📜 ⬆️ ⬇️

Asynchronous execution of PHP script on subprocesses



Good afternoon, dear habrovchane.

Today I would like to talk about such non-trivial things as asynchronous (parallel) calculations in the PHP language.
PHP itself is a scripting language that never claimed to be multi-threaded. But the farther into the forest, the more serious tasks facing the developers, and the more they have to “pervert” with a puff, because many companies simply fear and do not want to migrate to a more appropriate programming language for these tasks. Therefore, we have to work with what they give.
Details under the cut ...

')
Some time ago, I had a rather non-trivial task.
In short, the project implemented about 20 very heavy modules for calculating the cost of goods.
All this hung on several relational tables, each of the modules contained its own calculation rules and so on. But it was necessary to issue all this to the client as a single package. And this should have been done quickly. Very fast. Caching saved, but in very limited amounts, completely insufficient to meet the technical requirements.

The algorithm was quite simple: the necessary arguments were given to the input, then all the modules were instantiated into the array, and the whole thing was calculated in the loop. The answer was going to a single object and spat out on the client for post-processing.

So, at a certain moment, we and the team reached a dead end, and realized that each new module adds not even a linear amount of processing time, but with some increasing progression.

As you already guessed, it was suggested to parallelize the process in any way. But with PHP, it's not easy, because he can't do this out of the box.

Different solutions were tested:


Unfortunately, in the end nothing came of it. It was decided to minimize the project.

But for me the question remained open, because the decision should be. And even then we thought about some kind of “subprocesses” that the main script generates (analogous to the exec () function).

A lot of time has passed since then, I left the project a long time ago. But just last week I had one very nontrivial task: to write a script that in a certain way would secure the current state of some entity and part of its heavy relational dependencies. For this purpose, 2 classes are used that correctly prepare the data and store it in the database. The problem is that there are about 2800 such objects. My script falls off

PHP Fatal error: Allowed memory size of <over9000> bytes exhausted. 

For each package of 50 entities, on average, 190mb of memory is spent, with each new package the amount of used memory grew. When the restrictions on the use of RAM were completely disabled, I received the same error plus Segmentation Fault.

Those. anyway, it was necessary to figure out how to avoid overflowing the RAM in the script, and try to make it a little bit faster. First, we tried to figure out why the memory consumption increases from iteration to iteration. It turned out that the legs grow out of the features of the symphonic ServiceContainer and EventDispatcher. There in event all container is pushed, and then it becomes recursively. Frankly speaking, it was all too lazy to bypass us, and my colleague suggested a rather elegant solution.

The symfony2 component set has such a great thing as the symfony process component.
This vunderwafl allows you to spawn a subprocess during the execution of the script and run it in CLI mode (like a regular console command).

At first we just tried to “bud off” one process at a time to limit the use of RAM. But then they read at the docks that this thing can work asynchronously.

It was decided to try it in the case. The result was something like this (Below is an example from the Example repository on GitHub. The logic of the subprocesses themselves is very simple, but weighted):

Maincommand
 <?php namespace Example\Command; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Process\Process; class MainCommand extends Command { protected function configure() { $this->setName('example:main') ->setDescription('Run example command with optional number of CPUs') ->addArgument('CPUs', null, 'number of working CPUs', 2); } protected function execute(InputInterface $input, OutputInterface $output) { $channels = []; $maxChannels = $input->getArgument('CPUs'); $exampleArray = $this->getExampleArray(); $output->writeln('<fg=green>Start example process</>'); while (count($exampleArray) > 0 || count($channels) > 0) { foreach ($channels as $key => $channel) { if ($channel instanceof Process && $channel->isTerminated()) { unset($channels[$key]); } } if (count($channels) >= $maxChannels) { continue; } if (!$item = array_pop($exampleArray)) { continue; } $process = new Process(sprintf('php index.php example:sub-process %s', $item), __DIR__ . '/../../../'); $process->start(); if (!$process->isStarted()) { throw new \Exception($process->getErrorOutput()); } $channels[] = $process; } $output->writeln('<bg=green;fg=black>Done.</>'); } /** * @return array */ private function getExampleArray() { $array = []; for ($i = 0; $i < 30; $i++) { $name = 'No' . $i; $x1 = rand(1, 10); $y1 = rand(1, 10); $x2 = rand(1, 10); $y2 = rand(1, 10); $array[] = $name . '.' . $x1 . '.' . $y1 . '.' . $x2 . '.' . $y2; } return $array; } } 


SubProcessCommand
 <?php namespace Example\Command; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class SubProcessCommand extends Command { protected function configure() { $this->setName('example:sub-process') ->setDescription('Run example sub-process command') ->addArgument('item'); } protected function execute(InputInterface $input, OutputInterface $output) { $items = explode('.', $input->getArgument('item')); $pointName = $items[0]; $x1 = $items[1]; $y1 = $items[2]; $x2 = $items[3]; $y2 = $items[4]; // Used for mocking heavy execution. $sum = 0; for ($i = 1; $i <= 30000000; $i++){ $sum += $i; } $distance = bcsqrt(pow(($x2 - $x1),2) + pow(($y2 - $y1),2)); $data = sprintf('Point %s: %s', $pointName, (string)$distance); file_put_contents(__DIR__.'/../../../output/Point'.$pointName , print_r($data, 1), FILE_APPEND); } } 


index.php
 <?php require __DIR__ . '/vendor/autoload.php'; use Symfony\Component\Console\Application; $application = new Application(); $application->add(new \Example\Command\MainCommand()); $application->add(new \Example\Command\SubProcessCommand()); $application->run(); 



As a result, we have approximately the following picture:


Frankly, I was very impressed with such features.
I hope some of this article will help. As an additional material I will leave here a link to the repository, where the example given above was implemented.

Repository

Thanks for attention. I will be glad to feedback and comments.

UPD
Many thanks to AlmazDelDiablo and skvot for the reminder.
This solution will work only if the project does not prohibit the proc_open() function on which the symfony process component is based.

Updated htop screenshot. Now there is data on the processes. Thank you hell0w0rd

Source: https://habr.com/ru/post/266615/


All Articles