📜 ⬆️ ⬇️

RabbitMQ Spring tutorial

Rabbitmq.com already has detailed examples and a client for java. However, if the project already uses spring, it is much more convenient to use the Spring AMQP library. This article contains the implementation of all six official examples of working with RabbitMQ.

Immediately link to projects on GitHub .

For examples, I will use the simplest application in the spring. After the user clicks on the specified link, a message will be sent to RabbitMQ which will be sent to one of the listeners. The listener in turn will simply output the message to the log. Habré already had translations of official tutorials in php and python, and I think many are already familiar with the principles of rabbitmq, so I’ll concentrate on working with Spring AMQP.

Training


RabbitMQ installation


RabbitMQ installation is described in detail on the official website . There should be no problems.
')

Spring Setup


For simplicity, I used Spring Boot . It is great for quickly deploying applications in the spring and not having to configure it for a long time. At the same time, I myself will configure Spring AMQP in a “classic way” - i.e. the way I configured it in a real project without Spring Boot (unless the ConnectionFactory doesn’t describe some of the heroku-specific things).

The content of the minimum pom.xml we need to run. There is already Spring boot and Spring AMQP.

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>rabbitmq</groupId> <artifactId>example-1</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.2.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> </project> 

The main configuration file. In addition to the class name, its contents will be the same for all our examples.

 package com.rabbitmq.example1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Import; @EnableAutoConfiguration @ComponentScan @Import(RabbitConfiguration.class) public class Example1Configuration { public static void main(String[] args) throws Exception { SpringApplication.run(Example1Configuration.class, args); } } 


Example 1. “Hello World!”




To work with RabbitMQ we need the following bins:
- connectionFactory - to connect with RabbitMQ;
- rabbitAdmin - for registration / cancellation of registration of queues, etc .;
- rabbitTemplate - to send messages (producer);
- myQueue1 - the actual queue to send messages to;
- messageListenerContainer - receives messages (consumer).

The configuration code for these beans
 package com.rabbitmq.example1; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; public class RabbitConfiguration { Logger logger = Logger.getLogger(RabbitConfiguration.class); //   RabbitMQ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } //    queue1 @Bean public Queue myQueue1() { return new Queue("queue1"); } // ,       @Bean public SimpleMessageListenerContainer messageListenerContainer1() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueueNames("queue1"); container.setMessageListener(new MessageListener() { //    queue1 public void onMessage(Message message) { logger.info("received from queue1 : " + new String(message.getBody())); } }); return container; } } 


In this and the following examples, as a producer, there will be a controller that will send messages to rabbitmq.

 package com.rabbitmq.example1; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SampleController { Logger logger = Logger.getLogger(SampleController.class); @Autowired AmqpTemplate template; @RequestMapping("/emit") @ResponseBody String queue1() { logger.info("Emit to queue1"); template.convertAndSend("queue1","Message to queue"); return "Emit to queue"; } } 

Now, if you run Example1Configuration and go to the browser at http: // localhost: 8080 / emit , then in the console we will see something like:

 2015-06-23 21: 16: 26.250 INFO 6460 --- [nio-8080-exec-2] com.rabbitmq.example1.SampleController: Emit to queue1
 2015-06-23 21: 16: 26.252 INFO 6460 --- [cTaskExecutor-1] c.rabbitmq.example1.RabbitConfiguration: received from queue 1: Message to queue


Consider the result in more detail. Here we are sending a message to SampleController.java:

 template.convertAndSend("queue1","Message to queue"); 

And here we get it:

 public void onMessage(Message message) { logger.info("received from queue 1: " + new String(message.getBody())); } 

Nothing complicated, but describing all listeners in the configuration is not very convenient, especially when there are a lot of them. It is much more convenient to configure listeners with annotations.

Example 1.1. "Hello World!" On annotations


Instead of the listener in the configuration, add the RabbitMqListener class to the project to which the messages will come. Accordingly, messageListenerContainer1 is no longer needed.

RabbitMqListener is an ordinary component (@Component) of a spring with a method marked with the @RabbitListener annotation. In this method messages will come. At the same time, we can receive both the complete Message message with headers and the body as an array of bytes, as well as simply converted body as we sent it.

  @RabbitListener(queues = "queue1") public void processQueue1(String message) { logger.info("Received from queue 1: " + message); } 

Source code for RabbitMqListener.java and updated RabbitConfiguration.java
 package com.rabbitmq.example1annotated; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @EnableRabbit //     @RabbitListener @Component public class RabbitMqListener { Logger logger = Logger.getLogger(RabbitMqListener.class); @RabbitListener(queues = "queue1") public void processQueue1(String message) { logger.info("Received from queue 1: " + message); } } 

 package com.rabbitmq.example1annotated; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfiguration { Logger logger = Logger.getLogger(RabbitConfiguration.class); @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public Queue myQueue1() { return new Queue("queue1"); } } 


Example 2. Work Queues




In this example, two listeners already listen for one queue. To emulate useful work, use Thread.sleep. It is important that listeners of the same queue can be on different instances of the program. So you can parallelize the queue on multiple computers or nodes in the cloud.

  @RabbitListener(queues = "query-example-2") public void worker1(String message) throws InterruptedException { logger.info("worker 1 : " + message); Thread.sleep(100 * random.nextInt(20)); } @RabbitListener(queues = "query-example-2") public void worker2(String message) throws InterruptedException { logger.info("worker 2 : " + message); Thread.sleep(100 * random.nextInt(20)); } 

Result:
 2015-06-23 22: 03: 48.018 INFO 6784 --- [nio-8080-exec-1] com.rabbitmq.example2.SampleController: Emit to queue
 2015-06-23 22: 03: 48.029 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 1
 2015-06-23 22: 03: 48.029 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 0
 2015-06-23 22: 03: 48.830 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 2
 2015-06-23 22: 03: 49.331 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 3
 2015-06-23 22: 03: 49.432 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 4
 2015-06-23 22: 03: 49.634 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 5
 2015-06-23 22: 03: 49.733 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 6
 2015-06-23 22: 03: 49.735 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 7
 2015-06-23 22: 03: 50.236 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 8
 2015-06-23 22: 03: 50.537 INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 9


RabbitMqListener.java source code and updated SampleController.java
 package com.rabbitmq.example2; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Random; @Component public class RabbitMqListener { Logger logger = Logger.getLogger(RabbitMqListener.class); Random random = new Random(); @RabbitListener(queues = "query-example-2") public void worker1(String message) throws InterruptedException { logger.info("worker 1 : " + message); Thread.sleep(100 * random.nextInt(20)); } @RabbitListener(queues = "query-example-2") public void worker2(String message) throws InterruptedException { logger.info("worker 2 : " + message); Thread.sleep(100 * random.nextInt(20)); } } 

 package com.rabbitmq.example2; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SampleController { Logger logger = Logger.getLogger(SampleController.class); @Autowired AmqpTemplate template; @RequestMapping("/queue") @ResponseBody String queue1() { logger.info("Emit to queue"); for(int i = 0;i<10;i++) template.convertAndSend("query-example-2","Message " + i); return "Emit to queue"; } } 


Example 3. Publish / Subscribe




Here the same message comes at once to two consumers.

 2015-06-23 22: 12: 24.669 INFO 1664 --- [nio-8080-exec-1] com.rabbitmq.example3.SampleController: Emit to exchange-example-3
 2015-06-23 22: 12: 24.684 INFO 1664 --- [cTaskExecutor-1] com.rabbitmq.example3.RabbitMqListener: accepted on worker 1: Fanout message
 2015-06-23 22: 12: 24.684 INFO 1664 --- [cTaskExecutor-1] com.rabbitmq.example3.RabbitMqListener: accepted on worker 2: Fanout message

To do this, let's connect both queues to FanoutExchange:

  @Bean public FanoutExchange fanoutExchangeA(){ return new FanoutExchange("exchange-example-3"); } @Bean public Binding binding1(){ return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA()); } @Bean public Binding binding2(){ return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA()); } 

And we will send not to the queue, but in exchange exchange-example-3:

  template.setExchange("exchange-example-3"); template.convertAndSend("Fanout message"); 

Each time specify the exchange is optional. You can specify it once when creating a RabbitTemplate.

Full source codes
 package com.rabbitmq.example3; import org.apache.log4j.Logger; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @EnableRabbit @Configuration public class RabbitConfiguration { Logger logger = Logger.getLogger(RabbitConfiguration.class); @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } @Bean public Queue myQueue1() { return new Queue("query-example-3-1"); } @Bean public Queue myQueue2() { return new Queue("query-example-3-2"); } @Bean public FanoutExchange fanoutExchangeA(){ return new FanoutExchange("exchange-example-3"); } @Bean public Binding binding1(){ return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA()); } @Bean public Binding binding2(){ return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA()); } } 

 package com.rabbitmq.example3; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Random; @Component public class RabbitMqListener { Logger logger = Logger.getLogger(RabbitMqListener.class); Random random = new Random(); @RabbitListener(queues = "query-example-3-1") public void worker1(String message) { logger.info("accepted on worker 1 : " + message); } @RabbitListener(queues = "query-example-3-2") public void worker2(String message) { logger.info("accepted on worker 2 : " + message); } } 

 package com.rabbitmq.example3; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SampleController { Logger logger = Logger.getLogger(SampleController.class); @Autowired RabbitTemplate template; @RequestMapping("/") @ResponseBody String home() { return "Empty mapping"; } @RequestMapping("/emit") @ResponseBody String emit() { logger.info("Emit to exchange-example-3"); template.setExchange("exchange-example-3"); template.convertAndSend("Fanout message"); return "Emit to exchange-example-3"; } } 



Example 4. Routing




It uses the routing key, depending on which message can go to one of the queues or both at once. To do this, instead of using FanoutExchange, use DirectExchange:

  @Bean public DirectExchange directExchange(){ return new DirectExchange("exchange-example-4"); } @Bean public Binding errorBinding1(){ return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error"); } @Bean public Binding errorBinding2(){ return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error"); } @Bean public Binding infoBinding(){ return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info"); } @Bean public Binding warningBinding(){ return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning"); } 


And when sending we use we specify the Routing key, for example,

  template.convertAndSend("info", "Info"); 

The result is:

 2015-06-23 22: 29: 24.480 INFO 5820 --- [nio-8080-exec-2] com.rabbitmq.example4.SampleController: Emit as info
 2015-06-23 22: 29: 24.483 INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener: accepted on worker 2: Info
 2015-06-23 22: 29: 29.721 INFO 5820 --- [nio-8080-exec-4] com.rabbitmq.example4.SampleController: Emit as error
 2015-06-23 22: 29: 29.727 INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener: accepted on worker 2: Error
 2015-06-23 22: 29: 29.731 INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener: accepted on worker 1: Error
 2015-06-23 22: 29: 36.779 INFO 5820 --- [nio-8080-exec-5] com.rabbitmq.example4.SampleController: Emit as warning
 2015-06-23 22: 29: 36.781 INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener: accepted on worker 2: Warning


Full source codes
 package com.rabbitmq.example4; import org.apache.log4j.Logger; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @EnableRabbit @Configuration public class RabbitConfiguration { Logger logger = Logger.getLogger(RabbitConfiguration.class); @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setExchange("exchange-example-4"); return rabbitTemplate; } @Bean public Queue myQueue1() { return new Queue("query-example-4-1"); } @Bean public Queue myQueue2() { return new Queue("query-example-4-2"); } @Bean public DirectExchange directExchange(){ return new DirectExchange("exchange-example-4"); } @Bean public Binding errorBinding1(){ return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error"); } @Bean public Binding errorBinding2(){ return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error"); } @Bean public Binding infoBinding(){ return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info"); } @Bean public Binding warningBinding(){ return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning"); } } 

 package com.rabbitmq.example4; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Random; @Component public class RabbitMqListener { Logger logger = Logger.getLogger(RabbitMqListener.class); Random random = new Random(); @RabbitListener(queues = "query-example-4-1") public void worker1(String message) { logger.info("accepted on worker 1 : " + message); } @RabbitListener(queues = "query-example-4-2") public void worker2(String message) { logger.info("accepted on worker 2 : " + message); } } 

 package com.rabbitmq.example4; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SampleController { Logger logger = Logger.getLogger(SampleController.class); @Autowired RabbitTemplate template; @RequestMapping("/") @ResponseBody String home() { return "Empty mapping"; } @RequestMapping("/emit/error") @ResponseBody String error() { logger.info("Emit as error"); template.convertAndSend("error", "Error"); return "Emit as error"; } @RequestMapping("/emit/info") @ResponseBody String info() { logger.info("Emit as info"); template.convertAndSend("info", "Info"); return "Emit as info"; } @RequestMapping("/emit/warning") @ResponseBody String warning() { logger.info("Emit as warning"); template.convertAndSend("warning", "Warning"); return "Emit as warning"; } } 


Example 5. Topics




Here, instead of DirectExchange, we use TopicExchange.
  @Bean public TopicExchange topicExchange(){ return new TopicExchange("exchange-example-5"); } @Bean public Binding binding1(){ return BindingBuilder.bind(myQueue1()).to(topicExchange()).with("*.orange.*"); } @Bean public Binding binding2(){ return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("*.*.rabbit"); } @Bean public Binding binding3(){ return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("lazy.#"); } 

The result is:

 2015-06-23 22: 42: 28.414 INFO 6560 --- [nio-8080-exec-1] com.rabbitmq.example5.SampleController: Emit 'to 1 and 2' to 'quick.orange.rabbit'
 2015-06-23 22: 42: 28.428 INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener: accepted on worker 2: to 1 and 2
 2015-06-23 22: 42: 28.428 INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener: accepted on worker 1: to 1 and 2
 2015-06-23 22: 42: 55.802 INFO 6560 --- [nio-8080-exec-2] com.rabbitmq.example5.SampleController: Emit 'to 2' to 'lazy.black.cat'
 2015-06-23 22: 42: 55.805 INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener: accepted on worker 2: to 2


Full source codes
 package com.rabbitmq.example5; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SampleController { Logger logger = Logger.getLogger(SampleController.class); @Autowired RabbitTemplate template; @RequestMapping("/") @ResponseBody String home() { return "Empty mapping"; } @RequestMapping("/emit/{key}/{message}") @ResponseBody String error(@PathVariable("key") String key, @PathVariable("message") String message) { logger.info(String.format("Emit '%s' to '%s'",message,key)); template.convertAndSend(key, message); return String.format("Emit '%s' to '%s'",message,key); } } 

 package com.rabbitmq.example5; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Random; @Component public class RabbitMqListener { Logger logger = Logger.getLogger(RabbitMqListener.class); Random random = new Random(); @RabbitListener(queues = "query-example-5-1") public void worker1(String message) { logger.info("accepted on worker 1 : " + message); } @RabbitListener(queues = "query-example-5-2") public void worker2(String message) { logger.info("accepted on worker 2 : " + message); } } 

 package com.rabbitmq.example5; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SampleController { Logger logger = Logger.getLogger(SampleController.class); @Autowired RabbitTemplate template; @RequestMapping("/") @ResponseBody String home() { return "Empty mapping"; } @RequestMapping("/emit/{key}/{message}") @ResponseBody String error(@PathVariable("key") String key, @PathVariable("message") String message) { logger.info(String.format("Emit '%s' to '%s'",message,key)); template.convertAndSend(key, message); return String.format("Emit '%s' to '%s'",message,key); } } 



Example 6. Remote procedure call (RPC)




Spring AMQP allows you to use convertSendAndReceive to receive a response to a sent message. At the same time, with the default setting, if we have RabbitMQ versions up to 3.4.0, then a temporary queue will be created for the response message. This method is not very productive, so it is not recommended to use it, and you should also create a queue for return messages yourself and specify it as RabbitTemplate ReplyQueue. If we have RabbitMQ version 3.4.0 and higher, then the Direct reply-to mechanism will be used, which is much faster. Read more in the Spring AMQP documentation .

Thus, you can make a remote procedure call in just one line:

  String response = (String) template.convertSendAndReceive("query-example-6",message); 

And so the procedure is processed on the conciermer:

  @RabbitListener(queues = "query-example-6") public String worker1(String message) throws InterruptedException { logger.info("received on worker : " + message); Thread.sleep(3000); //   return "received on worker : " + message; } 

The result is:

 2015-06-23 23: 12: 36.677 INFO 6536 --- [nio-8080-exec-5] com.rabbitmq.example6.SampleController: Emit 'Hello world'
 2015-06-23 23: 12: 36.679 INFO 6536 --- [cTaskExecutor-1] com.rabbitmq.example6.RabbitMqListener: Received on worker: Hello world
 2015-06-23 23: 12: 39.681 INFO 6536 --- [nio-8080-exec-5] com.rabbitmq.example6.SampleController: Received on producer 'Received on worker: Hello world'


Full source codes
 package com.rabbitmq.example6; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @EnableRabbit @Configuration public class RabbitConfiguration { Logger logger = Logger.getLogger(RabbitConfiguration.class); @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setQueue("query-example-6"); rabbitTemplate.setReplyTimeout(60 * 1000); //no reply to - we use direct-reply-to return rabbitTemplate; } @Bean public Queue myQueue() { return new Queue("query-example-6"); } } 

 package com.rabbitmq.example6; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Random; @Component public class RabbitMqListener { Logger logger = Logger.getLogger(RabbitMqListener.class); @RabbitListener(queues = "query-example-6") public String worker1(String message) throws InterruptedException { logger.info("Received on worker : " + message); Thread.sleep(3000); return "Received on worker : " + message; } } 

 package com.rabbitmq.example6; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SampleController { Logger logger = Logger.getLogger(SampleController.class); @Autowired RabbitTemplate template; @RequestMapping("/") @ResponseBody String home() { return "Empty mapping"; } @RequestMapping("/process/{message}") @ResponseBody String error(@PathVariable("message") String message) { logger.info(String.format("Emit '%s'",message)); String response = (String) template.convertSendAndReceive("query-example-6",message); logger.info(String.format("Received on producer '%s'",response)); return String.valueOf("returned from worker : " + response); } } 


Conclusion


I used RabbitMQ in my project in the heroku cloud hosting. Using heroku's RabbitMQ is quite simple - just connect one of the RabbitMQ providers in the administration console and then the address for accessing the rabbit appears in the environment variables. This address must be used when creating a connectionFactory.

  @Bean public ConnectionFactory connectionFactory() { //  AMQP   String uri = System.getenv("CLOUDAMQP_URL"); if (uri == null) //         rabbitmq uri = "amqp://guest:guest@localhost"; URI url = null; try { url = new URI(uri); } catch (URISyntaxException e) { e.printStackTrace(); //    } CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(url.getHost()); connectionFactory.setUsername(url.getUserInfo().split(":")[0]); connectionFactory.setPassword(url.getUserInfo().split(":")[1]); if (StringUtils.isNotBlank(url.getPath())) connectionFactory.setVirtualHost(url.getPath().replace("/", "")); connectionFactory.setConnectionTimeout(3000); connectionFactory.setRequestedHeartBeat(30); return connectionFactory; } 

The rest of the code differs little from that shown in Example 4 (Routing).

Used sources


Spring Project Page AMQP
Spring Boot Project Page
Examples page RabbitMQ

Source: https://habr.com/ru/post/262069/


All Articles