Hi, Habr! I present to your attention the translation of the article "
Surprisingly simple messaging with the Spring Cloud Stream " by Richard Seroter.
There are many options for the interaction of microservices. You can use service discovery (Service Discovery, for example, Spring Cloud Discovery Server / Client in a Netflix Eureka implementation) and make direct calls. Or you can use a common database for sharing work results. But message brokers continue to be a popular choice.
They range from simple engines like Amazon SQS or RabbitMQ to event-based stream processors like Azure Event Hubs or Apache Kafka, and even service tires like Microsoft BizTalk Server. When developers choose one of the engines, they critically need knowledge about their effectiveness. How can you improve developer productivity? For Java developers,
Spring Cloud Stream offers a valuable abstraction.
')
Spring Cloud Stream offers an interface for developers who do not need the nuances of the basic broker. This broker, Apache Kafka or RabbitMQ, is configured by the Spring Cloud Stream itself. Communication with the broker and back from the broker is also carried out through the Stream library.
What worries me is that all brokers are treated the same. Spring Cloud Stream normalizes behavior, even if it is not native to the broker. For example, do you want to create a competing model of consumer bins for your customers or partition processing? These concepts behave differently in RabbitMQ and Kafka. No problems. Spring Cloud Stream makes work equally transparent. Let's actually try both of these scenarios.
Competitor Consumer through “consumer groups”
By default, Spring Cloud Stream establishes the relationship between posting and subscription to messages. This makes it easy to exchange data between different subscribers. But what if you want multiple instances of subscribers (for scaling processing)? One solution is “consumer groups”. But in brokers their implementation is different. Here Spring Cloud Stream gets to work! Let's create an example application using RabbitMQ.
Before writing the code, we need an instance of the RabbitMQ server. The simplest launch option is the Docker container for RabbitMQ. If you have Docker installed, you only need to execute the following command:
docker run -d –hostname local-rabbit –name demo-rmq -p 15672:15672 -p 5672:5672 rabbitmq:3.6.11-management

Instead of rabbitmq: 3.6.11-management, you can use the latest version from
hub.docker.com/_/rabbitmqAfter launch, I had a local docker image cache and a running container with the display of ports, which makes the container available on my working machine.
How do we send messages to RabbitMQ? Spring Cloud Stream supports multiple templates. We can publish on schedule or on request. Here, let's create a web application that publishes a message when a user issues a POST command at a REST endpoint.
Message Publishing Application (Producer, Producer)
First, create a Spring Boot application that uses spring-cloud-starter-stream-rabbit (and spring-boot-starter-web). This is all I need to use Spring Cloud Stream and RabbitMQ as my goal for posting messages.

Add a new class that implements the REST controller. The simple @EnableBinding annotation runs the configuration of the application as a Spring Cloud Stream project. Here I use the supplied simple “Source” interface, which defines one communication channel, but you can also create your own. These are just two lines of code.
@EnableBinding(Source.class) @RestController public class BriefController {
In this controller class, add an @Autowired field that refers to the bean that Spring Cloud Stream creates in the Source interface. (That's it! Configuring a channel in the interface that Spring Cloud Stream understands into bins). Then we can use this variable to directly post messages in the connected channel! Common code, whether it is RabbitMQ or Kafka. Universally and simply.
@EnableBinding(Source.class) @RestController public class BriefController {
The message publishing application is almost complete, and all that is left is the basic configuration. This configuration tells the Spring Cloud Stream how to connect to the broker. Please note that we do not need to tell the Spring Cloud Stream to use RabbitMQ; this happens automatically, by the presence of the boot starter RabbitMQ classes. No, all we need is information about the connection with our broker, an explicit link to the destination of the messages and the command to send JSON.
server.port=8080 #rabbitmq settings for Spring Cloud Stream to use spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.cloud.stream.bindings.output.destination=legalbriefs spring.cloud.stream.default.contentType=application/json
Message Consumer Application (Consumer, Consumer)
This part is too simple. Here, create a new Spring Boot application and select only the dependency on spring-cloud-starter-stream-rabbit.
Decorate the class with the @EnableBinding annotation and use the built-in Sink interface. Then all that is left is to create a method for processing any messages found in the broker. To do this, we denote the processing method using the @StreamListener annotation, and all content processing is done for us.
@EnableBinding(Sink.class) @SpringBootApplication public class BlogStreamSubscriberDemoApplication { public static void main(String[] args) { SpringApplication.run(BlogStreamSubscriberDemoApplication.class, args); } @StreamListener(target=Sink.INPUT) public void logfast(String msg) { System.out.println(msg); } }
Configuration for this application is simple. As above, we have connection data for RabbitMQ. Also note that the binding now refers to “input”, which was the name of the channel in the standard “Sink” interface. Finally, note that I used the same destination as the source to ensure that the Spring Cloud Stream was successfully connecting the publisher and the subscriber. To feel the power of Cloud Stream below, I’ll add the settings for a group of consumers (consumers).
server.port=0 #rabbitmq settings for Spring Cloud Stream to use spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.cloud.stream.bindings.input.destination=legalbriefs
Testing the solution
Let's see how it works. First run three instances of the subscriber application. I created a jar file and launched three instances in the shell.

When you launch these applications, Spring Cloud Stream starts working. Log into the RabbitMQ web-based admin console and note that the exchange point has been created. It is called “legalbriefs” and is mapped to the name we specified in the configuration file above.

We also have three queues that are displayed in each of the three application instances that we run.

Finally, launch the producer and send the message via http request with / briefs on the end.

What's happening? As expected, each subscriber receives a copy of the message, because by default everything happens in a simple publish / subscribe mode.

Add consumer group configuration
We do not want every copy of the subscriber to receive a copy of the message. Rather, we want these instances to share messages with each other. Each message should receive only one. In the subscriber application, we add one line to our configuration file. This tells the Spring Cloud Stream that all instances form a single group of subscribers who work together.
#adds consumer group processing spring.cloud.stream.bindings.input.group = briefProcessingGroup
After regenerating the jar file of the subscriber and running each file, we see a different setting in RabbitMQ. What you see is one named queue, but three “consumer” queues.

Send two different messages and make sure that each is processed by only one instance of the subscriber. This is an easy way to use a message broker to scale processing.

Performing stateful processing using partitioning
Separation looks like a connected, but different scenario than consumer groups. Partitions in Kafka introduce a level of parallel processing by writing data to different sections. Each subscriber then pulls a message from a given partition (partition) to do the work. Here, in the Spring Cloud Stream, splitting is useful for parallel processing, but also for stateful processing. When setting up, you specify a characteristic that directs messages to this section. Then one instance of the application processes all the data in this section. This can be useful for handling events or any scenario where it is useful for related messages to be processed by the same instance. Consider: counters, complex event handling, or time-sensitive computing tasks.
Unlike consumer groups, partitioning requires configuration changes for both publishers and subscribers. On the publisher’s (producer) side, all you need to specify is: (a) the number of sections and (b) an expression describing the data separation. No code changes.
#adding configuration for partition processing spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.attorney spring.cloud.stream.bindings.output.producer.partitionCount=3
On the consumer side, you set the number of partitions and set the "partitioned" property to "true." What is also interesting, but it is logical, is that as each subscriber starts, you need to give it an “index” so that Spring Cloud Streams knows which section it should read messages from.
#add partition processing spring.cloud.stream.bindings.input.consumer.partitioned=true #spring.cloud.stream.instanceIndex=0 spring.cloud.stream.instanceCount=3
Let's run all over again. The producer runs the same way as before. However, each subscriber instance runs with the "spring.cloud.stream.instanceIndex = X" flag, which indicates which index is being applied.

As you can see, in RabbitMQ, the setting is different from the previous one. Now we have three queues, each with a different “routing key” (“routing key”), which corresponds to its section.

Send the message and note that all messages with the same index name go to the same instance. Change the number and make sure all messages still go to the same place. Switch the number and note that another section (probably) gets it. If you have more varieties of data than partitions, you will see a section descriptor with more than one data set. No problem, just know that this is happening.

Summary
It is not necessary to deal directly with message brokers. Of course, there are many scenarios in which you want to apply advanced broker options, but there are also many cases where you just want a reliable intermediary. In such cases, Spring Cloud Stream simplifies the abstraction of broker knowledge, but at the same time normalizes the behavior of the unique broker implementations.
In
my last Pluralsight course, I spent more than an hour digging into the Spring Cloud Stream, and I worked with Spring Cloud Data Flow for another 90 minutes. This project will help you quickly combine streaming applications. Check it out for a deeper dive!