I want to continue the
series of translation lessons from the
official site . Examples will be on php, but they can be implemented on most popular
PL .
Publish / Subscribe
In the previous article, the creation of a working message queue was considered. It was assumed that each message will be sent to one worker (worker). In this article we will complicate the task - send a message to several subscribers. This pattern is known as "
publish / subscribe " (publish / subscribe).
To understand this template, create a simple logging system. It will consist of two programs - the first will create logs, the second will read and print them.
In our logging system, each program the subscriber will receive each message. Due to this, we can start one subscriber to save the logs to disk, and then at any time we can create another subscriber to display the logs on the screen.
')
Essentially, every message will be broadcast to every subscriber.
Exchange points (exchanges)
In previous articles we have worked with the queue to send and receive messages. Now consider the Rabbit extended messaging model.
Recall the terms of the previous article:
- Producer (supplier) - a program that sends messages
- Queue (queue) - the buffer storing the message
- Consumer (subscriber) - a program that receives messages.
The main idea in the Rabbit message sending model - the Supplier (producer) never sends messages directly to a queue. In fact, quite often the supplier does not know whether his message reached the specific queue.
Instead, the provider sends a message to the access point. At the access point is no big deal. The access point performs two functions:
- receives messages from the supplier;
- sends these messages to the queue.
The access point knows exactly what to do with incoming messages. Send a message to a specific queue, or in several queues, or not send it to anyone and delete it. These rules are described in the type of access point (exchange type).

There are several types: direct, topic, headers and fanout. We will focus on the last type of fanout. Create a point with an access with this type and name it - logs:
$channel->exchange_declare('logs', 'fanout', false, false, false);
The fanout type is very simple. He copies all messages that come to him in all the queues that are available to him. This is what we need for our logging system.
View a list of access points:
To view all access points on the server, you need to run the rabbitmqctl command:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
We see a list of access points with the name amq. * And an access point with no name, which is used by default (it is not suitable for performing our task).
Name of access points.
In previous articles, we did not know anything about access points, but still we could send letters to the queue. This was possible because they used the default access point, which is identified by the empty string “”.
Recall how earlier we sent letters:
$channel->basic_publish($msg, '', 'hello');
It uses the default access point or unnamed access point: the message is sent to the queue identified by the “routing_key” key. The “routing_key” key is transmitted via the third parameter of the basic_publish function.
Now we can send a message to our named access point:
$channel->exchange_declare('logs', 'fanout', false, false, false); $channel->basic_publish($msg, 'logs');
Temporary queues:
All this time we used the name of the queues (“hello” or “task_queue”). The ability to give names helps to indicate to the processors (workers) a specific queue, as well as to divide the queue between producers and subscribers.
But our logging system requires that all messages go to the queue, and not just a part. We also want messages to be relevant, not old. For this we need 2 things:
“Every time we connect to Rabbit, we create a new queue, or we give the server a random name;
“Every time a subscriber disconnects from Rabbit, we delete the queue.”
In the php-amqplib client, when we access the queue without a name, we create a temporary queue and an automatically generated name:
list($queue_name, ,) = $channel->queue_declare("");
The method will return the automatically generated queue name. She may be like this - 'amq.gen-JzTY20BRgKO-HjmUJj0wLg.'.
When the claimed connection is terminated, the queue is automatically deleted.
Bindings

So, we have a fanout access point and a queue. Now we need to tell the access point to send a message to the queue. The relationship between the access point and the queue is called bindings.
$channel->queue_bind($queue_name, 'logs');
From now on, messages for our queue pass through the access point.
You can view the list of bindings using the rabbitmqctl list_bindings command.
Sending to all queues:

The producer program that creates the posts has not changed from the previous article. The only important difference is that we now send messages to our named access point 'logs', instead of the default access point. We needed to specify the queue name when sending the message. But for an access point with a fanout type, this is not necessary.
Consider the script code emit_log.php:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "info: Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo " [x] Sent ", $data, "\n"; $channel->close(); $connection->close(); ?>
(emit_log.php source)As you can see, after the connection is established, we create an access point. This step is necessary because the use of a non-existent access point is prohibited.
The message at the access point will be lost because no queues are associated with the access point. But this is good for us: as long as there is not a single subscriber of our access point, all messages can be safely deleted.
Subscriber code receive_logs.php:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
(receive_logs.php source)If you want to save the logs to a file, you need to open the console and type:
$ php receive_logs.php > logs_from_rabbit.log
If you want to display logs on the screen, open another window and type:
$ php receive_logs.php
And, of course, the launch of the message producer:
$ php emit_log.php
Using the rabbitmqctl list_bindings command, we can make sure that the code correctly created the queue and associated it with the access point. With two open programs receive_logs.php you should have the following:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
This shows that these access points logs are sent to two queues whose names are created automatically. This is exactly what we wanted.
The following article will describe how to listen to only part of the messages.