📜 ⬆️ ⬇️

AzaThread - multithreading for PHP with blackjack

A lot of solutions for php emulation are walking around the net. Most often they are based on forks, but there are variations on the theme using curl, proc_open, etc.

All of the options for one reason or another did not suit me and had to write my decision.
I had the following set of requirements:

The result is the AzaThread library (the old name is CThread).

For impatient immediately link to the source:
github.com/Anizoptera/AzaThread

Description


AzaThread provides a simple interface for creating thread classes. Which actually use separate processes for asynchronous work, but you should not care. You can send events from a stream, return results, use one stream multiple times passing it arguments to start, or create a pool of 16 threads raking up your tasks like hotcakes without paying any attention to the fact that work happens in different processes.
')
In addition, you can easily test the performance of the library in different modes by selecting the optimal number of threads and the option of transferring data between processes specifically for your configuration.

To complete the work, the following extensions are required: libevent , posix and pcntl .

The library uses LibEvent and paired sockets for communication between processes. Supports 5 data transfer options (arguments, results and event data)!

Variants cite immediately with the performance data. It was tested with a pool of eight streams on Intel Core i7 2600K 3.40 Ghz (Ubuntu 11.04 on VMware virtual machine). The average results for 10 repetitions of the test in jps are given (jobs per second - the number of tasks that simply receive arguments and give data per second).
NojpsDescription
one6501Data transfer in the serialized form through the same sockets. The default option.
26625The same, but with igbinary serialization (the most productive option). Used by default if igbinary is installed.
36194System V Memory queue (sysvmsg)
four6008System V Shared memory (sysvshm)
five6052Shared memory (shmop)

An extension for working with sockets is automatically selected. If available, the sockets extension is used, which results in improved performance. Otherwise, stream is enabled.

In the child process, all available signals are heard. By default, all of them (except SIGWINCH and SIGINFO) should be shut down. But this can easily be overridden by creating a method with a signal name in the flow class. For example sigWinch .

In the parent process, by default, all signals are also captured. This can be changed by setting the class parameter listenMasterSignals to false. In this case, only SIGCHLD will be processed. You can easily add your own handlers by creating a static method called m <signal name> . For example mSigTerm .

If the child process dies for some reason, the class will automatically fork when the new task starts. This happens unnoticed and you can not even think about it. Just the instance does not need to re-create in case of any error.

The child process checks the existence of the parent from time to time. If he dies suddenly, the child automatic will end.

All resources used by a thread or thread pool are automatically cleared when the destructor is called. But they can be cleaned forcibly by calling the cleanup method. In this case, the thread / pool can no longer be used.

With standard settings, the thread is initialized in advance, immediately when creating a class. If the prefork parameter is set to false, then the fork will occur only at the moment the task is started.

There are quite a few customizable options. Changing the name of the child process after the fork ( pName parameter of the constructor), timeout for the task execution ( timeoutWork ), timeout for the maximum waiting time for tasks by the child process ( timeoutMaxWait ), timeout for the pre-initialization time ( timeoutInit ), size of buffers for reading sockets ( pipeReadSize , pipeMasterReadSize ).
You can disable multitasking for threads ( multitask ). In this case, each time the task is completed, the child process will die and be forged again for the next run. This will significantly reduce performance.

The code is covered with tests and is documented in detail, examples of use can be viewed and run in the file example.php .
More complex error handling examples can be seen in the unit test code.

There is a debugging mode which displays very detailed information about what is happening and where exactly.

Examples of using


The main feature is maximum simplicity. If you just want to run something in a separate “thread”, the following code is enough:
class ExampleThread extends Thread { protected function process() { // Some work here } } $thread = new ExampleThread(); $thread->wait()->run(); 

If you have everything you need to complete the work, then the task will be performed asynchronously. If not, everything will still work, but in synchronous mode.

With the transfer of the parameter and getting the result, the code will look just a little more complicated:
 class ExampleThread extends Thread { protected function process() { return $this->getParam(0); } } $thread = new ExampleThread(); $thread->wait()->run(123); $result = $thread->wait()->getResult(); 


Similarly, with a slight wave of the hand, we add event handling from the stream:
 class ExampleThread extends Thread { const EV_PROCESS = 'process'; protected function process() { $events = $this->getParam(0); for ($i = 0; $i < $events; $i++) { $event_data = $i; $this->trigger(self::EV_PROCESS, $event_data); } } } //  . $additionalArgument = 123; $thread->bind(ExampleThread::EV_PROCESS, function($event_name, $event_data, $additional_arg) { //   }, $additionalArgument); $events = 10; //  ,    //         , //    preforkWait  TRUE  - $thread->wait(); $thread = new ExampleThread(); $thread->run($events)->wait(); 


Finally, use a pool of eight threads with error handling:
 $threads = 8 //   $pool = new ThreadPool('ExampleThread', $threads); $num = 25; //   $left = $num; //    do { //       //        while ($pool->hasWaiting() && $left > 0) { //    id  $threadId = $pool->run(); $left--; } if ($results = $pool->wait($failed)) { foreach ($results as $threadId => $result) { //    //    //  id  ($threadId) $num--; } } if ($failed) { //   . //     //         //      foreach ($failed as $threadId) { $left++; } } } while ($num > 0); //    .    . $pool->cleanup(); 


Performance Test Results



Tests run on two machines with Ubuntu 11.04.
The first is the Intel Core i3 540 3.07 Ghz
The second is Intel Core i7 2600K 3.40 Ghz (Ubuntu stands on VMware virtual)

I cite the results simply to be able to estimate the productivity growth.
Again, these are average results for a series of 10 test repeats in jps (jobs per second - number of tasks per second).

As a task, threads perform the following garbage:
 for ($i = 0; $i < 1000; $i++) { $r = mt_rand(0, PHP_INT_MAX) * mt_rand(0, PHP_INT_MAX); } 

The first result is specified for synchronous operation (without forks).
I did not try 18 and 20 threads on the first configuration, since already for 12 the performance drop started.
Number of threadsFirst configurationThe second
0553763
one330669
25801254
four10152188
eight10402618
ten10272719
129702739
sixteen9582904
18-2830
20-2730


I mean, performance rises 2-4 or more times depending on the processor!

The code that performs a series of tests with the necessary parameters is in the examples / speed_test.php file . So you can easily test the performance and select the optimal number of threads at your place.



Well, in conclusion, once again, the link to the source, perhaps someone did not notice above:
github.com/Anizoptera/AzaThread

I would be very happy if the library is useful to anyone. Any feature requests or detected bugs can be left on the githaba, I will promptly fix and improve the library.

UPD:
A large library update has been released and it has been renamed from CThread to AzaThread. The comments resented the name with "C". So now the library is called AzaThread, it uses namespaces and supports PSR-0 :)
In this regard, I slightly corrected the article - the code, title and links to github.

UPD2:
AzaThread now has a composer package . The library has moved to the rest of the open components from Anizoptera CMF. And about our new developments and updates of open components, we are now writing on our blog AzaGroup.ru .

Source: https://habr.com/ru/post/134501/


All Articles