πŸ“œ ⬆️ ⬇️

Asynchronous business logic today

In short:


  • Proof is already implemented in C ++ , JS and PHP , suitable for Java .
  • Faster than Coroutine and Promise, more features.
  • Does not require the allocation of a separate software stack.
  • Is friends with all the security and debugging tools.
  • It works on any architecture and does not require special compiler flags.





Look back


At the dawn of the computer, there was a single control flow with I / O blocking. Then interruptions of iron were added to it. Now you can effectively use slow and unpredictable devices.


With the increasing capabilities of iron and its low availability, there is a need to perform several tasks at the same time, which provided hardware support. This is how isolated processes with abstracted from iron interruptions in the form of signals appeared.


The next evolutionary stage was multithreading, which was implemented on the foundation of the same processes, but with shared access to memory and other resources. This approach has its limitations and significant overhead for switching to a secure OS.


To communicate between processes and even different machines, a Promise / Future abstraction was proposed 40+ years ago.


User interfaces and the now ridiculous problem of 10K clients led to the heyday of Event Loop, Reactor and Proactor approaches, which are more event-oriented than clear, consistent business logic.


Finally, we arrived at a modern coroutine (coroutine), which is essentially an emulation of threads on top of the abstractions described above with the corresponding technical limitations and deterministic transfer of control.


For the transfer of events, results and exceptions all returned to the same concept of Promise / Future. Some offices have decided to call a little differently - "Task".


Ultimately, everything was hidden in a beautiful async/await package, which requires support from the compiler or translator, depending on the technology.


Problems with current situations asynchronous business logic


Consider only coroutines and Promise, decorated with async/await , because the existence of problems in older approaches confirms the evolution process itself.


These two terms are not identical. For example, in ECMAScript there are no coroutines, but there are syntactic facilitations for using Promise , which in turn only organizes work with the hell of callbacks (callback hell). In fact, scripting engines like V8 go further and make special optimizations for pure async/await functions and calls.


The experts' statements about co_async/co_await not included in C ++ 17 are here on the resource , but by the pressure of the software giant, coroutines can appear in the standard exactly in their form. In the meantime, the traditional recognized solution is Boost.Context , Boost.Fiber and Boost.Coroutine2 .


In Java, there is still no async/await at the language level, but there are solutions like EA Async , which, like Boost.Context, need to be customized for each version of JVM and byte code.


Go has its coroutines, but if you look carefully at the articles and bug reports of open projects, it turns out that everything is not so smooth. Perhaps the loss of the coroutine interface as a managed entity is not the best idea.


Opinion of the author: coroutines on bare gland are dangerous


Personally, the author has very little against coroutines in dynamic languages, but he is extremely wary of any flirting with the machine code level stack.


A few theses:


  1. It is required to allocate a stack:
    • the stack on the heap has a number of flaws: problems of timely detection of overflow, damage by neighbors and other reliability / safety problems,
    • a protected stack requires at least one page of physical memory, one conditional page, and additional overhead for each call to the async function: 4 + KB (minimum) + increased system limits,
    • in the end, it may be that a significant part of the memory allocated for stacks is not used during the coroutine's idle time.
  2. It is necessary to implement complex logic for saving, restoring and deleting the state of coroutines:
    • for each case of processor architecture (even models) and binary interface (ABI): an example
    • New or optional architecture features introduce potentially latent problems (for example, Intel TSX, ARM or MIPS co-processors),
    • other potential problems due to proprietary systems proprietary documentation (Boost documentation refers to this).
  3. Potential problems with dynamic analysis tools and security in general:
    • for example, integration with Valgrind is required due to the same skipping stacks,
    • It's hard to speak for antiviruses, but they probably don't really like it using the example of problems with the JVM in the past,
    • I am sure that new types of attacks will appear and vulnerabilities associated with the implementation of coroutines will be revealed.

Opinion of the author: generators and yield fundamental evil


This seemingly third-party topic is directly related to the concept of coroutines and the "continue" property.


In short, for any collection there should be a full-fledged iterator. Why create the problem of a truncated iterator generator is not clear. For example, a case with range() in Python is more like an exclusive showcase than an excuse for technical complication.


If the case of an infinite generator, then the logic of its implementation is elementary. Why create additional technical difficulties to shove an infinite continuous cycle inside?


The only efficient later appeared justification, which supporters of coroutines bring, is all kind of stream parsers with control inverting. In fact, this is a narrow specialized case for solving single problems of the library level, and not the business logic of applications. At the same time, there is an elegant, simple and more descriptive solution through finite automata. The area of ​​these technical problems is much smaller than the area of ​​banal business logic.


In fact, the solved problem turns out to be sucked from the finger and requires a relatively serious effort for initial implementation and long-term support. To the extent that some projects may prohibit the use of computer-code coroutines, following the example of a ban on goto or the use of dynamic memory allocation in certain industries.


Opinion of the author: async/await model on Promise from ECMAScript is more reliable, but requires adaptation


Unlike the continuing coroutines, in this model, pieces of code are secretly divided into uninterrupted blocks, decorated in the form of anonymous functions. In C ++, this is not entirely appropriate due to the memory management features, for example:


 struct SomeObject { using Value = std::vector<int>; Promise funcPromise() { return Promise.resolved(value_); } void funcCallback(std::function<void()> &&cb, const Value& val) { somehow_call_later(cb); } Value value_; }; Promise example() { SomeObject some_obj; return some_obj.funcPromise() .catch([](const std::exception &e){ // ... }) .then([&](SomeObject::value &&val){ return Promise([&](Resolve&& resolve, Reject&&){ some_obj.funcCallback(resolve, val); }); }); } 

First, some_obj will be destroyed when exiting example() and before calling the lambda functions.


Secondly, lambda functions with variable or reference capture are objects and secretly add copy / move, which can adversely affect performance with a large number of hooks and the need to allocate memory on the heap during type erasure in a regular std::function .


Thirdly, the Promise interface itself is conceived on the concept of "promising" the result, rather than the consistent implementation of business logic.


A schematic NOT optimal solution might look something like this:


 Promise example() { struct LocalContext { SomeObject some_obj; }; auto ctx = std::make_shared<LocalContext>(); return some_obj.funcPromise() .catch([](const std::exception &e){ // ... }) .then([ctx](SomeObject::Value &&val){ struct LocalContext2 { LocalContext2(std::shared_ptr<LocalContext> &&ctx, SomeObject::Value &&val) : ctx(ctx), val(val) {} std::shared_ptr<LocalContext> ctx; SomeObject::Value val; }; auto ctx2 = std::make_shared<LocalContext2>( std::move(ctx), std::forward<SomeObject::Value>(val) ); return Promise([ctx2](Resolve&& resolve, Reject&&){ ctx2->ctx->some_obj.funcCallback([ctx2, resolve](){ resolve(); }, val); }); }); } 

Note: std::move instead of std::shared_ptr not suitable due to the impossibility of transferring to several lambdds at once and their size increasing.


With the addition of async/await asynchronous "horrors" come in a digestible state:


 async void example() { SomeObject some_obj; try { SomeObject::Value val = await some_obj.func(); } catch (const std::exception& e) ( // ... } // Capture "async context" return Promise([async](Resolve&& resolve, Reject&&){ some_obj.funcCallback([async](){ resolve(); }, val); }); } 

Opinion of the author: the coroner planner is a bust


Some critics call the lack of a scheduler problem and the "dishonest" use of processor resources. Perhaps a more serious problem is data locality and efficient use of the processor cache.


On the first problem: prioritization at the level of individual coroutines looks like a big overhead. Instead, they can be operated in common for a particular unified task. So come with traffic flows.


This is possible by creating separate instances of the Event Loop with its own β€œiron” streams and planning at the OS level. The second option is to synchronize the coroutines with respect to the primitive (Mutex, Throttle) that limits the competition and / or performance.


Asynchronous programming does not make the processor resources rubber and requires absolutely normal restrictions on the number of simultaneously processed tasks and the limit on the total execution time.


Protection against long-term blocking on one coroutine requires the same measures as with callbacks β€” to avoid blocking system calls and long data processing cycles.


The second problem requires research, but at a minimum the coroutine stacks themselves and the details of the Future / Promise implementation already violate the data locality. It is possible to try to continue the execution of the same coroutine if Future already matters. It requires some mechanism for counting the runtime or the number of such continuations to prevent one coroutine from capturing the entire processor time. This may either not give a result, or give a very double result depending on the size of the processor cache and the number of threads.


There is also a third point - many implementations of coroutine planners allow you to run them on different processor cores, which, on the contrary, adds problems due to the mandatory synchronization when accessing shared resources. In the case of a single Event Loop stream, such synchronization is required only at the logic level, since Each synchronous callback unit is guaranteed to work without a race with others.


Opinion of the author: everything is good in moderation


The presence of threads in modern operating systems does not negate the use of separate processes. Also, processing a large number of clients in the Event Loop does not cancel the use of separate "iron" streams for other needs.


In any case, coroutines and various variants of Event Loops complicate the debugging process without the necessary support in the tools, and with local variables on the coroutine stack, everything becomes even more difficult - they can hardly be reached.





FutoIn AsyncSteps - alternative to coroutines


We take as a basis the already well-established Event Loop pattern and the organization of the ECMAScript (JavaScript) Promise callback scheme.


From the point of view of planning the execution, we are interested in the following actions from the Event Loop:


  1. Immediate callback Handle immediate(callack) with the requirement of a clean call stack.
  2. Deferred callback Handle deferred(delay, callback) .
  3. Cancel callback handle.cancel() .

So we get an interface called AsyncTool , which can be implemented in a variety of ways, including over existing proven developments. He has no direct relation to writing business logic, so we will not go into further details.


Step tree:


In the concept of AsyncSteps, an abstract tree of synchronous steps is built and is executed by a depth passage in the creation sequence. The steps of each deeper level are dynamically set as such a passage is completed.


All interaction takes place through a single interface AsyncSteps , which is conventionally passed as the first parameter in each step. According to the convention, the name of the parameter is asi or obsolete as . Such an approach makes it possible to almost completely break the link between a specific implementation and the writing of business logic in plug-ins and libraries.


In canonical implementations, each step receives its own copy of an object that implements AsyncSteps , which allows timely tracking of logical errors in the use of the interface.


Abstract example:


  asi.add( // Level 0 step 1 func( asi ){ print( "Level 0 func" ) asi.add( // Level 1 step 1 func( asi ){ print( "Level 1 func" ) asi.error( "MyError" ) }, onerror( asi, error ){ // Level 1 step 1 catch print( "Level 1 onerror: " + error ) asi.error( "NewError" ) } ) }, onerror( asi, error ){ // Level 0 step 1 catch print( "Level 0 onerror: " + error ) if ( error strequal "NewError" ) { asi.success( "Prm", 123, [1, 2, 3], true) } } ) asi.add( // Level 0 step 2 func( asi, str_param, int_param, array_param ){ print( "Level 0 func2: " + param ) } ) 

Result of performance:


  Level 0 func 1 Level 1 func 1 Level 1 onerror 1: MyError Level 0 onerror 1: NewError Level 0 func 2: Prm 

In synchronous form it would look like this:


  str_res, int_res, array_res, bool_res // undefined try { // Level 0 step 1 print( "Level 0 func 1" ) try { // Level 1 step 1 print( "Level 1 func 1" ) throw "MyError" } catch( error ){ // Level 1 step 1 catch print( "Level 1 onerror 1: " + error ) throw "NewError" } } catch( error ){ // Level 0 step 1 catch print( "Level 0 onerror 1: " + error ) if ( error strequal "NewError" ) { str_res = "Prm" int_res = 123 array_res = [1, 2, 3] bool_res = true } else { re-throw } } { // Level 0 step 2 print( "Level 0 func 2: " + str_res ) } 

Immediately visible is the maximum mimicry of the traditional synchronous code, which should help in readability.


From the point of view of business logic, a large number of requirements are growing over time, but we can divide it into easily understood parts. Described below, the result of running in practice for four years.


Base Runtime APIs:


  1. add(func[, onerror]) is an imitation of try-catch .
  2. success([args...]) is an explicit indication of successful completion:
    • implied by default
    • can transfer the results to the next step.
  3. error(code[, reason) - interrupt execution with an error:
    • code - has a string type to better integrate with network protocols in the microservice architecture,
    • reason is an arbitrary explanation for a person.
  4. state() is an analogue of Thread Local Storage. Predefined associative keys:
    • error_info - an explanation of the last error for a person
    • last_exception - a pointer to the object of the last exception,
    • async_stack - how many asynchronous calls the technology allows,
    • the rest is set by the user.

The previous example already has real C ++ code and some additional features:


 #include <futoin/iasyncsteps.hpp> using namespace futoin; void some_api(IAsyncSteps& asi) { asi.add( [](IAsyncSteps& asi) { std::cout << "Level 0 func 1" << std::endl; asi.add( [](IAsyncSteps& asi) { std::cout << "Level 1 func 1" << std::endl; asi.error("MyError"); }, [](IAsyncSteps& asi, ErrorCode code) { std::cout << "Level 1 onerror 1: " << code << std::endl; asi.error("NewError", "Human-readable description"); } ); }, [](IAsyncSteps& asi, ErrorCode code) { std::cout << "Level 0 onerror 1: " << code << std::endl; if (code == "NewError") { // Human-readable error info assert(asi.state().error_info == "Human-readable description"); // Last exception thrown is also available in state std::exception_ptr e = asi.state().last_exception; // NOTE: smart conversion of "const char*" asi.success("Prm", 123, std::vector<int>({1, 2, 3}, true)); } } ); asi.add( [](IAsyncSteps& asi, const futoin::string& str_res, int int_res, std::vector<int>&& arr_res) { std::cout << "Level 0 func 2: " << str_res << std::endl; } ); } 

API for creating loops:


  1. loop( func, [, label] ) is a step with an infinitely repeated body.
  2. forEach( map|list, func [, label] ) - iteration step through the collection object.
  3. repeat( count, func [, label] ) - step iteration the specified number of times.
  4. break( [label] ) - analogue of the traditional cycle interruption.
  5. continue( [label] ) is an analogue of the traditional loop continuation with a new iteration.

The specification suggests alternative names for breakLoop , continueLoop and others in the event of a conflict with reserved words.


C ++ Example:


  asi.loop([](IAsyncSteps& asi) { // infinite loop asi.breakLoop(); }); asi.repeat(10, [](IAsyncSteps& asi, size_t i) { // range loop from i=0 till i=9 (inclusive) asi.continueLoop(); }); asi.forEach( std::vector<int>{1, 2, 3}, [](IAsyncSteps& asi, size_t i, int v) { // Iteration of vector-like and list-like objects }); asi.forEach( std::list<futoin::string>{"1", "2", "3"}, [](IAsyncSteps& asi, size_t i, const futoin::string& v) { // Iteration of vector-like and list-like objects }); asi.forEach( std::map<futoin::string, futoin::string>(), [](IAsyncSteps& asi, const futoin::string& key, const futoin::string& v) { // Iteration of map-like objects }); std::map<std::string, futoin::string> non_const_map; asi.forEach( non_const_map, [](IAsyncSteps& asi, const std::string& key, futoin::string& v) { // Iteration of map-like objects, note the value reference type }); 

API for integration with external events:


  1. setTimeout( timeout_ms ) - causes a setTimeout( timeout_ms ) error when the time has elapsed if the step and its subtree have not completed execution.
  2. setCancel( handler ) - sets a cancel handler that is called when a general thread is canceled and the asynchronous steps stack expands during error handling.
  3. waitExternal() is a simple wait for an external event.
    • Note: it is safe to use only in technologies with the garbage collector.

Calling any of these functions necessitates an explicit call to success() .


C ++ Example:


  asi.add([](IAsyncSteps& asi) { auto handle = schedule_external_callback([&](bool err) { if (err) { try { asi.error("ExternalError"); } catch (...) { // pass } } else { asi.success(); } }); asi.setCancel([=](IAsyncSteps& asi) { external_cancel(handle); }); }); asi.add( [](IAsyncSteps& asi) { // Raises Timeout error after specified period asi.setTimeout(std::chrono::seconds{10}); asi.loop([](IAsyncSteps& asi) { // infinite loop }); }, [](IAsyncSteps& asi, ErrorCode code) { if (code == futoin::errors::Timeout) { asi(); } }); 

ECMAScript example:


 asi.add( (asi) => { asi.waitExternal(); // disable implicit success() some_obj.read( (err, data) => { if (!asi.state) { // ignore as AsyncSteps execution got canceled } else if (err) { try { asi.error( 'IOError', err ); } catch (_) { // ignore error thrown as there are no // AsyncSteps frames on stack. } } else { asi.success( data ); } } ); } ); 

Future / Promise integration API:


  1. await(promise_future[, on_error]) - waiting for Future / Promise as a step.
  2. promise() - turns the entire execution thread into Future / Promise, used instead of execute() .

C ++ Example:


  [](IAsyncSteps& asi) { // Proper way to create new AsyncSteps instances // without hard dependency on implementation. auto new_steps = asi.newInstance(); new_steps->add([](IAsyncSteps& asi) {}); // Can be called outside of AsyncSteps event loop // new_steps.promise().wait(); // or // new_steps.promise<int>().get(); // Proper way to wait for standard std::future asi.await(new_steps->promise()); // Ensure instance lifetime asi.state()["some_obj"] = std::move(new_steps); }; 

Business Logic Flow Control API:


  1. AsyncSteps(AsyncTool&) is a constructor that binds a thread of execution to a specific Event Loop.
  2. execute() - starts the execution thread.
  3. cancel() - cancels the thread of execution.

It already requires a specific implementation of the interface.


C ++ Example:


 #include <futoin/ri/asyncsteps.hpp> #include <futoin/ri/asynctool.hpp> void example() { futoin::ri::AsyncTool at; futoin::ri::AsyncSteps asi{at}; asi.loop([&](futoin::IAsyncSteps &asi){ // Some infinite loop logic }); asi.execute(); std::this_thread::sleep_for(std::chrono::seconds{10}); asi.cancel(); // called in d-tor by fact } 

other APIs:


  1. newInstance() - allows you to create a new thread of execution without direct dependence on the implementation.
  2. sync(object, func, onerror) - the same, but with synchronization relative to the object that implements the corresponding interface.
  3. parallel([on_error]) is a special add() whose substeps are separate AsyncSteps streams:
    • all threads have a common state() ,
    • the parent thread continues to execute upon completion of all children,
    • an uncaught error in any child immediately cancels all other child threads.

C ++ Examples:


  #include <futoin/ri/mutex.hpp> using namespace futoin; ri::Mutex mtx_a; void sync_example(IAsyncSteps& asi) { asi.sync(mtx_a, [](IAsyncSteps& asi) { // synchronized section asi.add([](IAsyncSteps& asi) { // inner step in the section // This synchronization is NOOP for already // acquired Mutex. asi.sync(mtx_a, [](IAsyncSteps& asi) { }); }); }); } void parallel_example(IAsyncSteps& asi) { using OrderVector = std::vector<int>; asi.state("order", OrderVector{}); auto& p = asi.parallel([](IAsyncSteps& asi, ErrorCode) { // Overall error handler asi.success(); }); p.add([](IAsyncSteps& asi) { // regular flow asi.state<OrderVector>("order").push_back(1); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(4); }); }); p.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(2); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(5); asi.error("SomeError"); }); }); p.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(3); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(6); }); }); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order"); // 1, 2, 3, 4, 5 }); }; 

Standard synchronization primitives


  1. Mutex - restricts simultaneous execution in N streams with a queue in Q , the default is N=1, Q=unlimited .
  2. Throttle - limits the number of inputs N in the period P with a queue in Q , the default is N=1, P=1s, Q=0 .
  3. Limiter is a combination of Mutex and Throttle , which is typically used at the input of processing external requests and when calling external systems in order to work stably under load.

In case of DefenseRejected queue limits, the DefenseRejected error is DefenseRejected , the meaning of which is clear from the description of Limiter .


Key benefits


The concept of AsyncSteps was not an end in itself, but was born due to the need for more controlled asynchronous program execution in terms of time limit, cancellation and overall coherence of individual callbacks. None of the universal solutions at the time and now does not provide the same functionality. Therefore:


FTN12 β€” .


setCancel() β€” . , . RAII atexit() .


cancel() β€” , . SIGTERM pthread_cancel() , .


setTimeout() β€” . , "Timeout".


β€” FutoIn AsyncSteps .


β€” ABI , . Embedded MMU.







Intel Xeon E3-1245v2/DDR1333 Debian Stretch .


:


  1. Boost.Fiber protected_fixedsize_stack .
  2. Boost.Fiber pooled_fixedsize_stack .
  3. FutoIn AsyncSteps .
  4. FutoIn AsyncSteps ( FUTOIN_USE_MEMPOOL=false ).
    • futoin::IMemPool .
  5. FutoIn NitroSteps<> β€” .
    • .

Boost.Fiber :


  1. 1 . .
  2. 30 . 1 . .
    • 30 . mmap()/mprotect() boost::fiber::protected_fixedsize_stack .
    • .
  3. 30 . 10 . .
    • "" .

"" , .. , . . .


GCC 6.3.0. lang tcmalloc , .


GitHub GitLab .


1.


TechnologyTimeHz
Boost.Fiber protected4.8s208333.333Hz
Boost.Fiber pooled0.23s4347826.086Hz
FutoIn AsyncSteps0.21s4761904.761Hz
FutoIn AsyncSteps no mempool0.31s3225806.451Hz
FutoIn NitroSteps0.255s3921568.627Hz


β€” .


Boost.Fiber - , pooled_fixedsize_stack , AsyncSteps.


2.


TechnologyTimeHz
Boost.Fiber protected6.31s158478.605Hz
Boost.Fiber pooled1.558s641848.523Hz
FutoIn AsyncSteps1.13s884955.752Hz
FutoIn AsyncSteps no mempool1.353s739098.300Hz
FutoIn NitroSteps1.43s699300.699Hz


β€” .


, . , β€” .


3.


TechnologyTimeHz
Boost.Fiber protected5.096s1962323.390Hz
Boost.Fiber pooled5.077s1969667.126Hz
FutoIn AsyncSteps5.361s1865323.633Hz
FutoIn AsyncSteps no mempool8.288s1206563.706Hz
FutoIn NitroSteps3.68s2717391.304Hz


β€” .


, Boost.Fiber AsyncSteps, NitroSteps.


( RSS)


TechnologyMemory
Boost.Fiber protected124M
Boost.Fiber pooled505M
FutoIn AsyncSteps124M
FutoIn AsyncSteps no mempool84M
FutoIn NitroSteps115M


β€” .


, Boost.Fiber .


: Node.js


- Promise : + 10 . . 10 . JIT NODE_ENV=production , @futoin/optihelp .


GitHub GitLab . Node.js v8.12.0 v10.11.0, FutoIn CID .


TechSimpleLoop
Node.js v10
FutoIn AsyncSteps1342899.520Hz587.777Hz
async/await524983.234Hz630.863Hz
Node.js v8
FutoIn AsyncSteps682420.735Hz588.336Hz
async/await365050.395Hz400.575Hz



β€” .


async/await ? , V8 Node.js v10 .


, Promise async/await Node.js Event Loop. ( ), FutoIn AsyncSteps .


AsyncSteps Node.js Event Loop async/await - Node.js v10.


, ++ β€” . , Node.js 10 .


findings


C++, FutoIn AsyncSteps Boost.Fiber , Boost.Fiber mmap()/mprotect .


, - , . .


FutoIn AsyncSteps JavaScript async/await Node.js v10.


, -, . .


- "" . β€” API.




Conclusion


, FutoIn AsyncSteps , "" async/await . , . Promise ECMAScript, AsyncSteps "" .


. AsyncSteps NitroSteps .


, - .


Java/JVM β€” . .


, GitHub / GitLab .


')

Source: https://habr.com/ru/post/424311/


All Articles