It so happened that in the process of working in MegaFon, we have to face the same type of tasks when working with RabbitMQ. The question naturally arises: "How to simplify and automate the execution of such tasks?"
The first solution that comes to mind is to use the HTTP interface, and, of course, out of the box RabbitMQ has a good web interface and HTTP API. However, the use of the HTTP API is not always convenient, and sometimes even impossible (assuming you do not have enough access rights, but I really want to publish the message) at such times it becomes necessary to work using AMQP protocol
Not finding the ready-made solutions suitable for me in the open spaces of the network, it was decided to write a small application for working with RabbitMQ using AMQP protocol with the ability to transfer startup parameters via the command line and providing the minimum required set of features, namely:
Python was chosen as the simplest (and in my opinion beautiful) tool for implementing such a task. (here you can bet, but what will it change?)
Habré presents translations of official guides ( one , two ) by RabbitMQ, however, sometimes a simple example from practice is useful. In this article I will try to use the example of a small application to highlight the main issues that arise when working with rabbits on the AMQP channel from Python. The application itself is available on GitHub .
AMQP is one of the most common messaging protocols today between components of a distributed system. The main distinctive feature of this protocol is the concept of constructing a message route, containing two main structural elements: a queue and an exchange point . The queue accumulates messages in itself until they are received. The exchange point is a message distributor that sends them either to the desired queue or to another exchange point. Distribution rules (bindings) , according to which the exchange point determines exactly where to send the message, are based on checking the routing key of the message for a given mask. More details about how the AMQP protocol works can be found here .
RabbitMQ is an open source application that fully supports the AMQP protocol and offers a number of additional features. To work with RabbitMQ, a large number of libraries have been written in various programming languages, including Python.
You can always throw a couple of scripts for personal use and not know the troubles with them. When it comes to distributing them in a circle of colleagues, everything becomes more complicated. Everyone needs to be shown and told how and what to launch, what and where to change, where to get the latest version, and what has changed in it ... It is unwittingly that a simple interface is easier to work once, so as not to waste time on stories in the future. For ease of use, it was decided to divide the application into 4 modules:
This approach allows us to simplify the set of launch parameters. We chose the necessary module, chose one of its modes of operation, and passed the necessary parameters (for more details on the operating modes and parameters in the help –help).
Since the structure of “rabbits” in “MegaFon” consists of a sufficiently large number of nodes, for ease of use, the data for connection to the nodes is made into a module with general parameters and rmq_common_tools.py methods
To work on AMQP in Python, we will use the Pika library.
import pika
When using this library, working with RabbitMQ will consist of three main stages:
The first and last stage are the same for all modules and are implemented in rmq_common_tools.py
To establish a connection:
rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel()
The Pika library allows you to use various options for decorating connection parameters to RabbitMQ. In this case, the most convenient option was to transfer the parameters in the form of a URL string of the following format:
'amqp://rabbit_user:rabbit_password@host:port/vhost'
To close a connection:
rmq_connection.close()
Posting a message is probably the simplest, but at the same time the most sought-after operation when working with rabbits.
Tools for publishing messages are collected in rmq_publish.py
To post a message, use the method
rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text)
Where:
exchange - the name of the exchange point to which the message will be published
routing_key - the routing key with which the message will be published
body - the message body
rmq_publish.py supports two message entry modes for posting:
The second mode, in my opinion, is more convenient when working with large messages or arrays of messages. The first one in turn allows you to send a message without additional files, which is convenient when integrating the module into other scenarios.
The issue of receiving messages is not such a trivial thing as a publication. When it comes to reading messages, you need to understand:
The message reader is implemented in the file rmq_consume.py
There are two modes of operation:
The issue of creating a queue and routes will be discussed below.
Directly proofreading is implemented as follows:
channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc())
Where
on_message - the message handler procedure
params.queue - the name of the queue from which reading will be made
The message handler must perform some operation with the message read and acknowledge (or not confirm if required) the delivery of the message.
def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log(' .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag)
Where
all_cnt - global counter
lim - the number of messages that must be considered
In such an implementation, the handler provides for reading a certain number of messages and displaying information about the progress of reading in the console, if writing occurs to a file.
It is also possible to implement the recording of read messages in the database. In the current implementation, this possibility is not presented, but it is not difficult to add.
An example of writing messages to the database will be considered for the Oracle database and the cx_oracle library.
Connect to the database
ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor()
Add to handler on_message
global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1
Where
cnt is another counter
commit_int - the number of inserts into the database, after which it is necessary to make a “commit”. The presence of such a parameter is due to the desire to reduce the load on the database. However, installing it is not particularly large, since in the event of a failure, there is a chance to lose messages read after the last successful commit.
And, as expected, at the end of the work we make the final commit and close the connection.
ora_cursor.execute('commit') connection_ora.close()
Something like this is reading messages. If you remove the limit on the number of messages read, then you can make a background process for continuous reading messages from the "rabbit".
Despite the fact that AMQP is primarily intended for publishing and reading messages, it also allows you to perform simple manipulations with route configuration (this is not about the configuration of network connections and other RabbitMQ settings as applications).
The basic configuration operations are:
Since for each of them there is a ready-made procedure in the pika library, then, for ease of launching, they are simply assembled in the rmq_setup.py file. Next, we list the procedures from the pika library with a few comments about the parameters.
Creating a queue
rmq_channel.queue_declare(queue=params.queue, durable = params.durable)
everything is simple
queue - the name of the queue to be created
durable - a boolean parameter, the value True will mean that the queue will continue to exist when the rabbit is rebooted. In the case of False when rebooting, the queue will be deleted. The second option is usually used for temporary queues, which are guaranteed not needed in the future.
Creating exchange points (exchange)
rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable)
here there is a new exchange_type parameter - type of exchange point. About what types of points of exchange are available here .
exchange - the name of the created exchange point
Deleting a queue or exchange point
rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch)
Creating a forwarding rule (binding)
rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
exchange - the name of the exchange point from which the shipment will be made
queue - the name of the queue to be sent to
routing_key - the mask of the routing key by which the transfer will be made.
The following entries are allowed:
Remove forwarding rule (binding)
rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
everything is by analogy with the creation of a forwarding rule.
Clearing the queue
rmq_channel.queue_purge(queue=params.queue)
queue - the name of the queue to be cleared
Startup options make life easier. So as not to edit the code before each launch, it is logical to provide a mechanism for passing parameters at startup. For this purpose, the argparse library was chosen. I will not go into the details of its use in detail, there are enough guides on this occasion ( one , two , three ). I note only that this tool has helped me greatly simplify the process of using the application (if you can call it that). Even having thrown a simple sequence of commands and wrapping them in a similar interface, you can get quite a full-fledged and easy-to-use tool.
Well, now a few impressions about using the AMQP protocol in everyday life.
The most requested feature was posting a message. The access rights of a particular user do not always allow using the web interface, although it is sometimes necessary to test this or that service. Here, AMQP and authorization on behalf of the service using this channel come to the rescue.
The second most popular was the ability to read messages from a temporary queue. This feature is useful when setting up new routes and message flows, as well as preventing accidents.
The remaining possibilities also found application in various tasks.
Source: https://habr.com/ru/post/434510/
All Articles