xrange()
function: function xrange($start, $end, $step = 1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; } } foreach (xrange(1, 1000000) as $num) { echo $num, "\n"; }
range()
. The only difference is that range()
would give you an array of a million numbers, and xrange()
will return an iterator that will throw out these numbers, but will never create an array with all of them.Iterator
. Generators simply make this (much) easier to do, because you don't need to implement five different methods for each iterator.yield
is used for interruptions.xrange(1, 1000000)
, nothing in the body of the xrange
function will actually be called. Instead, PHP will simply return a Generator
class object that implements the Iterator
interface: <?php $range = xrange(1, 1000000); var_dump($range); // object(Generator)#1 var_dump($range instanceof Iterator); // bool(true)
$range->rewind()
the xrange()
function code will be executed before the first yield
. In this case, this means that $i = $start
will be executed first and then yield $i
. Whatever we pass to yield
, this can be obtained by calling $range->current()
.$range->next()
. This again makes the generator execute the code before the next yield
. Thus, using successive calls to next()
and current()
, you can get all the values ​​from the generator until it reaches the point where the code just ends. In the case of xrange()
this will happen when $i
reaches $end
. In this case, the execution thread reaches the end of the function, leaving no more code. After this happens, valid()
will return false
and the iteration will stop.send()
method instead of next()
. An example of how this works is this Korutin: function logger($fileName) { $fileHandle = fopen($fileName, 'a'); while (true) { fwrite($fileHandle, yield . "\n"); } } $logger = logger(__DIR__ . '/log'); $logger->send('Foo'); $logger->send('Bar');
yield
not as a statement (like return
or echo
), but as an expression, that is, it returns some value. It will return what was sent via send()
. In this example, yield
first returns "Foo"
, and then "Bar"
.yield
can act as a simple recipient. But you can also combine both types of usage, so you can both send and receive values. Here is an example of how this works: function gen() { $ret = (yield 'yield1'); var_dump($ret); $ret = (yield 'yield2'); var_dump($ret); } $gen = gen(); var_dump($gen->current()); // string(6) "yield1" var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen) // string(6) "yield2" (the var_dump of the ->send() return value) var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen) // NULL (the return value of ->send())
yield
is not an accident. We need these brackets for technical reasons (I was even thinking of adding a throw-out exception to direct assignment). Secondly, you should have noticed that current()
was used without calling rewind()
. rewind()
, in fact, is then invoked implicitly.logger()
, you thought, “Why would I use corutin for this? Why not make an ordinary class for this? ”, Then you were absolutely right. That example only demonstrates how to use it, but there is no reason for it to use the korutina. As mentioned above, in the introduction, coroutines are a very powerful thing, but their use is very rare and often very complicated, which makes the task of inventing simple and non-contrived examples rather complicated.yield
allows you to interrupt the task yourself, in order to give the control flow to the scheduler and it could start another task. In addition, yield
can be used to communicate the task with the scheduler. class Task { protected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); } }
taskId
). Using the setSendValue()
method, you can specify which value to send to the task on the next run (you will find out what you need to do later). The run()
method is really just calling the send()
method of cortutina.beforeFirstYield
, look at the following code: function gen() { yield 'foo'; yield 'bar'; } $gen = gen(); var_dump($gen->send('something')); // send(), yield rewind() // : $gen->rewind(); var_dump($gen->send('something')); // rewind() yield ( ), send() // yield ( var_dump- ). //
beforeFirstYield
we will know if the first yield has already been returned. class Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } }
newTask()
method creates a new task and puts it in the taskMap
. Moreover, it adds it to the taskQueue
. The run()
method then goes through this queue and runs tasks. If the task is completed, it is deleted, otherwise added to the end of the queue. function task1() { for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i.\n"; yield; } } function task2() { for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i.\n"; yield; } } $scheduler = new Scheduler; $scheduler->newTask(task1()); $scheduler->newTask(task2()); $scheduler->run();
This is task 1 iteration 1. This is task 2 iteration 1. This is task 1 iteration 2. This is task 2 iteration 2. This is task 1 iteration 3. This is task 2 iteration 3. This is task 1 iteration 4. This is task 2 iteration 4. This is task 1 iteration 5. This is task 2 iteration 5. This is task 1 iteration 6. This is task 1 iteration 7. This is task 1 iteration 8. This is task 1 iteration 9. This is task 1 iteration 10.
yield
. In this case, yield
will act both as a breaker and as a way to transfer information (and get) to the scheduler.callable
: class SystemCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; // Can't call it directly in PHP :/ return $callback($task, $scheduler); } }
callable
, but will take a task and a scheduler into the arguments. To work with this, we need to slightly modify the run()
method of the scheduler: public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } }
function getTaskId() { return new SystemCall(function(Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId()); $scheduler->schedule($task); }); }
function task($max) { $tid = (yield getTaskId()); // <-- here's the syscall! for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i.\n"; yield; } } $scheduler = new Scheduler; $scheduler->newTask(task(10)); $scheduler->newTask(task(5)); $scheduler->run();
function newTask(Generator $coroutine) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } ); } function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { $task->setSendValue($scheduler->killTask($tid)); $scheduler->schedule($task); } ); }
killTask
function killTask
we need an additional method in the scheduling: public function killTask($tid) { if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); // , // , foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true; }
function childTask() { $tid = (yield getTaskId()); while (true) { echo "Child task $tid still alive!\n"; yield; } } function task() { $tid = (yield getTaskId()); $childTid = (yield newTask(childTask())); for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i.\n"; yield; if ($i == 3) yield killTask($childTid); } } $scheduler = new Scheduler; $scheduler->newTask(task()); $scheduler->run();
Parent task 1 iteration 1. Child task 2 still alive! Parent task 1 iteration 2. Child task 2 still alive! Parent task 1 iteration 3. Child task 2 still alive! Parent task 1 iteration 4. Parent task 1 iteration 5. Parent task 1 iteration 6.
wait
(wait until the task completes completely), exec
(which sets what task you need to perform now) and fork
(create a clone of the task). The cloning is a pretty cool feature and you can implement it with the corutines, since they support cloning.stream_select()
function. function waitForRead($socket) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForRead($socket, $task); } ); } function waitForWrite($socket) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForWrite($socket, $task); } ); }
<?php // resourceID => [socket, tasks] protected $waitingForRead = []; protected $waitingForWrite = []; public function waitForRead($socket, Task $task) { if (isset($this->waitingForRead[(int) $socket])) { $this->waitingForRead[(int) $socket][1][] = $task; } else { $this->waitingForRead[(int) $socket] = [$socket, [$task]]; } } public function waitForWrite($socket, Task $task) { if (isset($this->waitingForWrite[(int) $socket])) { $this->waitingForWrite[(int) $socket][1][] = $task; } else { $this->waitingForWrite[(int) $socket] = [$socket, [$task]]; } }
waitingForRead
and waitingForWrite
simply arrays containing waiting sockets and related tasks. The most interesting part is this method, which checks whether the sockets are ready and reschedules their tasks: protected function ioPoll($timeout) { $rSocks = []; foreach ($this->waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = []; foreach ($this->waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = []; // dummy if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list(, $tasks) = $this->waitingForRead[(int) $socket]; unset($this->waitingForRead[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } } foreach ($wSocks as $socket) { list(, $tasks) = $this->waitingForWrite[(int) $socket]; unset($this->waitingForWrite[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } } }
stream_select
function takes as input arrays of sockets waiting to be read, write, and exception (ignore the latter). Arrays are passed by reference, and this function will leave in them only those elements whose state has changed. After that we can go through all these arrays and reschedule all the tasks associated with these sockets. protected function ioPollTask() { while (true) { if ($this->taskQueue->isEmpty()) { $this->ioPoll(null); } else { $this->ioPoll(0); } yield; } }
$this->newTask($this->ioPollTask())
to the beginning of the run()
method. Then it will work just like any other task, checking the available ready-made sockets for each switch between tasks. The ioPollTask
method will call ioPoll
with a zero timeout, which means stream_select will return the result immediately, without waiting.null
as a timeout, in which case stream_select
will wait until one of the sockets is ready. If we did not do this, we would have eaten the entire CPU (at least the kernel), since this task would be performed in a loop, time after time, until someone would connect. function server($port) { echo "Starting server at port $port...\n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); while (true) { yield waitForRead($socket); $clientSocket = stream_socket_accept($socket, 0); yield newTask(handleClient($clientSocket)); } } function handleClient($socket) { yield waitForRead($socket); $data = fread($socket, 8192); $msg = "Received following request:\n\n$data"; $msgLength = strlen($msg); $response = <<<RES HTTP/1.1 200 OK\r Content-Type: text/plain\r Content-Length: $msgLength\r Connection: close\r \r $msg RES; yield waitForWrite($socket); fwrite($socket, $response); fclose($socket); } $scheduler = new Scheduler; $scheduler->newTask(server(8000)); $scheduler->run();
ab -n 10000 -c 100 localhost:8000/
. So we will send 10,000 requests, 100 of which will be sent simultaneously. Using this benchmark, I received an average response in 10 milliseconds. But there was a problem with some requests that were processed for a very long time (5 minutes in a district time), so the total throughput is only 2000 requests per second. With higher concurrency (for example -c 500
), the script also works quite well, but some connections throw an error “Connections reset by peer”. function echoTimes($msg, $max) { for ($i = 1; $i <= $max; ++$i) { echo "$msg iteration $i\n"; yield; } } function task() { echoTimes('foo', 10); // print foo ten times echo "---\n"; echoTimes('bar', 5); // print bar five times yield; // force it to be a coroutine } $scheduler = new Scheduler; $scheduler->newTask(task()); $scheduler->run();
task()
and execute it. But it will not work. , , Generator
. , echoTimes()
, . $retval = (yield someCoroutine($foo, $bar));
yield
: yield retval("I'm return value!");
retval()
, , , : class CoroutineReturnValue { protected $value; public function __construct($value) { $this->value = $value; } public function getValue() { return $this->value; } } function retval($value) { return new CoroutineReturnValue($value); }
function stackedCoroutine(Generator $gen) { $stack = new SplStack; for (;;) { $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } $gen->send(yield $gen->key() => $value); } }
CoroutineReturnValue
, .$this->coroutine = $coroutine;
Task
, $this->coroutine = stackedCoroutine($coroutine);
. class CoSocket { protected $socket; public function __construct($socket) { $this->socket = $socket; } public function accept() { yield waitForRead($this->socket); yield retval(new CoSocket(stream_socket_accept($this->socket, 0))); } public function read($size) { yield waitForRead($this->socket); yield retval(fread($this->socket, $size)); } public function write($string) { yield waitForWrite($this->socket); fwrite($this->socket, $string); } public function close() { @fclose($this->socket); } }
function server($port) { echo "Starting server at port $port...\n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); $socket = new CoSocket($socket); while (true) { yield newTask( handleClient(yield $socket->accept()) ); } } function handleClient($socket) { $data = (yield $socket->read(8192)); $msg = "Received following request:\n\n$data"; $msgLength = strlen($msg); $response = <<<RES HTTP/1.1 200 OK\r Content-Type: text/plain\r Content-Length: $msgLength\r Connection: close\r \r $msg RES; yield $socket->write($response); yield $socket->close(); }
throw()
.throw()
, yield ( current()
): function gen() { echo "Foo\n"; try { yield; } catch (Exception $e) { echo "Exception: {$e->getMessage()}\n"; } echo "Bar\n"; } $gen = gen(); $gen->rewind(); // echos "Foo" $gen->throw(new Exception('Test')); // echos "Exception: Test" // and "Bar"
Scheduler::run()
needs a little change: if ($retval instanceof SystemCall) { try { $retval($task, $this); } catch (Exception $e) { $task->setException($e); $this->schedule($task); } continue; }
Task
must handle calls throw()
: class Task { // ... protected $exception = null; public function setException($exception) { $this->exception = $exception; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } elseif ($this->exception) { $retval = $this->coroutine->throw($this->exception); $this->exception = null; return $retval; } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } // ... }
killTask
, let's throw an exception if the passed task ID is invalid: function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { if ($scheduler->killTask($tid)) { $scheduler->schedule($task); } else { throw new InvalidArgumentException('Invalid task ID!'); } } ); }
function task() { try { yield killTask(500); } catch (Exception $e) { echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n"; } }
stackedCoroutine
does not handle exceptions. To fix this, modify it a bit: function stackedCoroutine(Generator $gen) { $stack = new SplStack; $exception = null; for (;;) { try { if ($exception) { $gen->throw($exception); $exception = null; continue; } $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } try { $sendValue = (yield $gen->key() => $value); } catch (Exception $e) { $gen->throw($e); continue; } $gen->send($sendValue); } catch (Exception $e) { if ($stack->isEmpty()) { throw $e; } $gen = $stack->pop(); $exception = $e; } } }
yield $socket->read()
.Source: https://habr.com/ru/post/164173/
All Articles