📜 ⬆️ ⬇️

PHP multithreading emulation

The essence of multithreading (for an application) is that a process can consist of several (of the same type) threads running “in parallel”, that is, without sequencing by time — one was executed, the next one went. Using multithreading allows you to speed up the execution of the task and / or reduce the load, thus, improving the speed of the application itself.

PHP refers to languages ​​in which multithreading is not supported. But there are a lot of tasks in which she would be very useful. As a rule, these are tasks in which one action needs to be performed many times, but with different parameters, and the action itself is so resource-intensive, and the number of iterations is so large that it is impossible to use a normal cycle.

I will consider a simple example of how to achieve multithreading emulation in PHP.

Suppose we need to send mailing to a very large number of addresses. Addresses are stored in the database. Since there are many addresses, this option does not suit us, hang the server:
')
$ res = mysql_query ( 'SELECT `email` FROM` user`' );

while ( $ row = mysql_fetch_assoc ( $ res ))
mail ( $ row [ 'email' ], $ theme , $ text );


If we want to use multithreading for this task, then the following option would look good:

$ res = mysql_query ( 'SELECT `email` FROM` user`' );

$ data = array ();

while ( $ row = mysql_fetch_assoc ( $ res ))
$ data [] = $ row ;

$ multithreading = new MultiThreading ();

// mailer.php - a script that sends an email to the address passed to it by the GET method,
// so mailer.php?email=somebody@example.com
$ multithreading -> setScriptName ( 'mailer.php' );

$ multithreading -> setParams ( $ data );

$ multithreading -> execute ();


Everything is simple and convenient. It remains to write the corresponding class, which I did. Here it is, with all the explanations in the comments.

class MultiThreading
{
/ **
* Server Name
*
* @var string
* @access private
* /
private $ server ;

/ **
* Maximum number of threads
*
* @var int
* @access private
* /
private $ maxthreads ;

/ **
* The name of the script that performs the task we need
*
* @var string
* @access private
* /
private $ scriptname ;

/ **
* Parameters that we will pass to the script
*
* @var array
* @access private
* /
private $ params = array ();

/ **
* Array in which streams are stored
*
* @var array
* @access private
* /
private $ threads = array ();

/ **
* Array in which the results are stored
*
* @var array
* @access private
* /
private $ results = array ();

/ **
* Constructor class. In it, we specify the maximum number of threads and the name of the server. Both arguments are optional.
*
* @param int $ maxthreads maximum number of threads, default 10
* @param string $ server server name, by default the name of the server on which the application is running
* @access public
* /
public function __construct ( $ maxthreads = 10 , $ server = '' )
{
if ( $ server )
$ this -> server = $ server ;
else
$ this -> server = $ _SERVER [ 'SERVER_NAME' ];

$ this -> maxthreads = $ maxthreads ;
}

/ **
* Specify the name of the script that performs the task we need.
*
* @param string $ scriptname script name including path to it
* @access public
* /
public function setScriptName ( $ scriptname )
{
if (! $ fp = fopen ( 'http: //' . $ this -> server . '/' . $ scriptname , 'r' ))
throw new Exception ( 'Cant open script file' );

fclose ( $ fp );

$ this -> scriptname = $ scriptname ;
}

/ **
* Set the parameters that we will pass to the script
*
* @param array $ params parameters array
* @access public
* /
public function setParams ( $ params = array ())
{
$ this -> params = $ params ;
}

/ **
* Perform the task, comments in the code
*
* @access public
* /
public function execute ()
{
// Start the mechanism, and it works until all threads are executed.
do {
// If you do not exceed the limit of threads
if ( count ( $ this -> threads ) < $ maxthreads ) {
// If you can get the next set of parameters
if ( $ item = current ( $ this -> params )) {

// Create a request using the GET method

$ query_string = '' ;

foreach ( $ item as $ key => $ value )
$ query_string . = '&' . urlencode ( $ key ). '=' . urlencode ( $ value );

$ query = "GET http: //" . $ this -> server . "/" . $ this -> scriptname . "?" . $ query_string . "HTTP / 1.0 \ r \ n" ;

// Open the connection

if (! $ fsock = fsockopen ( $ this -> server , 80 ))
throw new Exception ( 'Cant open socket connection' );

fputs ( $ fsock , $ query );
fputs ( $ fsock , "Host: $ server \ r \ n" );
fputs ( $ fsock , "\ r \ n" );

stream_set_blocking ( $ fsock , O );
stream_set_timeout ( $ fsock , 3600 );

// write the stream

$ this -> threads [] = $ fsock ;

// Go to the next element

next ( $ this -> params );
}
}

// Looping through threads
foreach ( $ this -> threads as $ key => $ value ) {
// If the thread has completed, close and delete
if ( feof ( $ value )) {
fclose ( $ value );
unset ( $ this -> threads [ $ key ]);
} else {
// Otherwise, read the results
$ this -> results [] = fgets ( $ value );
}
}

// You can put a delay in order not to hang the server
sleep ( 1 );

// ... until all threads are executed
} while ( count ( $ this -> threads )> O );

return $ this -> results ;
}
}


You can also download this class, so as not to copy-paste :-).

UPD: gentlemen, I want to remind you, yet this is rather a detailed example than an ideal ready-made solution.

PS - «$var > 0», (stream_set_blocking($fsock, 0); while (count($this->threads) > 0);) 0 O. .

Original article.

Source: https://habr.com/ru/post/40245/


All Articles