In continuation of the
fifth lesson in the study of the basics of RabbitMQ, I publish the translation of the sixth lesson from the
official site . All examples are written in python (pika version 0.9.8 is used), but still they can be implemented on most
popular PL .
In the
second lesson, we looked at the use of task queues to distribute resource-intensive tasks among several subscribers.
But what if we want to run a function on a remote machine and wait for the result? Well, that's another story. This pattern is commonly known as Remote Procedure Call or RPC, hereinafter referred to as RPC.
')
In this guide, we will build using RabbitMQ, an RPC system that will include a client and a scalable RPC server. Since we do not have a real time-consuming task requiring distribution, we will create a simple RPC server that returns Fibonacci numbers.
Client interface
To illustrate the use of the RPC service, create a simple client class. This class will contain a
call method that will send RPC requests and block until a response is received:
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print "fib(4) is %r" % (result,)
RPC Note
Although RPC is a fairly common pattern, it is often criticized. Problems usually arise when the developer does not know exactly what function he is using: local or slow, performed by RPC. Confusion, like this, can lead to unpredictable system behavior, and also introduces unnecessary complexity in the debugging process. Thus, instead of simplifying software, incorrect use of RPC can lead to unattended and unreadable code.
Based on the above, the following recommendations can be made:
- Make sure that it is obvious which function is called in each case: local or remote;
- Document your system. Make the dependencies between the components explicit;
- Handle errors. How should the client respond if the RPC server does not respond within a long period of time?
- If in doubt, do not use RPC. If possible, use an asynchronous pipeline instead of a blocking RPC when the results are asynchronously transferred to the next level of processing.
Results queue
In general, making RPC through RabbitMQ is easy. The client sends the request and the server responds to the request. To get a response, the client must submit a queue to post the results along with the request. Let's see how it looks in code:
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
Message Properties
The AMQP protocol has 14 predefined message properties. Most of them are used extremely rarely, with the exception of the following:
- delivery_mode : marks the message as “persistent” (with a value of 2) or “temporary” (any other value). You must remember this property in the second lesson ;
- content_type : used to describe the presentation format (mime). For example, for frequently used JSON format, it is considered good practice to set this property in application / json;
- reply_to : usually used to specify a result queue;
- correlation_id : property used to match RPC responses with queries.
Correlation id
In the method presented above, we suggested creating a response queue for each RPC request. This is somewhat redundant, but, fortunately, there is a better way - let's create a common queue of results for each client.
This raises a new question, getting an answer from this queue is not entirely clear to which query this answer corresponds. And here the
correlation_id property comes in handy. We will assign this property a unique value with each request. Later, when we retrieve the resulting response from the response queue, based on the value of this property, we will be able to uniquely match the request with the response. If we meet an unknown value in the
correlation_id property, we can safely ignore this message, since it does not match any of our requests.
Could you ask why we plan to simply ignore unknown messages from the response queue instead of interrupting the script? This is due to the likelihood of a race condition on the server side. Although this is unlikely, a scenario is quite possible in which the RPC server will send us an answer, but will not have time to send a confirmation of the processing of the request. If this happens, the restarted RPC server will process the request again. That is why on the client we have to correctly handle duplicate responses. In addition, RPC ideally should be idempotent.
Results
Our RPC will work as follows:
- When the Client starts, it creates an anonymous, unique results queue;
- To make an RPC request, the Client sends a message with two properties:
reply_to , where the value is the result queue and the
correlation_id set to a unique value for each request.
- The request is sent to the
rpc_queue queue ;
- The server is waiting for requests from this queue. When a request is received, the Server performs its task and sends a message with the result back to the Client using the queue from the
reply_to property;
- The client is waiting for the result from the results queue. When a message is received, the Client checks the
correlation_id property. If it matches the value from the query, then the result is sent to the application.
Putting it all together
Server code rpc_server.py:
Server code is pretty simple:
- (4) As usual, we establish a connection and declare a queue;
- (11) We declare our function that returns Fibonacci numbers, which takes only positive integers as an argument (this function is unlikely to work with large numbers, most likely it is the slowest possible implementation);
- (19) We declare the on_request callback function for basic_consume , which is the core of the RPC server. It is executed when the request is received. After completing the work, the function sends the result back;
- (32) We will probably want to run more than one server someday. To evenly distribute the load across multiple servers, we set up prefetch_count .
Client code rpc_client.py:
Client code is somewhat more complicated:
- (7) We establish a connection, a channel, and declare a unique queue of results for the responses received;
- (16) We subscribe to the results queue to receive responses from the RPC;
- (18) The ' on_response ' callback function, executed when each response is received, performs a rather trivial task — for each incoming reply, it checks whether the correlation_id matches what we expect. If so, it stores the response in self.response and aborts the loop;
- (23) Next, we define our call method, which, in fact, performs an RPC request;
- (24) In this method, we first generate a unique correlation_id and save it — the ' on_response ' callback function will use this value to track the desired response;
- (25) Next, we place the request with the properties reply_to and correlation_id into a queue;
- (32) Next, the process of waiting for a response begins;
- (33) And, at the end, we return the result back to the user.
Our RPC service is ready. We can start the server:
$ python rpc_server.py [x] Awaiting RPC requests
To get Fibonacci numbers, run the Client:
$ python rpc_client.py [x] Requesting fib(30)
The presented RPC implementation is not the only one possible, but it has the following advantages:
- If the RPC server is too slow, you can easily add one more. Try running the second rpc_server.py in a new console;
- On the client side, RPC requires sending and receiving only one message. No synchronous call to queue_declare is required . As a result, an RPC client costs one request-response cycle for one RPC request.
Our code, however, is simplified and does not even try to solve more complex (but certainly important) problems like these:
- How should the Client react if the server is not running?
- Should the client have a timeout for RPC?
- If the Server at some point “breaks down” and throws an exception, should it be transferred to the Client?
- Protection against invalid incoming messages (for example, checking the permissible limits) before processing.
All guide articles
RabbitMQ tutorial 1 - Hello World (python)
RabbitMQ tutorial 2 - Task Queue (python)
RabbitMQ tutorial 3 - Publish / Subscribe (php)
RabbitMQ tutorial 4 - Routing (php)
RabbitMQ tutorial 5 - Topics (php)
RabbitMQ tutorial 6 - Remote procedure call (this article, python)