In short:
- Proof is already implemented in C ++ , JS and PHP , suitable for Java .
- Faster than Coroutine and Promise, more features.
- Does not require the allocation of a separate software stack.
- Is friends with all the security and debugging tools.
- It works on any architecture and does not require special compiler flags.
At the dawn of the computer, there was a single control flow with I / O blocking. Then interruptions of iron were added to it. Now you can effectively use slow and unpredictable devices.
With the increasing capabilities of iron and its low availability, there is a need to perform several tasks at the same time, which provided hardware support. This is how isolated processes with abstracted from iron interruptions in the form of signals appeared.
The next evolutionary stage was multithreading, which was implemented on the foundation of the same processes, but with shared access to memory and other resources. This approach has its limitations and significant overhead for switching to a secure OS.
To communicate between processes and even different machines, a Promise / Future abstraction was proposed 40+ years ago.
User interfaces and the now ridiculous problem of 10K clients led to the heyday of Event Loop, Reactor and Proactor approaches, which are more event-oriented than clear, consistent business logic.
Finally, we arrived at a modern coroutine (coroutine), which is essentially an emulation of threads on top of the abstractions described above with the corresponding technical limitations and deterministic transfer of control.
For the transfer of events, results and exceptions all returned to the same concept of Promise / Future. Some offices have decided to call a little differently - "Task".
Ultimately, everything was hidden in a beautiful async/await
package, which requires support from the compiler or translator, depending on the technology.
Consider only coroutines and Promise, decorated with async/await
, because the existence of problems in older approaches confirms the evolution process itself.
These two terms are not identical. For example, in ECMAScript there are no coroutines, but there are syntactic facilitations for using Promise
, which in turn only organizes work with the hell of callbacks (callback hell). In fact, scripting engines like V8 go further and make special optimizations for pure async/await
functions and calls.
The experts' statements about co_async/co_await
not included in C ++ 17 are here on the resource , but by the pressure of the software giant, coroutines can appear in the standard exactly in their form. In the meantime, the traditional recognized solution is Boost.Context , Boost.Fiber and Boost.Coroutine2 .
In Java, there is still no async/await
at the language level, but there are solutions like EA Async , which, like Boost.Context, need to be customized for each version of JVM and byte code.
Go has its coroutines, but if you look carefully at the articles and bug reports of open projects, it turns out that everything is not so smooth. Perhaps the loss of the coroutine interface as a managed entity is not the best idea.
Personally, the author has very little against coroutines in dynamic languages, but he is extremely wary of any flirting with the machine code level stack.
A few theses:
async
function: 4 + KB (minimum) + increased system limits,yield
fundamental evilThis seemingly third-party topic is directly related to the concept of coroutines and the "continue" property.
In short, for any collection there should be a full-fledged iterator. Why create the problem of a truncated iterator generator is not clear. For example, a case with range()
in Python is more like an exclusive showcase than an excuse for technical complication.
If the case of an infinite generator, then the logic of its implementation is elementary. Why create additional technical difficulties to shove an infinite continuous cycle inside?
The only efficient later appeared justification, which supporters of coroutines bring, is all kind of stream parsers with control inverting. In fact, this is a narrow specialized case for solving single problems of the library level, and not the business logic of applications. At the same time, there is an elegant, simple and more descriptive solution through finite automata. The area of ββthese technical problems is much smaller than the area of ββbanal business logic.
In fact, the solved problem turns out to be sucked from the finger and requires a relatively serious effort for initial implementation and long-term support. To the extent that some projects may prohibit the use of computer-code coroutines, following the example of a ban on goto
or the use of dynamic memory allocation in certain industries.
async/await
model on Promise from ECMAScript is more reliable, but requires adaptationUnlike the continuing coroutines, in this model, pieces of code are secretly divided into uninterrupted blocks, decorated in the form of anonymous functions. In C ++, this is not entirely appropriate due to the memory management features, for example:
struct SomeObject { using Value = std::vector<int>; Promise funcPromise() { return Promise.resolved(value_); } void funcCallback(std::function<void()> &&cb, const Value& val) { somehow_call_later(cb); } Value value_; }; Promise example() { SomeObject some_obj; return some_obj.funcPromise() .catch([](const std::exception &e){ // ... }) .then([&](SomeObject::value &&val){ return Promise([&](Resolve&& resolve, Reject&&){ some_obj.funcCallback(resolve, val); }); }); }
First, some_obj
will be destroyed when exiting example()
and before calling the lambda functions.
Secondly, lambda functions with variable or reference capture are objects and secretly add copy / move, which can adversely affect performance with a large number of hooks and the need to allocate memory on the heap during type erasure in a regular std::function
.
Thirdly, the Promise
interface itself is conceived on the concept of "promising" the result, rather than the consistent implementation of business logic.
A schematic NOT optimal solution might look something like this:
Promise example() { struct LocalContext { SomeObject some_obj; }; auto ctx = std::make_shared<LocalContext>(); return some_obj.funcPromise() .catch([](const std::exception &e){ // ... }) .then([ctx](SomeObject::Value &&val){ struct LocalContext2 { LocalContext2(std::shared_ptr<LocalContext> &&ctx, SomeObject::Value &&val) : ctx(ctx), val(val) {} std::shared_ptr<LocalContext> ctx; SomeObject::Value val; }; auto ctx2 = std::make_shared<LocalContext2>( std::move(ctx), std::forward<SomeObject::Value>(val) ); return Promise([ctx2](Resolve&& resolve, Reject&&){ ctx2->ctx->some_obj.funcCallback([ctx2, resolve](){ resolve(); }, val); }); }); }
Note: std::move
instead of std::shared_ptr
not suitable due to the impossibility of transferring to several lambdds at once and their size increasing.
With the addition of async/await
asynchronous "horrors" come in a digestible state:
async void example() { SomeObject some_obj; try { SomeObject::Value val = await some_obj.func(); } catch (const std::exception& e) ( // ... } // Capture "async context" return Promise([async](Resolve&& resolve, Reject&&){ some_obj.funcCallback([async](){ resolve(); }, val); }); }
Some critics call the lack of a scheduler problem and the "dishonest" use of processor resources. Perhaps a more serious problem is data locality and efficient use of the processor cache.
On the first problem: prioritization at the level of individual coroutines looks like a big overhead. Instead, they can be operated in common for a particular unified task. So come with traffic flows.
This is possible by creating separate instances of the Event Loop with its own βironβ streams and planning at the OS level. The second option is to synchronize the coroutines with respect to the primitive (Mutex, Throttle) that limits the competition and / or performance.
Asynchronous programming does not make the processor resources rubber and requires absolutely normal restrictions on the number of simultaneously processed tasks and the limit on the total execution time.
Protection against long-term blocking on one coroutine requires the same measures as with callbacks β to avoid blocking system calls and long data processing cycles.
The second problem requires research, but at a minimum the coroutine stacks themselves and the details of the Future / Promise implementation already violate the data locality. It is possible to try to continue the execution of the same coroutine if Future already matters. It requires some mechanism for counting the runtime or the number of such continuations to prevent one coroutine from capturing the entire processor time. This may either not give a result, or give a very double result depending on the size of the processor cache and the number of threads.
There is also a third point - many implementations of coroutine planners allow you to run them on different processor cores, which, on the contrary, adds problems due to the mandatory synchronization when accessing shared resources. In the case of a single Event Loop stream, such synchronization is required only at the logic level, since Each synchronous callback unit is guaranteed to work without a race with others.
The presence of threads in modern operating systems does not negate the use of separate processes. Also, processing a large number of clients in the Event Loop does not cancel the use of separate "iron" streams for other needs.
In any case, coroutines and various variants of Event Loops complicate the debugging process without the necessary support in the tools, and with local variables on the coroutine stack, everything becomes even more difficult - they can hardly be reached.
We take as a basis the already well-established Event Loop pattern and the organization of the ECMAScript (JavaScript) Promise callback scheme.
From the point of view of planning the execution, we are interested in the following actions from the Event Loop:
Handle immediate(callack)
with the requirement of a clean call stack.Handle deferred(delay, callback)
.handle.cancel()
.So we get an interface called AsyncTool
, which can be implemented in a variety of ways, including over existing proven developments. He has no direct relation to writing business logic, so we will not go into further details.
In the concept of AsyncSteps, an abstract tree of synchronous steps is built and is executed by a depth passage in the creation sequence. The steps of each deeper level are dynamically set as such a passage is completed.
All interaction takes place through a single interface AsyncSteps
, which is conventionally passed as the first parameter in each step. According to the convention, the name of the parameter is asi
or obsolete as
. Such an approach makes it possible to almost completely break the link between a specific implementation and the writing of business logic in plug-ins and libraries.
In canonical implementations, each step receives its own copy of an object that implements AsyncSteps
, which allows timely tracking of logical errors in the use of the interface.
Abstract example:
asi.add( // Level 0 step 1 func( asi ){ print( "Level 0 func" ) asi.add( // Level 1 step 1 func( asi ){ print( "Level 1 func" ) asi.error( "MyError" ) }, onerror( asi, error ){ // Level 1 step 1 catch print( "Level 1 onerror: " + error ) asi.error( "NewError" ) } ) }, onerror( asi, error ){ // Level 0 step 1 catch print( "Level 0 onerror: " + error ) if ( error strequal "NewError" ) { asi.success( "Prm", 123, [1, 2, 3], true) } } ) asi.add( // Level 0 step 2 func( asi, str_param, int_param, array_param ){ print( "Level 0 func2: " + param ) } )
Result of performance:
Level 0 func 1 Level 1 func 1 Level 1 onerror 1: MyError Level 0 onerror 1: NewError Level 0 func 2: Prm
In synchronous form it would look like this:
str_res, int_res, array_res, bool_res // undefined try { // Level 0 step 1 print( "Level 0 func 1" ) try { // Level 1 step 1 print( "Level 1 func 1" ) throw "MyError" } catch( error ){ // Level 1 step 1 catch print( "Level 1 onerror 1: " + error ) throw "NewError" } } catch( error ){ // Level 0 step 1 catch print( "Level 0 onerror 1: " + error ) if ( error strequal "NewError" ) { str_res = "Prm" int_res = 123 array_res = [1, 2, 3] bool_res = true } else { re-throw } } { // Level 0 step 2 print( "Level 0 func 2: " + str_res ) }
Immediately visible is the maximum mimicry of the traditional synchronous code, which should help in readability.
From the point of view of business logic, a large number of requirements are growing over time, but we can divide it into easily understood parts. Described below, the result of running in practice for four years.
add(func[, onerror])
is an imitation of try-catch
.success([args...])
is an explicit indication of successful completion:error(code[, reason)
- interrupt execution with an error:code
- has a string type to better integrate with network protocols in the microservice architecture,reason
is an arbitrary explanation for a person.state()
is an analogue of Thread Local Storage. Predefined associative keys:error_info
- an explanation of the last error for a personlast_exception
- a pointer to the object of the last exception,async_stack
- how many asynchronous calls the technology allows,The previous example already has real C ++ code and some additional features:
#include <futoin/iasyncsteps.hpp> using namespace futoin; void some_api(IAsyncSteps& asi) { asi.add( [](IAsyncSteps& asi) { std::cout << "Level 0 func 1" << std::endl; asi.add( [](IAsyncSteps& asi) { std::cout << "Level 1 func 1" << std::endl; asi.error("MyError"); }, [](IAsyncSteps& asi, ErrorCode code) { std::cout << "Level 1 onerror 1: " << code << std::endl; asi.error("NewError", "Human-readable description"); } ); }, [](IAsyncSteps& asi, ErrorCode code) { std::cout << "Level 0 onerror 1: " << code << std::endl; if (code == "NewError") { // Human-readable error info assert(asi.state().error_info == "Human-readable description"); // Last exception thrown is also available in state std::exception_ptr e = asi.state().last_exception; // NOTE: smart conversion of "const char*" asi.success("Prm", 123, std::vector<int>({1, 2, 3}, true)); } } ); asi.add( [](IAsyncSteps& asi, const futoin::string& str_res, int int_res, std::vector<int>&& arr_res) { std::cout << "Level 0 func 2: " << str_res << std::endl; } ); }
loop( func, [, label] )
is a step with an infinitely repeated body.forEach( map|list, func [, label] )
- iteration step through the collection object.repeat( count, func [, label] )
- step iteration the specified number of times.break( [label] )
- analogue of the traditional cycle interruption.continue( [label] )
is an analogue of the traditional loop continuation with a new iteration.The specification suggests alternative names for breakLoop
, continueLoop
and others in the event of a conflict with reserved words.
C ++ Example:
asi.loop([](IAsyncSteps& asi) { // infinite loop asi.breakLoop(); }); asi.repeat(10, [](IAsyncSteps& asi, size_t i) { // range loop from i=0 till i=9 (inclusive) asi.continueLoop(); }); asi.forEach( std::vector<int>{1, 2, 3}, [](IAsyncSteps& asi, size_t i, int v) { // Iteration of vector-like and list-like objects }); asi.forEach( std::list<futoin::string>{"1", "2", "3"}, [](IAsyncSteps& asi, size_t i, const futoin::string& v) { // Iteration of vector-like and list-like objects }); asi.forEach( std::map<futoin::string, futoin::string>(), [](IAsyncSteps& asi, const futoin::string& key, const futoin::string& v) { // Iteration of map-like objects }); std::map<std::string, futoin::string> non_const_map; asi.forEach( non_const_map, [](IAsyncSteps& asi, const std::string& key, futoin::string& v) { // Iteration of map-like objects, note the value reference type });
setTimeout( timeout_ms )
- causes a setTimeout( timeout_ms )
error when the time has elapsed if the step and its subtree have not completed execution.setCancel( handler )
- sets a cancel handler that is called when a general thread is canceled and the asynchronous steps stack expands during error handling.waitExternal()
is a simple wait for an external event.Calling any of these functions necessitates an explicit call to success()
.
C ++ Example:
asi.add([](IAsyncSteps& asi) { auto handle = schedule_external_callback([&](bool err) { if (err) { try { asi.error("ExternalError"); } catch (...) { // pass } } else { asi.success(); } }); asi.setCancel([=](IAsyncSteps& asi) { external_cancel(handle); }); }); asi.add( [](IAsyncSteps& asi) { // Raises Timeout error after specified period asi.setTimeout(std::chrono::seconds{10}); asi.loop([](IAsyncSteps& asi) { // infinite loop }); }, [](IAsyncSteps& asi, ErrorCode code) { if (code == futoin::errors::Timeout) { asi(); } });
ECMAScript example:
asi.add( (asi) => { asi.waitExternal(); // disable implicit success() some_obj.read( (err, data) => { if (!asi.state) { // ignore as AsyncSteps execution got canceled } else if (err) { try { asi.error( 'IOError', err ); } catch (_) { // ignore error thrown as there are no // AsyncSteps frames on stack. } } else { asi.success( data ); } } ); } );
await(promise_future[, on_error])
- waiting for Future / Promise as a step.promise()
- turns the entire execution thread into Future / Promise, used instead of execute()
.C ++ Example:
[](IAsyncSteps& asi) { // Proper way to create new AsyncSteps instances // without hard dependency on implementation. auto new_steps = asi.newInstance(); new_steps->add([](IAsyncSteps& asi) {}); // Can be called outside of AsyncSteps event loop // new_steps.promise().wait(); // or // new_steps.promise<int>().get(); // Proper way to wait for standard std::future asi.await(new_steps->promise()); // Ensure instance lifetime asi.state()["some_obj"] = std::move(new_steps); };
AsyncSteps(AsyncTool&)
is a constructor that binds a thread of execution to a specific Event Loop.execute()
- starts the execution thread.cancel()
- cancels the thread of execution.It already requires a specific implementation of the interface.
C ++ Example:
#include <futoin/ri/asyncsteps.hpp> #include <futoin/ri/asynctool.hpp> void example() { futoin::ri::AsyncTool at; futoin::ri::AsyncSteps asi{at}; asi.loop([&](futoin::IAsyncSteps &asi){ // Some infinite loop logic }); asi.execute(); std::this_thread::sleep_for(std::chrono::seconds{10}); asi.cancel(); // called in d-tor by fact }
newInstance()
- allows you to create a new thread of execution without direct dependence on the implementation.sync(object, func, onerror)
- the same, but with synchronization relative to the object that implements the corresponding interface.parallel([on_error])
is a special add()
whose substeps are separate AsyncSteps streams:state()
,C ++ Examples:
#include <futoin/ri/mutex.hpp> using namespace futoin; ri::Mutex mtx_a; void sync_example(IAsyncSteps& asi) { asi.sync(mtx_a, [](IAsyncSteps& asi) { // synchronized section asi.add([](IAsyncSteps& asi) { // inner step in the section // This synchronization is NOOP for already // acquired Mutex. asi.sync(mtx_a, [](IAsyncSteps& asi) { }); }); }); } void parallel_example(IAsyncSteps& asi) { using OrderVector = std::vector<int>; asi.state("order", OrderVector{}); auto& p = asi.parallel([](IAsyncSteps& asi, ErrorCode) { // Overall error handler asi.success(); }); p.add([](IAsyncSteps& asi) { // regular flow asi.state<OrderVector>("order").push_back(1); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(4); }); }); p.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(2); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(5); asi.error("SomeError"); }); }); p.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(3); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(6); }); }); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order"); // 1, 2, 3, 4, 5 }); };
Mutex
- restricts simultaneous execution in N
streams with a queue in Q
, the default is N=1, Q=unlimited
.Throttle
- limits the number of inputs N
in the period P
with a queue in Q
, the default is N=1, P=1s, Q=0
.Limiter
is a combination of Mutex
and Throttle
, which is typically used at the input of processing external requests and when calling external systems in order to work stably under load.In case of DefenseRejected
queue limits, the DefenseRejected
error is DefenseRejected
, the meaning of which is clear from the description of Limiter
.
The concept of AsyncSteps was not an end in itself, but was born due to the need for more controlled asynchronous program execution in terms of time limit, cancellation and overall coherence of individual callbacks. None of the universal solutions at the time and now does not provide the same functionality. Therefore:
FTN12 β .
setCancel()
β . , . RAII atexit()
.
cancel()
β , . SIGTERM
pthread_cancel()
, .
setTimeout()
β . , "Timeout".
β FutoIn AsyncSteps .
β ABI , . Embedded MMU.
Intel Xeon E3-1245v2/DDR1333 Debian Stretch .
:
protected_fixedsize_stack
.pooled_fixedsize_stack
.FUTOIN_USE_MEMPOOL=false
).futoin::IMemPool
.Boost.Fiber :
mmap()/mprotect()
boost::fiber::protected_fixedsize_stack
."" , .. , . . .
GCC 6.3.0. lang tcmalloc , .
Technology | Time | Hz |
---|---|---|
Boost.Fiber protected | 4.8s | 208333.333Hz |
Boost.Fiber pooled | 0.23s | 4347826.086Hz |
FutoIn AsyncSteps | 0.21s | 4761904.761Hz |
FutoIn AsyncSteps no mempool | 0.31s | 3225806.451Hz |
FutoIn NitroSteps | 0.255s | 3921568.627Hz |
β .
Boost.Fiber - , pooled_fixedsize_stack
, AsyncSteps.
Technology | Time | Hz |
---|---|---|
Boost.Fiber protected | 6.31s | 158478.605Hz |
Boost.Fiber pooled | 1.558s | 641848.523Hz |
FutoIn AsyncSteps | 1.13s | 884955.752Hz |
FutoIn AsyncSteps no mempool | 1.353s | 739098.300Hz |
FutoIn NitroSteps | 1.43s | 699300.699Hz |
β .
, . , β .
Technology | Time | Hz |
---|---|---|
Boost.Fiber protected | 5.096s | 1962323.390Hz |
Boost.Fiber pooled | 5.077s | 1969667.126Hz |
FutoIn AsyncSteps | 5.361s | 1865323.633Hz |
FutoIn AsyncSteps no mempool | 8.288s | 1206563.706Hz |
FutoIn NitroSteps | 3.68s | 2717391.304Hz |
β .
, Boost.Fiber AsyncSteps, NitroSteps.
Technology | Memory |
---|---|
Boost.Fiber protected | 124M |
Boost.Fiber pooled | 505M |
FutoIn AsyncSteps | 124M |
FutoIn AsyncSteps no mempool | 84M |
FutoIn NitroSteps | 115M |
β .
, Boost.Fiber .
- Promise
: + 10 . . 10 . JIT NODE_ENV=production
, @futoin/optihelp
.
GitHub GitLab . Node.js v8.12.0 v10.11.0, FutoIn CID .
Tech | Simple | Loop |
---|---|---|
Node.js v10 | ||
FutoIn AsyncSteps | 1342899.520Hz | 587.777Hz |
async/await | 524983.234Hz | 630.863Hz |
Node.js v8 | ||
FutoIn AsyncSteps | 682420.735Hz | 588.336Hz |
async/await | 365050.395Hz | 400.575Hz |
β .
async/await
? , V8 Node.js v10 .
, Promise async/await
Node.js Event Loop. ( ), FutoIn AsyncSteps .
AsyncSteps Node.js Event Loop async/await
- Node.js v10.
, ++ β . , Node.js 10 .
C++, FutoIn AsyncSteps Boost.Fiber , Boost.Fiber mmap()/mprotect
.
, - , . .
FutoIn AsyncSteps JavaScript async/await
Node.js v10.
, -, . .
- "" . β API.
, FutoIn AsyncSteps , "" async/await
. , . Promise
ECMAScript, AsyncSteps "" .
. AsyncSteps NitroSteps .
Java/JVM β . .
Source: https://habr.com/ru/post/424311/
All Articles