📜 ⬆️ ⬇️

An example of using the RabbitMQ Delayed Message Exchange in the Java Spring Framework

image In this post I want to show how to use deferred messages in RabbitMQ. As an example of a problem where it is convenient to use a delayed queue, I will take the postback mechanism ( s2s ping back , s2s pixel ).

In a few words about the postback mechanism:


1. Some event occurs
2. Your application must notify this event of a third-party service.
3. If the third-party service was unavailable, then you need to repeat the notification again in a few minutes

For re-notification, I will use a pending queue.

RabbitMQ by default does not know how to delay messages, they are delivered immediately after publication. The deferred delivery functionality is available as a rabbitmq-delayed-message-exchange plugin.
')
Just want to note that the plugin is experimental . Despite the fact that it is generally quite stable, it should be used in production at your own peril and risk.

Build a Docker Container with RMQ and Plugin


I will take the official image with the management plugin as a basis, useful for testing.

Dockerfile:

FROM rabbitmq:3.6-management RUN apt-get update && apt-get install -y curl RUN curl http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange 

Assembly
 docker build --tag=x25/rmq-delayed-message-exchange . 

Launch
 docker run -d --name rmq -p 5672:5672 -p 15672:15672 x25/rmq-delayed-message-exchange 

Spring AMQP


The Spring Framework fully supports the plugin in the pring-rabbit project. Starting from version 1.6.4, you can use both xml / bean configurations and annotations.

I will use the Spring Boot Amqp Starter.

Dependency for Maven
 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 

Dependency for Gradle
 compile "org.springframework.boot:spring-boot-starter-amqp" 

Configuration through annotations


When using bootstrappers and annotations, Spring takes over the majority of the work. Just write:

 @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME), exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME)) public void onMessage(Message<?> message) { //... } 

And when the application is launched, RabbitAdmin will automatically declare a delayed exchange , queue , create event handlers and bind them to the annotated method.

Need more threads to process messages? This is configured through the external configuration file (the spring.rabbitmq.listener.concurrency property) or through the containerFactory parameter of the annotation:

Example
 // : @Configuration public class RabbitConfiguration { @Bean(name = "containerFactory") @Autowired public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(10); factory.setPrefetchCount(30); return factory; } } // : @RabbitListener(containerFactory = "containerFactory", ...) 

To send a deferred message, use RabbitTemplate:

 rabbitTemplate.send( DELAY_EXCHANGE_NAME, DELAY_QUEUE_NAME, MessageBuilder .withBody(data) .setHeader("x-delay", MESSAGE_DELAY_MS).build() ); 

It will be sent instantly, but will be delivered with a delay specified in the x-delay header. The maximum allowed delay time (2 ^ 32-1) ms.

Ready sample application:

 @SpringBootApplication public class Application { private final Logger log = LoggerFactory.getLogger(Application.class); private final static String DELAY_QUEUE_NAME = "delayed.queue"; private final static String DELAY_EXCHANGE_NAME = "delayed.exchange"; private final static String DELAY_HEADER = "x-delay"; private final static String NUM_ATTEMPT_HEADER = "x-num-attempt"; private final static long RETRY_BACKOFF = 5000; @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME), exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME)) public void onMessage(Message<byte[]> message) { String endpointUrl = new String(message.getPayload()); Long numAttempt = (Long) message.getHeaders().getOrDefault(NUM_ATTEMPT_HEADER, 1L); log.info("Message received, url={}, attempt={}", endpointUrl, numAttempt); if (!doNotifyEndpoint(endpointUrl) && numAttempt < 3) { rabbitTemplate.send( DELAY_EXCHANGE_NAME, DELAY_QUEUE_NAME, MessageBuilder .withBody(message.getPayload()) .setHeader(DELAY_HEADER, numAttempt * RETRY_BACKOFF) .setHeader(NUM_ATTEMPT_HEADER, numAttempt + 1) .build() ); } } private boolean doNotifyEndpoint(String url) { try { HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); /* @todo: set up connection timeouts */ return (connection.getResponseCode() == 200); } catch (Exception e) { log.error(e.getMessage()); return false; } } public static void main(String[] args) { SpringApplication.run(Application.class, args); } } 

application.yml
 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: prefetch: 10 concurrency: 50 

build.gradle
 buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' jar { baseName = 'rmq-delayed-demo' version = '0.1.0' } repositories { mavenCentral() } sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { compile("org.springframework.boot:spring-boot-starter-amqp") testCompile("org.springframework.boot:spring-boot-starter-test") } 

We check the delayed delivery through the control panel (rmq-management), it is available on port 15672 :

image

Logs:

 2016-12-21 14:27:25.927: Message received, url=http://localhost:1234, attempt=1 2016-12-21 14:27:25.941: Connection refused (Connection refused) 2016-12-21 14:27:30.946: Message received, url=http://localhost:1234, attempt=2 2016-12-21 14:27:30.951: Connection refused (Connection refused) 2016-12-21 14:27:40.954: Message received, url=http://localhost:1234, attempt=3 

Configuration via XML


When using XML configurations, you just need to set the delayed property to true in the exchange-bean and RabbitAdmin does the rest for you.

Sample xml configuration in conjunction with the Integration Framework
 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int:channel id="to-delayed-rmq" /> <int-amqp:outbound-channel-adapter channel="to-delayed-rmq" amqp-template="rabbitTemplate" exchange-name="delayed.exchange" routing-key="delayed.binding" mapped-request-headers="x-delay" /> <int-amqp:inbound-channel-adapter channel="from-delayed-rmq-queue" queue-names="delayed.queue" message-converter="amqpMessageConverter" connection-factory="rabbitConnectionFactory" concurrent-consumers="10" prefetch-count="50" /> <int:service-activator input-channel="from-delayed-rmq-queue" method="onMessage"> <bean id="postbackServiceActivator" class="PostbackServiceActivator" /> </int:service-activator> <rabbit:queue name="delayed.queue" /> <rabbit:direct-exchange name="delayed.exchange" delayed="true"> <rabbit:bindings> <rabbit:binding queue="delayed.queue" key="delayed.binding" /> </rabbit:bindings> </rabbit:direct-exchange> </beans> 

useful links


Source: https://habr.com/ru/post/318118/


All Articles