Once in SObjectizer-4, out of the box, the ability to build distributed applications was available. But it did not always work as well as we would like. As a result, in SObjectizer-5, we refused to support distribution in the core of
SObjectizer, but we refused (this issue is discussed in more detail
here ). They refused in favor of choosing a specific transport for a specific task, taking into account the peculiarities of this very task. Writing for this the appropriate binding, which will hide from the programmer the details of the work of the selected transport.
In this article we will try to talk about one such binding around
MQTT and
libmosquttio , through which the possibility of interaction between parts of a distributed application was realized.
What and why
A couple of years ago, we helped one team with the development of a prototype distributed measurement system. The prototype has earned and, as far as we know, it is still in demo mode. Unfortunately, the team had difficulty finding further funding and the project froze. Because of the “suspended” status of the project, we will be able to tell only what we can tell, without
names, attendances and passwords details.
So, there was the idea of ​​creating a distributed system for collecting some measurement information. It was planned to have a central server, where all collected information flows and where it is processed. And also some set (from several hundred to several thousand) “measuring stations”.
')
Initially, the central server distributes commands to collect information at the measuring stations. And already beyond the station themselves at any time can give information to the central server, while the central server at any time can issue new commands to the measuring stations.
To implement the filling stations, measuring stations decided to use C ++. Both because of the savings of resources of these stations, and because of the need to work with the system API in places.
In the implementation of the central server, it was possible to use Java and / or something more convenient and secure than C ++. Although within the framework of the prototype in C ++, a demo “server” was written in C ++, which supported several scenarios for collecting information from measuring stations, but without processing this information.
MQTT as a transport
So, there is a measuring station where a process written in C ++ turns, which should interact with the central server: receive commands from the server, give the results of command execution to the server, monitor connection with the server, reconnect to the server when the connection is broken, etc.
A natural question arose: what kind of transport will be used to communicate the central server with the measuring stations?
It must be said that the answer to this question was not sought for very long. We stopped at the protocol MQTT, which:
- has an official specification and there are implementations of this specification for different programming languages;
- very simple and lightweight. Those. even if it is necessary for some very specific conditions to write our own implementation of this protocol, it will not be difficult;
- makes it easy to exchange information in both directions;
- involves the use of a simple, but powerful and convenient model Publish / Subscribe.
In general, in our opinion, since MQTT was created for such tasks, it is a sin not to use it in such conditions.
But MQTT is transport. It was decided to frame the data in JSON. Fortunately, JSON is quite simple, understandable and lightweight. Again, for any more or less popular programming language there are libraries that allow you to work with JSON.
Thus, we are faced with the fact that in our C ++ code we have to make SObjectizer friends with MQTT and JSON. Here's how we did it and talk further.
What have we got?
We did a small add-on over SObjectizr and libmosquitto called
mosquitto_transport . This add-on takes over the task of interacting with the MQTT broker and provides the developer with a small API for publishing and subscribing the message.
The mosquitto_transport library is written in C ++ 14, runs under Linux, supports only a part of the MQTT capabilities (in particular, only QoS = 0 is used). In general, it implements the minimum that was required for the prototype.
What is the use of mosquitto_transport?
To use mosquitto_transport, a developer needs to take a few required steps, which we'll take a closer look at below.
The mosquitto_transport library places all its contents in the mosquitto_transport namespace. But such a long name is inconvenient to use in the examples. Therefore, in the following, the synonym mosqtt will be used, which is introduced as follows: namespace mosqtt = mosquitto_transport;
Step one. Initializing mosquitto_transport and libmosquitto
The mosquitto_transport base is libmosquitto, and libmosquitto needs to be explicitly initialized at the beginning of the program and explicitly deinitialized at the end of the program. Therefore, to work with mosquitto_transport, you need to create somewhere in the program an object of type lib_initializer_t. This object will initialize libmosquitto in its constructor, and in the destructor it will perform deinitialization. The link to lib_initializer_t will then need to be passed to the a_transport_manager_t agent constructor.
Usually in the program it looks like this:
int main() { mosqtt::lib_initializer_t mosqtt_lib; ... }
Step two. Run the a_transport_manager_t agent and get an instance_t instance
The whole flow of incoming and outgoing messages is serviced by the special agent a_transport_manager_t. The developer himself must create an agent of this type in his program and register this agent as part of some kind of cooperation.
In addition to actually creating and registering the a_transport_manager_t agent, you need to do one more important thing: take an instance of an object of a special type instance_t from the agent. This instance_t will be needed to publish and receive messages.
If there are only a few agents in the application, then all of them can be put into one cooperation with the a_transport_manager_t agent:
int main() { ... mosqtt::lib_initializer_t mosqtt_lib; ... so_5::launch([&](so_5::environment_t & env) { // , // . env.introduce_coop([&](so_5::coop_t & coop) { // . auto transport = coop.make_agent<mosqtt::a_transport_manager_t>(...); // . coop.make_agent<first_app_agent>(..., transport->instance(), ...); coop.make_agent<second_app_agent>(..., transport->instance(), ...); coop.make_agent<third_app_agent>(..., transport->instance(), ...); }); }); }
But in more complex cases, when application agents are more and / or application agents can appear and disappear as the application runs, it makes sense to create a separate cooperation with a_transport_manager_t and link this agent to its own dispatcher. In this case, it may be reasonable to allocate the creation of a_transport_manager_t into a separate auxiliary function:
mosqtt::instance_t make_transport( so_5::environment_t & env, mosqtt::lib_initializer_t & mosqtt_lib, ... ) { mosqtt::instance_t instance; env.introduce_coop(
This allows you to use make_transport, for example, as follows:
int main() { ... mosqtt::lib_initializer_t mosqtt_lib; ... so_5::launch([&](so_5::environment_t & env) { // MQTT-. auto mqtt = make_transport(env, mosqtt_lib, ...); // . env.introduce_coop([&](so_5::coop_t & coop) { coop.make_agent<first_app_agent>(..., mqtt, ...); coop.make_agent<second_app_agent>(..., mqtt, ...); coop.make_agent<third_app_agent>(..., mqtt, ...); }); // . env.introduce_coop(...); ... // . }); }
A remarkable feature of the a_transport_manager_t agent is that it has a number of event handlers marked as thread-safe. This makes it possible to bind this agent to the adv_thread_pool type dispatcher. In this case, the agent will be able to process part of his events in parallel on several working threads at once. Although in practice we did not need this functionality.
Step three. Implementation of serialization / deserialization of application messages
The most important part when using mosquitto_transport is the implementation of the presentation format of the transmitted messages. For example, if we have a message like:
struct sensor_data_t { data_category_t category_; timestamp_t when_; int value_; ... };
then, to send it to the MQTT broker and then subscribers should convert the message to some format. For example, in XML, JSON or ProtoBuf. In order for this to occur automatically while working with mosquitto_transport, the developer must implement serialization / deserialization. In more detail this question is considered below, for now we will show the main idea.
First, the developer must decide how the data will be presented. For example, it may be a textual JSON representation. Or a binary ProtoBuf view. Maybe this and the other - mosquitto_transport does not interfere with this, but the developer himself must keep track of which message format he uses in which topics.
Secondly, for each of its format, the developer must determine the type tag. For example, if we want to use only JSON, then we define the following type of tag:
struct json_encoding {};
This tag type is required for parameterization of templates and selection of overloaded functions.
Third, the developer should define for each of his own form in the mosquitto_transport namespace a partial specialization for the encoder_t and decoder_t template classes.
To get something like:
// -, . struct my_encoding_tag {}; // / // mosquitto_transport. namespace mosquitto_transport { // . template<typename Msg> struct encoder_t<my_encoding_tag, Msg> { static std::string encode(const Msg & what) { ... // - . } }; // . template<typename Msg> struct decoder_t<my_encoding_tag, Msg> { static Msg decode(const std::string & payload) { ... // - . } }; } /* namespace mosquitto_transport */
Further, the operation of posting messages and subscribing to messages will need to specify the type tag as a marker, thanks to which mosquitto_transport will understand exactly how to serialize and deserialize messages.
Next steps. Posting and receiving messages
After the three required steps have been completed, you can perform message publishing and message subscriptions.
The publication is as follows:
mosqtt::topic_publisher_t<Tag_Type>::publish( // instance, a_transport_manager_t. instance, // MQTT-, // . "some/topic/name", // , . some_message_instance);
We say such operations that we want to publish a message to a specific topic and that this message should be serialized accordingly (the format defines the template Tag_Type parameter). For example, we could write this:
mosqtt::topic_publisher_t<my_encoding_tag>::publish( instance, "/sensor/1023/updates", sensor_data_t{ data_category_t::recent_value, current_timestamp, value});
In this case, mosquitto_transport would serialize data from a message of type sensor_data_t into the format defined for my_encoding_tag, and publish the resulting message in the topic "/ sensor / 1023 / updates".
In order to receive messages that someone publishes to topics of interest to us, you need to do something like a “double subscription”: you need to call a special method subscribe () and inside this method subscribe to messages from a special mbox. For example, if we want to receive all messages from the "/ sensor / + / updates" topics, then we can make the following call subscribe () inside one of our agents:
class my_agent_t : public so_5::agent_t { ... void so_define_agent() override { mosqtt::topic_subscriber_t<my_encoding_tag>::subscribe( // instance, a_transport_manager_t. instance_, // , . "sensor/+/updates", // , // . [this](so_5::mbox_t mbox) { // , // . so_subscribe(mbox).event(&my_agent_t::on_sensor_data); }); } ... void on_sensor_data( mhood_t<mosqtt::incoming_message_t<my_encodig_tag>> cmd) { ... } };
A double subscription is needed here because, in essence, we have two different actions:
- First, we need to force transport_manager to subscribe to a specific topic on the MQTT broker. Without this subscription, the broker simply will not deliver us new incoming messages. Actually, this is exactly what the subscribe () method does;
- secondly, when the MQTT broker delivers a message to us from a topic we have signed, then we need to deliver this message to the receiving agent. For this, the SObjectizer subscription mechanism should be used. That is why a new mbox is created inside subscribe (), which will be associated with a specific subscription to the MQTT broker. When the broker gives us a new message, this message will be sent to the mbox. Accordingly, the message will reach the agent who subscribed to this mbox.
Simplest Hello, World
To demonstrate the mosquitto_transport “in action”, we will make a typical Hello-Worlds example. The example includes two applications:
- demo_server_app. It connects to the local MQTT broker and subscribes to messages from the "/ client / + / hello" topics. When the message arrives, it responds with a reply message to the topic "/ client /: ID / replies";
- demo_client_app. It connects to the local MQTT broker, subscribes to the topic "/ client /: ID / replies" and publishes the message to the topic "/ client /: ID / hello". Finishes its work as soon as the "/ client /: ID / replies" comes the answer.
As the ": ID" in the names of the topics, an integer identifier will be used, which demo_client_app generates randomly.
Messages are transmitted in JSON format.
Sample sources can be found
in this repository . We will go through the main points of the example implementation.
Defining messages for “client” and “server” interactions
In the header file
demo / messages.hpp , definitions of data structures are made that act as messages for the interaction between the “client” and “server”. The client sends the server a message of the type client_hello_t, and the server sends a message of the type server_hello_t in response. Of course, in the form of MQTT messages, the representations of these messages are serialized in JSON. But in the program, the work goes exactly with client_hello_t / server_hello_t types.
In order for messages of the client_hello_t / server_hello_t types to be serialized and deserialized properly, we define a tag type with the name json_format_t. And we do a partial specialization of encoder_t and decoder_t in accordance with how we talked about this above. To work with JSON, we use a small add-on to RapidJSON called
json_dto . Therefore, inside our client_hello_t / server_hello_t types there are json_io template methods - this is just the specifics of json_dto.
So, that's what's in messages.hpp:
#pragma once #include <json_dto/pub.hpp> #include <mosquitto_transport/pub.hpp> namespace demo { // -, // . struct json_format_t {}; // . using publisher_t = mosquitto_transport::topic_publisher_t<json_format_t>; using subscriber_t = mosquitto_transport::topic_subscriber_t<json_format_t>; // . struct client_hello_t { int id_; std::string greeting_; std::string reply_topic_; template<typename Json_Io> void json_io(Json_Io & io) { using namespace json_dto; io & mandatory("id", id_) & mandatory("greeting", greeting_) & mandatory("reply_topic", reply_topic_); } }; // . struct server_hello_t { std::string greeting_; template<typename Json_Io> void json_io(Json_Io & io) { io & json_dto::mandatory("greeting", greeting_); } }; } /* namespace demo */ // - . namespace mosquitto_transport { template<typename Msg> struct encoder_t<demo::json_format_t, Msg> { static std::string encode(const Msg & msg) { return json_dto::to_json(msg); } }; template<typename Msg> struct decoder_t<demo::json_format_t, Msg> { static Msg decode(const std::string & json) { return json_dto::from_json<Msg>(json); } }; } /* namespace mosquitto_transport */
Separate explanations may deserve the definition of the names publisher_t and subscriber_t. They are needed so that you can write in the application code, for example:
publisher_t::publish(...)
instead:
mosqtt::topic_publisher_t<json_format_t>::publish(...)
Common Utility Code
Another header file,
demo / common.hpp , contains support functions that both the client and the server need.
The function make_loggers (), which we will not consider, creates objects for logging. We need two such objects. The first will be used by mosquitto_transport. The second is a demo application. In the example for loggers, the level of detail is set to spdlog :: level :: trace. Those. everything that allows you to observe almost everything that happens inside the application will be logged.
But the run_transport_manager () function, the code of which we will look at, serves just to create and launch the a_transport_manager_t agent. Returns the run_transport_manager () instance of the mosquitto_transport :: instance_t type, which is then required for publishing and subscribing:
auto run_transport_manager( so_5::environment_t & env, const std::string & client_id, std::shared_ptr<spdlog::logger> logger) { mosquitto_transport::instance_t mqtt; env.introduce_coop([&](so_5::coop_t & coop) { auto lib = coop.take_under_control( std::make_unique<mosquitto_transport::lib_initializer_t>()); auto tm = coop.make_agent<mosquitto_transport::a_transport_manager_t>( std::ref(*lib), mosquitto_transport::connection_params_t{ client_id, "localhost", 1883u, 60u}, std::move(logger)); mqtt = tm->instance(); }); return mqtt; }
Here a new cooperation is created, which contains a single agent - a_transport_manager_t. Connection to the MQTT broker (since this is a hello-vorlo example, the MQTT broker on localhost is enough) and the logger that should be used are transferred to this agent in the constructor. But the most interesting thing here is the creation of the lib_initializer_t object. It is created dynamically, and responsibility for its removal is shifted to cooperation with the agent a_transport_manager_t. Thus, we get that libmosquitto will be initialized immediately before creating the a_transport_manager_t agent, and it will be deinitialized after the a_transport_manager_t agent ceases to exist.
Main demo_server_app
In
demo_server_app, the entire listener logic is executed by the listener_t agent, implemented as follows:
class listener_t final : public so_5::agent_t { public: listener_t( context_t ctx, mosqtt::instance_t mqtt, std::shared_ptr<spdlog::logger> logger) : so_5::agent_t{std::move(ctx)} , mqtt_{std::move(mqtt)} , logger_{std::move(logger)} {} virtual void so_define_agent() override { demo::subscriber_t::subscribe(mqtt_, "/client/+/hello", [&](const so_5::mbox_t & mbox) { so_subscribe(mbox).event(&listener_t::on_hello); }); } private: mosqtt::instance_t mqtt_; std::shared_ptr<spdlog::logger> logger_; void on_hello(mhood_t<demo::subscriber_t::msg_type> cmd) { logger_->trace("message received from topic: {}, payload={}", cmd->topic_name(), cmd->payload()); const auto msg = cmd->decode<demo::client_hello_t>(); logger_->info("hello received. client={}, greeting={}", msg.id_, msg.greeting_); demo::publisher_t::publish( mqtt_, msg.reply_topic_, demo::server_hello_t{"World, hello!"}); } };
This can be said to be a typical small SObjectizer agent agent. In its overloaded method so_define_agent (), you subscribe to messages from topics that satisfy the "/ client / + / hello" mask. When such messages from the broker arrive, the on_hello method will be called to process them.
A special object is passed to the on_hello () method, which contains the name of the topic (in which the message was published) and the message itself exactly in the form in which it came from the MQTT broker. Those. in the JSON representation. But we want to receive this message as an object of type client_hello_t. For this, the decode () method is called, in which the body of the message is deserialized from JSON to client_hello_t.
Well, at the end of on_hello (), a response message is published. The name of the topic for the publication of the answer is taken from the client_hello_t.
Main demo_client_app
In
demo_client_app , the only agent of the client_t type performs the same work. But this agent is somewhat more complicated than listener_t in demo_server_app. Here is the client_t code:
class client_t final : public so_5::agent_t { public: client_t( context_t ctx, int id, mosqtt::instance_t mqtt, std::shared_ptr<spdlog::logger> logger) : so_5::agent_t{std::move(ctx)} , id_{id} , mqtt_{std::move(mqtt)} , logger_{std::move(logger)} { } virtual void so_define_agent() override { // . so_subscribe(mqtt_.mbox()).event(&client_t::on_broker_connected); demo::subscriber_t::subscribe(mqtt_, make_topic_name("/replies"), [&](const so_5::mbox_t & mbox) { so_subscribe(mbox).event(&client_t::on_reply); }); } private: const int id_; mosqtt::instance_t mqtt_; std::shared_ptr<spdlog::logger> logger_; std::string make_topic_name(const char * suffix) const { return std::string("/client/") + std::to_string(id_) + suffix; } void on_broker_connected(mhood_t<mosqtt::broker_connected_t>) { // , // . demo::publisher_t::publish( mqtt_, make_topic_name("/hello"), demo::client_hello_t{ id_, "Hello, World", make_topic_name("/replies")}); } void on_reply(mhood_t<demo::subscriber_t::msg_type> cmd) { logger_->trace("message received from topic: {}, payload={}", cmd->topic_name(), cmd->payload()); const auto msg = cmd->decode<demo::server_hello_t>(); logger_->info("hello received. greeting={}", msg.greeting_); // . logger_->warn("finishing"); so_environment().stop(); } };
Here the client_t agent handles two events. With the event that the on_reply () handler is subscribing to, everything should be clear - this is the processing of the message that comes from the MQTT broker.
Explains the on_broker_connected handler. The fact is that when the client starts, you need to choose the moment when you can publish the initial message in the topic "/ client /: ID / hello". Since if the client_t agent does it right after its start, the message will not go anywhere and will be lost, since there is no connection to the MQTT broker yet. Therefore, the client_t agent needs to wait until this connection is established.
A_transport_manager_t deals with the interaction with the MQTT broker. When a connection with a broker appears (that is, when not only a TCP connection is established with a broker, but also when exchanged with a broker by CONNECT / CONNACT messages), the a_transport_manager_t agent sends the broker_connected_t signal to the special mbox. The client_t subscribes to this signal and when the signal arrives, the client_t publishes the client_hello_t message.
The results of the application
Running demo_server_app you can see approximately the following picture:

Here you can see how demo_server_app connected to the broker, how it made the subscription, how it received the incoming message and how it responded to it.
Running demo_client_app, you can see a picture of such a plan:

Here you can see how demo_client_app connected to the broker, how it made a subscription, how it sent its welcome message and how it received a message from demo_server_app in response.
A few words about the design decisions made
Above, we have shown what the
mosquitto_transport library we have at the moment looks like. Now you can try to say a few words about a number of design decisions that were the basis for implementation.
Small disclaimer: mosquitto_transport is itself a prototype
When we had the task of providing an opportunity for SObjectizer agent agents to communicate with the outside world through MQTT, we quickly became convinced that using widely used libraries such as
libmosquitto and
Paho is not convenient. With primitive C ++ wrappers over them, everything was just as sad.
The main problem, in particular, with libmosquitto, was that in libmosquitto it is supposed to do the processing of all incoming messages in the same callback in general. Whereas we need to have several independent agents in the application, each of which should receive only those messages in which he is interested.
Accordingly, it was necessary to make some layer that would be located between the application agents and libmosquitto. So that this layer hides from agents all the work with libmosquitto calls and callbacks. But what should this layer be?
Naturally, we did not have a ready answer. In mosquitto_transport we tried to find this answer. It turned out more or less normal, especially given the fact that the forces in mosquitto_transport were invested, quite frankly, very little. Most of the work required the support of wildcards in the names of topics in the subscription. Well, writing an understandable README, so that you yourself do not forget how to use mosquitto_transport.
Surely one could do better. In addition, there are serious white spots in the existing implementation. For example, support for QoS levels other than 0. Or performance issues that were not essential in the framework of the prototype under development.
So we ourselves consider the current implementation of mosquitto_transport as a fully working and debugged prototype. Which can be taken and tried. But which, for certain, it is necessary to finish a file for any other conditions.All the main work is done by a_transport_manager_t and one auxiliary working thread.
In mosquitto_transport, almost all the work is performed by the a_transport_manager_t agent. It creates an instance of mosquitto, it implements all the necessary callbacks, it launches the main event loop of libmosquitto on the context of a separate working thread using mosquitto_loop_start () , and then it stops this cycle through mosquitto_loop_stop .But in a_transport_manager_t there are several tricks that can be focused on.Delegating work from one worker thread to another
The main event processing loop of libmosquitto runs on the context of a separate working thread. This means that callbacks, such as mosquitto_log_callback, mosquitto_connect_callback, mosquitto_subscribe_callback, etc., are called on the context of this particular thread. But from these callbacks you need to access the data of the a_transport_manager_t agent. Which, in turn, works on another working thread (or even on several other threads in the case of adv_thread_pool-dispatcher).We did not protect the a_transport_manager_t agent data with some synchronization primitives (like std :: mutex). Instead, we send a message from the callback to the a_transport_manager_t agent. Which will already be processed on the main working context of the agent. Because of this, when processing a message, we don’t need to think about synchronizing access to data at all.
Here, for example, how it looks in code. Implement mosquitto_message_callback: void a_transport_manager_t::on_message_callback( mosquitto *, void * this_object, const mosquitto_message * msg ) { auto tm = reinterpret_cast< a_transport_manager_t * >(this_object); tm->m_logger->trace( "on_message, topic={}, payloadlen={}, qos={}" ", retain={}", msg->topic, msg->payloadlen, msg->qos, msg->retain ); so_5::send< message_received_t >( tm->m_self_mbox, *msg ); }
And the message_received_t message handler: void a_transport_manager_t::on_message_received( const message_received_t & cmd ) { auto subscribers = m_delivery_map.match( cmd.m_topic ); if( !subscribers.empty() ) { for( auto * s : subscribers ) s->deliver_message( cmd.m_topic, cmd.m_payload ); } else m_logger->warn( "message for unregistered topic, topic={}, " "payloadlen={}", cmd.m_topic, cmd.m_payload.size() ); }
Asynchronous work with subscriptions
When an agent makes a call to subscribe (), the subscription to the corresponding MQTT-shny topic is not synchronized inside the subscribe (), but asynchronously. Initially, the a_transport_manager_t agent is sent a message subscribe_topic_t. Then, the a_transport_manager_t agent processes this message and tries to create a subscription to the MQTT broker. What can be at the moment is impossible if there is no connection with the broker now.Accordingly, the call to subscribe () does not mean at all that upon leaving the subscription, the subscription has already been created and the agent will immediately begin to receive the messages published in the corresponding topic. This is usually not a problem, but remember this fact.Sometimes it is desirable for an agent to know whether there is a subscription to a topic or there is no subscription for some reason. For example, an agent wants to first subscribe to a topic and only then begin to publish his messages. In this case, the agent can subscribe to special subscription_available_t, subscription_unavailable_t, and subscription_failed_t. Let's say this is how a code for a topic subscription can look like with the receipt of a notification that a subscription was completed successfully: mosqtt::topic_subscriber_t<my_encoding_tag>::subscribe( mqtt, "some/topic/name", [&](const so_5::mbox_t & mbox) { // mbox ... so_subscribe(mbox).event( [&](mhood_t<mosqtt::topic_subscriber_t<my_encoding_tag>::msg_type> cmd) { const auto msg = cmd->decode<my_type>(); ... }); // ... subscription_available_t. so_subscribe(mbox).event( [&](mhood_t<mosqtt::subscription_available_t> cmd) { ... }); });
Automatic Unsubscribe from Topics
Subscriptions for MQTT-shny topics are created via calls to subscribe (). But subscriptions are automatically deleted. This happens at the expense of a small street magic, hidden in SObjectizer-ovsky mbox-ah ( which you can create under their own, perhaps, very specific tasks ).The fact is that the subscribe () method creates a special service mbox and this particular mbox is then given to the lambda function, in which the programmer signs his agent. This service mbox knows exactly how many subscriptions are made. When all subscriptions are destroyed, for example, when deregistration of an agent or manual removal of agent subscriptions, mbox understands that there are no more subscriptions and ask a_transport_manager_t to delete the subscription to the MQTT topic.The so_define_agent () method as a classic example of setting up a multi-state agent
Usually we are confronted with two opposite reactions of people who are familiar with SObjectizer. The first ones say that everything is somehow difficult for us, agents need to be inherited from something, you need to know about some so_define_agent (), so_evt_start (), etc. And the agents do not understand why there are some incomprehensible states.The second, by contrast, suggests that the state of the agents - this is cool. And the presence of the so_define_agent (), so_evt_start () and so_evt_finishe () methods just simplifies the writing of agents and, especially, the proceedings with foreign agents: they say you know where to look and where to look.Naturally, the second point of view is much closer to us. And, I think, the contents of a_transport_manager_tis a good confirmation of this. In particular, the so_define_agent () method immediately gives an idea of ​​what, how and when this agent processes: void a_transport_manager_t::so_define_agent() { st_working .event( m_self_mbox, &a_transport_manager_t::on_subscribe_topic ) .event( m_self_mbox, &a_transport_manager_t::on_unsubscribe_topic ) .event( m_self_mbox, &a_transport_manager_t::on_message_received, so_5::thread_safe ); st_disconnected .on_enter( [this] { // Everyone should be informed that connection lost. so_5::send< broker_disconnected_t >( m_self_mbox ); } ) .event< connected_t >( m_self_mbox, &a_transport_manager_t::on_connected ); st_connected .on_enter( [this] { // Everyone should be informed that connection established. so_5::send< broker_connected_t >( m_self_mbox ); // All registered subscriptions must be restored. restore_subscriptions_on_reconnect(); } ) .on_exit( [this] { // All subscriptions are lost. drop_subscription_statuses(); // No more pending subscriptions. m_pending_subscriptions.clear(); } ) .event< disconnected_t >( m_self_mbox, &a_transport_manager_t::on_disconnected ) .event( m_self_mbox, &a_transport_manager_t::on_subscription_result ) .event( m_self_mbox, &a_transport_manager_t::on_publish_message, so_5::thread_safe ) .event( &a_transport_manager_t::on_pending_subscriptions_timer ); }
Moreover, the a_transport_manager_t agent uses a state hierarchy, albeit a very simple one: state_t st_working{ this, "working" }; state_t st_disconnected{ initial_substate_of{ st_working }, "disconnected" }; state_t st_connected{ substate_of{ st_working }, "connected" };
It should be noted that a_transport_manager_t is not the most complicated SObjectizer agent that we had to implement. The larger and more complex the agent, the better the availability of specialized methods so_define_agent (), so_evt_start () and so_evt_finish ().Learn more about serialization / deserialization of messages.
Above, we showed how a user must configure the serialization and deserialization of their messages. But they did not explain why it is done this way. Although this is a very fundamental question. You can even call it "the cornerstone." Therefore, you need to devote a few words to this aspect.Firstly, although we did mosquitto_transport for a specific project, we wanted to make mosquitto_transport reusable for other tasks. Therefore, it was necessary to ensure that mosquitto_transport was not tied to a specific data format. So that you can transfer data even in JSON, even in XML, even in ProtoBuf, at least in some other representation. Perhaps even using several different representations in the same program.Secondly, we needed to decide on who will spend resources on serializing / deserializing messages: a_transport_manager_t agent or user application agents. I did not want to waste resources on a_transport_manager_t for these operations, since this is a direct way for a_transport_manager_t to become a bottleneck. Therefore, we chose the second way: serialization and deserialization are performed on the user's context. And the serialization is performed automatically within topic_publisher_t :: publish. But deserialization is done manually - the user himself must indicate which type he wants to get at the output when calling the incoming_message_t :: decode () method. Here is how in this example: void on_reply(mhood_t<demo::subscriber_t::msg_type> cmd) { ... const auto msg = cmd->decode<demo::server_hello_t>();
Thirdly, there is still a moment of errors that can occur when serializing / deserializing messages. For example, the message came in XML format, and we tried to parse it as JSON. We get an error and ... And the question arises: how to inform about this error and how to handle this error?This is another reason why serialization / deserialization is placed on the user's context. If, when calling publish () or decode (), an exception occurs related to a serialization / deserialization error, the user will be able to handle this exception in any way convenient for him.Based on the combination of all these factors, the scheme described earlier was chosen: for each data format, the user must define a special type tag for which you need to make a partial specialization of the types encoder_t and decoder_t. This scheme allows you to have several formats for messages in one application. Plus, everything is resolved statically at compile time, so if serialization / deserialization is not implemented for some type or format, it will be detected in compile-time.General impressions of using mosquitto_transport
It's funny, but there are no special impressions from using mosquitto_transport. It just works.From the above, it is clear that the most code when working with mosquitto_transport has to be done for two things:- -, a_transport_manager_t ;
- -, / ;
But, as it turned out, in practice, none of these moments caused any problems. The code for creating a_transport_manager_t happened to be written only a few times: in a couple of tests, well, in the main application for which all this was intended. Moreover, compared to other code that is needed to initialize an application (parsing command line arguments, reading configuration, creating and configuring loggers, etc., etc.), a few lines to run a_transport_manager_t are not even a drop in sea, but still quite a bit.Thanks to json_dto , the code for serializing / deserializing a message has to be written quite a bit. Well, you still have to write it in one form or another, at least until reflexion is introduced in C ++.So, we didn’t notice any particular laborious use of mosquitto_transport when implementing a dozen agents who communicate through the MQTT with the outside world and use about three dozen messages in their work.What left a much brighter impression was working with libmosquitto and studying the behavior of an MQTT broker in some situations through studying its source code. One can only regret that nothing more worthy than the libmosquitto and Paho came across. But that's another story ...Conclusion
Here, perhaps, is all that we wanted to tell about our experiment in the implementation of distribution for SObjectizer applications based on MQTT and libmosquitto. If any moments you are interested in are left behind the scenes, then ask questions - we will try to reveal the moments you are interested in in the comments.In conclusion, already traditionally, we suggest trying SObjectizer ( github ) in business and sharing our impressions: almost all constructive reviews are used by us to improve SObjectizer.But today I want to touch on another such aspect. The presence of distribution in the core of SObjectizer is good in places, but not very good in places (as SObjectizer-4 experience shows). The lack of distribution in the core is again good in places, but not very good in places. It seems that it turns out like in the well-known wisdom: so that you don’t choose, you will still regret it. For example, one of the most common questions addressed to SObjectizer-5: “What about your distribution?”It’s already beginning to bother to explain the same thing for the hundredth time, so the thought creeps in: “Isn’t it easier to make SO-5 a mechanism for supporting distribution? ? »In this connection, please share your comments on this topic in the comments.Do you need a distribution in SO-5? If so, for which tasks? Do you need to send many small messages? Or do you transfer large blobs? Do you need interoperability with other languages? Or are you satisfied with the protocol / format that is supported by only one framework? Do you need confirmation of the delivery of individual messages, automatic resend, etc.? Any means of introspecting, monitoring and collecting statistics on traffic?In general, the more the women will be announced, the easier it will be for us to decide whether to make support for transparent distribution and, if so, in what form. And maybe not easier. But, in any case, there will be some information for thought.