Coro::async { ... } @args;
my $coro = new Coro(\&code_ref, @args); $coro->ready();
Coro::async
function becomes a stream, while the arguments specified immediately after the block become available inside the block as function arguments (via @_
). In the second case, you create a stream using the function reference and arguments for that function. The second construct returns a reference to the created stream, for which the ready()
method is then called. This is the main difference between the second and the first constructions - the created thread will be inactive until it is placed in the ready-turn (more on this later).schedule()
; in addition, this method extracts the next coro stream from the ready queue and starts it to run. Accordingly, in order for the coro flow calling schedule()
to get processor time again in the future, it must first place itself at the end of the ready queue using the ready()
method (or any other thread must do it for it) ). The interrupted thread remains blocked (does not receive CPU time) until it is placed at the end of the ready queue; if this does not happen by the time other active threads have completed their work, Coro will detect this and abort the program. Since the ready()
and schedule()
calls are used quite often together, the Coro module provides for convenience the call to cede()
, which is analogous to the following pair of lines: $Coro::current->ready(); Coro::schedule;
#!/usr/bin/perl $| = 1; use strict; use warnings; use Coro; # coro- Coro::async Coro::async { my $thread_id = shift; # coro- $Coro::current->desc("Thread #$thread_id"); for (my $i = 0; $i < 1_000_000; $i++) { if ($i % 1000 == 0) { print "$Coro::current->{desc} - Processed: $i items\n"; # coro- ready- $Coro::current->ready(); # ready- Coro::schedule(); } } } 0; # coro- sub my_thread { my $thread_id = shift; $Coro::current->desc("Thread #$thread_id"); for (my $i = 0; $i < 1_000_000; $i++) { if ($i % 1000 == 0) { print "$Coro::current->{desc} - Processed: $i items\n"; # coro- Coro::cede(); } } } my @threads = (); for (my $thread_id = 1; $thread_id < 5; $thread_id++) { # coro- Coro::new() my $thread = new Coro(\&my_thread, $thread_id); # coro- ready- $thread->ready(); push @threads, $thread; } while (my $thread = shift @threads) { # coro- , coro- $thread->join(); }
Thread # 0 - Processed: 0 items Thread # 1 - Processed: 0 items Thread # 2 - Processed: 0 items Thread # 3 - Processed: 0 items Thread # 4 - Processed: 0 items Thread # 0 - Processed: 1000 items Thread # 1 - Processed: 1000 items Thread # 2 - Processed: 1000 items Thread # 3 - Processed: 1000 items Thread # 4 - Processed: 1000 items ... Thread # 0 - Processed: 999000 items Thread # 1 - Processed: 999000 items Thread # 2 - Processed: 999000 items Thread # 3 - Processed: 999000 items Thread # 4 - Processed: 999000 items
cede()
). The program continues to run until the main coro stream completes, and the main coro stream is busy waiting for the end of 4 of the 5 created coro streams (the call to the join()
method blocks that coro stream from which the call is made until until the coro flow for which this method was called is completed). Coro::async { EV::run() };
rouse_cb()
and rouse_wait()
:rouse_cb()
generates and returns a callback that, when called, saves the arguments passed to it and notifies Coro internals of the fact of the call.rouse_wait()
blocks the current coro stream until the last rouse_cb()
created by the rouse_cb()
function is called (you can also specify the call of which callback to wait for as an argument); the function returns what was passed to the callback as arguments # 1. rouse_cb() rouse_wait() my $timer = EV::timer(5, 5, sub { my ($watcher, $revents) = @_; print "Timer $wathcer: timeout\n"; }); #2. rouse_cb() rouse_wait() my $timer = EV::timer(5, 5, rouse_cb()); my ($watcher, $revents) = rouse_wait(); print "Timer $wathcer: timeout\n";
rouse_cb()
does not convey all the power of rouse_cb()
and rouse_wait()
, but its understanding comes as we work on real projects. However, for myself I discovered the main minus of the built-in rouse-callbacks - if you save the callback returned by the rouse_cb()
function and try to reuse it (which is logical for cyclic operations, because why create a new object at each iteration the same job?), nothing will come of it. Being called at least once, the callback retains its state and all subsequent rouse_wait()
calls for this callback immediately return the previously saved arguments.rouse_wait()
method of callback is used instead of the rouse_wait()
wait()
function: my $cb = new My::RouseCallback; my $timer = EV::timer(5, 5, $cb); my ($watcher, $revents) = $cb->wait(); print "Timer $wathcer: timeout\n";
package My::RouseCallback; use strict; use warnings; use Coro; # "" My::RouseCallback my %STORAGE = (); # : my $cb = new My::RouseCallback; sub new { my ($class) = @_; my $context = {args => [], done => 0, coro => undef}; my $self = bless sub { # $context->{args} = \@_; # , $context->{done} = 1; if ($context->{coro}) { # coro- $context->{coro}->ready(); } }, $class; $STORAGE{"$self"} = $context; return $self; }; # : $cb->wait(); sub wait { my $self = shift; my $context = $STORAGE{"$self"}; # , coro- $context->{coro} = $Coro::current; # coro- , while ($context->{done} == 0) { Coro::schedule(); } # my @args = @{ $context->{args} }; $context->{args} = []; $context->{done} = 0; return @args; } sub DESTROY { my $self = shift; $self->(); delete $STORAGE{"$self"}; }; 1; __END__
Source: https://habr.com/ru/post/197130/
All Articles