📜 ⬆️ ⬇️

parallelizing task execution with stream_select ()

Not many people know that some tasks in PHP can be made to run in parallel - and this is not necessary to resort to forks. PHP5 has stream functions, and among them is stream_select ().

After reading the Cameron Laird article (http://www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU), anyone who has not done so can learn this technique , I offer you a simple small class Parastreams in this topic, which will itself be parallelized - and you can decide what to do with the data obtained from the streams by specifying data handlers.


')
Scope of technology:
It is necessary to get some data on the network from several sockets. Using stream_select (), you will receive data from all sockets for the time equal to the time of receiving data from the slowest of them (with the traditional approach, the total time will be equal to the sum of the times for receiving data from each socket).
Suppose you are using search with pom. Sphinx . With the help of stream_select () you can force several requests to the search daemon to be executed in parallel (of course, you have to tweak and pick up the sphinxapi, but there is nothing super-complicated). This can be useful when a search results in two requests to a search daemon (for example, we search in posts and in comments): these two requests to two, respectively, indices will be executed in parallel — that is, we obtain optimization and search acceleration.

And here is the class code:

<?php

/**
* Parastreams PHP class:
* a simple tool for performing multiple tasks with PHP - simultaneously (in parallel).
*
* example of usage:
* $ps = new Parastreams();
*
* function parastreams_callback($data) {
* echo $data."\n";
* }
*
* $s = stream_socket_client("localhost:80", $errno,
* $errstr, 10,
* STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
* fwrite($s, "GET /sleep.php?delay=1 HTTP/1.0\r\nHost: localhost\r\n\r\n");
* $ps->add($s, 'parastreams_callback');
* ... // repeat the above 5 lines as many times as you wish to, adding new streams to $ps.
* $ps->run();// process the streams
*
* Author: Victor Bolshov ( crocodile2u ( the at symbol here ) yandex.ru )
*
* License: use this script without any retrictions.
*
* Based on code by Cameron Laird, you may find his code here:
* www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU
*
* PHP version used: PHP 5.3.0alpha1 (should be compatible with older versions of PHP5)
*/

class Parastreams {
/**
* streams served by this instance
* @var resource[]
*/
private $streams = array();
/**
* stream events listeners
* @var array
*/
private $listeners = array();
/**
* @var int
*/
private $timeout = 10;
/**
* Constructor
* @param array $arg when specified, add() is called and $arg is passed to add()
* @see add()
*/
function __construct($arg = null )
{
if ($arg)
{
$ this ->add($arg);
}
}
/**
* add new stream(s)
* @param array | resource $arg either a stream resource or an array like this:
* array(
* array(stream1, listener1),
* array(stream2, listener2),..
*)
* where streamN is a stream resource created with stream_socket_client(),
* and listenerN is a Closure object which is called once the stream becomes readable,
* with the only argument: string $data (the data read from the stream)
* @param callable $arg2 the listener to stream; matters only in case when the first arg is not an array
* @return void
* @throws ParastreamsException
*/
function add($arg1, $arg2 = null )
{
if (is_array($arg1))
{
foreach ($arg1 as $offset => $s)
{
if (! is_array($s))
{
throw new ParastreamsException( "Illegal input at offset " . $offset . " (not an array)" );
} elseif (count($s = array_values($s)) < 2) {
throw new ParastreamsException( "Illegal input at offset " . $offset . " (length is less then 2)" );
} elseif (! is_resource($s[0])) {
throw new ParastreamsException( "Illegal input at offset " . $offset . " (not a stream resource)" );
} elseif (! is_callable($s[1])) {
throw new ParastreamsException( "Illegal input at offset " . $offset . " (not a callable)" );
}

$ this ->addOne($s[0], $s[1]);
}
} elseif (is_resource($arg1)) {
if (! is_callable($arg2))
{
throw new ParastreamsException( "Argument 2 is expected to be a callable, " . gettype($arg2) . " given" );
}
$ this ->addOne($arg1, $arg2);
} else {
throw new ParastreamsException( "Argument 1 is expected to be a resource or an array, " . gettype($arg1) . " given" );
}
}
/**
* Start listening to stream events
* @return void
* @throws ParastreamsException
*/
function run()
{
while (count($ this ->streams))
{
$events = $ this ->streams;
if ( false === stream_select($events, $w = null , $e = null , $ this ->timeout))
{
throw new ParastreamsException( "stream_select() failed!" );
} elseif (count($events)) {
$ this ->processStreamEvents($events);
} else {
throw new ParastreamsException( "Time out!" );
}
}
}

/* Starting private methods */

private function processStreamEvents($events)
{
foreach ($events as $fp) {
$id = array_search($fp, $ this ->streams);

$ this ->invokeListener($fp);

fclose($fp);
unset($ this ->streams[$id]);
}
}
private function invokeListener($fp)
{
foreach ($ this ->listeners as $index => $spec) {
if ($spec[0] == $fp)
{
$data = "" ;
while (! feof($fp))
{
$data .= fread($fp, 1024);
}
call_user_func($spec[1], $data);
unset($ this ->listeners[$index]);
return ;
}
}
}
private function addOne($stream, $listener)
{
$ this ->streams[] = $stream;
$ this ->listeners[] = array($stream, $listener);
}
}

class ParastreamsException extends RuntimeException {}


* This source code was highlighted with Source Code Highlighter .


Example of use (there is in the comments, but nonetheless):

test.php:
<?php

require_once 'Parastreams.php' ;

function parastreams_callback($data) {
echo $data. "\n" ;
};

$streams = array();
for ($i = 1; $i <= 3; ++$i) {
$s = stream_socket_client( "localhost:80" , $errno,
$errstr, 10,
STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
fwrite($s, "GET /sleep.php?delay=" . $i . " HTTP/1.0\r\nHost: localhost\r\n\r\n" );
$streams[$i] = array($s, 'parastreams_callback' );
}

$ps = new Parastreams($streams);
$ps->run();


* This source code was highlighted with Source Code Highlighter .


The example uses sleep.php, to complete the picture, here it is:

<?php

$delay = filter_input(INPUT_GET, 'delay' , FILTER_VALIDATE_INT);
if ($delay <= 0) {
$delay = 1;
}

sleep($delay);

echo "was sleeping for $delay seconds\n" ;


* This source code was highlighted with Source Code Highlighter .

Source: https://habr.com/ru/post/43637/


All Articles