📜 ⬆️ ⬇️

Implementation of a small asynchronous server

The purpose of the publication of this topic is to present to the audience Habrahabr the code of a small asynchronous server written in Python using almost “bare” sockets.

I had to write a lot of applications that work as network services. These services were written in different languages, under different load, and each time the implementation of a new service was somewhat different from the previous one. Under Habrakat, I give an example of quite successful, in my opinion, the implementation of the "training" server, accompanying the code with my comments as needed.

Introductory word


So, at the beginning a few words about what the server actually does. Since it is a learning example, its task is quite simple: when starting up, create a socket listening port; when the client connects, start sending information to him As information, customers receive an endless stream of digits of Pi calculated on the server. The server generates digits only when at least one client is connected to it, remembering the already generated sequence. When all clients are disconnected, the generation is suspended, the server waits for a new connection.

The server implementation is single-threaded. The advantage of this approach is the absence of an additional overhead projector in the form of a separate stream or even a separate instance of the application that would arise when using other frequently used techniques.
')
By and large, this difference can hardly be called decisive, since Python is nevertheless an interpreted language and introduces its own restrictions on the scope of use, but in the long term we can use the threads “saved” with such an architecture in the application to perform any parallel computational tasks.

Before proceeding, I strongly recommend downloading the code and reading it fluently. Also, the code can be viewed online on Showroom .
To start the server, users of UNIX-like systems can execute the following commands:
chmod + x ./pypi.py
./pypi.py

To connect to the server, you can use the following command:
telnet 127.0.0.1 45067


Parsing code


The core of the application is the main () function, which starts the dispatch cycle of the asyncore module:
def main ( ) :
try :
asyncore . loop ( 0.1 , True , Server ( '127.0.0.1' ) )
except KeyboardInterrupt :
print ' \ n Bye: - *'
sys . exit ( 0 )

if __name__ == '__main__' :
main ( )

The asyncore module provides a loop function that accepts four optional arguments: (1) timeout, (2) the preference flag of the poll mechanism over the usual select, (3) the dictionary of socket descriptors, (4) the number of iterations. Important for us is the third parameter, to which we have transferred functions to the newly created server object.

Thanks to the “special Python magic”, the Server class is inherited from both the dispatcher class from the asyncore module and the dict dictionary class, which allows it to act simultaneously as a server socket and storage socket descriptors of all connected clients.

The start of the Server class declaration looks like this:
class Server ( dispatcher, dict ) :
writable = lambda x: False

def __init__ ( self , host = None , port = 0xB00B ) :
dispatcher. __init__ ( self )

self . create_socket ( AF_INET, SOCK_STREAM )
dict . __init__ ( self , { self . fileno ( ) : self } )

self . set_reuse_addr ( )
self . bind ( ( host, port ) )
self . listen ( 0xA )

self . dataSource = PiGenerator ( )

In the constructor, the object is first initialized as a server socket handler, and then as a dictionary consisting of one entry — the socket descriptor of the server itself, pointing to the server object. It is important that the socket is created by the create_socket function before the dictionary is initialized, since before creating a socket, we could not get its handle. Then, the server socket is bound to the port on the specified host, a wiretap is started, and a digit generator of the pi number is created, which will later be used to generate a stream of data to the client.

After the dispatch cycle is started, the main part of the work falls on the asyncore module, which, when a new client is connected, will call the server object's handle_accept method to handle the incoming connection:
def handle_accept ( self ) :
sock, ( host, port ) = self . accept ( )
print 'new client from% s:% d' % ( host, port )

stream = Stream ( self . dataSource )
self [ sock. fileno ( ) ] = Client ( sock, self , stream )

Inside the handler method, a new client is directly accepted using the accept function, which returns the newly created socket for communication with the client and a pair of host ports from which the connection occurred. After receiving the client's socket, the server creates a new data stream (implemented by the Stream class) to read data from the generator. After that, a new client object is added to the list of clients, initialized by the newly created data stream.

The client reads data from a stream inside the writable () method:
def writable ( self ) :
data = self . stream . read ( )
if data == None :
print 'client finished reading'
self . close ( )
return false

self . buffer + = data
return len ( self . buffer ) > 0

The writable method is called by the asyncore module for each socket before the next iteration of the dispatch loop to find out whether it is necessary to check accessibility for a given socket for writing. We use this to try to read the data from the stream and report the need to write if there is data in the stream. If the stream returns None, it means that there will be no more data in it and the socket is closed. In this example, this situation should not occur, because numbers are generated infinitely.

Learning about the availability of a write operation for the client socket, asyncore calls the handle_write () method, which sends the data previously read from the stream through the socket:
def handle_write ( self ) :
sent = self . send ( self . buffer )
self . buffer = self . buffer [ sent: ]

The generator and the flow are closely related to each other by implementing the observer pattern. The generator acts as an observable object and provides the subscribe and unsubscribe methods, respectively, for subscribing to events and unsubscribing from them:
class PiGenerator ( list ) :
def subscribe ( self , obj ) :
self . lock . acquire ( )
try :
self . append ( obj )
self ._notify ( obj = obj )
finally :
self . lock . release ( )

if not self . calculator :
self . calculator = PiCalcThread ( self , self . lock )
self . calculator . start ( )
else :
if len ( self ) > 0 :
self ._resumeCalculator ( )

def unsubscribe ( self , obj ) :
self . lock . acquire ( )
self . remove ( obj )
self . lock . release ( )

if len ( self ) < = 0 :
self ._pauseCalculator ( )

The generation of numbers is directly implemented by a separate class PiCalcThread, which performs calculations in a separate thread, therefore, a lock mechanism is used to synchronize the operations of adding and deleting a subscription. Using the same lock object, the thread is suspended in the absence of subscribed threads. When a new thread is subscribed using the _notify () method, the digits counted and saved to it are sent to it, if it is not the first subscribed thread.

The _notify () method runs over the subscribed threads and calls the update method, passing new digits to the stream:
def _notify ( self , digits = None , obj = None ) :
objs = [ obj ] if obj else self
digits = digits or self . digits

for obj in objs:
obj. update ( digits )

The update () method of the stream simply adds new data to the existing ones:
def update ( self , data ) :
self . data + = data

The digit generation flow class of Pi uses the algorithm proposed by Jeremy Gibbons in the Unbounded Spigot Algorithm for the Digits of Pi document:
class PiCalcThread ( Thread ) :
def __init__ ( self , buffer, lock ) :
Thread. __init__ ( self )
self . buffer = buffer
self . lock = lock

def run ( self ) :
q, r, t, k, n, l = 1 , 0 , 1 , 1 , 3 , 3

while true :
if 4 * q + rt < n * t:
self . lock . acquire ( )
self . buffer . newDigits ( str ( n ) )
self . lock . release ( )

q, r, t, k, n, l = ( 10 * q, 10 * ( rn * t ) , t, k, ( 10 * ( 3 * q + r ) ) / t - 10 * n, l )
else :
q, r, t, k, n, l = ( q * k, ( 2 * q + r ) * l, t * l, k + 1 , ( q * ( 7 * k + 2 ) + r * l ) / ( t * l ) , l + 2 )

time . sleep ( 0.001 )

The run () method infinitely calculates the next digit of the Pi number and again, using a lock, reports it to the generator, which in turn passes it to the data streams subscribing to the generator. An artificial delay is added so that the server does not generate too much data flow per unit time.

To highlight the syntax in the preparation of the material used resource http://highlight.hohli.com/ . I really hope that this article will be useful to someone, although the description turned out to be muddled with a sufficiently large amount.

Source: https://habr.com/ru/post/64229/


All Articles