📜 ⬆️ ⬇️

SObjectizer: from simple to complex. Part III

In the next article about SObjectizer, we will continue to follow the evolution of a simple at first agent, which becomes more and more complicated as it develops. Consider how to deal with deferred messages in which we are no longer interested. And we will use some functionality of hierarchical finite automata.



In the previous article, we stopped at the fact that we have an agent email_analyzer, which can be considered more or less reliably solving its task. However, he himself, sequentially, performs three stages of checking email: first he checks the headers, then the contents, then the attachments.


Most likely, each of these operations will not be exclusively CPU-bound. It is much more likely that after isolating some values ​​from the fragment being checked (for example, from letter headers), you will need to make a request somewhere to check the validity of this value. For example, a query in the database in order to check whether the name of the sending host is blacklisted. While this request will be executed, it would be possible to perform some other operation, for example, to parse the contents of the text of the letter into separate key phrases, so that they can be checked using a dictionary of spam markers. Or check if there are archives in attachments and initiate antivirus check. In general, it makes sense to parallelize the analysis of email.


Let's try to use individual agents for each operation. Those. You can write agents like:


class email_headers_checker : public agent_t { public : struct result { check_status status_ }; /*    */ email_headers_checker( context_t ctx, ... /* -  */ ) {...} virtual void so_evt_start() override { ... /*      */ } ... /* -   */ }; class email_body_checker : public agent_t {...}; class email_attachment_checker : public agent_t {...}; 

Each such agent will perform operations specific to its operation, and then send the result email_analyzer as a message. Our email_analyzer will need to create instances of these agents at home and wait for messages from them with the results of the analysis:


 void on_load_succeed( const load_email_succeed & msg ) { try { auto parsed_data = parse_email( msg.content_ ); introduce_child_coop( *this, // -checker-      // thread-pool-,     //   . disp::thread_pool::create_disp_binder( "checkers", disp::thread_pool::bind_params_t{} ), [&]( coop_t & coop ) { coop.make_agent< email_headers_checker >( so_direct_mbox(), parsed_data->headers() ); coop.make_agent< email_body_checker >( so_direct_mbox(), parsed_data->body() ); coop.make_agent< email_attach_checker >( so_direct_mbox(), parsed_data->attachments() ); } ); } catch( const exception & ) {...} } 

Those who carefully read the previous articles, the phrase "wait for messages from them" should have been alerted. Waiting without time limit is not good, it is a direct way to get in vain dangling in the system and not doing anything agent. Therefore, when waiting for responses from checkers, it makes sense for us to do the same as when waiting for the result of an IO operation: to send ourselves some sort of deferred signal, having received which we will understand that it is no use waiting any longer. Those. we would have to write something like:


 //    email_analyzer    . class email_analyzer : public agent_t { //     ,    //   IO-    . struct io_agent_response_timeout : public signal_t {}; //     ,    //     email-. struct checkers_responses_timeout : public signal_t {}; ... virtual void so_evt_start() override { ... /*   IO- */ //      -    IO-. send_delayed< io_agent_response_timeout >( *this, 1500ms ); } ... void on_load_succeed( const load_succeed & msg ) { ... /*     checker- */ //     -    -checker-. send_delayed< checkers_responses_timeout >( *this, 750ms ); } ... void on_checkers_responses_timeout() { ... /*   . */ } }; 

However, having gone this way we will step on a rake: waiting for a response from the checkers, we can easily get a pending signal io_agent_response_timeout. After all, it has not been canceled. And when this signal comes, we will generate a negative response due to the supposedly existing I / O timeout, which is not. Let's try to get around this rake.


Often, developers who are not used to asynchronous messaging try to cancel the delayed signal. This can be done if you save the timer identifier when referring to send_periodic:


 //    email_analyzer    //  io_agent_response_timeout. class email_analyzer : public agent_t { struct io_agent_response_timeout : public signal_t {}; ... virtual void so_evt_start() override { ... /*   IO- */ //  ,      // send_periodic  send_delayed,   period //   0,     , //   . io_response_timer_ = send_periodic< io_agent_response_timeout >( *this, 1500ms, 0ms ); } ... void on_load_succeed( const load_succeed & msg ) { //   . io_response_timer_.reset(); ... /*     checker- */ //     -    -checker-. send_delayed< checkers_responses_timeout >( *this, 750ms ); } ... //       -  IO-. timer_id_t io_response_timer_; }; 

Unfortunately, this simple method does not always work. The problem is that the delayed signal can be sent to the email_analyzer agent just a moment before the email_analyzer agent resets the timer for this delayed signal. There's nothing to be done - the wonders of multithreading, they are.


The email_analyzer agent can enter on_load_succeed on the context of its working thread, it may even have time to enter the reset () call for the timer ... But then its thread will be forced out, the control will receive the SObjectizer timer thread, on which the delayed signal will be sent. After that, the management will again receive the email_analyzer () agent's working thread and the reset () method for the timer will cancel the signal that has already been sent. However, the signal is already in the agent's message queue, from which no one will throw it away - once the message has been queued up to the agent, it cannot be removed from there.


The worst thing about this situation is that such an error will occur sporadically. Because of that, it will be difficult to understand exactly what is happening and what exactly the error is. So it must be remembered that canceling a deferred message is not at all a guarantee that it will not be sent.


So, if you simply cancel the postponed message incorrectly, then what to do?


For example, you can use agent states. When email_analyzer waits for a response from an IO agent, it is in one state. When the response from the IO agent arrives, the email_analyzer agent changes to a different state in which it will wait for responses from checkers. Since in the second state, the email_analyzer to the io_agent_response_timeout signal is not signed, then this signal will simply be ignored.


With the introduction of states into the email_analyzer agent, we might get something like:


 //    email_analyzer   //  . class email_analyzer : public agent_t { struct io_agent_response_timeout : public signal_t {}; struct checkers_responses_timeout : public signal_t {}; // ,       IO-. state_t st_wait_io{ this }; // ,        checker-. state_t st_wait_checkers{ this }; ... virtual void so_define_agent() override { //        . //  ,    ,    //   –    state_t. st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) .event< io_agent_response_timeout >( &email_analyzer::on_io_timeout ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .event< checkers_responses_timeout >( &email_analyzer::on_checkers_timeout ); } ... }; 

However, in SObjectizer you can do even easier: you can set a time limit for the agent to be in a particular state. When this limit expires, the agent will be forcibly transferred to another state. Those. we can write something like:


 //    email_analyzer     //      . class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_io_timeout{ this }; state_t st_wait_checkers{ this }; state_t st_checkers_timeout{ this }; ... virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) //   . .time_limit( 1500ms, st_io_timeout ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .time_limit( 750ms, st_checkers_timeout ); } }; 

But just to limit the time spent in some state is not enough. We still need to take some action when this time expires. How to do it?


Use such a thing as a state entry handler. When the agent enters a specific state, the SObjectizer calls the input handler function in that state, if the user has assigned such a function. This means that at the entrance to st_io_timeout we can hang the handler, which sends a check_result with a negative result and terminates the agent:


 st_io_timeout.on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } ); 

We will hang the same handler on the input to st_checkers_timeout. And since actions inside these handlers will be the same, then we can put them in a separate agent method email_analyzer and specify this method as an input handler for both the st_io_timeout state and the st_checkers_timeout state:


 class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_io_timeout{ this }; state_t st_wait_checkers{ this }; state_t st_checkers_timeout{ this }; ... virtual void so_define_agent() override { ... st_io_timeout .on_enter( &email_analyzer::on_enter_timeout_state ); ... st_checkers_timeout .on_enter( &email_analyzer::on_enter_timeout_state ); }; ... void on_enter_timeout_state() { send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } }; 

But that's not all. Since we have touched on the topic of agent states and their capabilities, we can further develop it and refactor the email_analyzer code.


It is easy to see that a couple of actions are often duplicated in the code: sending a check_result message and deregistration of agent cooperation. Such duplication is not good, you should get rid of it.


In essence, the email_analyzer agent’s job is to end up with the agent in one of two states: either everything ended normally and a positive result should be sent, then shut down, or everything ended with an error, a negative result should be sent and, again did shut down the agent. So let's express it directly in the code using two agent states: st_success and st_failure.


 //    email_analyzer    //  st_success  st_failure. class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_wait_checkers{ this }; state_t st_failure{ this }; state_t st_success{ this }; ... virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) //   . .time_limit( 1500ms, st_failure ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .time_limit( 750ms, st_failure ); st_failure .on_enter( [this]{ send< check_result >( reply_to_, email_file_, status_ ); so_deregister_agent_coop_normally(); } ); st_success .on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::safe ); so_deregister_agent_coop_normally(); } ); }; ... //        . check_status status_{ check_status::check_failure }; }; 

This will allow us in the agent code to simply change the state to shut down the agent in one way or another:


 void on_load_failed( const load_email_failed & ) { st_failure.activate(); } void on_checker_result( check_status status ) { //        . if( check_status::safe != status ) { status_ = status; st_failure.activate(); } else { ++checks_passed_; if( 3 == checks_passed_ ) //   .     //  . st_success.activate(); } } 

But you can go even further. For the st_failure and st_success states, there is one general action that needs to be performed when entering any of these states - a call to so_deregister_agent_coop_normally (). And this is no accident, because both of these states are responsible for the completion of the agent. And if so, then we can use the nested states. Those. we will enter the st_finishing state, for which st_failure and st_success will be substates. When you enter st_finishing, so_deregister_agent_coop_normally () will be called. And at the entrance to st_failure and st_success - the corresponding message will only be sent.


Since the st_failure and st_success states are nested in st_finishing, then upon entering any of them, the input handler for st_finishing will be called first, and only then the input handler for st_failure or st_success will be called. It turns out that when we enter st_finishing, we deregister the agent, and then, upon entering st_failure or st_success, we send the message check_result.


If one of the readers feels uncomfortable with the mention of nested states, state entry handlers, restrictions on the time they are in a state, then it makes sense to get acquainted with one of the fundamental articles on hierarchical finite automata: David Harel, Statecharts: A visual formalism for complex systems. Science of Computer Programming . The states of agents in SObjectizer realize a fair amount of the features described there.


As a result of all these transformations, the email_analyzer agent will take the form shown below.


 //    email_analyzer,      //  email-    . class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_wait_checkers{ this }; state_t st_finishing{ this }; state_t st_failure{ initial_substate_of{ st_finishing } }; state_t st_success{ substate_of{ st_finishing } }; public : email_analyzer( context_t ctx, string email_file, mbox_t reply_to ) : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to)) {} virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) //  -   . .time_limit( 1500ms, st_failure ); st_wait_checkers .event( [this]( const email_headers_checker::result & msg ) { on_checker_result( msg.status_ ); } ) .event( [this]( const email_body_checker::result & msg ) { on_checker_result( msg.status_ ); } ) .event( [this]( const email_attach_checker::result & msg ) { on_checker_result( msg.status_ ); } ) //   -  . .time_limit( 750ms, st_failure ); //  ,     , //     . st_finishing.on_enter( [this]{ so_deregister_agent_coop_normally(); } ); st_failure.on_enter( [this]{ send< check_result >( reply_to_, email_file_, status_ ); } ); st_success.on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::safe ); } ); } virtual void so_evt_start() override { //      ,  //      . st_wait_io.activate(); //       IO-   //  email . send< load_email_request >( so_environment().create_mbox( "io_agent" ), email_file_, so_direct_mbox() ); } private : const string email_file_; const mbox_t reply_to_; //      ,   //      st_failure. check_status status_{ check_status::check_failure }; int checks_passed_{}; void on_load_succeed( const load_email_succeed & msg ) { //   ..    . st_wait_checkers.activate(); try { auto parsed_data = parse_email( msg.content_ ); introduce_child_coop( *this, // -checker-      // thread-pool-,     //   . disp::thread_pool::create_disp_binder( "checkers", disp::thread_pool::bind_params_t{} ), [&]( coop_t & coop ) { coop.make_agent< email_headers_checker >( so_direct_mbox(), parsed_data->headers() ); coop.make_agent< email_body_checker >( so_direct_mbox(), parsed_data->body() ); coop.make_agent< email_attach_checker >( so_direct_mbox(), parsed_data->attachments() ); } ); } catch( const exception & ) { st_failure.activate(); } } void on_load_failed( const load_email_failed & ) { st_failure.activate(); } void on_checker_result( check_status status ) { //        . if( check_status::safe != status ) { status_ = status; st_failure.activate(); } else { ++checks_passed_; if( 3 == checks_passed_ ) //   .     //  . st_success.activate(); } } }; 

Well, now it makes sense to look at the code of the resulting email_analyzer agent and ask yourself a simple but important question: was it worth it?


Obviously, the answer to this question is not so simple. But we will try to talk about this in the next article. In which we will touch on the topic of lessons that we have learned after more than ten years of using SObjectizer in the development of software systems.


Source codes for the examples shown in the article can be found in this repository .


')

Source: https://habr.com/ru/post/308084/


All Articles