📜 ⬆️ ⬇️

Coro and another implementation of rouse-callback

There is such a wonderful family of modules in CPAN - Coro . These modules allow you to program on pearl using corutin.

Small introduction

Imagine that at any moment anywhere in the program (for example, inside the function body or at the next iteration of the cycle) you can save the current state and temporarily "switch" to another point in the program. Having done some useful work in this “other” point, you come back, restore the saved state, and then all the work happens as if this “switch” was not at all. Well, of course, apart from the changes in general data that occurred at the new point. Having several “heavy” functions, each of which does not depend on the results of the work of the others, such “switching” can simulate their parallel execution. That is, from the outside it will look as if the functions are executed in parallel, but, in fact, at each moment of time only one of them is executed “slice”, and you define the size of this “slice”. In other words, it turns out that each function is executed in its thread, all threads use only one processor core (regardless of their number in the system), and in order for each thread to get its processor time, they all must share this time among themselves . Due to the lack of real parallelism, all changes in general data that have occurred in any stream become immediately available in all other streams, and since you set the switching points between the streams, the need for synchronization decreases sharply (basically, it is necessary for working with external resources ).
All this and more can be implemented in a pearl with the help of a family of modules called Coro. The main module of this family allows you to perform functions, or code blocks in separate streams (below I will call these streams coro-streams), and auxiliary modules add synchronization tools, message queues, integration with event loops, etc.

Creating coro streams

In order to create a coro stream, you must use one of the following constructs:

Coro::async { ... } @args; 

notice the absence of a comma between the block and its arguments, or
')
 my $coro = new Coro(\&code_ref, @args); $coro->ready(); 

In the first case, the block passed to the Coro::async function becomes a stream, while the arguments specified immediately after the block become available inside the block as function arguments (via @_ ). In the second case, you create a stream using the function reference and arguments for that function. The second construct returns a reference to the created stream, for which the ready() method is then called. This is the main difference between the second and the first constructions - the created thread will be inactive until it is placed in the ready-turn (more on this later).
In both cases, the thread "lives" as long as the corresponding function or block of code is executed. By the way, the program itself is also running in a separate coro stream - the main thing.

Switch between coro streams

Unlike system streams, switching between which is carried out somewhere in the depths of the operating system, it is necessary to switch between coro streams manually. The most obvious switching points (you can think of more or less obvious):

In the second case, the processor is still not used until the data comes over the network, or is read from the disk (as well as transmitted over the network, or written to disk).
How to transfer control using Coro? In order to save the current state and interrupt the execution of the current coro stream, you must use the static method schedule() ; in addition, this method extracts the next coro stream from the ready queue and starts it to run. Accordingly, in order for the coro flow calling schedule() to get processor time again in the future, it must first place itself at the end of the ready queue using the ready() method (or any other thread must do it for it) ). The interrupted thread remains blocked (does not receive CPU time) until it is placed at the end of the ready queue; if this does not happen by the time other active threads have completed their work, Coro will detect this and abort the program. Since the ready() and schedule() calls are used quite often together, the Coro module provides for convenience the call to cede() , which is analogous to the following pair of lines:

 $Coro::current->ready(); Coro::schedule; 

Consider an example
 #!/usr/bin/perl $| = 1; use strict; use warnings; use Coro; #  coro-   Coro::async Coro::async { my $thread_id = shift; #    coro- $Coro::current->desc("Thread #$thread_id"); for (my $i = 0; $i < 1_000_000; $i++) { if ($i % 1000 == 0) { print "$Coro::current->{desc} - Processed: $i items\n"; #   coro-   ready- $Coro::current->ready(); #      ready- Coro::schedule(); } } } 0; #       coro- sub my_thread { my $thread_id = shift; $Coro::current->desc("Thread #$thread_id"); for (my $i = 0; $i < 1_000_000; $i++) { if ($i % 1000 == 0) { print "$Coro::current->{desc} - Processed: $i items\n"; #     coro- Coro::cede(); } } } my @threads = (); for (my $thread_id = 1; $thread_id < 5; $thread_id++) { #   coro-   Coro::new() my $thread = new Coro(\&my_thread, $thread_id); #   coro-   ready- $thread->ready(); push @threads, $thread; } while (my $thread = shift @threads) { #   coro-   ,   coro-   $thread->join(); } 

Result:
 Thread # 0 - Processed: 0 items
 Thread # 1 - Processed: 0 items
 Thread # 2 - Processed: 0 items
 Thread # 3 - Processed: 0 items
 Thread # 4 - Processed: 0 items
 Thread # 0 - Processed: 1000 items
 Thread # 1 - Processed: 1000 items
 Thread # 2 - Processed: 1000 items
 Thread # 3 - Processed: 1000 items
 Thread # 4 - Processed: 1000 items
 ...
 Thread # 0 - Processed: 999000 items
 Thread # 1 - Processed: 999000 items
 Thread # 2 - Processed: 999000 items
 Thread # 3 - Processed: 999000 items
 Thread # 4 - Processed: 999000 items


In the example, coro threads are created in different ways and in different ways pass each other processor time. All coro-threads perform the same work - every 1000 iterations, report on the progress of work and interrupt their execution, giving the opportunity to work the rest of the coro-streams, first placing themselves at the end of the ready-queue (explicitly, or using cede() ). The program continues to run until the main coro stream completes, and the main coro stream is busy waiting for the end of 4 of the 5 created coro streams (the call to the join() method blocks that coro stream from which the call is made until until the coro flow for which this method was called is completed).

Integration with event loops

The above example demonstrates how coro-threads share CPU time, taking a break from a long-running job. As noted above, a good reason to share processor time is also to perform blocking operations (usually I / O operations).
When we face the problem of effective work with many blocking operations, we usually solve this problem with the help of event loops. For example, we translate sockets into a non-blocking mode and “hang” watchers on them that monitor the readiness of a socket to write or read and create a timer to interrupt timeout operations. As events of interest occur, from the depths of the event cycle, callbacks are invoked that are associated with the corresponding “vsecher”. As the project becomes more complex, it is clear what kind of callback, when and why it is called, it becomes more and more difficult. With the use of Coro, the situation is noticeably improved and the program code becomes more linear and clear (purely my opinion).
First of all, it should be noted that in the Coro family of modules there are three modules for integrating coro flows into event loops - these are Coro :: AnyEvent , Coro :: Event and Coro :: EV (the code pieces below will be for Coro :: EV). In order to integrate the event loop into your program, you need to run the loop itself in any coro stream (for example, in the main one):

 Coro::async { EV::run() }; 

For ease of event handling, the Coro module provides two useful functions — rouse_cb() and rouse_wait() :

Thus, the following code snippets are equivalent:

 # 1.   rouse_cb()  rouse_wait() my $timer = EV::timer(5, 5, sub { my ($watcher, $revents) = @_; print "Timer $wathcer: timeout\n"; }); #2.   rouse_cb()  rouse_wait() my $timer = EV::timer(5, 5, rouse_cb()); my ($watcher, $revents) = rouse_wait(); print "Timer $wathcer: timeout\n"; 


Another implementation of rouse callbacks

The above code rouse_cb() does not convey all the power of rouse_cb() and rouse_wait() , but its understanding comes as we work on real projects. However, for myself I discovered the main minus of the built-in rouse-callbacks - if you save the callback returned by the rouse_cb() function and try to reuse it (which is logical for cyclic operations, because why create a new object at each iteration the same job?), nothing will come of it. Being called at least once, the callback retains its state and all subsequent rouse_wait() calls for this callback immediately return the previously saved arguments.
Therefore, I decided to write my own implementation of the rouse-callback. In this implementation, the callback is an object, and the rouse_wait() method of callback is used instead of the rouse_wait() wait() function:

 my $cb = new My::RouseCallback; my $timer = EV::timer(5, 5, $cb); my ($watcher, $revents) = $cb->wait(); print "Timer $wathcer: timeout\n"; 

Implementing My :: RouseCallback
 package My::RouseCallback; use strict; use warnings; use Coro; # ""      My::RouseCallback my %STORAGE = (); #  : my $cb = new My::RouseCallback; sub new { my ($class) = @_; my $context = {args => [], done => 0, coro => undef}; my $self = bless sub { #     $context->{args} = \@_; #   ,     $context->{done} = 1; if ($context->{coro}) { #   coro- $context->{coro}->ready(); } }, $class; $STORAGE{"$self"} = $context; return $self; }; #   : $cb->wait(); sub wait { my $self = shift; my $context = $STORAGE{"$self"}; #   ,  coro-   $context->{coro} = $Coro::current; #   coro-   ,      while ($context->{done} == 0) { Coro::schedule(); } #        my @args = @{ $context->{args} }; $context->{args} = []; $context->{done} = 0; return @args; } sub DESTROY { my $self = shift; $self->(); delete $STORAGE{"$self"}; }; 1; __END__ 



If you see the possibility of using Coro in your task, be sure to try it, you might like it. Study the documentation, share the knowledge gained in practice.

Ps. If you use modules from the EV and Coro families together, be careful. Both the first and second export the async () function by default. Therefore, when creating coro streams, it is always better to explicitly specify Coro :: async.

Source: https://habr.com/ru/post/197130/


All Articles