📜 ⬆️ ⬇️

Implementation of guaranteed asynchronous message delivery

Article authored by Alexander Romanov, developer of integration solutions.

In the process of integrating systems, we often encounter the need for guaranteed message delivery between systems. In such cases, we come to the aid of the queue. But not all tasks are as simple as delivering messages from system A to system B. There are times when you need to enrich the delivered messages with data from adjacent systems that participate in the integration. Which can not always be integrated through queues, but have only synchronous services. And now, in our integration, such phenomena as unavailability, refusals and other "pleasant" features of using "synchronous" occur. It would be possible to shift the processing of intermediate failures to the source system, but this is uncivilized, and it is not possible if we publish events for several systems at once (in a topic).



A convenient and working solution to the problem, from our point of view, is asynchronous step-by-step processing of a message through an internal queue with a call to an external service at each step. In the event of a service failure due to an error or due to its temporary inoperability, the message enters the internal failure queue and is sent again after parsing with problems encountered in the service.


')
This solution also eliminates the problem of the impossibility of rolling back a transaction when working with external services. No call will pass twice - processing begins with the step at which the failure occurred.

All of the above is very easy to implement on an integration bus, in which asynchronous interaction between components via internal queues comes out of the box. But too high prices for the "box" can greatly complicate the use of an integration bus. We will give an example of implementing a simple application for Spring Integration (hereinafter SI) + Rabbit MQ. Let's make a reservation that we don’t use Rabbit MQ in production because of the impossibility of his work with XA.

The heart of the whole application is spring-integration-context.xml . It describes the component model, initializes the resource beans and the transaction manager for working with MQ. We describe it in more detail.

We connect the driver which is built in SI and we register resources:

<rabbit:queue name="si.test.queue.to"/> <rabbit:queue name="si.test.queue.from"/> 

We need a low-level amqpTemplate bin through which we interact with resources. We use this bin directly in tests, and it is required for the SI components that work with Rabbit MQ. ConnectionFactory, required to connect to resources, configures Spring Boot settings from application.yml ( see org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration ).

  <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" mandatory="true"/> 

TransactionManager is required for transactional work with Rabbit MQ (needed to roll back a message back to the queue if an error occurs during operation). Unfortunately, Rabbit MQ does not support XA transactions, otherwise the transaction manager would configure the Spring Boot. Configuring the provided Spring manually.

  <bean id="rabbitTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <constructor-arg name="connectionFactory" ref="rabbitConnectionFactory"/> </bean> 

And now the best part. "Draw" flow! In quotes, because we write in the form of xml, which is less pleasant.

Flow


We will need:


To arrange guaranteed delivery in this application, we use asynchronous calls through an internal queue. Since the component is many, several queues are required, which is inconvenient from the development point of view or the administrator. We will try to do one.

Consider the interaction scenario between the two components. SomeComponentOne receives a message from the channel, calls a certain synchronous REST service (works from the database, writes to a file, etc.) and sends the message for further processing, which SomeComponentTwo should be engaged in. If SomeComponentOne could not do the work assigned to it, it should roll back the transaction and return the received message to where it took it from. If all is well, send the message to the internal queue and complete the transaction. SomeComponentOne takes the message from the internal queue and sends the message to it, without necessarily being in the same form in which it received. The message can be enriched or changed, it does not matter to us. It is designed to run the components of SomeComponentTwo. There is a problem of routing. The message enters the internal queue and must be taken from there by the currently required component. In other words, routing is required.

This application demonstrates routing based on message headers. The message has a custom PartnerComponent header, indicating the component that should work with the message.

Let us write down the technical details of the flow.

Adapter to read from the input queue. Receives a message and in a transaction throws it immediately into the internal queue.

 <int-amqp:inbound-channel-adapter channel="innerChannel" queue-names="si.test.queue.to" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/> <int-amqp:channel id="innerChannel" queue-name="si.test.queue.inner" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/> 

We used a specialized asynchronous channel for working with queues provided by Spring. We received the SI-channel interface, and the storage of messages directly in the queue, in our case, in the internal mq-queue of the application. When a message is received from this queue channel, the transaction will open, because we connected our transaction manager.

To this queue channel, we connect an SI router running on message headers.

 <int:header-value-router id="wireRouter" input-channel="innerChannel" header-name="PartnerComponent" default-output-channel="component1Channel"> <int:mapping value="ComponentTwo" channel="component2Channel"/> <int:mapping value="ComponentThree" channel="component3Channel"/> <int:mapping value="OutboundComponent" channel="outboundRabbitChannel"/> </int:header-value-router> 

The new flow message does not have the PartnerComponent technical header, so by default it will be processed by the component someComponentOne , the duty of which is to specify the next component in the message header of the PartnerComponent and send the message to the internal queue. The router again picks up the message from the internal queue and sends it for processing to the specified component.

Description of the components to which messages are sent from the router.

  <int:channel id="component1Channel"/> <int:service-activator input-channel="component1Channel" ref="someComponentOne" method="process"/> <int:channel id="component2Channel"/> <int:service-activator input-channel="component2Channel" ref="someComponentTwo" method="process"/> <int:channel id="component3Channel"/> <int:service-activator input-channel="component3Channel" ref="someComponentThree" method="process"/> <int:channel id="outboundRabbitChannel"/> <int:service-activator input-channel="outboundRabbitChannel" ref="outboundRabbitComponent" method="process"/ 

Adapter to send to the output queue.
  <int:channel id="toRabbit"/> <int-amqp:outbound-channel-adapter channel="toRabbit" amqp-template="amqpTemplate" routing-key="si.test.queue.from"/> 

Build (pom.xml)


Good old Maven. Standard build from Spring Boot. Dependencies on SI and AMQP provide all the necessary libraries. We also connect the spring-boot-starter-test to implement test cases on JUnit.

Work SomeComponent * .java


Transaction bins connected as service-activator to flow SI. Call REST via RestTemplate and send to internal queue via innerChannel . It is enough to demonstrate the work with the service and it is convenient to mock-up in tests.

We are testing


In the testHappyPath test , we checked the operation of flow when there were no failures when calling REST. Mock-ay all the calls of REST services without failures, throw the message into the input queue, wait for the output, check the passage of all components on the body content of the received message.

In the testGuaranteedDelivery test , we checked the guaranteed delivery upon failure in one of the REST. We emulate a one-time failure when calling a service from the third component, waiting for the message to be delivered to the output queue, checking the body of the received message.

Conclusion


We have made an application with guaranteed delivery. There are a number of small unresolved issues: the re-sending of messages is endless and uncontrollable, who supports this solution - is the administrator or can this automation be automated? We solve these issues with the help of self-written applications and individual settings for each type of message. Perhaps in the future we will describe these solutions.

The application is entirely on git-hub

Source: https://habr.com/ru/post/315786/


All Articles