📜 ⬆️ ⬇️

Mailboxes that are not mailboxes at all ...

When in the summer of 2016, the first article about the SObjectizer was created, we said that over time we would talk about the details of its implementation, so that interested readers could look under the hood. Today's article will be about the SObjectizer giblets. About the mechanism of mboxes ("mailboxes"), which is used to organize the interaction of actors (agents in our terminology).

Why are we talking about mboxes?


Because we ourselves are surprised at how many very similar questions this mechanism causes to those who undertake to study SObjectizer. It turned out that the thing that is well known, understandable and familiar to us, the developers of SObjectizer, is by no means the same for beginners. Well, if so, then let's try to figure out what mboxes are and how they work. And at the same time and try to make your own mbox.

Why do we need mboxes?


SObjectizer mailboxes are needed in order to organize interaction between agents. Communication between agents is built through asynchronous messages and these same messages need to be sent somewhere. The question arises: "Where exactly?"

In the classical Model of Actors, the recipient of the message is the recipient actor itself. Those. in order for actor A to send a message to actor B, actor A should have a link to actor B. There is no reference to the recipient actor - there is no way to send him a message. If you want to perform a 1: N distribution, then the sender must have links to all recipients. This is if we talk about the classic Model Actors.
')
However, we had another specificity (as usual, the bitie determines consciousness and we were repelled by the needs of the tasks that we had to solve and the tools that we had at our disposal).

First, we have C ++. Just because the link to Agent B is not transmitted to Agent A. If this is a normal link (or a regular bare pointer), then when Agent B is destroyed, Agent A will have a “dangling” link to B. Accordingly, instead of the usual links / pointers, you should use smart links / pointers. But a simple smart pointer is not good, because Agent B will not be deleted (and therefore the resources it owns will not be released) as long as Agent A has a smart pointer to Agent B.

Therefore, in C ++ we would have to use not just smart pointers, but some special smart proxy links. Agent A may have a proxy link to B, but at the same time B can be safely removed even though the proxy link for A continues. Moreover, A may try to send a message to an already non-existent agent B and this attempt should not lead to catastrophic consequences (such as damage to the “alien” memory or the collapse of the entire application, as is the case when accessing “dangling” links in C ++).

Secondly, our 1: N interaction was very common. Moreover, at the beginning it was generally the only way agents could interact. Therefore, we really didn’t want agents B and C, who needed to receive information from agent A, to be forced to first send links to themselves to agent A. And so that agent A had to maintain lists of agents who want to receive messages from A in 1: n.

As a result, we had the concept of a “mailbox”, which was created just to (a) be the cleverest proxy link that agents can use to communicate with each other, and (b) be a mechanism that simplifies the interaction in mode 1 : N.

In the presence of mboxes, agents send messages not directly to each other, but to mailboxes (mboxes). A message sent to mbox is delivered to those agents who have subscribed to messages from this mbox.

Thus, in order for Agent A to send a message to Agent B, you need to have an mbox that both agents know about. Agent A sends a message to this mbox, and Agent B subscribes to messages from this mbox. As can be seen in this small example:

#include <so_5/all.hpp> class A final : public so_5::agent_t { const so_5::mbox_t to_; public: A(context_t ctx, so_5::mbox_t to) : so_5::agent_t{std::move(ctx)}, to_{std::move(to)} {} virtual void so_evt_start() override { //    B. so_5::send<std::string>(to_, "Hello!"); } }; class B final : public so_5::agent_t { public: B(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event(&B::on_string); } private: void on_string(mhood_t<std::string> cmd) { std::cout << "Message: " << *cmd << std::endl; //   . so_deregister_agent_coop_normally(); } }; int main() { so_5::launch([](so_5::environment_t & env) { //      ,   //  mbox  . env.introduce_coop([&](so_5::coop_t & coop) { //  mbox,     //  A  B. const auto mbox = env.create_mbox(); //    ,    //  mbox  . coop.make_agent<A>(mbox); coop.make_agent<B>(mbox); }); }); return 0; } 

At the same time, that sending, that receiving messages in 1: N mode is no different from sending / receiving messages in 1: 1 mode. Here's how the example above will look like when agent A sends a message to agents B and C simultaneously:

 #include <so_5/all.hpp> class A final : public so_5::agent_t { const so_5::mbox_t to_; public: A(context_t ctx, so_5::mbox_t to) : so_5::agent_t{std::move(ctx)}, to_{std::move(to)} {} virtual void so_evt_start() override { //    B. so_5::send<std::string>(to_, "Hello!"); } }; class B final : public so_5::agent_t { public: B(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event(&B::on_string); } private: void on_string(mhood_t<std::string> cmd) { std::cout << "(B) Message: " << *cmd << std::endl; //   . so_deregister_agent_coop_normally(); } }; class C final : public so_5::agent_t { public: C(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event([](mhood_t<std::string> cmd) { //   ,    , //      B. std::cout << "(C) Message: " << *cmd << std::endl; }); } }; int main() { so_5::launch([](so_5::environment_t & env) { //      ,   //  mbox  . env.introduce_coop([&](so_5::coop_t & coop) { //  mbox,     //  A, B, C. const auto mbox = env.create_mbox(); //   ,    //  mbox  . coop.make_agent<A>(mbox); coop.make_agent<B>(mbox); coop.make_agent<C>(mbox); }); }); return 0; } 

How do mboxes work?


Different mboxes work differently :) Therefore, in order to tell how the most widely used types of mboxes work, you first need to talk about what mboxes are in general.

What are mboxes?


Multi-Producer / Multi-Consumer


Historically, this is the first type of mbox, which appeared in SObjectizer-5. Anyone can send a message to this mbox. Anyone can subscribe to messages from this mbox.

Multi-Producer / Single-Consumer


For a 1: 1 interaction case, MPSC-mbox can be used, to which anyone can send a message, but only one agent who owns an MPSC-mbox can subscribe to messages from an MPSC-mbox.

MPSC mboxes appeared in SObjectizer-5 some time after the start of active use of SObjectizer-5. When experience has shown that in cases where messages are addressed to one specific agent, using MPMC-mbox is ineffective. So we can say that MPSC-mbox is more a way to optimize the code, rather than some fundamentally different approach to the organization of interaction. In addition, the user cannot create MPSC-mboxes. The MPSC-mbox for each agent is automatically created by the SObjectizer.

Additional mboxes from the so_5_extra library


Over SObjectizer, an additional library so_5_extra was built containing components that we thought it would be unwise to add to the SObjectizer core. It includes several additional types of mboxes. For example:


Another interesting example of using mboxes is the shutdowner component from so_5_extra, in which mbox is used to determine when it is possible to shut down a large SObjectizer application correctly.

However, we will not consider in detail the mboxes from so_5_extra in this article.

How does the Multi-Producer / Single-Consumer mbox work?


So, mboxes are different, so they work in different ways. And we will begin to consider the details of the work with the simplest of them - MPSC-mbox.

If you do not take into account such specific things as message_limits (this is the mechanism for protecting agents from overloading) and msg_tracing (this is a way to look at how the message is delivered to the recipient), then MPSC-mbox works as a simple “semiconductor”: the message is sent and gives it to the receiving agent so that the recipient places the message in his / her queue of messages waiting to be processed.

Well, that is everything is very stupid here: I took the message from the sender and gave it to the recipient. Nothing more.

How does Multi-Producer / Multi-Consumer mbox work?


But with MPMC-mbox-ohm, the situation is somewhat more complicated (again, we do not take into account such things as message_limits and msg_tracing). Since there can be multiple recipients of messages, MPMC-mbox stores an associative container with subscribers. The key in this container is the message type identifier, and the element is the actual list of subscribers for messages of this type.

When someone sends a message of type M, MPMC-mbox searches in its associative container for a list of subscribers for messages of type M. If there is such a list, then MPMC-mbox follows this list and tries to send a message to each of the subscribers.

It was specifically said "trying to give", because there is still such a thing as delivery_filters (i.e. filters that allow or prohibit the delivery of a message to a subscriber depending on the content of the message). Before delivering a message to a subscriber agent, MPMC-mbox checks if the subscriber has delivery_filter. If so, the message is first given to the filter. And only if the filter allows delivery of the message to the agent, this message will be given to the agent.

In general, MPMC-mbox follows the list of subscribers for a specific type of message and, if delivery of a specific instance of a message to a subscriber is allowed, the message is given to the subscriber agent in order for the subscriber to put his turn on messages waiting to be processed.

What is common between regular MPMC and MPSC mboxes?


The regular MPMC and MPSC mboxes have one important unifying feature: mboxes do not have their own repository of messages sent to mboxes. Those. mboxes, at least, regular ones do not store messages. At all.

Therefore, questions such as " How many messages can the mbox store until it overflows and what will happen when it overflows? " Or " Will agent B receive message M if he subscribes to message M after message M has been sent? " MPMC- and MPSC-mboxes do not make sense. For these mboxes stupidly do not store messages inside themselves: messages are immediately transferred to those agents who are interested in messages. Alternatively, messages are ignored if there are currently no recipients for this type of message.

And for other types of mboxes, internal storage for sent messages is rather an exception to the rules than the norm. The fact is that work with mboxes is based on the push-principle: the sent message is “pushed” in mbox. And this is probably the only way for mbox to deliver a message to someone. Since no one jerks periodically mbox in order to check whether something new has appeared in the mbox. Those. no one, well, no one at all, does not work with the mbox in pull mode.

So, the dry residue: in general, mboxes do not store messages within themselves.

Complicate the world: agents do not have their own message queues


Faced with SObjectizer, developers quickly begin to understand that mboxes do not store messages, so there is no point in asking questions about the capacity of mboxes. But since the messages are not stored in mbox, but with agents, then questions about the capacity of the agent's message queue begin ...

And then the newcomers will have another revelation and, possibly, disappointment: in SObjectizer, agents, in general, do not have their own message queues.

Like this. Just not all :)

The fact is that dispatchers manage message queues for agents in SObjectizer. It is the dispatcher that provides the agent with a working context in which the agent will process its messages. And, as a result, it is the dispatcher who organizes the storage of messages waiting to be processed.

For example, there is a dispatcher of type one_thread (one of the most frequently used). In it, all agents attached to this dispatcher work on one single common working thread. And all messages for all agents are stored in one common message queue. The working thread gets the following message from this queue, gives it to the receiving agent, then takes the following, and so on.

Similarly, an active_group type dispatcher operates, in which a group of agents can be tied to one common working thread. And all agents on this working thread will use a common message queue.

The situation with thread_pool and adv_thread_pool dispatchers is more cunning. There you can set parameters for FIFO queues for agents. One of them is exactly which queue the agent will use. You can make the agent have its own queue, in which there will only be messages that are addressed to this agent. And it can be done in such a way that agents from one collaboration will share a common message queue.

Even more fun with dispatchers who support agent priorities. For example, the dispatcher prio_one_thread :: strictly_ordered. There, all agents with the same priority will have one common message queue. But for agents with different priorities, the message queues will be different.

In short, in the dry balance: in general, mboxes send messages to agents, and agents send messages to dispatchers, who already save messages in the appropriate queues. Therefore, again, in general, there are no repositories for messages for mboxes or agents.

And how difficult is it to make your own mbox?


If you do "according to all the rules", with the support of message_limits, delivery_filters, msg_tracing and other nuances, then it is quite difficult. Those who are interested can look, for example, in the implementation gals of the retained_msg mbox from so_5_extra , in order to see how scary it all may look :)

However, if your own mbox is done for a specific task, then everything may not be so scary. Let's make MPSC-mbox as a small example, which will prevent the agent from receiving too many messages. Well, let's say, if the message M2 arrives less than 250ms after the message M1, then it is thrown out. If 250ms or more has passed after M1, then M2 is delivered to the recipient.

Necessary explanations


So, let's try to make your own mbox under the conditional name anti-jitter-mbox. This will be an MPSC-mbox, which should be associated with some specific agent.

In order to simplify our life, we will not create a full own implementation of the MPSC-mbox. Instead, we will use a ready-made MPSC-mbox, which is already available for each agent. We simply demand that the anti-jitter-mbox constructor of ours should send the MPSC-mbox of the agent to which the anti-jitter-mbox should belong.

We need to define our own anti_jitter_mbox class, which should be the heir of the special class so_5 :: absctract_message_mbox_t . In our class we will have to override the pure virtual methods present in absctract_message_mbox_t. In SObjectizer versions 5.5. * These are the following methods:

id () . It must return a unique mbox ID. Since the delivery of the message will actually be performed by the MPSC-mbox agent, which will be passed to us in the constructor, we will return the ID of this particular MPSC-mbox. Those. here we will simply delegate work to the current MPSC-mbox.

subscribe_event_handler () . This method is called when the agent wants to subscribe to messages of type T. We will register type T in this method. We need this so that when a message of some type M arrives in the mbox we can check if the agent is subscribed to it. If signed, you can try to deliver the message (and, accordingly, you need to record the time of the last delivery). And if not signed - then the message must be ignored.

unsubscribe_event_handlers () . This method, in contrast to subscribe_event_handler (), is called when the agent wants to unsubscribe from T-type messages. In this method, we will cancel the T-type registration.

query_name () . This method should return the string name of the mbox. This method serves for debugging and diagnostic purposes. For example, a SObjectizer may tug this method when generating error messages.

type () . This method should return the mbox type: is mbox Multi-Producer / Multi-Consumer or is it Multi-Producer / Single-Consumer. This method is called by the SObjectizer to check whether these or other actions can be performed. For example, mutable messages can only be sent to MPSC-mboxes.

do_deliver_message () . This method is responsible for sending the message to the receiving agent. In this method we have to check if the type of the message being sent is registered with us. If not, the message is ignored. If it is registered and enough time has passed since the last delivery of the message, the message should be delivered to the recipient (and we fix the delivery time). The delivery itself is delegated to the current MPSC-mbox agent.

do_deliver_service_request () . This method is similar to do_deliver_message (), but it is called when agent A makes a synchronous request to agent B (that is, request_future or request_value is used instead of send_message). For simplicity, we will not support the functionality of synchronous requests for our anti-jitter-mbox.

set_delivery_filter () and drop_delivery_filter () . These methods are used to set and clear message delivery filters. Since delivery filters for MPSC-mboxes are not intended, we will not support this functionality in our example.

Explanation of the constancy of some methods abstract_message_mbox_t


In the example implementation, you will see that the do_deliver_message () and do_deliver_service_request () methods are declared as constant. But, since in do_deliver_message () we have to modify the internal state of our anti-jitter-mbox, then we have to mark this same state as mutable in the description of the mbox class.

This is a consequence of the ancient architectural miscalculation when forming the interface of the class abstract_message_mbox_t. When this class was formed many years ago, we did not think that someone would ever need to create their own types of mboxes.

When it turned out a year and a half or two ago that it was not only necessary, but sometimes very necessary, we were faced with a choice: break compatibility within the SObjectizer-5.5 branch or leave everything as it is and change the abstract_message_mbox_t interface in some future major release (like SObjectizer-5.6). Since we have a quirk about maintaining compatibility between releases within the same branch, we in SObjectizer-5.5 decided to leave everything as it is. Therefore, now when implementing your own mboxes, you need to reckon with the constancy of a number of abstract_message_mbox_t methods and use the mutable keyword.

Implementing your own anti-jitter-mbox


Well, now we can already look at what our own mbox will be.

Let's start with the data that our mbox will handle:

 using namespace std::chrono; using clock_type = steady_clock; class anti_jitter_mbox : public so_5::abstract_message_box_t { //        ,   //           // . struct data { //      . struct item { //      .  0 , //          . std::size_t subscribers_{0}; //    . //   ,       . std::optional<clock_type::time_point> last_received_{}; }; //   ,     //   . using message_table = std::map<std::type_index, item>; //   mutex    mbox-   // . std::mutex lock_; //  ,    . message_table messages_; }; //  mbox-. //  mbox,       . const so_5::mbox_t mbox_; //     "" . const clock_type::duration timeout_; //   mbox-.  SObjectizer-5.5    //  mutable, ..         // const-. mutable data data_; 

We need an up-to-date mbox, through which the delivery of messages to the recipient agent, the time threshold for cutting off the “extra” messages and the actual information about the types of messages and the time of their last receipt. Plus, we need mutex, since the mbox methods can be called on different working threads and we will have to provide thread-safety for our mbox.

By the way, just because of the provision of thread-safety in most methods we will have to capture the internal mutex of our mbox. In order to simplify your life, let's make an auxiliary template method, which will be responsible for capturing mutex and performing the actions we need under the captured mutex:

  template<typename Lambda> decltype(auto) lock_and_perform(Lambda l) const noexcept { std::lock_guard<std::mutex> lock{data_.lock_}; return l(); } 

In principle, its presence is not necessary. But I decided to use it for another reason: for ease of implementation, we will not bother with such a thing as exception safety. If during the execution of some actions we have an exception, then we just need to stop the work of the entire application. Just that lock_and_perform is marked as noexcept and provides us with this behavior - if lambda throws an exception, the C ++ run-time itself will call std :: terminate.

Well, now you can look at the actual implementation of mbox itself:

 public: // .    MPSC-mbox,  //      . anti_jitter_mbox( so_5::mbox_t actual_mbox, clock_type::duration timeout) : mbox_{std::move(actual_mbox)} , timeout_{timeout} {} //  ID mbox-.     ID  mbox-. so_5::mbox_id_t id() const override { return mbox_->id(); } //    . void subscribe_event_handler( const std::type_index & msg_type, const so_5::message_limit::control_block_t * limit, so_5::agent_t * subscriber ) override { lock_and_perform([&]{ //      .    //   ,     . auto & msg_data = data_.messages_[msg_type]; msg_data.subscribers_ += 1; //     mbox-. mbox_->subscribe_event_handler(msg_type, limit, subscriber); }); } //   . void unsubscribe_event_handlers( const std::type_index & msg_type, so_5::agent_t * subscriber ) override { lock_and_perform([&]{ //      . //    ,     . auto it = data_.messages_.find(msg_type); if(it != data_.messages_.end()) { auto & msg_data = it->second; --msg_data.subscribers_; if(!msg_data.subscribers_) //    ,    //       . data_.messages_.erase(it); //  mbox      . mbox_->unsubscribe_event_handlers(msg_type, subscriber); } }); } //   mbox-. std::string query_name() const override { return "<mbox:type=anti-jitter-mpsc:id=" + std::to_string(id()) + ">"; } //   mbox-.  ,    . so_5::mbox_type_t type() const override { return mbox_->type(); } //     . void do_deliver_message( const std::type_index & msg_type, const so_5::message_ref_t & message, unsigned int overlimit_reaction_deep ) const override { lock_and_perform([&]{ //       . //    ,      //    . auto it = data_.messages_.find(msg_type); if(it != data_.messages_.end()) { auto & msg_data = it->second; const auto now = clock_type::now(); // ,    . //      (..  last_received_ //  ),   . bool should_be_delivered = true; if(msg_data.last_received_) { should_be_delivered = (now - *(msg_data.last_received_)) >= timeout_; } //  - ,     mbox  //        // . if(should_be_delivered) { msg_data.last_received_ = now; mbox_->do_deliver_message(msg_type, message, overlimit_reaction_deep); } } }); } //    . void do_deliver_service_request( const std::type_index & /*msg_type*/, const so_5::message_ref_t & /*message*/, unsigned int /*overlimit_reaction_deep*/ ) const override { //  ,    so_5::exception_t   //  ,     SObjectizer-. SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support service requests"); } //    MPSC-mbox-  .   //   . void set_delivery_filter( const std::type_index & /*msg_type*/, const so_5::delivery_filter_t & /*filter*/, so_5::agent_t & /*subscriber*/ ) override { SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support delivery filters"); } void drop_delivery_filter( const std::type_index & /*msg_type*/, so_5::agent_t & /*subscriber*/ ) noexcept override { SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support delivery filters"); } }; 

, mbox-, . MPSC-mbox-:

 class ordinary_subscriber final : public so_5::agent_t { const std::string name_; public: ordinary_subscriber(context_t ctx, //  ,    . std::string name) : so_5::agent_t{std::move(ctx)} , name_{std::move(name)} { so_subscribe_self().event([&](mhood_t<std::string> cmd) { std::cout << name_ << ": signal received -> " << *cmd << std::endl; }); } // Mbox,      . auto target_mbox() const { return so_direct_mbox(); } }; 

anti-jitter-mbox:

 class anti_jitter_subscriber final : public so_5::agent_t { const std::string name_; const so_5::mbox_t anti_jitter_mbox_; public: anti_jitter_subscriber(context_t ctx, //  ,    . std::string name, //  ,     //  "" . clock_type::duration jitter_threshold) : so_5::agent_t{std::move(ctx)} , name_{std::move(name)} , anti_jitter_mbox_{ new anti_jitter_mbox{so_direct_mbox(), jitter_threshold}} { //     mbox. so_subscribe(anti_jitter_mbox_).event([&](mhood_t<std::string> cmd) { std::cout << name_ << ": signal received -> " << *cmd << std::endl; }); } // Mbox,      . auto target_mbox() const { return anti_jitter_mbox_; } }; 

:

 //       . void generate_msg_sequence( so_5::environment_t & env, const so_5::mbox_t & ordinary_mbox, const so_5::mbox_t & anti_jitter_mbox) { std::vector<milliseconds> delays{ 125ms, 250ms, 400ms, 500ms, 700ms, 750ms, 800ms }; for(const auto d : delays) { const std::string msg = std::to_string(d.count()) + "ms"; so_5::send_delayed<std::string>(env, ordinary_mbox, d, msg); so_5::send_delayed<std::string>(env, anti_jitter_mbox, d, msg); } } int main() { //  SObjectizer    . so_5::launch([](so_5::environment_t & env) { //    mbox-.      //     . so_5::mbox_t ordinary, anti_jitter; //    ,      //  mbox. env.introduce_coop([&](so_5::coop_t & coop) { ordinary = coop.make_agent<ordinary_subscriber>( "ordinary-mbox")->target_mbox(); anti_jitter = coop.make_agent<anti_jitter_subscriber>( "anti-jitter-mbox", 250ms)->target_mbox(); }); //     . generate_msg_sequence(env, ordinary, anti_jitter); //       . std::this_thread::sleep_for(1250ms); //    . env.stop(); }); return 0; } 

, anti-jitter-mbox- , mbox-:

 ordinary-mbox: signal received -> 125ms
anti-jitter-mbox: signal received -> 125ms
ordinary-mbox: signal received -> 250ms
ordinary-mbox: signal received -> 400ms
anti-jitter-mbox: signal received -> 400ms
ordinary-mbox: signal received -> 500ms
ordinary-mbox: signal received -> 700ms
anti-jitter-mbox: signal received -> 700ms
ordinary-mbox: signal received -> 750ms
ordinary-mbox: signal received -> 800ms 


.

Epilogue


, . , , :)

: SObjectizer so_5_extra . SObjectizer 5.5.20, so_5_extra 1.0.3. SObjectizer vcpkg . SObjectizer vcpkg install sobjectizer .

SObjectizer: 5.6, , , , , 5.5. SObjectizer-5.6 . , SObjectizer- , SObjectizer SObjectizer .

- - SObjectizer, . . , .

Source: https://habr.com/ru/post/344580/


All Articles