
In continuation of the
first lesson in the study of the basics of RabbitMQ I publish a translation of the second lesson from the
official site . All examples, as before, in python, but still they can be implemented on most
popular PL .
Task queues
In the first lesson we wrote two programs: one sent messages, the second received them. In this lesson, we will create a queue that will be used to distribute resource-intensive tasks among multiple subscribers.
The main purpose of such a queue is not to start the task right now and not wait until it is completed. Instead, tasks are postponed. Each message corresponds to one task. The handler program running in the background will accept the task for processing, and after some time it will be executed. When you run multiple handlers, tasks will be divided between them.
')
This principle of operation is especially useful for applications in web applications where it is impossible to process a resource-intensive task during an HTTP request.
Training
In the previous lesson, we sent a message with the text “Hello World!”. And now we will send messages corresponding to resource-intensive tasks. We will not perform real tasks, such as resizing an image or rendering a pdf file, let's just do a stub using the
time.sleep () function. The complexity of the task will be determined by the number of points in the message line. Each point will be “executed” one second. For example, a task with the message “Hello ...” will be executed for 3 seconds.
We will slightly
modify the send.py program
code from the
previous example so that it is possible to send arbitrary messages from the command line. This program will send messages to our queue, planning to perform new tasks. Let's call it
new_task.py :
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
The
receive.py program from the previous example should also be changed: it is necessary to simulate the performance of useful work, a second for each point of the message text. The program will receive a message from the queue and perform the task. Let's call it
worker.py :
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
Cyclic distribution
One of the advantages of using the task queue is the ability to perform work in parallel with several programs. If we do not have time to perform all incoming tasks, we can simply add the number of handlers.
To begin, let's launch two programs
worker.py at once . Both will receive messages from the queue, but how? Now we will see.
You need to open three terminal windows. In two of them the
worker.py program will be launched. It will be two subscribers - C1 and C2.
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C
In the third window, we will publish new tasks. After subscribers are started, you can send any number of messages:
shell3$ python new_task.py First message. shell3$ python new_task.py Second message.. shell3$ python new_task.py Third message... shell3$ python new_task.py Fourth message.... shell3$ python new_task.py Fifth message.....
Let's see what has been delivered to subscribers:
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
By default, RabbitMQ will send each new message to the next subscriber. Thus, all subscribers will receive the same number of messages. This kind of message distribution is called
round-robin [ round-robin algorithm] . Try the same with three or more subscribers.
Message Verification
These tasks take a few seconds to complete. You may have already wondered what would happen if the handler started the task, but unexpectedly stopped working, having completed it only partially. In the current implementation of our programs, the message is deleted as soon as RabbitMQ delivered it to the subscriber. Therefore, if you stop the handler during operation, the task will not be executed, and the message will be lost. Delivered messages that have not yet started processing will also be lost.
But we do not want to lose any tasks. We need to, in the event of an emergency exit of one handler, the message is transferred to another.
So that we can be sure that there are no lost messages, RabbitMQ supports message acknowledgment. A confirmation is sent by the subscriber to inform RabbitMQ that the received message has been processed and RabbitMQ can delete it.
If the subscriber has stopped working and has not sent a confirmation, RabbitMQ will understand that the message has not been processed and will forward it to another subscriber. So you can be sure that no message will be lost, even if the execution of the handler program has stopped unexpectedly.
There is no timeout to process messages. RabbitMQ will transfer them to another subscriber only if the connection with the first one is closed, therefore there are no restrictions on the processing time of the message.
The default is manual confirmation of messages. In the previous example, we forced the automatic confirmation of messages by specifying
no_ack = True . Now we will remove this flag and send the confirmation from the handler immediately after the task is completed.
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello')
Now, even if you stop the handler by pressing
Ctrl + C while processing the message, nothing will be lost. After stopping the handler, RabbitMQ will re-transmit unconfirmed messages.
Do not forget to confirm messages.
Sometimes developers forget to add
basic_ack to the code. The consequences of this small error can be significant. The message will be re-transmitted only when the handler program is stopped, but RabbitMQ will consume more and more memory, because will not delete unconfirmed messages.
To debug this kind of error, you can use
rabbitmqctl to display the
messages_unacknowledged field (unconfirmed messages):
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
[or use a more convenient monitoring script, which I cited in the first part ]Message persistence
We figured out how not to lose the task if the subscriber suddenly stopped working. But tasks will be lost if RabbitMQ server stops working.
By default, when the RabbitMQ server stops or crashes, all queues and messages are lost, but this behavior can be changed. In order for messages to remain in the queue after the server is restarted, it is necessary to make both the queues and the messages stable.
First, make sure that the queue is not lost. For this you need to declare it as stable (
durable ):
channel.queue_declare(queue='hello', durable=True)
Although this command itself is correct, it will not work now, because the
hello queue has already been declared unstable. RabbitMQ does not allow overriding parameters for an existing queue and will return an error when trying to do this. But there is a simple workaround - let's declare a queue with a different name, for example,
task_queue :
channel.queue_declare(queue='task_queue', durable=True)
This code must be corrected for both the supplier and the subscriber.
So we can be sure that the
task_queue queue
will not be lost when the RabbitMQ server is restarted. Now you need to mark messages as resilient. To do this, pass the
delivery_mode property with a value of
2 :
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2,
Message Resilience Notice
Marking a message as sustainable does not guarantee that the message will not be lost. Despite the fact that this causes RabbitMQ to save the message to the disk, there is a short period of time when RabbitMQ has confirmed the acceptance of the message, but has not yet recorded it. Also RabbitMQ does not do
fsync (2) for each message, so some of them can be saved to the cache, but not yet written to disk. The message stability guarantee is not complete, but it is more than enough for our task queue. If you need higher reliability, you can wrap transactions in a transaction.
Uniform distribution of messages
You may have noticed that the distribution of messages still does not work as we need. For example, when two subscribers work, if all odd messages contain complex tasks
[require a lot of time to perform] , and even ones are simple, then the first handler will be constantly busy, and the second will be free most of the time. But RabbitMQ knows nothing about it and will still send messages to subscribers in turn.
This happens because RabbitMQ distributes messages at a time when they are in the queue, and does not take into account the number of unacknowledged messages from subscribers. RabbitMQ simply sends every nth message to the nth subscriber.

In order to change this behavior, we can use the
basic_qos method with the
prefetch_count = 1 option. This will force RabbitMQ not to give the subscriber more than one message at a time. In other words, the subscriber will not receive a new message until he processes and confirms the previous one. RabbitMQ will send the message to the first released subscriber.
channel.basic_qos(prefetch_count=1)
Remark on queue size
If all subscribers are busy, the queue size may increase. You should pay attention to this and, possibly, increase the number of subscribers.
Well, now all together
The full code of the program
new_task.py :
The complete code for
worker.py is :
Using the confirmation of messages and
prefetch_count , you can create a queue of tasks. Resilience tuning will allow tasks to persist even after the RabbitMQ server is restarted.
In the third lesson we will discuss how you can send one message to several subscribers.