📜 ⬆️ ⬇️

EventMachine proxy daemon

Despite the fact that EventMachine is a fairly convenient framework for writing high-performance and well-scalable network applications, the Internet is not happy with an abundance of examples of its use and testing. And those examples that exist, for example, in Habré, will not work correctly, since they do not take into account the peculiarities of data transmission (for some reason they do not take into account that the data, in general, are transmitted in parts). Actually, this article is intended for those who are familiar with the basic principles of EM, for example, in the article Ruby and EventMachine , and want to learn how to write something more complicated on its basis and how then to test the resulting code.

Recently, I was sent a test task, the essence of which was to write a demon proxy on EM, which would asynchronously accept connections from clients through a unix domain socket, lined them up and forwarded these commands to a socket connected to some abstract system, and a response from this system would send them back to customers. The format of the client message is {id: 1, text: "req1"}, which must be converted by the server into an answer - {id: 1, text: "answ1"}.

To simplify the task, the clients and the abstract system I also emulated using EM, which allowed using the built-in data transfer protocol EM :: P :: ObjectProtocol , which takes into account that the data can be taken not as a single reactor tick, but also serializes the data for transmission.

Let's start by writing client code. When a client connects to a proxy daemon, the post_init function is called , in which the client sends a message in the form of a hash using send_object .
Then it waits for a response from the daemon, and displays it in the console. Since it was necessary to emulate the connections of several clients at once, variables were entered that store data on the number of connections and the number of clients that have already received a response and were disconnected from the daemon. The total number of connections is stored in the TOTAL_CONNECTIONS constant, which is set when the clients are started. When the client disconnects from the server, unbind is called. When all customers receive a response, the reactor is stopped.
')
module EMClient<br/> include EM::P::ObjectProtocol<br/> <br/> @@connection_number = 0 <br/> @@dissconnected = 0 <br/> <br/> #send request as the connection has been established <br/> def post_init <br/> @@connection_number += 1 <br/> send_object({ 'id' => rand ( 10 ), 'text' => "req#{@@connection_number}" })<br/> end <br/> <br/> <br/> def receive_object (obj)<br/> #display response from server <br/> p obj.inspect<br/> end <br/> <br/> def unbind <br/> @@dissconnected += 1 <br/> #stop reactor after all requests have been processed <br/> EM.stop if @@dissconnected == TOTAL_CONNECTIONS<br/> end <br/> end <br/>

Next, the server code that emulates the abstract system. His task is to get the object, convert it and send it back. Using EM.add_time (), you can delay the execution of a block of code passed by the second argument of the function, in this case, the response to the request from the daemon.

module SocketServer<br/> include EM::P::ObjectProtocol<br/> <br/> def receive_object (obj)<br/> #emulation of job on server <br/> EM.add_timer( 1 + rand ( 5 )) do <br/> #validation of obj goes here)) <br/> obj[ 'text' ]. sub !(/req/, 'answ' )<br/> send_object(obj)<br/> end <br/> p "Server received object: #{obj.inspect}" <br/> end <br/> end <br/>

Now let's move on to the more interesting part, creating a message queue and a connection pool with an abstract server. For this, we used the EM :: Queue class, which has two methods pop (* a, & b) and push (* items) , which allow you to add items to the queue and retrieve them from it. The pop method takes the last argument a block of code that will be executed when an element appears in the queue.

To establish a connection with the server, the EMConnection module was used , in which the send_request (obj, & block) method was defined, the essence of which is to send a server message and send a block that will be executed when the server receives a response.

The ConnectionPool class is responsible for creating connection pools. When it is initialized, the size of the pool is determined and the queue is initialized. Then, in the start_queue method, the specified number of connections is established and for each connection, a worker ( queue_worker_loop ) is started , which is represented by a proc, which accepts connections as an argument. The essence of his work is to obtain from the queue an element that is an object that must be sent to the server, and a block of code that must be executed after receiving the object. Moreover, after executing this block of code, proc calls itself, thus some kind of infinite loop is obtained.

module EMConnection<br/> include EM::P::ObjectProtocol<br/> <br/> def receive_object (obj)<br/> #calling callback on object receiving <br/> @callback. call (obj)<br/> end <br/> <br/> def send_request obj, &block<br/> #sending data to server and setting callback <br/> send_object obj<br/> @callback = block<br/> end <br/> <br/> end <br/> <br/> #simple connection pool using EM queue, default size 10 <br/> class ConnectionPool <br/> <br/> def initialize (conf)<br/> @pool_size = conf[:size] || 10 <br/> @connections = []<br/> @query_queue = EM:: Queue .new<br/> start_queue conf<br/> end <br/> <br/> def queue_worker_loop <br/> proc { |connection|<br/> @query_queue.pop do | request |<br/> connection. send_request ( request [:obj]) do |response|<br/> request [:callback].call response #if request[:callback] <br/> queue_worker_loop .call connection<br/> end <br/> end <br/> }<br/> end <br/> <br/> def start_queue (conf)<br/> @pool_size.times do <br/> connection = EM.connect( '0.0.0.0' , 8080 , EMConnection)<br/> @connections << connection<br/> queue_worker_loop .call connection<br/> end <br/> end <br/> <br/> def request (obj, &block)<br/> @query_queue.push :obj => obj, :callback => block<br/> end <br/> end <br/>

We now turn to the code responsible for the work of the proxy daemon. His job is to initialize the connection pool, of course, this can be done not with the daemon code, but the pool will only be initialized this way when it is needed. When an object is received from a client, it sends an object and a block of the connection pool queue code, and when a response is received from an abstract server, sends a message back to clients and closes the connection using the close_connection_after_writing method, making sure that the message is completely sent to the client (unlike the close_connection method, which immediately closes the connection).

module DaemonServer<br/> include EM::P::ObjectProtocol<br/> <br/> def post_init <br/> @@connections_pool ||= ConnectionPool. new (:size => 5 )<br/> end <br/> <br/> def receive_object (obj)<br/> @@connections_pool.request obj do |response|<br/> send_object(response)<br/> close_connection_after_writing<br/> end <br/> end <br/> end <br/>

Now let's move on to the scripts responsible for starting the server, clients and the proxy daemon.
We start the server on TCPSocket. It's all very simple.

EventMachine.run {<br/> EventMachine.start_server "127.0.0.1" , 8080 , SocketServer<br/>} <br/>


It is a little more difficult with clients, since it is necessary to give the opportunity to set the number of emulated clients, which is realized by passing the parameter when the script is started. Running a client on a Unix Domain Socket differs from the case of TCP Socket, only by calling connect_unix_domain instead of connect and passing the address and port, file name, first argument of the function instead of ip.

tc = ARGV[ 0 ].to_i<br/>TOTAL_CONNECTIONS = tc > 0 ? tc : 25 <br/> <br/>file = File .expand_path( '../tmp/daemon.sock' , __FILE__ )<br/> p "Starting #{TOTAL_CONNECTIONS} client(s)" <br/>EventMachine::run {<br/> TOTAL_CONNECTIONS.times{ EM.connect_unix_domain(file, EMClient) }<br/>} <br/>

In order for a proxy to become a demon, it must of course be demonized (captain). For this, I used the 'daemons' gem. If the script starts with the -d key, it is demonized by calling the Daemons.daemonize method with additional options that define where to store the logs and the file containing the pid of the process, so that the daemon can then be stopped.

options = {<br/> :app_name => 'ProxyServer' ,<br/> :backtrace => true ,<br/> :log_output => true ,<br/> :dir_mode => :normal,<br/> :dir => File .expand_path( '../tmp' , __FILE__ )<br/>}<br/> <br/>file = File .expand_path( '../tmp/daemon.sock' , __FILE__ )<br/> File .unlink(file) if File .exists?(file)<br/> <br/>Daemons.daemonize(options) if ARGV.index( '-d' )<br/> <br/>EventMachine::run {<br/> EventMachine::start_unix_domain_server(file, DaemonServer)<br/>} <br/>

I believe that almost any code without tests costs very little, even if it is working, since it can deliver a lot of problems and headaches in the future both to the person who wrote it and those who need to edit it.

There is a ready solution for testing programs created on the basis of EM, - EMSpec . But I did not use it and therefore I will demonstrate how to do without it, using rspec .
First you need to create a test client. In it, we define the send_request method (obj, & block) , which allows you to send proxy requests to the daemon and set the callback as a block of code that will be called when the client receives a response. We also create an onclose = (proc) method that will determine the callback that will be called when the connection is closed.

module TestClient<br/> include EM::P::ObjectProtocol<br/> <br/> #on object received callback <br/> def receive_object (obj)<br/> @onresponse. call (obj)<br/> p "Client received object: #{obj.inspect}" <br/> end <br/> <br/> def send_request obj, &block<br/> @onresponse = block<br/> send_object obj<br/> end <br/> <br/> # on disconnect callback <br/> def onclose =( proc )<br/> @onclosed = proc <br/> end <br/> <br/> def unbind <br/> @onclosed.call<br/> end <br/> <br/> end <br/>

Now you can move on to creating methods that allow you to test written code. The first method start_serv is responsible for starting the server, the test client and the proxy, while taking the block argument, which will have at its disposal the client variable that allows manipulating the client. The timer method is needed in case something goes wrong and the client does not receive a response from the server, then rspec will indicate that the test has failed, and not just hang. The basis for the tests is the server_test method, which uses the above methods to start the server, the client and the proxy, determines that the reactor should be stopped when closing the connection, and also accepts a query request that should be sent by the client to the server.

module HelperMethods<br/> def start_serv <br/> File .unlink(SOCK_FILE) if File .exists?(SOCK_FILE)<br/> EM.run {<br/> EventMachine.start_server "127.0.0.1" , 8080 , SocketServer<br/> EventMachine.start_unix_domain_server(SOCK_FILE, DaemonServer)<br/> client = EM.connect_unix_domain(SOCK_FILE, TestClient)<br/> yield client<br/> }<br/> end <br/> <br/> # if request takes to long it will show fail <br/> def timer start<br/> timeout = 6 <br/> EM.add_timer(timeout){<br/> ( Time .now-start).should be_within( 0 ).of(timeout)<br/> EM.stop<br/> }<br/> end <br/> <br/> #main wrapper for test starts server daemon and client <br/> def server_test request<br/> time = Time .now<br/> start_serv do |client|<br/> client.send_request request do |response|<br/> yield response<br/> end <br/> client.onclose= lambda {EM.stop}<br/> timer (time)<br/> end <br/> end <br/> end <br/>

For example, I will give a test verifying that this task is being carried out correctly.

describe "on sending test request" do <br/> include HelperMethods<br/> it "should responsend with right answer" do <br/> server_test({ 'id' => 0 , 'text' => "req1" }) do |response|<br/> response[ 'text' ].should == "answ1" <br/> response[ 'id' ].should == 0 <br/> end <br/> end <br/> end <br/>

That's all, I hope this article will be useful to someone. All code is available on zhitkhab .

PS It turned out quite a lot of text, who mastered to read to the end, thanks.

Source: https://habr.com/ru/post/126231/


All Articles