#!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; # $mq->connect("localhost", {user => $user, password => $password}); # $mq->channel_open(1); # --- for my $i (1..2) { my $exchange = 'myexchange' . $i; # $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'}); for my $j (1..2) { my $queue = 'myqueue' . $j; # my $queue_full = "$exchange.$queue"; $mq->queue_declare(1, $queue_full, {auto_delete => 0}); # my $routing_key = $queue . '.*'; $mq->queue_bind(1, $queue_full, $exchange, $routing_key); # $mq->consume(1, $queue_full); } } # ( ) while ( my $msg = $mq->recv() ) { print "$msg->{body} ($msg->{routing_key})\n"; }
my $routing_key = $queue . '.*';
'*'
, which is why all messages will be delivered to the normal queue, without taking into account time (and this is what is required for a normal queue). #!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; my $message = $ARGV[0] || 'mymessage'; my $exchange = $ARGV[1] || 'myexchange1'; my $queue = $ARGV[2] || 'myqueue1'; my $delay = $ARGV[3] || 0; # $mq->connect("localhost", {user => $user, password => $password}); # $mq->channel_open(1); # $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'}); # dlx my $exchange_dlx = $exchange . '.dlx'; $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'}); # dlx my $endtime = time() + $delay; my $queue_full = "$exchange.$queue.$endtime"; $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000}); # my $routing_key = "$queue.$endtime"; $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key); # $mq->publish(1, $routing_key, $message, {exchange => $exchange_dlx});
# dlx my $exchange_dlx = $exchange . '.dlx'; $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'});
# dlx my $endtime = time() + $delay; my $queue_full = "$exchange.$queue.$endtime"; $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000});
$endtime
by adding the specified delay to the current time (using the values ​​in seconds, unixtime).'x-message-ttl'
is the lifetime of messages. As you remember, we have a separate queue for each delay, so we can set a delay for all messages in the queue at once, instead of specifying the same 'expiration'
value for each individual message.'x-dead-letter-exchange'
is the name of the exchanger where the messages from this queue will be placed.'x-expires'
is the lifetime of the queue itself. Since we are creating a new pending queue for every delay, these queues will constantly accumulate. So that they do not interfere with how much in vain, we will give them a time to live, after which they will be automatically removed. Important! Queue life time should be longer than the message lifetime. If you set the queue lifetime to be equal to the message lifetime, then the delivery of messages is not guaranteed - the queue may be crashed before the messages are delivered from it. In this example, the queue lifetime is set 10 seconds longer than the message lifetime. # my $routing_key = "$queue.$endtime"; $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key);
Source: https://habr.com/ru/post/235983/
All Articles