📜 ⬆️ ⬇️

SObjectizer: from simple to complex. Part I

In the first article, we talked about what a SObjectizer is and why it turned out exactly like this. In the second, let's try to show what a more or less real code on SObjectizer can look like. With a demonstration of the direction in which this code usually evolves. For initially, when the developer has the opportunity to work with the Actor Model, he begins to abuse this opportunity, creating problems both for himself and those who will exploit the software product written in the “actor for every sneeze” style. Only after some time and a certain amount of stuffed cones comes an understanding that the beauty of the model of actors is not at all in the ability to create them by tens of thousands or even just thousands. But let's go consistently, not ahead of the event.

To demonstrate, we will invent this abstract task: there is a file name with an email (roughly speaking, this file saved everything that came via the POP3 protocol, including headers, letter body, attachments, etc.). It is necessary to give the result of assessing the suspicion of the contents of this file: whether the letter looks safe or suspicious, or when trying to evaluate its contents, some kind of problem arose and it is impossible to issue a current assessment. The task is abstract, any coincidences with something similar from real life are an unintended coincidence.

Naturally, we will have more than one of these names with email files. There will be a stream of these names that you need to deal with. It is desirable, using the capabilities of modern multi-core iron, i.e., starting processing of several emails in parallel.

We schematically show how this problem can be solved on the SObjectizer head-on. After that we indicate the problems of the chosen approach, we will do the next iteration, etc. In order, as a result of examples, to lead the reader to the understanding of the “convenient use of the model of actors in C ++” that we have developed over ten years of working with SObjectizer in real projects.
')
To begin with, we will decide how requests are issued for checking files with emails and how the results of checks are returned. We use for this purpose simple messages:

//      so_5  std. //     using-   , // ,    . using namespace so_5; using namespace std; using namespace chrono_literals; //       email-. struct check_request { //   . string email_file_; //     . mbox_t reply_to_; }; //  ,      . enum class check_status { safe, suspicious, dangerous, check_failure, check_timedout }; //        email. //     ,     . //      ,    //   . struct check_result { string email_file_; check_status status_; }; 

It turns out that when we need to check email, we send the message check_request to a certain mbox . In this message, the file name and the return address are sent to where the test result should be sent. Accordingly, the next step we need to determine where exactly the check_request messages will be sent.

You can, of course, create a single agent that receives all the check_request messages and processes them independently. But such an agent would very quickly become a bottleneck. Therefore, we will make it so that we have one agent-manager, who receives the check_request messages and creates an analyzer agent for each received message. It is the analyzer-agent that will be engaged in checking email, and the agent-manager will act as a factory of analyzer-agents.

Immediately, you can write the simplest version of the agent manager:
 // ,       email_analyzer. class analyzer_manager final : public agent_t { public : analyzer_manager( context_t ctx ) : agent_t( ctx ) { //    final,      //   .   final  ,    //     so_define_agent(),     //  . so_subscribe_self() //     ,    , //  . .event( &analyzer_manager::on_new_check_request ); } private : void on_new_check_request( const check_request & msg ) { //      . //        -. // .. SObjectizer Environment ,    // -     ,  //      -. introduce_child_coop( *this, [&]( coop_t & coop ) { //       . coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ ); } ); } }; 

To process emails, we will need to register in the SObjectizer Environment an instance of the agent type analyzer_manager and somehow make its personal mbox (the so-called direct_mbox) accessible to all. The one who needs to check the email, sends the check_request message to this mbox, the message reaches the analyzer_manager, the email_analyzer agent will be created, and then everything as intended ...

Now you need to implement the email_analyzer agent, which will analyze emails. The simplest thing that comes to mind is an agent who performs all operations himself: i.e. loads content from a file, parses this content into its constituent parts (headers, body, attachments), analyzes all this and produces a conclusion.

In fact, the email_analyzer agent will need to define only its own implementation of the so_evt_start () method, which is automatically called for each agent after the agent successfully registers within the SObjectizer Environment. Therefore, the email_analyzer agent will look very simple:
 //      email-. //       , //        so_evt_start. class email_analyzer : public agent_t { public : email_analyzer( context_t ctx, //    email  . string email_file, //     . mbox_t reply_to ) : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to)) {} virtual void so_evt_start() override { try { //     . auto raw_data = load_email_from_file( email_file_ ); auto parsed_data = parse_email( raw_data ); auto status = check_headers( parsed_data->headers() ); if( check_status::safe == status ) status = check_body( parsed_data->body() ); if( check_status::safe == status ) status = check_attachments( parsed_data->attachments() ); send< check_result >( reply_to_, email_file_, status ); } catch( const exception & ) { //   -      //    email-   . send< check_result >( reply_to_, email_file_, check_status::check_failure ); } //    ,   , //   . so_deregister_agent_coop_normally(); } private : const string email_file_; const mbox_t reply_to_; }; 

So, we have very trivial implementations of the analyzer_manager and email_analyzer agents. Which, unfortunately, have several serious problems.

The first problem is that email_analyzer agents will not work in parallel. The fact is that when they are created, the dispatcher to which they should be attached is not specified. Therefore, these agents are automatically linked to the default dispatcher of the SObjectizer Environment, and this default dispatcher is single-threaded: i.e. he has only one working thread on which the events associated with the dispatcher agents are sequentially triggered.

Therefore, if we want email_analyzer agents to work independently of each other, we need to explicitly bind them to the appropriate type of dispatcher. In this case, a dispatcher with a pool of worker threads is well suited. Accordingly, someone must create an instance of such a dispatcher and someone must attach email_analyzer-s to this instance. Obviously, this someone is an analyzer_manager agent:
 class analyzer_manager final : public agent_t { public : analyzer_manager( context_t ctx ) : agent_t( ctx ) , analyzers_disp_( //  , ..     // ,      -. disp::thread_pool::create_private_disp( // ,    SObjectizer Environment //   .      //   . so_environment(), //       . //         //  , , thread::hardware_concurrency()  //   . 16 ) ) { so_subscribe_self() .event( &analyzer_manager::on_new_check_request ); } private : disp::thread_pool::private_dispatcher_handle_t analyzers_disp_; void on_new_check_request( const check_request & msg ) { introduce_child_coop( *this, //          //      (    //   ). analyzers_disp_->binder( disp::thread_pool::bind_params_t() ), [&]( coop_t & coop ) { //       . coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ ); } ); } }; 

Such a simple modification of analyzer_manager allowed us to get rid of the first problem. But there is still the second: the uncontrolled creation of an unlimited number of agents email_analyzer.

The current implementation of analyzer_manager works on the principle: received a check_email message with the name of the file to check, created the email_analyzer agent and forgot about everything. But, obviously, this option is not suitable for more or less high loads. If you immediately create 100,500 email_analyzer agents that will work on a pool of N workflows, then there will be nothing good except extra memory consumption. It is better to immediately limit the number of concurrent agents and create new ones after the previous ones are completed. Plus, keep a queue of tasks for processing, from which elements for new agents will be taken.

Therefore, once again, we modify our analyzer_manager: add a queue of requests and a limit on the number of concurrent agents to it.
 class analyzer_manager final : public agent_t { //      ,     //     . struct try_create_next_analyzer : public signal_t {}; //        ,   //    . struct analyzer_finished : public signal_t {}; public : analyzer_manager( context_t ctx ) : agent_t( ctx ) , analyzers_disp_( disp::thread_pool::create_private_disp( so_environment(), 16 ) ) { so_subscribe_self() .event( &analyzer_manager::on_new_check_request ) //     -   , //   -  . .event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer ) .event< analyzer_finished >( &analyzer_manager::on_analyzer_finished ); } private : const size_t max_parallel_analyzers_{ 16 }; size_t active_analyzers_{ 0 }; disp::thread_pool::private_dispatcher_handle_t analyzers_disp_; list< check_request > pending_requests_; void on_new_check_request( const check_request & msg ) { //     :    //    ,      //    . //   -     . pending_requests_.push_back( msg ); //    . send< try_create_next_analyzer >( *this ); } void on_create_new_analyzer() { //          //    . if( active_analyzers_ >= max_parallel_analyzers_ ) return; lauch_new_analyzer(); //         // ,    . if( !pending_requests_.empty() && active_analyzers_ < max_parallel_analyzers_ ) send< try_create_next_analyzer >( *this ); } void on_analyzer_finished() { //  ,    . --active_analyzers_; //  ,    ,  . if( !pending_requests_.empty() ) lauch_new_analyzer(); } void lauch_new_analyzer() { introduce_child_coop( *this, analyzers_disp_->binder( disp::thread_pool::bind_params_t() ), [this]( coop_t & coop ) { coop.make_agent< email_analyzer >( pending_requests_.front().email_file_, pending_requests_.front().reply_to_ ); //     ,    //  .      , //      analyzer_finished. coop.add_dereg_notificator( //    ,      . [this]( environment_t &, const string &, const coop_dereg_reason_t & ) { send< analyzer_finished >( *this ); } ); } ); //   ,    . ++active_analyzers_; //         . pending_requests_.pop_front(); } }; 

In principle, we got a more or less normal solution that could be considered satisfactory. If it were not for one "but."

This "but" is that although we have the opportunity to run several analyzer agents in parallel work, parallelization will turn out so-so. If, say, five agents start at the same time, then all five will immediately begin I / O operations and while these operations will be performed, no one can do anything else. Then I / O operations will end and all five agents will begin to analyze the data read from the disk. This will take the processor. This could be used to start I / O operations for the next few analyzer agents. But we cannot do this while the first five agents are busy with their work.

This problem can be solved by removing the operation from email_analyzer I / O. Instead of loading the data from the file itself, the email_analyzer agent can delegate this task to a special IO agent. Those. The email_analyzer agent starts, sends a message to an IO agent, and then receives the result of an I / O operation in the form of a response message. Thereby enabling another email_analyzer to do its part (send a message to an IO agent or process a response message from an IO agent). But the conversation about how it will look and how good this decision will be will be continued in the next article.

In the meantime, you can show one important opportunity that we received in our current implementation of the agent manager with its waiting list: we can easily control the waiting time for requests in this list.

Indeed, the operation of checking the letter will surely have some reasonable limits on the waiting time for an answer. And if during this time it was not possible to assess security, then, most likely, it will not be necessary to try to do it. Based on this, we can easily modify the agent manager so that he throws out from the waiting list those requests that have spent waiting too long (for example, more than 10 seconds). To do this, use a periodic message that will come to the manager twice a second. Having received this message, the manager will run through the waiting list and drop those requests that have been waiting for more than 10 seconds. The approach, of course, is not very accurate, but it is very simple and reliable:
 class analyzer_manager final : public agent_t { struct try_create_next_analyzer : public signal_t {}; struct analyzer_finished : public signal_t {}; //          //    . struct check_lifetime : public signal_t {}; //  ,        //   .        //      . using clock = chrono::steady_clock; struct pending_request { clock::time_point stored_at_; check_request request_; }; public : analyzer_manager( context_t ctx ) : agent_t( ctx ) , analyzers_disp_( disp::thread_pool::create_private_disp( so_environment(), 16 ) ) { so_subscribe_self() .event( &analyzer_manager::on_new_check_request ) .event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer ) .event< analyzer_finished >( &analyzer_manager::on_analyzer_finished ) //        -. .event< check_lifetime >( &analyzer_manager::on_check_lifetime ); } //     ,    . virtual void so_evt_start() override { //       timer_id, //     . check_lifetime_timer_ = send_periodic< check_lifetime >( *this, 500ms, 500ms ); } private : const size_t max_parallel_analyzers_{ 16 }; size_t active_analyzers_{ 0 }; disp::thread_pool::private_dispatcher_handle_t analyzers_disp_; //        . const chrono::seconds max_lifetime_{ 10 }; //      check_lifetime. timer_id_t check_lifetime_timer_; list< pending_request > pending_requests_; void on_new_check_request( const check_request & msg ) { //     . pending_requests_.push_back( pending_request{ clock::now(), msg } ); send< try_create_next_analyzer >( *this ); } void on_create_new_analyzer() { if( active_analyzers_ >= max_parallel_analyzers_ ) return; lauch_new_analyzer(); if( !pending_requests_.empty() && active_analyzers_ < max_parallel_analyzers_ ) send< try_create_next_analyzer >( *this ); } void on_analyzer_finished() { --active_analyzers_; if( !pending_requests_.empty() ) lauch_new_analyzer(); } void on_check_lifetime() { //         ,  //  . while( !pending_requests_.empty() && pending_requests_.front().stored_at_ + max_lifetime_ < clock::now() ) { //     email- . send< check_result >( pending_requests_.front().request_.reply_to_, pending_requests_.front().request_.email_file_, check_status::check_timedout ); pending_requests_.pop_front(); } } void lauch_new_analyzer() { introduce_child_coop( *this, analyzers_disp_->binder( disp::thread_pool::bind_params_t() ), [this]( coop_t & coop ) { coop.make_agent< email_analyzer >( pending_requests_.front().request_.email_file_, pending_requests_.front().request_.reply_to_ ); coop.add_dereg_notificator( [this]( environment_t &, const string &, const coop_dereg_reason_t & ) { send< analyzer_finished >( *this ); } ); } ); ++active_analyzers_; pending_requests_.pop_front(); } }; 

Perhaps, at this point, you can stop in order to preserve a reasonable amount of article. In subsequent articles, we will continue to consider this example and describe more complex implementations of agents, demonstrating some specific features of SObjectizer.

In the meantime, it can be noted that in the examples shown, we have already come up against one of the most important problems faced by a developer using Actorl Model: overload protection.

This problem occurs, for example, when there are too many agents on the system so that their events can be managed normally. So, if we allow email_analyzer agents to be created without limiting their number, then at one fine moment we may find ourselves in a situation where several thousand such agents wait for their turn to handle the event and wait a very long time (the bill can go for minutes and even tens of minutes per most pathological cases). In this article, we have shown one of the most effective ways to solve this manifestation of the overload problem: limiting the number of agents and creating new agents only as suitable opportunities appear for this (as old agents are destroyed).

There are other problems with overload. For example, the occurrence of such a number of messages that the application does not have time to process in a reasonable time. This is also a very unpleasant problem and SObjectizer provides some tools to deal with it. But we will touch on this issue in more detail in one of the following articles.

In addition to the problem of overloads, there is one more problem inherent in systems built on actors / agents: the complexity of the visibility of what is happening in the application. This is when there are 100,500 agents in the application, each of which, it seems, works correctly, but it’s not easy to understand whether the entire application is working properly. We will also touch upon this question, but in subsequent articles.

In the meantime, we hope that the examples and arguments given in this article have been understood. Well, if something remains unclear, then we will be happy to answer questions in the comments.

Source codes for the examples shown in the article can be found in this repository .

Source: https://habr.com/ru/post/306858/


All Articles