📜 ⬆️ ⬇️

RabbitMQ tutorial 6 - Remote procedure call

In continuation of the fifth lesson in the study of the basics of RabbitMQ, I publish the translation of the sixth lesson from the official site . All examples are written in python (pika version 0.9.8 is used), but still they can be implemented on most popular PL .

In the second lesson, we looked at the use of task queues to distribute resource-intensive tasks among several subscribers.

But what if we want to run a function on a remote machine and wait for the result? Well, that's another story. This pattern is commonly known as Remote Procedure Call or RPC, hereinafter referred to as RPC.
')
In this guide, we will build using RabbitMQ, an RPC system that will include a client and a scalable RPC server. Since we do not have a real time-consuming task requiring distribution, we will create a simple RPC server that returns Fibonacci numbers.

Client interface


To illustrate the use of the RPC service, create a simple client class. This class will contain a call method that will send RPC requests and block until a response is received:

fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print "fib(4) is %r" % (result,) 

RPC Note

Although RPC is a fairly common pattern, it is often criticized. Problems usually arise when the developer does not know exactly what function he is using: local or slow, performed by RPC. Confusion, like this, can lead to unpredictable system behavior, and also introduces unnecessary complexity in the debugging process. Thus, instead of simplifying software, incorrect use of RPC can lead to unattended and unreadable code.

Based on the above, the following recommendations can be made:
  • Make sure that it is obvious which function is called in each case: local or remote;
  • Document your system. Make the dependencies between the components explicit;
  • Handle errors. How should the client respond if the RPC server does not respond within a long period of time?
  • If in doubt, do not use RPC. If possible, use an asynchronous pipeline instead of a blocking RPC when the results are asynchronously transferred to the next level of processing.

Results queue


In general, making RPC through RabbitMQ is easy. The client sends the request and the server responds to the request. To get a response, the client must submit a queue to post the results along with the request. Let's see how it looks in code:

 result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request) # ...-       callback_queue ... 

Message Properties


The AMQP protocol has 14 predefined message properties. Most of them are used extremely rarely, with the exception of the following:



Correlation id


In the method presented above, we suggested creating a response queue for each RPC request. This is somewhat redundant, but, fortunately, there is a better way - let's create a common queue of results for each client.

This raises a new question, getting an answer from this queue is not entirely clear to which query this answer corresponds. And here the correlation_id property comes in handy. We will assign this property a unique value with each request. Later, when we retrieve the resulting response from the response queue, based on the value of this property, we will be able to uniquely match the request with the response. If we meet an unknown value in the correlation_id property, we can safely ignore this message, since it does not match any of our requests.

Could you ask why we plan to simply ignore unknown messages from the response queue instead of interrupting the script? This is due to the likelihood of a race condition on the server side. Although this is unlikely, a scenario is quite possible in which the RPC server will send us an answer, but will not have time to send a confirmation of the processing of the request. If this happens, the restarted RPC server will process the request again. That is why on the client we have to correctly handle duplicate responses. In addition, RPC ideally should be idempotent.

Results


image

Our RPC will work as follows:

- When the Client starts, it creates an anonymous, unique results queue;
- To make an RPC request, the Client sends a message with two properties: reply_to , where the value is the result queue and the correlation_id set to a unique value for each request.
- The request is sent to the rpc_queue queue ;
- The server is waiting for requests from this queue. When a request is received, the Server performs its task and sends a message with the result back to the Client using the queue from the reply_to property;
- The client is waiting for the result from the results queue. When a message is received, the Client checks the correlation_id property. If it matches the value from the query, then the result is sent to the application.

Putting it all together


Server code rpc_server.py:

 #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print " [.] fib(%s)" % (n,) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print " [x] Awaiting RPC requests" channel.start_consuming() 


Server code is pretty simple:



Client code rpc_client.py:

 #!/usr/bin/env python import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print " [x] Requesting fib(30)" response = fibonacci_rpc.call(30) print " [.] Got %r" % (response,) 


Client code is somewhat more complicated:



Our RPC service is ready. We can start the server:

 $ python rpc_server.py [x] Awaiting RPC requests 

To get Fibonacci numbers, run the Client:

 $ python rpc_client.py [x] Requesting fib(30) 

The presented RPC implementation is not the only one possible, but it has the following advantages:



Our code, however, is simplified and does not even try to solve more complex (but certainly important) problems like these:



All guide articles


RabbitMQ tutorial 1 - Hello World (python)
RabbitMQ tutorial 2 - Task Queue (python)
RabbitMQ tutorial 3 - Publish / Subscribe (php)
RabbitMQ tutorial 4 - Routing (php)
RabbitMQ tutorial 5 - Topics (php)
RabbitMQ tutorial 6 - Remote procedure call (this article, python)

Source: https://habr.com/ru/post/236221/


All Articles