📜 ⬆️ ⬇️

Parallel data import

Imagine that we have some set of tasks that allow the possibility of parallel execution. For example, we need to organize an RSS aggregator that updates all of its feeds after a specified period of time. It is clear that the main and quite tangible time will be spent on downloading data from a remote source. Considering this, the organization of such imports by the sequential loading of tapes is meaningless, so in the case of any large number of tapes, the import will not fit into the deadlines assigned to it.

There are two possible solutions to the problem. The first is to implement parallel loading of tapes using CURL'a. For example:

//
$rMultiHandler = curl_multi_init();
$aResources = array();
foreach ( $aFeedUrls as $sFeedUrl ) {
$rResource = curl_init();

curl_setopt($rResource, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($rResource, CURLOPT_URL, $sFeedUrl );
curl_setopt($rResource, CURLOPT_FOLLOWLOCATION, true );
curl_setopt($rResource, CURLOPT_TIMEOUT, 60);

curl_multi_add_handle( $multi_handler, $rResource );
$aResources[] = array(
'url' => $sFeedUrl,
'client' => $rResource
);
}

// CURL
$iRunningProcesses = null ;
do {
usleep( 1000000 );
curl_multi_exec( $rMultiHandler, $iRunningProcesses );
} while ( $iRunningProcesses > 0 );

//
foreach ( $aResources as $aResource ) {
$aHeaders = curl_getinfo( $aResource[ 'client' ] );
$sBody = curl_multi_getcontent( $aResource[ 'client' ] );
}




This option will partially solve the problem with downtime while waiting for the tape to load, however, further analysis of the tape will have to be carried out in sequential mode. In addition, it is not very convenient if in your subject model the tape is considered as an object of a certain class.
')
The second option is to create some set (pool) of child processes - each for tape. You can do this, for example, using the proc_ * family of functions. It would also be reasonable to limit the set of simultaneously running processes (pool size) to a certain number in order to control the server load (in principle, this statement is also true for the first option). To do this, you will have to emulate the dispatcher, which will monitor the state of the pool and add new processes to it as the processes in the pool are completed.

Below is a self-documented example of the implementation of the pool for parallel execution of the task of importing RSS feeds:

/**
*
*/
class Import {

/**
*
* @var int
*/
const POOL_SIZE = 10;

/**
*
* @var int
*/
const POOL_PROC_EXEC_TIME = 180;

/**
*
*/
public function startPool() {

file_put_contents( 'import.log' , "[*] " .
PHP_EOL, FILE_APPEND );

//
$iSuccess = 0;
$iFailure = 0;
$iUpdated = 0;

// ,
$aFeedId = array( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15 );

//
$aPool = array();
for ( $iIter = 0; $iIter < self::POOL_SIZE && !empty( $aFeedId );
$iIter++ ) {
$iFeedId = array_shift( $aFeedId );

$ this ->startProcess( $aPool, $iFeedId );
}

//
while ( !empty( $aPool ) ) {

// (1 )
usleep(1000000);

//
foreach ( $aPool as $iKey => &$aProcess ) {

//
$aProcStatus = proc_get_status( $aProcess[ 'handler' ] );

//
if ( false === $aProcStatus[ 'running' ] ) {

//
$iResponse = fgets( $aProcess[ 'pipes' ][1] );

//
fclose( $aProcess[ 'pipes' ][1] );
fclose( $aProcess[ 'pipes' ][2] );
proc_close( $aProcess[ 'handler' ] );

//
// www.php.net/manual/en/function.proc-get-status.php#92145
if ( 0 === $aProcStatus[ 'exitcode' ]
&& is_numeric( $iResponse ) ) {
$iSuccess++;
$iUpdated += $iResponse;

//
} else
$iFailure++;

//
unset( $aPool[ $iKey ] );
if ( !empty( $aFeedId ) ) {
$iFeedId = array_shift( $aFeedId );

$bIsLaunched = $ this ->startProcess( $aPool,
$iFeedId );
if ( !$bIsLaunched )
$iFailure++;
}

//
} else {

//
if ( time() - $aProcess[ 'iTimeStart' ] >
self::POOL_PROC_EXEC_TIME ) {
file_put_contents( 'import.log' , "[!] " .
" {$aProcess['iFeedId']} " .
" " . PHP_EOL, FILE_APPEND );
$iSingnalCode = 15;
proc_terminate( $aProcess[ 'handler' ], $iSingnalCode );
}
}
}
unset( $aProcess );
}

file_put_contents( 'import.log' , "[*] : " .
" {$iSuccess}, {$iFailure}, " .
" {$iUpdated}" . PHP_EOL, FILE_APPEND );
}

/**
*
* @param array $aPool
* @param int $iFeedId
*/
public function startProcess( array &$aPool, $iFeedId ) {

//
// www.php.net/manual/en/function.proc-get-status.php#93382
$sCmd = "exec php -f " . __FILE__ . " {$iFeedId}" ;

$aDescriptors = array(
1 => array( "pipe" , "w" ),
2 => array( "pipe" , "w" )
);
$aPipes = array();

//
$bSuccess = true ;
$rProcess = proc_open( $sCmd, $aDescriptors, $aPipes );
if ( is_resource( $rProcess ) ) {
$aPool[] = array(
'handler' => $rProcess,
'pipes' => $aPipes,
'iFeedId' => $iFeedId,
'iTimeStart' => time()
);
} else {
$bSuccess = false ;
file_put_contents( 'import.log' , "[!] " .
" {$iFeedId}" , FILE_APPEND );
}

return $bSuccess;
}

/**
*
* @param $iFeedId
* @return int
*/
public function doImport( $iFeedId ) {

file_put_contents( 'import.log' , "[+] {$iFeedId}" .
PHP_EOL, FILE_APPEND);

//
$iExecTime = rand( 1, 10 );
usleep( $iExecTime * 1000000 );
$iUpdated = rand( 0,10 );

file_put_contents( 'import.log' , "[-] {$iFeedId}" .
" {$iExecTime} " . PHP_EOL, FILE_APPEND);

//
echo $iUpdated;

return $iUpdated;
}
}

/**
*
*/
$oImport = new Import();

//
if ( 1 === $argc ) {
$oImport->startPool();

//
} else {
$iFeedId = $argv[1];
$oImport->doImport( $iFeedId );
}




The result of this script will be the following log:
[*]
[+] 1
[+] 2
[+] 3
[+] 4
[+] 5
[+] 8
[+] 6
[+] 7
[+] 9
[+] 10
[-] 7 1
[+] 11
[-] 1 5
[+] 12
[-] 10 5
[-] 2 6
[-] 12 1
[+] 13
[+] 14
[-] 3 7
[-] 6 7
[+] 15
[-] 9 7
[-] 14 1
[-] 11 6
[-] 4 9
[-] 5 10
[-] 8 10
[-] 13 7
[-] 15 6
[*] : 15, 0, 89


The method was tested in combat conditions and at the moment did not cause any complaints.

Source: https://habr.com/ru/post/90487/


All Articles