📜 ⬆️ ⬇️

Coroutines in PHP and working with non-blocking functions

One of the biggest innovations in PHP 5.5 will be support for generators and corutin (coroutines). Generators have already been sufficiently covered in the documentation and in several other posts (for example, in this or this ). Coroutines received very little attention. It is much more powerful, but also more difficult to understand and explain, a tool.

In this article, I will show you how to implement a task scheduler using corutin so that you understand what you can do with them and how to use them. Let's start with a few introductory words. If you think that you already know quite well how generators and corutines work, then you can immediately go to the section “Collaborative multitasking”.

Generators


The essence of the generator is that it is a function that returns not just one value, but a sequence of values , where each value is thrown one after the other. Or, in other words, generators allow you to implement an iterator, without unnecessary code.

A very simple example is the xrange() function:
 function xrange($start, $end, $step = 1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; } } foreach (xrange(1, 1000000) as $num) { echo $num, "\n"; } 

')
This function does the same as the built-in PHP range() . The only difference is that range() would give you an array of a million numbers, and xrange() will return an iterator that will throw out these numbers, but will never create an array with all of them.

The advantages of this approach should be obvious. This way you can work with very large data sets without loading them all into memory. You can even work with endless data streams.

All this can also be done without generators by creating a class implementing interface Iterator . Generators simply make this (much) easier to do, because you don't need to implement five different methods for each iterator.

Generators as interruptible functions


To move from generators to corintes, it is important to understand how they are built from the inside: generators are interruptible functions, in which the keyword yield is used for interruptions.

Returning to the previous example, when you call xrange(1, 1000000) , nothing in the body of the xrange function will actually be called. Instead, PHP will simply return a Generator class object that implements the Iterator interface:
 <?php $range = xrange(1, 1000000); var_dump($range); // object(Generator)#1 var_dump($range instanceof Iterator); // bool(true) 


The code will be run only when you run certain iterator methods. For example, if you call $range->rewind() the xrange() function code will be executed before the first yield . In this case, this means that $i = $start will be executed first and then yield $i . Whatever we pass to yield , this can be obtained by calling $range->current() .

To continue executing the code, you must call $range->next() . This again makes the generator execute the code before the next yield . Thus, using successive calls to next() and current() , you can get all the values ​​from the generator until it reaches the point where the code just ends. In the case of xrange() this will happen when $i reaches $end . In this case, the execution thread reaches the end of the function, leaving no more code. After this happens, valid() will return false and the iteration will stop.

Coroutines


The main thing is that the corutines add to the above functionality - it is the ability to send values ​​to the generator. This makes the familiar monologue of the generator a full-fledged dialogue, where the caller may also say something to the generator.

Values ​​are passed to corortine by calling the send() method instead of next() . An example of how this works is this Korutin:
 function logger($fileName) { $fileHandle = fopen($fileName, 'a'); while (true) { fwrite($fileHandle, yield . "\n"); } } $logger = logger(__DIR__ . '/log'); $logger->send('Foo'); $logger->send('Bar'); 


As you can see, here, the yield not as a statement (like return or echo ), but as an expression, that is, it returns some value. It will return what was sent via send() . In this example, yield first returns "Foo" , and then "Bar" .

In this example, it was introduced how yield can act as a simple recipient. But you can also combine both types of usage, so you can both send and receive values. Here is an example of how this works:
 function gen() { $ret = (yield 'yield1'); var_dump($ret); $ret = (yield 'yield2'); var_dump($ret); } $gen = gen(); var_dump($gen->current()); // string(6) "yield1" var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen) // string(6) "yield2" (the var_dump of the ->send() return value) var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen) // NULL (the return value of ->send()) 


The exact order of the output may seem a bit difficult to understand at first glance, so re-read the code and try to run it yourself to figure out why everything happens in that order. There are two things that I would like to draw your attention to: first, the use of parentheses around yield is not an accident. We need these brackets for technical reasons (I was even thinking of adding a throw-out exception to direct assignment). Secondly, you should have noticed that current() was used without calling rewind() . rewind() , in fact, is then invoked implicitly.

Joint multitasking


If, reading the example of the function logger() , you thought, “Why would I use corutin for this? Why not make an ordinary class for this? ”, Then you were absolutely right. That example only demonstrates how to use it, but there is no reason for it to use the korutina. As mentioned above, in the introduction, coroutines are a very powerful thing, but their use is very rare and often very complicated, which makes the task of inventing simple and non-contrived examples rather complicated.

I decided to show you the implementation of collaborative multitasking using korutiny. The bottom line is that we have several tasks that need to be run in parallel. But the processor (approx. Lane spherical and in vacuum) can perform only one task at a time. Thus, the processor needs to switch between different tasks and give each one a "little work".

“Joint” is this multitasking because it implies a voluntary transfer of control of execution to the scheduler so that it can start another task. There is also preemptive multitasking, where the planner itself can interrupt the task. Joint multitasking was used in earlier versions of Windows (before Win95) and Mac OS, but then they switched to preemptive. The reason is obvious - if you rely on any program to voluntarily give up control flow, then any program can simply occupy the entire CPU.

Now you should already see the connection between the corortists and the task scheduler: yield allows you to interrupt the task yourself, in order to give the control flow to the scheduler and it could start another task. In addition, yield can be used to communicate the task with the scheduler.

In our case, the task will be a thin wrapper around the function generator:
 class Task { protected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); } } 


Each task will have its own identifier ( taskId ). Using the setSendValue() method, you can specify which value to send to the task on the next run (you will find out what you need to do later). The run() method is really just calling the send() method of cortutina.
To understand why we need an additional flag beforeFirstYield , look at the following code:
 function gen() { yield 'foo'; yield 'bar'; } $gen = gen(); var_dump($gen->send('something')); //   send(),   yield    rewind() //       : $gen->rewind(); var_dump($gen->send('something')); // rewind()    yield (   ), send() //    yield ( var_dump-  ). //       


With the help of beforeFirstYield we will know if the first yield has already been returned.

The scheduler will now have to do a little more than just go through all the tasks:
 class Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } } 


The newTask() method creates a new task and puts it in the taskMap . Moreover, it adds it to the taskQueue . The run() method then goes through this queue and runs tasks. If the task is completed, it is deleted, otherwise added to the end of the queue.

Let's try a scheduler with two simple (and rather meaningless) tasks:
 function task1() { for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i.\n"; yield; } } function task2() { for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i.\n"; yield; } } $scheduler = new Scheduler; $scheduler->newTask(task1()); $scheduler->newTask(task2()); $scheduler->run(); 


Both tasks simply output the message, and give the control flow back to the scheduler. Here is what will be displayed:
  This is task 1 iteration 1.
 This is task 2 iteration 1.
 This is task 1 iteration 2.
 This is task 2 iteration 2.
 This is task 1 iteration 3.
 This is task 2 iteration 3.
 This is task 1 iteration 4.
 This is task 2 iteration 4.
 This is task 1 iteration 5.
 This is task 2 iteration 5.
 This is task 1 iteration 6.
 This is task 1 iteration 7.
 This is task 1 iteration 8.
 This is task 1 iteration 9.
 This is task 1 iteration 10. 


Scheduler Interaction


So, our planner works and we can move on to the next item on the agenda: the interaction between tasks and the planner. We will use the same method that processes use to communicate with the operating system: system calls (syscall, syscol). We need syscola, because the operating system is at a different level of privilege than the processes themselves. Therefore, in order to do some actions that require greater privileges (for example, to kill other processes) there must be a way to give control to the kernel so that it can perform those actions. From the inside, this is implemented with interrupt instructions.

Our scheduler will adhere to this architecture: instead of passing the scheduler to a task (which can do anything with it), we will interact using syscols passed through yield . In this case, yield will act both as a breaker and as a way to transfer information (and get) to the scheduler.

To introduce siskol, I will use a small callable :
 class SystemCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; // Can't call it directly in PHP :/ return $callback($task, $scheduler); } } 


She will behave like any other callable , but will take a task and a scheduler into the arguments. To work with this, we need to slightly modify the run() method of the scheduler:
 public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } 


Our first syscol just returns the task id:
 function getTaskId() { return new SystemCall(function(Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId()); $scheduler->schedule($task); }); } 


This is because we set the value to send and set the task back to the scheduler. For syscols, the scheduler does not add tasks to the queue automatically, we need to do this manually (you will find out why later).
Using this new siscol, we can rewrite the previous example:
 function task($max) { $tid = (yield getTaskId()); // <-- here's the syscall! for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i.\n"; yield; } } $scheduler = new Scheduler; $scheduler->newTask(task(10)); $scheduler->newTask(task(5)); $scheduler->run(); 


This code will give us the same output as the previous example. Another pair of syscols to create new and kill tasks:
 function newTask(Generator $coroutine) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } ); } function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { $task->setSendValue($scheduler->killTask($tid)); $scheduler->schedule($task); } ); } 


For the killTask function killTask we need an additional method in the scheduling:
 public function killTask($tid) { if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); //        , //      ,       foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true; } 


A small script to test this functionality:
 function childTask() { $tid = (yield getTaskId()); while (true) { echo "Child task $tid still alive!\n"; yield; } } function task() { $tid = (yield getTaskId()); $childTid = (yield newTask(childTask())); for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i.\n"; yield; if ($i == 3) yield killTask($childTid); } } $scheduler = new Scheduler; $scheduler->newTask(task()); $scheduler->run(); 


The output will be as follows:
  Parent task 1 iteration 1.
 Child task 2 still alive!
 Parent task 1 iteration 2.
 Child task 2 still alive!
 Parent task 1 iteration 3.
 Child task 2 still alive!
 Parent task 1 iteration 4.
 Parent task 1 iteration 5.
 Parent task 1 iteration 6. 


The child task is killed after three iterations, then the messages “Child task still alive!” Are completed. It is worth noting that this is not a real parent and child, since the child task can continue to be executed even when the parent has already completed. Or a child may even kill the parent. We could modify the scheduler to get a normal relationship between parent and child, but not in this article.

There are still quite a lot of different types of calls that can be implemented, such as wait (wait until the task completes completely), exec (which sets what task you need to perform now) and fork (create a clone of the task). The cloning is a pretty cool feature and you can implement it with the corutines, since they support cloning.

Non-blocking interaction


The really cool application of our scheduler is obviously a web server. There may be one task listening to the socket for new connections and each time a new connection is made, a new task will be created to handle this connection.

The difficulty lies in the fact that operations on sockets, such as reading data in PHP blocking, that is, PHP will wait until the client completes sending data. For a web server, this is obviously not good at all: it means that it can only process one request at a time.

As a solution, we need to ask the socket if it is “ready” before reading or writing data to it. To find out which sockets are ready to send or receive data, we will use the stream_select() function.

To begin, let's add a couple of new syscols that will send a specific socket to wait for either read or write:
 function waitForRead($socket) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForRead($socket, $task); } ); } function waitForWrite($socket) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForWrite($socket, $task); } ); } 


These syscol are just proxies for the corresponding scheduler methods:
 <?php // resourceID => [socket, tasks] protected $waitingForRead = []; protected $waitingForWrite = []; public function waitForRead($socket, Task $task) { if (isset($this->waitingForRead[(int) $socket])) { $this->waitingForRead[(int) $socket][1][] = $task; } else { $this->waitingForRead[(int) $socket] = [$socket, [$task]]; } } public function waitForWrite($socket, Task $task) { if (isset($this->waitingForWrite[(int) $socket])) { $this->waitingForWrite[(int) $socket][1][] = $task; } else { $this->waitingForWrite[(int) $socket] = [$socket, [$task]]; } } 


The waitingForRead and waitingForWrite simply arrays containing waiting sockets and related tasks. The most interesting part is this method, which checks whether the sockets are ready and reschedules their tasks:
 protected function ioPoll($timeout) { $rSocks = []; foreach ($this->waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = []; foreach ($this->waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = []; // dummy if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list(, $tasks) = $this->waitingForRead[(int) $socket]; unset($this->waitingForRead[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } } foreach ($wSocks as $socket) { list(, $tasks) = $this->waitingForWrite[(int) $socket]; unset($this->waitingForWrite[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } } } 


The stream_select function takes as input arrays of sockets waiting to be read, write, and exception (ignore the latter). Arrays are passed by reference, and this function will leave in them only those elements whose state has changed. After that we can go through all these arrays and reschedule all the tasks associated with these sockets.

To perform all these actions, we will add the following method to the scheduler:
 protected function ioPollTask() { while (true) { if ($this->taskQueue->isEmpty()) { $this->ioPoll(null); } else { $this->ioPoll(0); } yield; } } 


This task must be registered at some point, for example, you can add $this->newTask($this->ioPollTask()) to the beginning of the run() method. Then it will work just like any other task, checking the available ready-made sockets for each switch between tasks. The ioPollTask method will call ioPoll with a zero timeout, which means stream_select will return the result immediately, without waiting.

Only if the task queue is empty, we use null as a timeout, in which case stream_select will wait until one of the sockets is ready. If we did not do this, we would have eaten the entire CPU (at least the kernel), since this task would be performed in a loop, time after time, until someone would connect.

The server itself looks pretty simple:
 function server($port) { echo "Starting server at port $port...\n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); while (true) { yield waitForRead($socket); $clientSocket = stream_socket_accept($socket, 0); yield newTask(handleClient($clientSocket)); } } function handleClient($socket) { yield waitForRead($socket); $data = fread($socket, 8192); $msg = "Received following request:\n\n$data"; $msgLength = strlen($msg); $response = <<<RES HTTP/1.1 200 OK\r Content-Type: text/plain\r Content-Length: $msgLength\r Connection: close\r \r $msg RES; yield waitForWrite($socket); fwrite($socket, $response); fclose($socket); } $scheduler = new Scheduler; $scheduler->newTask(server(8000)); $scheduler->run(); 


It will accept connections to port 8000 and simply send in response the contents of the request itself. It would be much more difficult to do something “real” (correct processing of HTTP requests is not the topic of this article).

Now you can try this server with something like ab -n 10000 -c 100 localhost:8000/ . So we will send 10,000 requests, 100 of which will be sent simultaneously. Using this benchmark, I received an average response in 10 milliseconds. But there was a problem with some requests that were processed for a very long time (5 minutes in a district time), so the total throughput is only 2000 requests per second. With higher concurrency (for example -c 500 ), the script also works quite well, but some connections throw an error “Connections reset by peer”.

Corollaries delivered


If you start making a big system using our scheduler, you will soon come across one problem: we often divided the code, putting some pieces into separate functions and calling them. But with coroutines this is impossible. Imagine this code:
 function echoTimes($msg, $max) { for ($i = 1; $i <= $max; ++$i) { echo "$msg iteration $i\n"; yield; } } function task() { echoTimes('foo', 10); // print foo ten times echo "---\n"; echoTimes('bar', 5); // print bar five times yield; // force it to be a coroutine } $scheduler = new Scheduler; $scheduler->newTask(task()); $scheduler->run(); 


In this code, we tried to extract the coroutine from task() and execute it. But it will not work. , , Generator . , echoTimes() , .

, . -:
 $retval = (yield someCoroutine($foo, $bar)); 

- yield :
 yield retval("I'm return value!"); 

retval() , , , :
 class CoroutineReturnValue { protected $value; public function __construct($value) { $this->value = $value; } public function getValue() { return $this->value; } } function retval($value) { return new CoroutineReturnValue($value); } 


( -) ( — ):
 function stackedCoroutine(Generator $gen) { $stack = new SplStack; for (;;) { $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } $gen->send(yield $gen->key() => $value); } } 


-. , , , . CoroutineReturnValue , .

, $this->coroutine = $coroutine; Task , $this->coroutine = stackedCoroutine($coroutine); .

- , , . :
 class CoSocket { protected $socket; public function __construct($socket) { $this->socket = $socket; } public function accept() { yield waitForRead($this->socket); yield retval(new CoSocket(stream_socket_accept($this->socket, 0))); } public function read($size) { yield waitForRead($this->socket); yield retval(fread($this->socket, $size)); } public function write($string) { yield waitForWrite($this->socket); fwrite($this->socket, $string); } public function close() { @fclose($this->socket); } } 


:
 function server($port) { echo "Starting server at port $port...\n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); $socket = new CoSocket($socket); while (true) { yield newTask( handleClient(yield $socket->accept()) ); } } function handleClient($socket) { $data = (yield $socket->read(8192)); $msg = "Received following request:\n\n$data"; $msgLength = strlen($msg); $response = <<<RES HTTP/1.1 200 OK\r Content-Type: text/plain\r Content-Length: $msgLength\r Connection: close\r \r $msg RES; yield $socket->write($response); yield $socket->close(); } 



, , . . , ( ) .

, , : throw() .

throw() , yield ( current() ):
 function gen() { echo "Foo\n"; try { yield; } catch (Exception $e) { echo "Exception: {$e->getMessage()}\n"; } echo "Bar\n"; } $gen = gen(); $gen->rewind(); // echos "Foo" $gen->throw(new Exception('Test')); // echos "Exception: Test" // and "Bar" 


This is a very cool thing, because we can, for example, make it possible to throw exceptions through syscol and sub-coroutines. For syscols, the method Scheduler::run()needs a little change:
 if ($retval instanceof SystemCall) { try { $retval($task, $this); } catch (Exception $e) { $task->setException($e); $this->schedule($task); } continue; } 


And the class Taskmust handle calls throw():
 class Task { // ... protected $exception = null; public function setException($exception) { $this->exception = $exception; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } elseif ($this->exception) { $retval = $this->coroutine->throw($this->exception); $this->exception = null; return $retval; } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } // ... } 


Now we can start throwing exceptions from syscols! For example, for example killTask, let's throw an exception if the passed task ID is invalid:
 function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { if ($scheduler->killTask($tid)) { $scheduler->schedule($task); } else { throw new InvalidArgumentException('Invalid task ID!'); } } ); } 


Now try:
 function task() { try { yield killTask(500); } catch (Exception $e) { echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n"; } } 


So far this will not work, because The function stackedCoroutinedoes not handle exceptions. To fix this, modify it a bit:
 function stackedCoroutine(Generator $gen) { $stack = new SplStack; $exception = null; for (;;) { try { if ($exception) { $gen->throw($exception); $exception = null; continue; } $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } try { $sendValue = (yield $gen->key() => $value); } catch (Exception $e) { $gen->throw($e); continue; } $gen->send($sendValue); } catch (Exception $e) { if ($stack->isEmpty()) { throw $e; } $gen = $stack->pop(); $exception = $e; } } } 


Summarizing


, , , . , , , , . , . yield $socket->read() .

, , , PHP. . , , . , , , - .

, :)

: , , .. . — .

Source: https://habr.com/ru/post/164173/


All Articles