FUNCTION run_pipe_parallel(pi_Select_Txt VARCHAR2,
pi_Proc_Name VARCHAR2,
pi_Parallel_Count VARCHAR2,
Pi_Password VARCHAR2) RETURN VARCHAR2 AS
LANGUAGE JAVA NAME 'com.samtrest.ParallelRunner.run_parallel(java.lang.String, java.lang.String,java.lang.String, java.lang.String) return java.lang.String';
-- Create table create global temporary table parallel_calls_tmp ( call_str varchar2(4000) ) on commit preserve rows;
public static String run_parallel(String selectTxt, String procedureName, String threadCount, String password) throws NumberFormatException, SQLException, ClassNotFoundException { String rc = "OK"; ParallelRunner parallelRunner = new ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),password); try { parallelRunner.runProc(); } catch (SQLException e) { e.printStackTrace(); rc = e.getMessage(); } catch (ClassNotFoundException e) { e.printStackTrace(); rc = e.getMessage(); } return rc; }
res = stm.executeQuery(); ResultSetMetaData meta = res.getMetaData(); columnCount = meta.getColumnCount(); int [] types = new int[columnCount]; for (int k = 0; k < columnCount; k++) { types[k] = meta.getColumnType(k+1); }
while (res.next()){ callStr = "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount; for (int k = 0; k < columnCount; k++) { callStr = callStr+","; String value = ""; if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){ value = res.getString(k+1); if (value == null){ value = "null"; }else{ value = "'"+value+"'"; } }else if (types[k] == java.sql.Types.NUMERIC){ BigDecimal number = res.getBigDecimal(k+1); if (number == null){ value = "null"; }else{ value = number.toString(); } }else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){ Timestamp date = res.getTimestamp(k+1); if (date == null){ value = "null"; }else{ value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')"; } }else{ System.out.println(""+types[k]); } callStr = callStr + value; } callStr = callStr + "); end;";
if (rowCount > CHUNK_LIMIT){ insert.setString(1, callStr); insert.executeUpdate(); }else{ chunks[i][j] = callStr; }
create or replace and compile java source named "ParallelRunner" as package com.samtrest; import java.math.BigDecimal; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; public class ParallelRunner { String selectTxt; String procedureName; String tableName = ""; String additional; int threadCount,processSeq; String host="127.0.0.1",instance,port="1521",userName,password; Connection [] connPool; Connection defaultConn; ChunkRunner [] chunkRunners; String [][] chunks; int CHUNK_LIMIT = 200000; public ParallelRunner(String selectTxt, String procedureName, int threadCount,String psw) throws SQLException, ClassNotFoundException { super(); this.selectTxt = selectTxt; this.procedureName = procedureName; this.threadCount = threadCount; this.port = "1521"; this.password=psw; connPool = new Connection[threadCount]; chunkRunners = new ChunkRunner[threadCount]; } public static String run_parallel(String selectTxt, String procedureName, String threadCount, String psw) throws NumberFormatException, SQLException, ClassNotFoundException { String rc = "OK"; ParallelRunner parallelRunner = new ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),psw); try { parallelRunner.runProc(); } catch (SQLException e) { e.printStackTrace(); rc = e.getMessage(); } catch (ClassNotFoundException e) { e.printStackTrace(); rc = e.getMessage(); } return rc; } public void populateConnectionPool() throws SQLException, ClassNotFoundException{ int siteNumber = 0; String siteStatus ="T"; ResultSet res; Class.forName("oracle.jdbc.driver.OracleDriver"); defaultConn = DriverManager.getConnection("jdbc:default:connection:"); PreparedStatement stm = defaultConn.prepareStatement("SELECT SYS_CONTEXT('USERENV','SESSION_USER') from dual"); res = stm.executeQuery(); if (res.next()){ userName = res.getString(1); } res.close(); stm = defaultConn.prepareStatement("SELECT SYS_CONTEXT('USERENV','DB_NAME') from dual"); res = stm.executeQuery(); if (res.next()){ instance = res.getString(1); } res.close(); for (int i = 0; i < connPool.length; i++) { connPool[i] = DriverManager.getConnection("jdbc:oracle:thin:@"+host+":"+port+":"+instance, userName, password); //connPool[i] = DriverManager.getConnection("jdbc:oracle:thin:@10.28.28.101:1521:orc1", userName, password); connPool[i].setAutoCommit(false); stm = connPool[i].prepareStatement("begin dbms_application_info.set_module('Java Parallel; process:'||"+ processSeq+",'"+connPool.length+" threads'); end;"); stm.executeUpdate(); } stm.close(); } public void runProc() throws SQLException, ClassNotFoundException{ int rowCount = 0,columnCount, chunkCount,i,j; String callStr=""; PreparedStatement stm; Statement info; PreparedStatement insert = null; populateConnectionPool(); info = defaultConn.createStatement(); info.executeUpdate("begin dbms_application_info.set_module('Parallel process:'||"+ processSeq+",'"+threadCount+" threads'); end;"); System.out.println(selectTxt); stm = defaultConn.prepareStatement("select count(*) from ("+selectTxt+")"); ResultSet res = stm.executeQuery(); if ( res.next()) rowCount = res.getInt(1); res.close(); stm.close(); chunkCount = rowCount/threadCount; if (chunkCount*threadCount < rowCount){ chunkCount++; } i = 0; j = 0; // System.out.println("Count of parallel threads: "+connPool.length); // System.out.println("Count of processing rows: "+rowCount); // System.out.println("Chunk length: "+chunkCount); info.executeUpdate("begin dbms_application_info.set_action('"+connPool.length+","+rowCount+","+chunkCount+"'); end;"); stm = defaultConn.prepareStatement(selectTxt); res = stm.executeQuery(); ResultSetMetaData meta = res.getMetaData(); columnCount = meta.getColumnCount(); int [] types = new int[columnCount]; for (int k = 0; k < columnCount; k++) { types[k] = meta.getColumnType(k+1); } if (rowCount > CHUNK_LIMIT){ insert = connPool[i].prepareStatement("insert into parallel_calls_tmp values (?)"); }else{ chunks = new String[threadCount][chunkCount]; } while (res.next()){ callStr = "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount; for (int k = 0; k < columnCount; k++) { callStr = callStr+","; String value = ""; if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){ value = res.getString(k+1); if (value == null){ value = "null"; }else{ value = "'"+value+"'"; } }else if (types[k] == java.sql.Types.NUMERIC){ BigDecimal number = res.getBigDecimal(k+1); if (number == null){ value = "null"; }else{ value = number.toString(); } }else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){ Timestamp date = res.getTimestamp(k+1); if (date == null){ value = "null"; }else{ value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')"; } }else{ System.out.println(""+types[k]); } callStr = callStr + value; } callStr = callStr + "); end;"; if (i == 0){ if( j == 0 ){ System.out.println(callStr); } } if (rowCount > CHUNK_LIMIT){ insert.setString(1, callStr); insert.executeUpdate(); }else{ chunks[i][j] = callStr; } j++; if (j == chunkCount){ connPool[i].commit(); if (rowCount > CHUNK_LIMIT){ chunkRunners[i] = new ChunkRunner(connPool[i],processSeq); }else{ chunkRunners[i] = new ChunkRunner(connPool[i],processSeq,chunks[i]); } chunkRunners[i].start(); i++; if (i < connPool.length ){ if (rowCount > CHUNK_LIMIT){ insert = connPool[i].prepareStatement("insert into parallel_calls_tmp values (?)"); } j = 0; } } info.executeUpdate("begin dbms_application_info.set_action('"+connPool.length+","+rowCount+","+chunkCount+" threads "+i+","+j+"'); end;"); } res.close(); stm.close(); info.close(); connPool[i].commit(); if (j < chunkCount){ if (rowCount > CHUNK_LIMIT){ chunkRunners[i] = new ChunkRunner(connPool[i],processSeq); }else{ chunkRunners[i] = new ChunkRunner(connPool[i],processSeq,chunks[i]); } chunkRunners[i].start(); } for (int k = 0; k < chunkRunners.length; k++) { if (chunkRunners[k] != null){ try { chunkRunners[k].join(); } catch (InterruptedException e) { e.printStackTrace(); } } } for (int k = 0; k < chunkRunners.length; k++) { if (chunkRunners[k] != null){ if (!connPool[k].isClosed()){ connPool[k].close(); } if (!"".equals(chunkRunners[k].errorMsg)){ throw(new SQLException(chunkRunners[k].errorMsg)); } } } defaultConn.close(); } public class ChunkRunner extends Thread { Connection conn; String errorMsg = ""; String [] chunk; String internal; int processSeq; public ChunkRunner(Connection conn, int process) { super(); this.conn = conn; this.processSeq = process; } public ChunkRunner(Connection conn, int process,String []chunk) { super(); this.conn = conn; this.chunk = chunk; this.processSeq = process; } public ChunkRunner(Connection conn, int process,String inter) { super(); this.conn = conn; this.processSeq = process; this.internal = inter; } public void run(){ Statement stm = null; PreparedStatement select = null; String stmt=""; try { stm = conn.createStatement(); if ("".equals(tableName)){ if( chunk == null){ select = conn.prepareStatement("select * from parallel_calls_tmp"); ResultSet res = select.executeQuery(); while (res.next()){ stmt = res.getString(1); if (stmt != null){ if ( stmt != ""){ stm.executeUpdate(stmt); } } } }else{ for (int i = 0; i < chunk.length; i++) { stmt = chunk[i]; if (stmt != null){ if (stmt != ""){ stm.executeUpdate(stmt); } } } } stm.close(); } } catch (SQLException e) { System.out.println(stmt); e.printStackTrace(); errorMsg = e.getMessage(); e.printStackTrace(); } finally { try { conn.commit(); conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } } /
PROCEDURE close_row_process(pi_CurrentInChunk NUMBER, pi_ChunkLength NUMBER, pi_activity_folder_seq fr_activity_folders.activity_folder_seq%TYPE, pi_action_seq fr_folder_actions.action_seq%TYPE, pi_demand_sum_min NUMBER, pi_action_code fr_folder_actions.action_code%TYPE, pi_demand_sum fr_folder_actions.demand_sum%TYPE, pi_contract_acc_num fr_activity_folders.contract_acc_num%TYPE, pi_debt_reduction_sum NUMBER, pi_response_type NUMBER, pi_section_num NUMBER, pi_client_id fr_activity_folders.client_id%TYPE, pi_client_type fr_activity_folders.client_type%TYPE);
dbms_application_info.set_module(module_name => 'close_folders_parallel,Process: ' || l_ProcessSeq, action_name => ''); v_Step := 1; l_SelectTxt := 'SELECT af.activity_folder_seq,' || ' fa.action_seq,' || ' CASE WHEN fa.action_code = 3' THEN' || ' rs.folder_closing_sum' || ' ELSE' || ' fac.demand_sum_min' || ' END demand_sum_min,' || ' fa.action_code,' || ' fa.demand_sum,' || ' af.contract_acc_num,' || ' nvl(rs.debt_reduction_sum,0) debt_reduction_sum,' || ' fab.response_type,rca.section_num,af.client_id,af.client_type' || ' FROM fr_activity_folders af,' || ' fr_folder_actions fa,' || ' rm_contract_acc rca,' || ' fr_actions_codes fac,' || ' rm_section rs,' || ' fr_action_bank_answers fab' || ' WHERE af.current_action_seq = fa.action_seq' || ' AND rca.contract_acc_num = af.contract_acc_num' || ' AND fa.action_code = fac.action_code' || ' AND rca.section_num = rs.section_num(+)' || ' AND fa.action_seq= fab.action_seq(+)' || ' AND af.folder_status = ' || fr_check_potential_pkg.c_FOLDER_OPEN_STATUS --||' AND rownum <= 1000' ; l_ProcName := 'fr_support_pkg.close_row_process'; -- Get amount limits for future actions ------------------------------------------------------ -- populate temporary table with calc balance results in parallel dbms_application_info.set_action(action_name => 'Parallel Java'); dbms_java.set_output(1000000); SELECT t.parallel_num INTO v_ParallelCount FROM sr_task_codes t WHERE t.task_seq = 203; v_Msg := run_pipe_parallel(pi_Select_Txt => l_SelectTxt, pi_Proc_Name => l_ProcName, pi_Parallel_Count => v_ParallelCount, Pi_Password => 'psw'); IF v_Msg <> 'OK' THEN RAISE ErrException; END IF; ------------------------------------------------------
Source: https://habr.com/ru/post/269693/
All Articles