📜 ⬆️ ⬇️

Parallel processing of a large select in several sessions

Imagine: there is a select that returns records, each of which needs to be processed, and whether there are many records, or the processing of each record takes a lot of time, and the processing of one record does not depend on the processes of other records.
The classic example is to enable multithreading or, in the case of databases, to perform processing in several sessions. In Orakle, hint / * + parallel () * / and pipelined functions are used for this. This is great, but if you have the Oracle standard edition (where the parallel does not work) or you want to process not every record separately (for reasons that it is better to save work, and then to complete, with one blow, execute) and divide the entire output of the select into pieces and process each separately?

The task is set as follows:

Write a Java stored procedure that receives the following parameters:

First, let's see what can be done with the pipelined function.

Java opens the result set in the default connection.
First thing to do
select count (*) from ("Text select");
Create a connection pool with the dimension specified in the 3rd parameter.
Let's create separate sessions, having joined through jdbc connection.
')
We take the data for this from the 4th parameter, we, by and large, need only a password, we’ll get everything else ourselves (maybe another port if it is different from 1521).

We will receive data from the default connection and rewrite it into the session from the pool. As soon as we decide that we have accumulated enough, we will create a thread, pass this connection to it as a parameter and let it work, and we will continue with the next session or, if everything has already been read, we will wait for the end of all threads.

Let's write the processing function. It gets all the select fields as parameters.

It will be convenient for, for example, the first two parameters to be the number in a chunk and its dimension. This will enable dbms info to display the percentage of completion in the stream.

According to the metadata of the select, we will construct its call in the form of something like this:

begin proc1 (23,14000, 'a1', 3, 'tratata', 35.48); end;

We will only store such a string.

Initially, it was a 2-dimensional array (i, j), where i is the stream number (hereinafter ...). Then I saw that with a large number of records, the Oracle costs of supporting a large array become excessive and decided to use a temporary table as well.

I put the border in 200,000 entries. If count (*) returns less than 200,000 Java, in-runtime uses a 2-dimensional String array; if more, writes to the parallel_calls_tmp temporary table with one varchar2 (4000) field.

So, in the PL / SQL package we create the function
FUNCTION run_pipe_parallel(pi_Select_Txt VARCHAR2,
pi_Proc_Name VARCHAR2,
pi_Parallel_Count VARCHAR2,
Pi_Password VARCHAR2) RETURN VARCHAR2 AS
LANGUAGE JAVA NAME 'com.samtrest.ParallelRunner.run_parallel(java.lang.String, java.lang.String,java.lang.String, java.lang.String) return java.lang.String';

Create a table
 -- Create table create global temporary table parallel_calls_tmp ( call_str varchar2(4000) ) on commit preserve rows; 

Java side has a function
  public static String run_parallel(String selectTxt, String procedureName, String threadCount, String password) throws NumberFormatException, SQLException, ClassNotFoundException { String rc = "OK"; ParallelRunner parallelRunner = new ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),password); try { parallelRunner.runProc(); } catch (SQLException e) { e.printStackTrace(); rc = e.getMessage(); } catch (ClassNotFoundException e) { e.printStackTrace(); rc = e.getMessage(); } return rc; } 


Getting an array of data types of select fields
  res = stm.executeQuery(); ResultSetMetaData meta = res.getMetaData(); columnCount = meta.getColumnCount(); int [] types = new int[columnCount]; for (int k = 0; k < columnCount; k++) { types[k] = meta.getColumnType(k+1); } 

So build the call string:
  while (res.next()){ callStr = "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount; for (int k = 0; k < columnCount; k++) { callStr = callStr+","; String value = ""; if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){ value = res.getString(k+1); if (value == null){ value = "null"; }else{ value = "'"+value+"'"; } }else if (types[k] == java.sql.Types.NUMERIC){ BigDecimal number = res.getBigDecimal(k+1); if (number == null){ value = "null"; }else{ value = number.toString(); } }else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){ Timestamp date = res.getTimestamp(k+1); if (date == null){ value = "null"; }else{ value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')"; } }else{ System.out.println(""+types[k]); } callStr = callStr + value; } callStr = callStr + "); end;"; 

We accumulate in an array or table
  if (rowCount > CHUNK_LIMIT){ insert.setString(1, callStr); insert.executeUpdate(); }else{ chunks[i][j] = callStr; } 


And now the whole class that needs to be loaded into the database.
 create or replace and compile java source named "ParallelRunner" as package com.samtrest; import java.math.BigDecimal; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; public class ParallelRunner { String selectTxt; String procedureName; String tableName = ""; String additional; int threadCount,processSeq; String host="127.0.0.1",instance,port="1521",userName,password; Connection [] connPool; Connection defaultConn; ChunkRunner [] chunkRunners; String [][] chunks; int CHUNK_LIMIT = 200000; public ParallelRunner(String selectTxt, String procedureName, int threadCount,String psw) throws SQLException, ClassNotFoundException { super(); this.selectTxt = selectTxt; this.procedureName = procedureName; this.threadCount = threadCount; this.port = "1521"; this.password=psw; connPool = new Connection[threadCount]; chunkRunners = new ChunkRunner[threadCount]; } public static String run_parallel(String selectTxt, String procedureName, String threadCount, String psw) throws NumberFormatException, SQLException, ClassNotFoundException { String rc = "OK"; ParallelRunner parallelRunner = new ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),psw); try { parallelRunner.runProc(); } catch (SQLException e) { e.printStackTrace(); rc = e.getMessage(); } catch (ClassNotFoundException e) { e.printStackTrace(); rc = e.getMessage(); } return rc; } public void populateConnectionPool() throws SQLException, ClassNotFoundException{ int siteNumber = 0; String siteStatus ="T"; ResultSet res; Class.forName("oracle.jdbc.driver.OracleDriver"); defaultConn = DriverManager.getConnection("jdbc:default:connection:"); PreparedStatement stm = defaultConn.prepareStatement("SELECT SYS_CONTEXT('USERENV','SESSION_USER') from dual"); res = stm.executeQuery(); if (res.next()){ userName = res.getString(1); } res.close(); stm = defaultConn.prepareStatement("SELECT SYS_CONTEXT('USERENV','DB_NAME') from dual"); res = stm.executeQuery(); if (res.next()){ instance = res.getString(1); } res.close(); for (int i = 0; i < connPool.length; i++) { connPool[i] = DriverManager.getConnection("jdbc:oracle:thin:@"+host+":"+port+":"+instance, userName, password); //connPool[i] = DriverManager.getConnection("jdbc:oracle:thin:@10.28.28.101:1521:orc1", userName, password); connPool[i].setAutoCommit(false); stm = connPool[i].prepareStatement("begin dbms_application_info.set_module('Java Parallel; process:'||"+ processSeq+",'"+connPool.length+" threads'); end;"); stm.executeUpdate(); } stm.close(); } public void runProc() throws SQLException, ClassNotFoundException{ int rowCount = 0,columnCount, chunkCount,i,j; String callStr=""; PreparedStatement stm; Statement info; PreparedStatement insert = null; populateConnectionPool(); info = defaultConn.createStatement(); info.executeUpdate("begin dbms_application_info.set_module('Parallel process:'||"+ processSeq+",'"+threadCount+" threads'); end;"); System.out.println(selectTxt); stm = defaultConn.prepareStatement("select count(*) from ("+selectTxt+")"); ResultSet res = stm.executeQuery(); if ( res.next()) rowCount = res.getInt(1); res.close(); stm.close(); chunkCount = rowCount/threadCount; if (chunkCount*threadCount < rowCount){ chunkCount++; } i = 0; j = 0; // System.out.println("Count of parallel threads: "+connPool.length); // System.out.println("Count of processing rows: "+rowCount); // System.out.println("Chunk length: "+chunkCount); info.executeUpdate("begin dbms_application_info.set_action('"+connPool.length+","+rowCount+","+chunkCount+"'); end;"); stm = defaultConn.prepareStatement(selectTxt); res = stm.executeQuery(); ResultSetMetaData meta = res.getMetaData(); columnCount = meta.getColumnCount(); int [] types = new int[columnCount]; for (int k = 0; k < columnCount; k++) { types[k] = meta.getColumnType(k+1); } if (rowCount > CHUNK_LIMIT){ insert = connPool[i].prepareStatement("insert into parallel_calls_tmp values (?)"); }else{ chunks = new String[threadCount][chunkCount]; } while (res.next()){ callStr = "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount; for (int k = 0; k < columnCount; k++) { callStr = callStr+","; String value = ""; if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){ value = res.getString(k+1); if (value == null){ value = "null"; }else{ value = "'"+value+"'"; } }else if (types[k] == java.sql.Types.NUMERIC){ BigDecimal number = res.getBigDecimal(k+1); if (number == null){ value = "null"; }else{ value = number.toString(); } }else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){ Timestamp date = res.getTimestamp(k+1); if (date == null){ value = "null"; }else{ value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')"; } }else{ System.out.println(""+types[k]); } callStr = callStr + value; } callStr = callStr + "); end;"; if (i == 0){ if( j == 0 ){ System.out.println(callStr); } } if (rowCount > CHUNK_LIMIT){ insert.setString(1, callStr); insert.executeUpdate(); }else{ chunks[i][j] = callStr; } j++; if (j == chunkCount){ connPool[i].commit(); if (rowCount > CHUNK_LIMIT){ chunkRunners[i] = new ChunkRunner(connPool[i],processSeq); }else{ chunkRunners[i] = new ChunkRunner(connPool[i],processSeq,chunks[i]); } chunkRunners[i].start(); i++; if (i < connPool.length ){ if (rowCount > CHUNK_LIMIT){ insert = connPool[i].prepareStatement("insert into parallel_calls_tmp values (?)"); } j = 0; } } info.executeUpdate("begin dbms_application_info.set_action('"+connPool.length+","+rowCount+","+chunkCount+" threads "+i+","+j+"'); end;"); } res.close(); stm.close(); info.close(); connPool[i].commit(); if (j < chunkCount){ if (rowCount > CHUNK_LIMIT){ chunkRunners[i] = new ChunkRunner(connPool[i],processSeq); }else{ chunkRunners[i] = new ChunkRunner(connPool[i],processSeq,chunks[i]); } chunkRunners[i].start(); } for (int k = 0; k < chunkRunners.length; k++) { if (chunkRunners[k] != null){ try { chunkRunners[k].join(); } catch (InterruptedException e) { e.printStackTrace(); } } } for (int k = 0; k < chunkRunners.length; k++) { if (chunkRunners[k] != null){ if (!connPool[k].isClosed()){ connPool[k].close(); } if (!"".equals(chunkRunners[k].errorMsg)){ throw(new SQLException(chunkRunners[k].errorMsg)); } } } defaultConn.close(); } public class ChunkRunner extends Thread { Connection conn; String errorMsg = ""; String [] chunk; String internal; int processSeq; public ChunkRunner(Connection conn, int process) { super(); this.conn = conn; this.processSeq = process; } public ChunkRunner(Connection conn, int process,String []chunk) { super(); this.conn = conn; this.chunk = chunk; this.processSeq = process; } public ChunkRunner(Connection conn, int process,String inter) { super(); this.conn = conn; this.processSeq = process; this.internal = inter; } public void run(){ Statement stm = null; PreparedStatement select = null; String stmt=""; try { stm = conn.createStatement(); if ("".equals(tableName)){ if( chunk == null){ select = conn.prepareStatement("select * from parallel_calls_tmp"); ResultSet res = select.executeQuery(); while (res.next()){ stmt = res.getString(1); if (stmt != null){ if ( stmt != ""){ stm.executeUpdate(stmt); } } } }else{ for (int i = 0; i < chunk.length; i++) { stmt = chunk[i]; if (stmt != null){ if (stmt != ""){ stm.executeUpdate(stmt); } } } } stm.close(); } } catch (SQLException e) { System.out.println(stmt); e.printStackTrace(); errorMsg = e.getMessage(); e.printStackTrace(); } finally { try { conn.commit(); conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } } / 

Example Handler Function
  PROCEDURE close_row_process(pi_CurrentInChunk NUMBER, pi_ChunkLength NUMBER, pi_activity_folder_seq fr_activity_folders.activity_folder_seq%TYPE, pi_action_seq fr_folder_actions.action_seq%TYPE, pi_demand_sum_min NUMBER, pi_action_code fr_folder_actions.action_code%TYPE, pi_demand_sum fr_folder_actions.demand_sum%TYPE, pi_contract_acc_num fr_activity_folders.contract_acc_num%TYPE, pi_debt_reduction_sum NUMBER, pi_response_type NUMBER, pi_section_num NUMBER, pi_client_id fr_activity_folders.client_id%TYPE, pi_client_type fr_activity_folders.client_type%TYPE); 

Usage example
  dbms_application_info.set_module(module_name => 'close_folders_parallel,Process: ' || l_ProcessSeq, action_name => ''); v_Step := 1; l_SelectTxt := 'SELECT af.activity_folder_seq,' || ' fa.action_seq,' || ' CASE WHEN fa.action_code = 3' THEN' || ' rs.folder_closing_sum' || ' ELSE' || ' fac.demand_sum_min' || ' END demand_sum_min,' || ' fa.action_code,' || ' fa.demand_sum,' || ' af.contract_acc_num,' || ' nvl(rs.debt_reduction_sum,0) debt_reduction_sum,' || ' fab.response_type,rca.section_num,af.client_id,af.client_type' || ' FROM fr_activity_folders af,' || ' fr_folder_actions fa,' || ' rm_contract_acc rca,' || ' fr_actions_codes fac,' || ' rm_section rs,' || ' fr_action_bank_answers fab' || ' WHERE af.current_action_seq = fa.action_seq' || ' AND rca.contract_acc_num = af.contract_acc_num' || ' AND fa.action_code = fac.action_code' || ' AND rca.section_num = rs.section_num(+)' || ' AND fa.action_seq= fab.action_seq(+)' || ' AND af.folder_status = ' || fr_check_potential_pkg.c_FOLDER_OPEN_STATUS --||' AND rownum <= 1000' ; l_ProcName := 'fr_support_pkg.close_row_process'; -- Get amount limits for future actions ------------------------------------------------------ -- populate temporary table with calc balance results in parallel dbms_application_info.set_action(action_name => 'Parallel Java'); dbms_java.set_output(1000000); SELECT t.parallel_num INTO v_ParallelCount FROM sr_task_codes t WHERE t.task_seq = 203; v_Msg := run_pipe_parallel(pi_Select_Txt => l_SelectTxt, pi_Proc_Name => l_ProcName, pi_Parallel_Count => v_ParallelCount, Pi_Password => 'psw'); IF v_Msg <> 'OK' THEN RAISE ErrException; END IF; ------------------------------------------------------ 

Basically, I just thought that it would work not only with Oracle, but with any database ...

If anyone is interested, I can tell you what I added in order to work not with the simulation pipelined functions, but with the execution of individual batches ...

I can say that as a result I received a gain in time: 12 hours in one session versus an hour and a half in 25 sessions. In this case, all 16 server processors were loaded at 100%.

Source: https://habr.com/ru/post/269693/


All Articles