The first article dealt with what a SObjectizer is.
In the second article, we began to talk about how agents might look, why, how, and where they evolve. Today we will continue this story, further complicating the implementation of demonstration agents. At the same time check the reliability of asynchronous messaging.
Last time we stopped on the fact that the operation of reading the contents of the file with the email should be left to a separate IO-agent. Let's do it and see what happens.
First, we need a
set of messages that will be exchanged between the IO agent and email_analyzer:
// . struct load_email_request { // . string email_file_; // . mbox_t reply_to_; }; // . struct load_email_succeed { // . string content_; }; // . struct load_email_failed { // . string what_; };
Secondly, we need to determine where the email_analyzer agent will send the load_email_request request message. We could go the usual way: when registering an IO agent, save it direct_mbox, then pass this mbox as a parameter to the analyzer_anager agent constructor, then a parameter to the designer of each email_analyzer agent ... Basically, if we would need to have several different IO- agents, then so should be done. But in our task one IO-agent is enough. That allows us to demonstrate the named mboxes.
')
A named mbox is created by invoking so_5 :: environment_t :: create_mbox (name). If you call create_mbox several times with the same name, it will always return the same mbox created when you first call create_mbox with this name.
An IO agent creates itself a named mbox and subscribes to it. The email_analyzer agents receive the same mbox when they need to send a load_email_request message. Thus, we get rid of the need to “drag” the mbox IO-agent through the analyzer_manager.
Now that we have decided on the interface between the IO agent and email_manager, we can make a
new version of the agent email_analyzer :
// . IO- IO-. class email_analyzer : public agent_t { public : email_analyzer( context_t ctx, string email_file, mbox_t reply_to ) : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to)) {} // , . // // . virtual void so_define_agent() override { // IO-. // . so_subscribe_self() .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ); } virtual void so_evt_start() override { // IO- // email . send< load_email_request >( // mbox IO- . so_environment().create_mbox( "io_agent" ), email_file_, // mbox. so_direct_mbox() ); } private : const string email_file_; const mbox_t reply_to_; void on_load_succeed( const load_email_succeed & msg ) { try { // . auto parsed_data = parse_email( msg.content_ ); auto status = check_headers( parsed_data->headers() ); if( check_status::safe == status ) status = check_body( parsed_data->body() ); if( check_status::safe == status ) status = check_attachments( parsed_data->attachments() ); send< check_result >( reply_to_, email_file_, status ); } catch( const exception & ) { // - // email- . send< check_result >( reply_to_, email_file_, check_status::check_failure ); } // , , // . so_deregister_agent_coop_normally(); } void on_load_failed( const load_email_failed & ) { // . // . send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } };
Now email_analyzer agents delegate IO operations to another agent who knows how to do this effectively. Accordingly, email_analyzer agents on their working threads will be engaged in either distributing tasks to an IO agent or processing email_analyzer responses. This gives us the opportunity to change the view on how many email_analyzer agents we can create and how many worker threads they need.
When each email_analyzer agent itself performed a synchronous IO operation, we needed to have as many worker threads in the pool as many parallel IO operations we wanted to allow. There was no point in creating much more email_analyzer agents than the number of worker threads in the pool. If there are 16 threads in the pool, and we allow 32 agents to exist at the same time, this will result in half of these agents just waiting for any of the worker threads to become free for them.
Now, after the IO operations are moved to another working context, it is possible, first, to reduce the number of worker threads in the pool. In their events, email_analyzer agents will perform mostly processor-intensive operations. Therefore, it makes no sense to create more workflows than there are available cores. So, if we have a 4-core processor, then we will need not 16 threads in the pool, but no more than 4.
Secondly, if IO operations take longer than processing email content, then we are able to create more email_analyzer agents than threads in the pool. It’s just that most of these agents will wait for the result of their IO operation. Although, if the email load time is comparable to or less than analyzing its contents, this item will lose its relevance and we will be able to create only 1-2-3 email_analyzer agents more than the number of threads in the pool. All these settings are easily made in one place - in the analyzer_manager agent. It is enough to change just a couple of constants in its code and see how the changes affect the performance of our solution. However, performance tuning is a separate big topic, to delve into which is now premature ...
So, we have another version of the agent email_analyzer, which fixes the problems of previous versions. Can we consider it acceptable?
Not.
The problem is that the resulting implementation cannot be considered reliable.
This implementation is designed for an optimistic scenario in which messages sent by us are never lost and always reach the addressee, and when they reach the addressee they are always processed. Then we always get the answer we need.
The harsh truth of life, however, is that when a system is built on asynchronous messaging between individual actors / agents, this same asynchronous exchange cannot be considered an absolutely reliable thing.
Messages may be lost. And that's fine .
Loss of messages can occur for various reasons. For example, the recipient agent just did not have time to subscribe to the message. Or there is no recipient agent at the moment. Or he has one, but his overload protection mechanism has worked (more on this in a later article). Either the agent is there and the message has even reached him, but the agent is in a state in which this message is not processed. Either the agent is there, the message reached him, he even began to process it, but during the processing some kind of application error occurred and the failed agent did not send anything back.
In general, communication between agents via asynchronous messages is like the interaction of hosts through the UDP protocol. In most cases, datagrams reach recipients. But sometimes they get lost on the road or even during processing.
The above means that load_email_request may not reach the IO agent. Or, the load_email_successed / load_email_failed response messages may not reach the email_analyzer agent. And what will we do in this case?
We will receive an email_analyzer agent that is present on the system, but does nothing. Does not work. Not going to die. And does not start email_analyzer to some other agent. If we are unlucky, then we may be faced with a situation where all the email_analyzer agents created by analyzer_manager will turn into such half-corpses that do nothing. After that, analyzer_manager will simply accumulate requests in its turn, and then throw them out after the timeout expires. But no useful work will be done.
How to get out of this situation?
For example, by controlling timeouts. We can either introduce control of the execution time of an IO operation by the email_analyzer agent (i.e., if there is no response for too long, then assume that the IO operation has failed). Alternatively, enter control over the execution time of the entire email analysis operation in the analyzer_manager agent. Either do both.
For simplicity, we restrict ourselves to counting the timeout of an IO operation
in the email_analyzer agent :
// . - IO-. class email_analyzer : public agent_t { // , // IO- . struct io_agent_response_timeout : public signal_t {}; public : email_analyzer( context_t ctx, string email_file, mbox_t reply_to ) : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to)) {} virtual void so_define_agent() override { so_subscribe_self() .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) // - IO-. .event< io_agent_response_timeout >( &email_analyzer::on_io_timeout ); } virtual void so_evt_start() override { // IO- // email . send< load_email_request >( so_environment().create_mbox( "io_agent" ), email_file_, so_direct_mbox() ); // - IO-. send_delayed< io_agent_response_timeout >( *this, 1500ms ); } private : const string email_file_; const mbox_t reply_to_; void on_load_succeed( const load_email_succeed & msg ) { try { auto parsed_data = parse_email( msg.content_ ); auto status = check_headers( parsed_data->headers() ); if( check_status::safe == status ) status = check_body( parsed_data->body() ); if( check_status::safe == status ) status = check_attachments( parsed_data->attachments() ); send< check_result >( reply_to_, email_file_, status ); } catch( const exception & ) { send< check_result >( reply_to_, email_file_, check_status::check_failure ); } so_deregister_agent_coop_normally(); } void on_load_failed( const load_email_failed & ) { send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } void on_io_timeout() { // , -. send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } };
This email_analyzer option can already be considered quite acceptable. In its code, refactoring with the introduction of a couple of operations (send and so_deregister_agent_coop_normally) into a separate auxiliary method is suggested. But this was not done on purpose, so that the code for each subsequent version of the email_analyzer agent was minimally different from the code of the previous version.
And just to compare the two email_analyzer agent versions shown above, one particular feature that programmers who have been using SObjectizer for a long time in their daily work will become noticeable: the simplicity and clarity of the agent extension procedure. Did the agent need to respond to another event? So you need to add another subscription and another event handler. And since subscriptions are usually made in the same places, it is immediately clear where exactly to go and what exactly to rule.
SObjectizer does not impose any restrictions on where and how the agent writes its events, but following a simple agreement - subscriptions are made in so_define_agent (), or, in very simple cases, for final-classes of agents, in the constructor - which greatly simplifies life . You look into the code of a foreign agent or even the code of your agent, but written a few years ago, and you immediately know what you need to look to understand the behavior of the agent. Conveniently, though, to understand this convenience, you probably need to write and debug not one real agent, and not even two ...
However, let us return to the topic of agent reliability, which was raised above and because of which the next, sixth, version of the agent email_analyzer appeared:
the asynchronous message exchange mechanism between agents is not reliable and you need to live with it .
Here you need to make an important remark: it is wrong to say that the message delivery mechanism in SObjectizer is very well full of holes and allows itself to lose any messages when it pleases.
Messages in SObjectizer just do not get lost, every loss has its own reason. If the agent sends a message to itself, and the send function has completed successfully, the message will reach the agent. Unless the developer himself does not explicitly take any action instructing SObjectizer to throw out this message in a specific case (for example, the developer does not sign the agent to the message in one of the states or uses limit_then_drop to protect against overload).
So, if the developer himself does not allow SObjectizer to throw out certain messages in certain situations, then the message that the agent sent to itself must reach the agent. Therefore, in the code shown above, we quietly sent to ourselves the deferred messages without fear that these messages would be lost somewhere along the way.
However, when a message is sent to another agent, the situation changes somewhat. There are cases when we are confident in the success of delivery. For example, if we ourselves implemented the recipient agent, and even included it in the same cooperation in which the sender agent lives.
But if the recipient agent is not written by us, it is created and destroyed as part of someone else’s cooperation, if we do not control its behavior, if we do not know how the agent is protected from overloads, how it behaves in any given situation, then we have confidence is the same as when sending a datagram via the UDP protocol: if everything is normal, then most likely the datagram will reach the sender, and then we will receive a response. If everything is fine. But if not?
We have come to an interesting point: developing software on actors / agents due to the relative unreliability of asynchronous messaging may look more laborious than using approaches based on the synchronous interaction of objects in a program.
Sometimes it is. But in the end, in our opinion, the software is more reliable, because in the code, involuntarily, one has to handle a lot of emergency situations associated with both the loss of messages and variations in the times of message delivery and processing.
Suppose email_analyzers access the io_agent via a synchronous request, not an asynchronous message, and io_agent informs about failures when performing an IO operation by throwing exceptions. For a long time, everything will work fine: email_analyzer synchronously requests io_agent and receives, in response, either the contents of the email, or an exception. But at one point, somewhere inside io_agent, a hidden bug manifests itself, and the synchronous call just hangs. No answer, no exception, just a hang. Accordingly, one email_analyzer hangs first, then one more, then another, etc. As a result, the entire application is suspended.
While asynchronous messaging agent io_agent can hang somewhere in the guts. But this will not affect email_analyzer agents, who can easily track the expiration of the request timeout and send a negative result. That is, even if there are failures in one part of the application, other parts of the application will be able to continue their work, even if this work consists in generating a stream of negative responses. Indeed, the very fact of this flow can become an important symptom and prompt the observer that something went wrong in the application.
Incidentally, on the topic of observing the work of an application written on agents.
Over the years of working with SObjectizer, we have the conviction that the ability to see what is happening inside the application built on the actors / agents is very important. In principle, this was shown even in this article. If you take the fifth version of email_analyzer without timeout control and try to start it, you can see how the processing of requests slows down until it stops at all. But how exactly to understand what is the matter?
A good hint could be given by how many agents email_analyzer is created at the moment and what each of them is doing. This requires the ability to monitor what is happening inside the application. This is exactly why Erlang and its platform are so appreciated: there you can connect to a working Erlang VM, see a list of Erlang processes, their parameters, etc. But in Erlang, this is possible due to the fact that the Erlang application is running under the control of Erlang VM.
In the case of a native C ++ application, the situation is more complicated. SObjectizer has added tools for monitoring what is happening inside the SObjectizer Environment (although these tools still provide only the most basic functionality). So, with their help in the course of
our demo application, you can get the following information:
mbox_repository / named_mbox.count -> 1
coop_repository / coop.reg.count -> 20
coop_repository / coop.dereg.count -> 0
coop_repository / agent.count -> 20
coop_repository / coop.final.dereg.count -> 0
timer_thread / single_shot.count -> 0
timer_thread / periodic.count -> 1
disp / ot / DEFAULT / agent.count -> 3
disp / ot / DEFAULT / wt-0 / demands.count -> 8
disp / tp / analyzers / threads.count -> 4
disp / tp / analyzers / agent.count -> 16
disp / tp / analyzers / cq / __ so5_au ... 109 __ / agent.count -> 1
disp / tp / analyzers / cq / __ so5_au ... 109 __ / demands.count -> 0
disp / tp / analyzers / cq / __ so5_au ... 124 __ / agent.count -> 1
disp / tp / analyzers / cq / __ so5_au ... 124 __ / demands.count -> 0
...
disp / tp / analyzers / cq / __ so5_au ..._ 94 __ / agent.count -> 1
disp / tp / analyzers / cq / __ so5_au ..._ 94 __ / demands.count -> 0
disp / ot / req_initiator / agent.count -> 1
disp / ot / req_initiator / wt-0 / demands.count -> 0
This exhaust monitoring information allows you to understand that there is a dispatcher with a thread pool called analyzers, which has 4 workflows. It is on this dispatcher in the example that the agents email_analyzer work. 16 agents are tied to the dispatcher, each of which constitutes a separate cooperation. And these agents have no applications. That is, there are agents, but there is no work for them. And this is a reason to understand why this happened.
Obviously, not always the low-level information that the SObjectizer Environment has will be useful to the application programmer. For example, in the example in question, a developer’s counter could be given by the number of agents for email_analyzer and the size of the list of requests in the analyzer_manager agent. But this is the application data, SObjectizer has no idea about them. Therefore, when developing an application on agents, the programmer will need to ensure that information outside the application is available that is most useful for assessing the health and viability of the application. Although this is already a big topic for another conversation.
Perhaps this is another article you can finish. In the next article we will try to show what can be done if we try to parallelize email analysis operations even more. For example, if you perform parallel operations analysis of the headers, body of the letter and attachments. And then what will become the agent code email_analyzer.
Source codes for the examples shown in the article can be found
in this repository .