📜 ⬆️ ⬇️

RabbitMQ - delayed messages

image

On Habré there is a series of translations of the official manual for RabbitMQ ( 1 , 2 , 3 , 4 , 5 ). Unfortunately, the official guide does not deal with the organization of deferred messages, but I consider this question to be very important. Therefore, I decided to write such an article myself.

Code examples will be on Pearl, but there will be no Pearl-specific details in the code, so examples can be relatively easily adapted to any other language.

Formulation of the problem


Sometimes you need to perform any task not "right-right-second-second," but after a while.
')
For example, we have a script that occasionally accesses an API, and, if the answer has not changed, it “spills” for a while, then “wakes up” and checks again.

Or, for example, we saved a temporary file and we need to start a timer to delete the file after a specified time.

In such cases, we need a mechanism to create a delayed message in RabbitMQ (unless, of course, we want to do this using RabbitMQ).

Unfortunately, in RabbitMQ itself there is no ready mechanism for publishing pending messages. Messages posted by RabbitMQ senders are delivered instantly to recipients. Of course, the recipient may not be connected to RabbitMQ, in which case the message will be delivered after connection, but if the recipient is connected, the message will be delivered immediately.

You can’t just post a message and say to him: “Lie down so far unnoticed in the little corner, and after 10 minutes, get out and be delivered to the recipient.”

Therefore, the problem arises - how, with the help of RabbitMQ, to organize deferred messages?

Decision


To do this, you have to make a workaround. The key idea is that if a message sent to a queue is immediately delivered to the recipient who is listening to this queue, then you need to send this message to another queue!

In general, the scheme of work will be as follows:

image

  1. We create an exchanger to which deferred messages will be sent.
  2. Create a queue to store pending messages.
  3. We make a binding between the queue and the exchanger
  4. We set up the queue so that the messages, having lain in it for some specified time, are sent to the usual exchanger for immediate delivery to the recipient

Recipient


Consider the consumer_dlx.pl script:

#!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; my $exchange = 'myexchange'; my $queue = 'myqueue'; my $routing_key = 'mykey'; #  $mq->connect("localhost", {user => $user, password => $password}); #  $mq->channel_open(1); #  $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'}); #  $mq->queue_declare(1, $queue); #  $mq->queue_bind(1, $queue, $exchange, $routing_key); #  $mq->consume(1, $queue); #   -- $mq->queue_declare(1, $queue.'2'); $mq->queue_bind(1, $queue.'2', $exchange, $routing_key.'2'); $mq->consume(1, $queue.'2'); #   ( ) while ( my $msg = $mq->recv() ) { print "$msg->{body} ($msg->{routing_key})\n"; } 

I will not focus on each line of this script, since there is nothing new for a person who has read the above-mentioned articles from the manual. This is quite a normal recipient of messages, there is not even any specifics associated with the topic in question - deferred messages. The recipient is needed only for demonstration, all the salt will be in the sender.

I will note only one thing:

Please note that the recipient creates and listens to two different queues, intertwined with the exchanger with two different routing_key. In principle, one queue is enough, but with two it will be clearer, plus it will help to further demonstrate one useful feature.

Sender


Now consider the producer_dlx.pl script:

 #!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; my $exchange = 'myexchange'; my $exchange_dlx = 'myexchange.dlx'; my $queue_dlx = 'myqueue.dlx'; my $message = $ARGV[0] || 'mymessage'; my $routing_key = $ARGV[1] || 'mykey'; my $expiration = $ARGV[2] || 0; #  $mq->connect("localhost", {user => $user, password => $password}); #  $mq->channel_open(1); #  $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'}); #  dlx $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'}); #  dlx $mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange}); #  $mq->queue_bind(1, $queue_dlx, $exchange_dlx, $routing_key); #   $mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration}); 

Let's sort separate sections of the code.

 #  $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'}); 

This is the same exchanger that is used in the recipient. Our sender does not send messages directly to this exchanger, but you still need to create an exchanger, since it will still be used, albeit indirectly.

 #  dlx $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'}); 

This is an exchanger to which we will send pending messages.

Pay attention to the type of exchanger being created - 'fanout', in contrast to the first exchanger having the type 'direct'. Next, I will explain why it is 'fanout'.

 #  dlx $mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange}); 

Here we create a queue in which the pending messages will be placed.

The 'x-dead-letter-exchange' argument is the nail that holds the entire mechanism for deferred messages. If this argument is specified for the queue, then messages that have expired are automatically transferred from this queue to the exchanger specified in this argument.

Accordingly, as the exchanger you need to specify the usual exchanger, with which the recipient works.

Just in case, a mark for those that are not familiar with Pearl: the construction of {} in the third parameter means that in this place you need to pass a reference to the hash with options, but since in this particular case no options are required, a link to empty hash.

 #   $mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration}); 

Send a message to the exchanger for deferred messages.

The 'expiration' parameter is important here. This parameter specifies the message storage time in milliseconds. After this time, the message will be removed from the queue. But, as mentioned above, if the 'x-dead-letter-exchange' argument is specified for the queue, then simultaneously with the removal from the queue, the message will be sent to the exchanger specified in the argument, which in turn will send the message to the usual queue associated with it for immediate delivery.

Thin moment with routing_key


As you remember, in the recipient we created one exchanger of the type 'direct' and two queues interlaced with it for different keys. Such a scheme can be used to send messages on one topic to two different recipients, for example, sending a log to a file or to the console, depending on the situation. For which queue to send a message, the routing_key key is responsible.

Now imagine that two messages with two different keys need to be set aside. We will send them to the exchanger for deferred messages, the exchanger will have to decide which queue to send them to. But they have different routing_key, so the exchanger, if it were of the type 'direct', could not send them to the same queue.

That is why the exchanger for deferred messages is done like 'fanout' - this type of exchanger ignores routing_key and sends messages to all the queues that are intertwined with it. In our case, only one queue is interlaced - the queue for pending messages. Accordingly, all messages with any routing_key sent to the exchanger for pending messages will go to this queue.

The attentive reader in this place should ask: “And with what routing_key will the messages be sent to the usual exchanger after the expiration of their retention in the message queue?”

They will be sent with the same routing_key that they had. The value of routing_key does not change if you do not specifically do anything for this (but if you wish, you can configure the queue to change the routing_key).

Launch


First you need to run consumer_dlx.pl, then you can run producer_dlx.pl with different parameters.

Parameters: [message] [key mykey or mykey2] [delay in milliseconds].

image

It is not visible on the static picture, but after launching producer_dlx.pl with an indication of the delay, this very delay occurs, and then consumer_dlx.pl displays a message (the key is displayed in brackets).

WARNING


As the user Tsyganov_Ivan correctly suggested to me here, there is a problem with messages that have different expired. The fact is that messages “exit” from the queue in a strictly sequential manner (that’s why it’s a queue). Because of this, a situation is possible when a message with a large expired, going ahead, will “block” the output from the queue to messages that have a small expired, even if this small expired has already expired.

Therefore, if all of a sudden you have to specify different 'expired' for different queues, then instead of one common deferred queue, make several individual deferred queues - each has its own deferred queue.

A more universal solution for arbitrary values ​​of 'expired' is described in the second part of the article .

Source: https://habr.com/ru/post/235505/


All Articles