📜 ⬆️ ⬇️

Building blocks distributed applications. First approach


In the last article, we discussed the theoretical foundations of reactive architecture. It's time to talk about data streams, ways to implement reactive Erlang / Elixir systems, and message exchange patterns in them:



SOA, MSA and messaging


SOA, MSA - system architectures that define the rules for building systems, while messaging provides primitives for their implementation.


I do not want to promote this or that system architecture. I am for the application of the most effective and useful practices for a particular project and business. Whatever paradigm we choose, it is better to create system blocks with an eye on the Unix-way: components with minimal connectivity that are responsible for individual entities. API methods perform as simple as possible with entities.


Messaging - as the name implies - a message broker. His main goal is to receive and give messages. He is responsible for information sending interfaces, the formation of logical channels of information transfer within the system, routing and balancing, and the processing of failures at the system level.
The developed messaging does not try to compete with or replace rabbitmq. Its main features are:



Comment. From the point of view of code organization, meta-projects are well suited for complex systems on Erlang / Elixir. All project code is in one repository - umbrella project. At the same time, microservices are maximally isolated and perform simple operations that are responsible for a separate entity. With this approach, it is easy to maintain the API of the entire system, just make changes, it is convenient to write a unit and integration tests.


Components of the system interact directly or through a broker. From the standpoint of messaging, each service has several vital phases:



It looks quite difficult, but the code is not so scary. Code samples with comments will be given in the parsing templates a little later.


Exchanges


The exchange point is a process of messaging that implements the logic of interaction with components within the framework of a message exchange pattern. In all the examples presented below, the components interact through exchange points, the combination of which forms a messaging.


Message exchange patterns (MEPs)


Global exchange patterns can be divided into bilateral and one-sided. The first implies a response to the received message, the second is not. A classic example of a two-way pattern in a client-server architecture is the Request-response pattern. Consider the pattern and its modifications.


Request – response or RPC


RPC is used when we need to get a response from another process. This process can be run on the same node or be on a different continent. Below is a diagram of the interaction between the client and server via messaging.



Since messaging is completely asynchronous, for the client the exchange is divided into 2 phases:


  1. Submit request


    messaging:request(Exchange, ResponseMatchingTag, RequestDefinition, HandlerProcess). 

    Exchange - the unique name of the exchange point
    ResponseMatchingTag is the local label for handling the response. For example, in the case of sending several identical requests belonging to different users.
    RequestDefinition - request body
    HandlerProcess - PID handler. This process will receive a response from the server.


  2. Response processing


     handle_info(#'$msg'{exchange = EXCHANGE, tag = ResponseMatchingTag,message = ResponsePayload}, State) 

    ResponsePayload - server response.



For the server, the process also consists of 2 phases:


  1. Initialization of the exchange point
  2. Processing incoming requests

We illustrate with the code this template. Suppose we need to implement a simple service that provides a single method of accurate time.


Server code


We put the definition of the service API in api.hrl:


 %% ===================================================== %% entities %% ===================================================== -record(time, { unixtime :: non_neg_integer(), datetime :: binary() }). -record(time_error, { code :: non_neg_integer(), error :: term() }). %% ===================================================== %% methods %% ===================================================== -record(time_req, { opts :: term() }). -record(time_resp, { result :: #time{} | #time_error{} }). 

Define a service controller in time_controller.erl


 %%      .     gen_server    . %%  gen_server init(Args) -> %%     messaging:monitor_exchange(req_resp, ?EXCHANGE, default, self()) {ok, #{}}. %%       .    ,      . handle_info(#exchange_die{exchange = ?EXCHANGE}, State) -> erlang:send(self(), monitor_exchange), {noreply, State}; %%  API handle_info(#time_req{opts = _Opts}, State) -> messaging:response_once(Client, #time_resp{ result = #time{ unixtime = time_utils:unixtime(now()), datetime = time_utils:iso8601_fmt(now())} }); {noreply, State}; %%   gen_server terminate(_Reason, _State) -> messaging:demonitor_exchange(req_resp, ?EXCHANGE, default, self()), ok. 

Client code


In order to send a request to the service, you can call the messaging request API anywhere:


 case messaging:request(?EXCHANGE, tag, #time_req{opts = #{}}, self()) of ok -> ok; _ -> %% repeat or fail logic end 

In a distributed system, the configuration of the components may be very different and at the time of the request messaging may not start up yet, or the service controller will not be ready to service the request. Therefore, we need to check the messaging response and handle the case of failure.
After successfully sending to the client from the service will receive a response or error.
We handle both cases in handle_info:


 handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time{unixtime = Utime}}}, State) -> ?debugVal(Utime), {noreply, State}; handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time_error{code = ErrorCode}}}, State) -> ?debugVal({error, ErrorCode}), {noreply, State}; 

Request-Chunked Response


It is better to prevent the transfer of huge messages. Responsiveness and stable operation of the entire system depends on it. If the answer to the request takes a lot of memory, then the breakdown is required.



I will give a couple of examples of such cases:



I call such answers a locomotive. Anyway, 1024 messages on 1 Mb are better, than the only message in the size of 1 Gb.


In the Erlang cluster, we get an additional gain - reducing the load on the exchange point and the network, since the answers are immediately sent to the recipient, bypassing the exchange point.


Response with Request


This is a rather rare modification of the RPC pattern for building interactive systems.



Publish-subscribe (data distribution tree)


Event-oriented systems deliver data to consumers as they become available. Thus, systems are more prone to push models than pull or poll. This feature allows you not to waste resources, constantly requesting and waiting for data.
The figure shows the process of distributing a message to consumers who subscribe to a specific topic.



Classical examples of the use of this template are the state propagation: the game world in computer games, market data on exchanges, useful information in data feeds.


Consider the subscriber code:


 init(_Args) -> %%   ,  = key messaging:subscribe(?SUBSCRIPTION, key, tag, self()), {ok, #{}}. handle_info(#exchange_die{exchange = ?SUBSCRIPTION}, State) -> %%    ,    messaging:subscribe(?SUBSCRIPTION, key, tag, self()), {noreply, State}; %%    handle_info(#'$msg'{exchange = ?SUBSCRIPTION, message = Msg}, State) -> ?debugVal(Msg), {noreply, State}; %%    -     terminate(_Reason, _State) -> messaging:unsubscribe(?SUBSCRIPTION, key, tag, self()), ok. 

The source can call the function of publishing the message in any convenient place:


 messaging:publish_message(Exchange, Key, Message). 

Exchange - the name of the exchange point
Key - routing key
Message - payload


Inverted Publish-subscribe



By deploying a pub-sub, you can get a pattern that is convenient for logging. The set of sources and consumers can be completely different. The figure shows the case with a single consumer and multiple sources.


Task distribution pattern


Almost every project has deferred processing tasks, such as generating reports, delivering notifications, and receiving data from third-party systems. The capacity of the system performing these tasks is easily scaled by adding handlers. All that remains for us is to form a cluster of handlers and evenly distribute the tasks between them.


Consider emerging situations on the example of 3 handlers. Even at the stage of distribution of tasks, the question arises of the fairness of distribution and overflow of handlers. For justice, the round-robin distribution will be responsible, and in order to avoid the situation of overflowing handlers, we introduce the restriction prefetch_limit . In transient modes, prefetch_limit will not allow one handler to get all the tasks.


Messaging manages queues and priority processing. Handlers receive tasks as they arrive. Task execution can be completed successfully or by failure:




Suppose, when processing three tasks, a complex failure occurred: handler 1 fell after receiving the task, not having time to report anything to the exchange point. In this case, the exchange point after the expiration of ack timeout will transfer the job to another handler. Handler 3 for some reason refused the task and sent nack, as a result, the task also went to another handler who successfully completed it.


Preliminary result


We took apart the basic building blocks of distributed systems and gained a basic understanding of their use in Erlang / Elixir.


By combining basic patterns, you can build complex paradigms to solve emerging problems.


In the final part of the cycle we will look at the general issues of service organization, routing and balancing, and also talk about the practical side of scalability and fault tolerance of systems.


The end of the second part.


Marius Christensen Photos
Illustrations prepared using websequencediagrams.com


')

Source: https://habr.com/ru/post/446108/


All Articles