📜 ⬆️ ⬇️

Learning to intercept unprocessed messages or an example of how SObjectizer is cluttered with new features ...

We are very pleased when new features are added to SObjectizer resulting from the prompts and / or wishes of SObjectizer users. Although it is not always easy . After all, on the one hand, we, as a team of developers and old users of SObjectizer, already have our own stereotypes about how SObjectizer is commonly used. And it is not always possible to immediately appreciate the “fresh look from the outside”, to understand what the user really wants to see in the framework and why he is not satisfied with the available means. On the other hand, SObjectzer is not such a small framework, adding new features requires some caution. It is necessary that the new functionality does not conflict with existing features. And, all the more so that after adding something new, something that already exists and has been working for a long time does not break. Plus, we have a point about maintaining compatibility between versions of SObjectizer, so we are strongly opposed to major changes ...

In general, adding a new one to SObjectizer is always a pleasure in terms of increasing the capabilities of the framework and increasing its usability. But this is not always as pleasant and simple from the point of view of implementation.

Under the cut there is a small story about how one new feature was added to SObjectizer. Maybe someone from the readers will be interested to see how the old framework adapts to the requests of new users.

Preamble


So, it all started with the fact that one of the SObjectizer users, PavelVainerman , drew our attention to the fact that SObjectizer does not have ready-made convenient means for performing episodic one-time interactions between agents.
')
It turned out that I mean this. Suppose Agent A wants to send a request to Agent B and wants to receive a response message from Agent B. But at the same time, Agent A does not want to wait for a response longer than 5 seconds. A trivial “head-on” solution that immediately comes to mind may look like this:

//  -. struct request { const so_5::mbox_t reply_to_; //   . ... //  ,   .. }; //  -. struct reply { ... }; class A : public so_5::agent_t { // ,       -. struct reply_timed_out final : public so_5::signal_t {}; ... //  . void on_reply(mhood_t<reply> cmd) {...} void on_reply_timeout(mhood_t<reply_timed_out>) {...} ... // ,       B. void ask_something(const so_5::mbox_t & B_mbox) { //         -. so_subscribe_self().event(&A::on_reply); so_subscribe_self().event(&A::on_reply_timeout); //   .     mbox,   //   . so_5::send<request>(B_mbox, so_direct_mbox(), ...); //       ,   -. so_5::send_delayed<reply_timed_out>(*this, 5s); }; 

Unfortunately, this simple version is only a clear demonstration of the veracity of the aphorism that "any difficult task has a simple, easy to understand wrong solution." There are several problems here.

The first problem is related to the deferred message A :: reply_timed_out. If the response from Agent B did not come in time, then everything is fine with reply_timed_out. We receive it, process it and forget about it. But what will happen if the answer from Agent B came on time? What happens to reply_timed_out?

It will still come to Agent A. After all, no one has canceled reply_timed_out. So, as soon as the SObjectizer timer thread counts 5 seconds, the reply_timed_out message will be delivered to Agent A. And we will receive and process it despite the fact that we no longer need it. What is wrong. It would be correct to prevent the reply_timed_out message from getting to us after we received a reply from agent B.

The surest way to do this is to unsubscribe from reply_timed_out. Why this is the case is a topic for another big conversation. If someone is interested, you can separately talk on this topic. In the meantime, we confine ourselves to the fact that unsubscribing from a deferred message is a “reinforced concrete” solution to problems with a deferred message.

The second problem is that it is unlikely that Agent A will need to communicate in this way only with Agent B. Most likely Agent A exchanges request / reply messages with several agents at once. Accordingly, when request flies simultaneously to agents B and C, then agent A needs to somehow understand who the answer came from. Or whose answer was not received within 5 seconds.

The second problem is more or less conveniently solved by eliminating the use of Agent A's own mbox as the return address. It's easier to create a new mbox for each new interaction. And it is this new mbox that will be used to receive a response, and for a pending message for this particular request.

However, as soon as we introduce a new mbox, we must ensure that the mbox is removed after it is no longer needed. To do this, we must remove the subscriptions to this mbox. If subscriptions are not removed, then mbox will remain alive, and this will lead to a constant increase in memory consumption - we will create new mboxes for each new request, and these mboxes will not be deleted.

In general, if we consider these two problems, the simple solution will be transformed into a not very simple one:

 class A : public so_5::agent_t { // ,       -. struct reply_timed_out final : public so_5::signal_t {}; ... //       , //     ,    . void on_reply(const request_info & info, mhood_t<reply> cmd) {...} void on_reply_timeout(const request_info & info, mhood_t<reply_timed_out>) {...} ... // ,       . //       ,  . void ask_something(const request_info & info, const so_5::mbox_t & dest) { //    mbox      . const auto uniq_mbox = so_environment().create_mbox(); //       . auto subscriptions_dropper = [this, uniq_mbox] { so_drop_subscription<reply>(uniq_mbox); so_drop_subscription<reply_timed_out>(uniq_mbox); }; //         -. so_subscribe(uniq_mbox) .event([this, info, subscriptions_dropper](mhood_t<reply> cmd) { //  . subscription_dropper(); //   . on_reply(info, cmd); }) .event([this, info, subscriptions_dropper](mhood_t<reply_timed_out> cmd) { subscription_dropper(); on_reply_timeout(info, cmd); }); //   .      mbox,  //      . so_5::send<request>(B_mbox, uniq_mbox, ...); //       ,   -. so_5::send_delayed<reply_timed_out>(so_environment(), uniq_mbox, 5s); } }; 

It turns out is not as simple and compact as we would like. But that's not all. So, in this decision there is no exception safety. There is no explicit cancellation of a deferred message when it is no longer needed. But, more importantly, if Agent A wants to have more than one default state, as in the example above, but several states, in each of which he needs to react to messages differently, then everything will become even worse. Well, everything will be even worse if an exchange between A and B requires more than one response message, but several. For example, if successful_reply and failed_reply instead of reply, then the amount of work for the developer of agent A will increase significantly.

Why we did not face such a problem?


A small retreat to the side. When it became clear to us what PavelVainerman tells us , we ourselves were surprised. After all, the problem is really obvious. But why did we not encounter it ourselves? At least, they did not come across so often to pay attention to it and include a solution for this problem in SObjectizer.

Probably, there were two factors.

First, we quickly came to the ideas of the SEDA approach . There, the number of agents is small, stable ties are established between them, so there are no such problems there in principle.

Secondly, probably, a one-time one-time interaction is most often used between short-lived agents. And for an agent who lives only to handle a single operation, these problems are not relevant.

Anyway, it is impossible not to note the fact that as soon as new people start using your tool, it immediately turns out that they want to use the tool in a completely different way than you yourself used to do it.

What did we do in the end?


As a result, we expanded our add-on over SObjectizer called so_5_extra , adding support for the so-called. asynchronous operations . Through asynchronous operations, the above example can be rewritten as follows:

 class A : public so_5::agent_t { // ,       -. struct reply_timed_out final : public so_5::signal_t {}; ... //     , //     ,    . void on_reply(const request_info & info, mhood_t<reply> cmd) {...} void on_reply_timeout(const request_info & info, mhood_t<reply_timed_out>) {...} ... // ,       . //       ,  . void ask_something(const request_info & info, const so_5::mbox_t & dest) { //    mbox      . const auto uniq_mbox = so_environment().create_mbox(); //     . so_5::extra::async_op::time_limited::make<reply_timed_out>(*this) .completed_on(uniq_mbox, so_default_state(), [this, info](mhood_t<reply> cmd) { on_reply(info, cmd); }) .timeout_handler(so_default_state(), [this, info](mhood_t<reply_timed_out> cmd) { on_reply_timeout(info, cmd); }) .activate(5s); //   .      mbox,  //      . so_5::send<request>(B_mbox, uniq_mbox, ...); } }; 

Learn more about new asynchronous operations in so_5_extra here .

But today it’s not about how the asynchronous messages themselves are made. And what it took to do in SObjectizer for asynchronous messages to work in so_5_extra.

What was the problem with the implementation of time_limited asynchronous operations?


Two implementations of asynchronous operations are included in so_5_extra: time_unlimited, when no restrictions are imposed on the execution time of the operation, and time_limited, when the operation needs to be completed in the allotted time. Above, it was just about time_limited-operations, since with their implementation was one of the main snags.

The bottom line is that when we start a time_limited operation, we must receive and process a deferred message, which limits the time of an asynchronous operation. And with this "necessarily" just something was not all simple.

The fact is that one of the key features of SObjectizer are agent states . States allow agents to process different sets of messages in each of the states. Or process the same messages in different states in different ways. But there is a downside: if some message needs to be processed in all states, then you need to explicitly sign the handler of this message for each of the states. Those. write something like:

 class default_msg_handler_demo : public so_5::agent_t { //   . state_t st_first{this}, st_second{this}, st_third{this}; ... // ,       . void some_msg_default_handler(mhood_t<some_msg> cmd) {...} ... virtual void so_define_agent() override { ... //    " ". so_subscribe(some_mbox) .in(st_first).in(st_second).in(st_third) .event(&default_msg_handler_demo::some_msg_default_handler); ... } }; 

Naturally, this is not the best and most convenient solution.

By using the capabilities of hierarchical finite automata, you can make it simpler, more convenient, and more reliable:

 class default_msg_handler_demo : public so_5::agent_t { //   . //   . state_t st_parent{this}, //    ,   . st_first{initial_substate_of{st_parent}}, st_second{substate_of{st_parent}}, st_third{substate_of{st_parent}}; ... // ,       . void some_msg_default_handler(mhood_t<some_msg> cmd) {...} ... virtual void so_define_agent() override { ... //    " "    . so_subscribe(some_mbox) .in(st_parent) .event(&default_msg_handler_demo::some_msg_default_handler); ... } }; 

Now the “default” handler will be called regardless of which state the agent is in.

But, unfortunately, this approach requires that the agent was originally designed using hierarchical finite automata. It is unlikely that it would be convenient to use asynchronous operations from so_5_extra if they imposed such a strict requirement on users: they say, if you want to use asynchronous operations, you can create a parent state in your agent.

And it is not always possible to do this in principle. Suppose someone wrote you a library of agents in which there is a basic type of basic_device_manager. You make your own inheritance class my_device_manager and you need to use asynchronous operations in my_device_manager. If the developer has not done something like st_parent in basic_device_manager, then you will not add your own st_parent there.

In general, it was necessary to do something that would allow catching messages that were addressed to the agent, but which were not processed by the agent. Such messages are sometimes called deadletters .

What and how did we end up doing?


Deadletter handlers


We made it so that now the developer can hang his own handler on the message that was not processed by the “normal” handler. For example:

 class deadletter_handler_handler_demo : public so_5::agent_t { state_t st_first{this}, st_second{this}, st_third{this}; ... void deadletter_handler(mhood_t<some_msg> cmd) {...} ... void normal_handler(mhood_t<some_msg> cmd) {...} ... virtual void so_define_agent() override { ... //  ""    st_first. so_subscribe(some_mbox) .in(st_first).event(&deadletter_handler_demo::normal_handler); //   "" . so_subscribe_deadletter_handler(some_mbox, &deadletter_handler_demo::deadletter_handler); ... } }; 

Now, if the agent receives the some_msg message from the mailbox some_mbox in the st_first state, then normal_handler will be called to process the message. But if the agent is in any other state, then the deadletter_handler will be called to process this message.

This feature is just used time_limited-operations. When the operation is activated, deadletter_handler hangs on the message about the expiration of the timeout. And in whatever state the agent is at the time of arrival of this message, the message will be received and processed. As allows to complete asynchronous operation. Even in the case when the developer made a mistake and did not identify all the timeout handlers he needed.

Attractive idea that was not implemented


The first thought, which arose as soon as the problem of the deadletter handlers was formulated, was to provide each agent with a certain parental state. And all the other states to automatically become children of him. Those. there was an idea to force some kind of superstate into each agent. That just is and nothing to do with it :)

This idea was very attractive from the point of view of the current mechanism for storing and searching subscriptions (this mechanism is not so simple).

Also this idea is very beautiful from an ideological point of view. Hierarchical finite automata as they are.

But I had to give it up (maybe for a while?).

The main reason for the failure is that the state_t object is rather heavy. Depending on the compiler, standard library and compilation parameters, the state_t in 64-bit mode can occupy from 150 to 250 bytes. If forcibly add superstate to each agent, then the “weight” of each agent increases by one and a half to two hundred bytes. Just like that, out of the blue. Even if this agent does not need a super state at all.

There was, and there is, in fact, another reason. Superstate for each agent is too great an innovation for a SObjectizer to make it from the bay,
not weighing carefully all the consequences. I personally have a lot of suspicions about
that it is worth adding superstates to SObjectizer and they will be abused.

In general, the idea of ​​a superstate did not go to work on version 5.5.21. But the notch for memory remained. Perhaps she will still find her embodiment. If someone has thoughts on this, it would be interesting to hear and discuss.

Actual decision


From the idea of ​​a superstate refused, but the current mechanism for storing subscriptions did not want to change. Therefore, a solution was found that still required an additional state_t object. But he exists one for all and all agents refer to him.

Due to this, it was possible to use the same tools for registering deadletters handler-s and for their search. In fact, so_subscribe_deadletter_handler is nothing more than subscribing a message handler for a special, invisible state for the user . Well, the search for a deadletter handler for a message is just an ordinary search for a handler , but not for the current state of the agent, but for this special, invisible state. There, however, there are some additional actions for the case when the trace mode of the message delivery mechanism is enabled, but these are boring details.

Was everything so obvious and simple?


When I read this article before publication, I caught myself thinking that some sort of trivialism was being told. Well, everything seems to be simple and clear. Only now the path to this “simple and understandable” turned out to be far from fast, not direct and not obvious. If anyone is interested, then traces of the evolution of the idea of ​​asynchronous operations can be found in this mini-series of blog posts: # 1 , # 2, and # 3 . Although, as it turned out, even in the final post of this series, not a resultant solution was described. I had to stumble upon a serious personal miscalculation and break my head over how to prevent memory leaks in the presence of cyclic references between objects. But that's another story ...

A few words in conclusion


First thanks ...


I would like to thank everyone who helps us develop SObjectizer: to those who use SO-5 and express their thoughts and suggestions (special thanks here to PavelVainerman ), to those who have not yet used SO-5, but who help with advice and not only (great thanks, in particular, masterspline ), and simply those who are not too lazy to put +1 on the news about SObjectizer on various resources and asterisks on github-e :) Thank you all very much!

... and short plans for the near future


We are going to start work on the next version of SObjectizer, number 5.5.22, in the near future. The main new feature that we want to see in 5.5.22 is parallel state support for agents. Agents can already use advanced features of hierarchical states. Like: state nesting, shallow and deep-history for states, input-output handlers, time limits for staying in state. But what was not yet in SObjectizer was parallel states.

At one time we did not do them for some reason. But practice has shown that for some of the users, parallel states are necessary and make their lives easier. So we will do them. All interested persons are invited to discuss: any constructive considerations, and especially, examples from practice and personal experience will be very useful to us.

Well, in general it would be interesting to know what impressions you have about SObjectizer, what you like, what you don’t like, what I would like to have in SO-5, what's stopping you ... Well, of course, we are ready to answer any questions about SObjectizer .

Source: https://habr.com/ru/post/348680/


All Articles