📜 ⬆️ ⬇️

RabbitMQ tutorial 4 - Routing

I continue the series of translation lessons from the official site . Examples will be on php, but they can be implemented on most popular PL .
In the previous article, we developed a logging system. We were able to send messages to multiple recipients. In this article we are updating our program - we will send the recipient only a part of the messages. For example, we can save only messages with critical errors on a disk (saving disk space), and in the console we will display all messages.

Bindings


In the previous article, we created bindings. Recall the code:
$channel->queue_bind($queue_name, 'logs'); 

Binding is the connection between the access point and the queue. This can be interpreted as: the queue wants to receive messages from the access point.
Binding can take the parameter routing_key. In order not to be confused with the $ channel :: basic_publish parameter (it also contains the routing_key parameter), let's call it binding_key. Consider creating a binding with the binding_key key:

 $binding_key = 'black'; $channel->queue_bind($queue_name, $exchange_name, $binding_key); 


The value of this key depends on the type of access point. A fanout access point will simply ignore it.
')

Direct Access Point


Our logging system in the previous article sent all messages to all subscribers. We want to expand our system to filter messages by importance. For example, we will make the script that writes logs to disk not spend its space on messages with warning or info.
Previously, we used an access point with a fanout type, which does not give us complete flexibility - it is only suitable for simple translation.
Instead, we will use the direct type. Its algorithm is very simple - messages go to the queue whose binding_key matches the routing key of the message.
Consider the diagram in the picture:

The diagram shows an access point X and two associated queues. The first stage is associated with binding key = orange, and the second stage has two connections. One with the key binding key = black, and the second with the key green.
Messages with routing key = orange will be sent to the Q1 queue, and messages with the black or green key will be sent to the Q2 queue. All other messages will be deleted.

Multiple bindings



It is perfectly acceptable to bind multiple queues with the same binding key. In this example, we associate an access point X and a queue Q1 with the same black key as the queue Q2. In this example, direct behaves in the same way as fanout: sends messages to all associated queues. Messages with the black key will fall into both queues Q1 and Q2.

Sending logs


Let's build an algorithm for sending messages. Instead of fanout for the access point we will use the direct type. Routing key will match the name of the log type. Assume that the log sending script will know the type of log.
First, create an access point:
 $channel->exchange_declare('direct_logs', 'direct', false, false, false); 

Now we will send the message:
 $channel->exchange_declare('direct_logs', 'direct', false, false, false); $channel->basic_publish($msg, 'direct_logs', $severity); 

The log will have 3 types: 'info', 'warning', 'error'.

Subscription


Sending messages will be the same as in the example of the previous article , with one condition - you need to create your own binding for each type of log.
 foreach($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } 


Total we get



The script code of the producer emit_log_direct.php:
 <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $severity = $argv[1]; if(empty($severity)) $severity = "info"; $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo " [x] Sent ",$severity,':',$data," \n"; $channel->close(); $connection->close(); ?> 


The script code of the subscriber's receive_logs_direct.php:
 <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $severities = array_slice($argv, 1); if(empty($severities )) { file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); exit(1); } foreach($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?> 


If you want to save to the file only logs with the error and warning types, type in the console:

$ php receive_logs_direct.php warning error > logs_from_rabbit.log

If you want to display all the logs on the screen, type in the console

 $ php receive_logs_direct.php info warning error [*] Waiting for logs. To exit press CTRL+C 


Or to pull out only the error logs:
 $ php emit_log_direct.php error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.' 

(source (emit_log_direct.php source) and (receive_logs_direct.php source) )

In the following article, the interception of messages corresponding to a template will be considered.

Source: https://habr.com/ru/post/201096/


All Articles