📜 ⬆️ ⬇️

RabbitMQ - Deferred messages, part 2

image In the previous article about deferred messages, I considered the option of organizing deferred messages for the simple case in which all deferred messages have the same delay time. However, right there in the comments I was told that this version of the organization of deferred messages will create problems when trying to use it for messages with different delay times.

In this regard, I want to give a more universal (but slightly more complicated) solution that allows you to organize deferred messages with an arbitrary delay time.

What is the problem?


The queues in RabbitMQ are arranged in such a way that no message can leave the queue before the previous messages leave it. Actually, then she and all - you can not climb forward. Even if the message has the “expiration” parameter set, and the life of this message has expired, it still does not give the right to leave the queue while there are other messages in front of it. The outdated message will hang in the queue until its turn comes (pun, aha).

At that moment, when his turn finally comes, one of two things will happen:
')
  1. If the queue does not have the “x-dead-letter-exchange” parameter set, the message will simply be deleted, without being delivered to anyone
  2. If the “x-dead-letter-exchange” parameter is set, the message will be transferred to the specified exchanger

In this connection, in the variant considered in the previous article the following happened:

If all messages had the same "expiration", then they all lined up one after the other in a queue and waited until all previous messages were quit. At the first message the storage time expired, this message left the queue and was delivered to the specified exchanger, freeing the exit from the queue to all other messages. The remaining messages had the same expiration value, therefore they were always obsolete at least at the same time as the previous one, or later, respectively, by the time of their obsolescence they were already the first in the queue and no one had delayed them.

Another thing is if the message in the queue with the value of the parameter “expiration” is larger than that of other messages. In this case, the message with a large “expiration” reached the exit from the queue and “stuck” there until the expiration of its lifetime. And behind him began to accumulate messages with a small «expiration», which could not get out of the queue, even if outdated. Then the message with a large “expiration” left the queue and immediately behind it all the accumulated messages, in which the “expiration” was outdated, fell out.

In short, when sending messages with delays of 30, 20, 10, the order of output was just that, and not the expected 10, 20, 30.

How to beat it


In the same article, in the previous article, I proposed a simple solution: to create not one common delayed queue, but several - for each task, its own delayed queue. It is assumed that for one task one delay will be enough. But, if you need the ability to set different delays even within the same task - let's make a universal way to create pending messages.

The main idea is this - if messages with different delays interfere with each other in the same queue, then we will create our own personal queue for each delay!

image

Key points:

  1. Create queues for deferred messages, which differ in delay - their own queue for each delay
  2. We bind the usual queue with the exchanger using the “name_lead_free. *” Key so that it gets messages regardless of the delay

Recipient


Consider the consumer_dlx.pl script:

#!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; #  $mq->connect("localhost", {user => $user, password => $password}); #  $mq->channel_open(1); #      --- for my $i (1..2) { my $exchange = 'myexchange' . $i; #  $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'}); for my $j (1..2) { my $queue = 'myqueue' . $j; #  my $queue_full = "$exchange.$queue"; $mq->queue_declare(1, $queue_full, {auto_delete => 0}); #  my $routing_key = $queue . '.*'; $mq->queue_bind(1, $queue_full, $exchange, $routing_key); #  $mq->consume(1, $queue_full); } } #   ( ) while ( my $msg = $mq->recv() ) { print "$msg->{body} ($msg->{routing_key})\n"; } 

As in the previous article, there are no features in the recipient's script that are directly related to pending queues.

Pay attention to only one important point - the exchanger is created here with the type of “topic”. This is due to the fact that the routing_key, which we will use, will contain two parameters - the name of the queue and the time of the desired delivery.

Moreover, in the recipient itself, the delivery time is not taken into account, as indicated by the following line:

 my $routing_key = $queue . '.*'; 

As you can see, the second parameter is set as '*' , which is why all messages will be delivered to the normal queue, without taking into account time (and this is what is required for a normal queue).

Sender


Now consider the producer_dlx.pl script:

 #!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; my $message = $ARGV[0] || 'mymessage'; my $exchange = $ARGV[1] || 'myexchange1'; my $queue = $ARGV[2] || 'myqueue1'; my $delay = $ARGV[3] || 0; #  $mq->connect("localhost", {user => $user, password => $password}); #  $mq->channel_open(1); #  $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'}); #  dlx my $exchange_dlx = $exchange . '.dlx'; $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'}); #  dlx my $endtime = time() + $delay; my $queue_full = "$exchange.$queue.$endtime"; $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000}); #  my $routing_key = "$queue.$endtime"; $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key); #   $mq->publish(1, $routing_key, $message, {exchange => $exchange_dlx}); 

Here the following points are important:

 #  dlx my $exchange_dlx = $exchange . '.dlx'; $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'}); 

The exchanger for temporary messages as well as the usual exchanger must be of the type 'topic'.

 #  dlx my $endtime = time() + $delay; my $queue_full = "$exchange.$queue.$endtime"; $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000}); 

We calculate the desired delivery time for the $endtime by adding the specified delay to the current time (using the values ​​in seconds, unixtime).

Then we create a personal queue for the specified delay and enter the delivery time directly in the queue name. Entering the time in the queue name is not a technical necessity, you can give any name to the queue, but to ensure clarity and avoid confusion, it is most convenient to do so.

When creating a queue, you need to pass three arguments:

  1. 'x-message-ttl' is the lifetime of messages. As you remember, we have a separate queue for each delay, so we can set a delay for all messages in the queue at once, instead of specifying the same 'expiration' value for each individual message.
  2. 'x-dead-letter-exchange' is the name of the exchanger where the messages from this queue will be placed.
  3. 'x-expires' is the lifetime of the queue itself. Since we are creating a new pending queue for every delay, these queues will constantly accumulate. So that they do not interfere with how much in vain, we will give them a time to live, after which they will be automatically removed. Important! Queue life time should be longer than the message lifetime. If you set the queue lifetime to be equal to the message lifetime, then the delivery of messages is not guaranteed - the queue may be crashed before the messages are delivered from it. In this example, the queue lifetime is set 10 seconds longer than the message lifetime.

 #  my $routing_key = "$queue.$endtime"; $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key); 

The key routing_key is set in the form of "name_learn_degree.the desired_time_delivery." The dot in the middle divides the key into two parameters; these parameters are parsed by the type “topic” exchanger.

As you can see, both parameters are included in the binding here. Accordingly, the exchanger for temporary messages will send messages with this key to one specific queue with a specific delivery time.

Launch


First you need to run consumer_dlx.pl, then you can run producer_dlx.pl with different parameters.

Parameters: [message] [exchanger] [queue] [delay in seconds].

image

As you can see, first messages were sent with a longer delay, but the messages were delivered in the correct order - first, those with less delay.

Source: https://habr.com/ru/post/235983/


All Articles