The other day I got the idea that it would be great to write a simple Redis-like database server. Although I have significant experience with WSGI applications, the database server presented a new challenge and turned out to be good practice in the process of learning how to work with sockets in Python. This article will tell you what I learned in the process of research.
The goal of my project was to write a simple server that I could use with a task queue called huey . Huey uses Redis as the default storage engine for tracking queued jobs, execution results, and other things. In the article, I reduced the source code of the project to do without water; You can easily add the missing code yourself, but if you're interested, you can look at the final result .
The server that we will create will be able to respond to the following commands:
GET <key>
SET <key> <value>
DELETE <key>
FLUSH
MGET <key1> ... <keyn>
MSET <key1> <value1> ... <keyn> <valuen>
We will also support the following data types:
To process multiple clients asynchronously, we will use gevent , but you can also use the standard library's SocketServer module using ForkingMixin
or ThreadingMixin
.
Let's create a skeleton for our server. We will need the server itself and the callback function that will be executed when a new client is connected. In addition, you need some kind of logic to process the request from the client and send him a response.
Let's start:
from gevent import socket from gevent.pool import Pool from gevent.server import StreamServer from collections import namedtuple from io import BytesIO from socket import error as socket_error # We'll use exceptions to notify the connection-handling loop of problems. class CommandError(Exception): pass class Disconnect(Exception): pass Error = namedtuple('Error', ('message',)) class ProtocolHandler(object): def handle_request(self, socket_file): # Parse a request from the client into it's component parts. pass def write_response(self, socket_file, data): # Serialize the response data and send it to the client. pass class Server(object): def __init__(self, host='127.0.0.1', port=31337, max_clients=64): self._pool = Pool(max_clients) self._server = StreamServer( (host, port), self.connection_handler, spawn=self._pool) self._protocol = ProtocolHandler() self._kv = {} def connection_handler(self, conn, address): # Convert "conn" (a socket object) into a file-like object. socket_file = conn.makefile('rwb') # Process client requests until client disconnects. while True: try: data = self._protocol.handle_request(socket_file) except Disconnect: break try: resp = self.get_response(data) except CommandError as exc: resp = Error(exc.args[0]) self._protocol.write_response(socket_file, resp) def get_response(self, data): # Here we'll actually unpack the data sent by the client, execute the # command they specified, and pass back the return value. pass def run(self): self._server.serve_forever()
The code above is hopefully clear enough. We divided the tasks so that the protocol processing is in a class of its own with two public methods: handle_request
and write_response
. The server itself uses a protocol handler to unpack client requests and repeat server responses to the client server. The get_response()
method will be used to execute a command initiated by the client.
Looking at the connection_handler()
method code in more detail, you will see that we get a wrapper around a socket object, similar to a file. This shell allows you to abstract from some of the features that are usually encountered when working with clean sockets. The function enters an infinite loop, reads requests from the client, sends responses, and finally exits the loop when the client disconnects (marked by the read()
method, which returns an empty string).
We use typed exceptions to handle cases where clients are disabled and to notify users about error handling commands. For example, if a user sends an CommandError
request to the server, we throw a CommandError
, which is serialized in response to an error and sent to the client.
Before we go further, let's discuss how the client and server will interact.
The first challenge I faced was how to handle sending binary data over a wired communication protocol. Most of the examples I found on the Internet were meaningless echo servers that converted a socket into a file-like object and simply called readline()
. If I wanted to store some pickle-data or strings with new strings, I would need to have some kind of serialization format.
Having spent time trying to come up with something suitable, I decided to read the documentation for the Redis protocol, which turned out to be very simple to implement and has the added advantage of supporting several different types of data.
The Redis protocol uses a client request / response communication pattern. Responses from the server will use the first byte to indicate the type of data, and then the data completed by carriage return / line.
Data type | Prefix | Structure | Example |
---|---|---|---|
Simple string | + | + {string data} \ r \ n | + this is a simple string \ r \ n |
Mistake | - | - {error message} \ r \ n | -ERR unknown command "FLUHS" \ r \ n |
Whole | : | : {the number} \ r \ n | : 1337 \ r \ n |
Binary data | $ | $ {number of bytes} \ r \ n {data} \ r \ n | $ 6 \ r \ n foobar \ r \ n |
Array | * | * {number of elements} \ r \ n {0 or more of above} \ r \ n | * 3 \ r \ n + a simple string element \ r \ n : 12345 \ r \ n $ 7 \ r \ n testing \ r \ n |
Vocabulary | % | % {number of keys} \ r \ n {0 or more of above} \ r \ n | % 3 \ r \ n + key1 \ r \ n + value1 \ r \ n + key2 \ r \ n * 2 \ r \ n + value2-0 \ r \ n + value2-1 \ r \ n : 3 \ r \ n $ 7 \ r \ n testing \ r \ n |
Null | $ | $ -1 \ r \ n (string of length -1) | $ -1 \ r \ n |
Let's fill in the protocol handler class so that it implements the Redis protocol.
class ProtocolHandler(object): def __init__(self): self.handlers = { '+': self.handle_simple_string, '-': self.handle_error, ':': self.handle_integer, '$': self.handle_string, '*': self.handle_array, '%': self.handle_dict} def handle_request(self, socket_file): first_byte = socket_file.read(1) if not first_byte: raise Disconnect() try: # Delegate to the appropriate handler based on the first byte. return self.handlers[first_byte](socket_file) except KeyError: raise CommandError('bad request') def handle_simple_string(self, socket_file): return socket_file.readline().rstrip('\r\n') def handle_error(self, socket_file): return Error(socket_file.readline().rstrip('\r\n')) def handle_integer(self, socket_file): return int(socket_file.readline().rstrip('\r\n')) def handle_string(self, socket_file): # First read the length ($<length>\r\n). length = int(socket_file.readline().rstrip('\r\n')) if length == -1: return None # Special-case for NULLs. length += 2 # Include the trailing \r\n in count. return socket_file.read(length)[:-2] def handle_array(self, socket_file): num_elements = int(socket_file.readline().rstrip('\r\n')) return [self.handle_request(socket_file) for _ in range(num_elements)] def handle_dict(self, socket_file): num_items = int(socket_file.readline().rstrip('\r\n')) elements = [self.handle_request(socket_file) for _ in range(num_items * 2)] return dict(zip(elements[::2], elements[1::2]))
In the protocol serialization part, we will do the opposite: move the Python objects into their serialized copies!
class ProtocolHandler(object): # ... above methods omitted ... def write_response(self, socket_file, data): buf = BytesIO() self._write(buf, data) buf.seek(0) socket_file.write(buf.getvalue()) socket_file.flush() def _write(self, buf, data): if isinstance(data, str): data = data.encode('utf-8') if isinstance(data, bytes): buf.write('$%s\r\n%s\r\n' % (len(data), data)) elif isinstance(data, int): buf.write(':%s\r\n' % data) elif isinstance(data, Error): buf.write('-%s\r\n' % error.message) elif isinstance(data, (list, tuple)): buf.write('*%s\r\n' % len(data)) for item in data: self._write(buf, item) elif isinstance(data, dict): buf.write('%%%s\r\n' % len(data)) for key in data: self._write(buf, key) self._write(buf, data[key]) elif data is None: buf.write('$-1\r\n') else: raise CommandError('unrecognized type: %s' % type(data))
An additional benefit of processing the protocol in our own class is that we can reuse the handle_request
and write_response
to create the client library.
Now the Server
class we are designing should have a get_response()
method. Commands will be considered sent by the client as simple strings or an array of command arguments, so the data parameter passed to get_response()
will be either a byte string or a list. To simplify processing, if the data turns out to be a simple string, we convert it into a list by dividing it into spaces.
The first argument will be the name of the command with any additional arguments belonging to the specified command. As with the mapping of the first byte with the handlers in ProtocolHandler
, let's create a command mapping with callback functions in the Server
class:
class Server(object): def __init__(self, host='127.0.0.1', port=31337, max_clients=64): self._pool = Pool(max_clients) self._server = StreamServer( (host, port), self.connection_handler, spawn=self._pool) self._protocol = ProtocolHandler() self._kv = {} self._commands = self.get_commands() def get_commands(self): return { 'GET': self.get, 'SET': self.set, 'DELETE': self.delete, 'FLUSH': self.flush, 'MGET': self.mget, 'MSET': self.mset} def get_response(self, data): if not isinstance(data, list): try: data = data.split() except: raise CommandError('Request must be list or simple string.') if not data: raise CommandError('Missing command') command = data[0].upper() if command not in self._commands: raise CommandError('Unrecognized command: %s' % command) return self._commands[command](*data[1:])
Our server is almost ready! We just need to implement six methods for the commands defined in the get_commands()
method:
class Server(object): def get(self, key): return self._kv.get(key) def set(self, key, value): self._kv[key] = value return 1 def delete(self, key): if key in self._kv: del self._kv[key] return 1 return 0 def flush(self): kvlen = len(self._kv) self._kv.clear() return kvlen def mget(self, *keys): return [self._kv.get(key) for key in keys] def mset(self, *items): data = zip(items[::2], items[1::2]) for key, value in data: self._kv[key] = value return len(data)
Here he is! Now our server is ready to begin processing requests. In the next section, we will use the client to communicate with the server.
To interact with the server, let's reuse the ProtocolHandler
class to implement a simple client. The client will connect to the server and send commands encoded as lists. We will reuse both write_response()
and handle_request()
logic for requests for encoding and processing server responses, respectively.
class Client(object): def __init__(self, host='127.0.0.1', port=31337): self._protocol = ProtocolHandler() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect((host, port)) self._fh = self._socket.makefile('rwb') def execute(self, *args): self._protocol.write_response(self._fh, args) resp = self._protocol.handle_request(self._fh) if isinstance(resp, Error): raise CommandError(resp.message) return resp
Using the execute()
method, we can pass an arbitrary list of parameters that will be encoded as an array and sent to the server. The server response is analyzed and returned as a Python object. For convenience, we can write client methods for individual commands:
class Client(object): # ... def get(self, key): return self.execute('GET', key) def set(self, key, value): return self.execute('SET', key, value) def delete(self, key): return self.execute('DELETE', key) def flush(self): return self.execute('FLUSH') def mget(self, *keys): return self.execute('MGET', *keys) def mset(self, *items): return self.execute('MSET', *items)
To test our client, let's configure a Python script to start the server directly from the command line:
# Add this to bottom of module: if __name__ == '__main__': from gevent import monkey; monkey.patch_all() Server().run()
To test the server, simply run the server-side Python module from the command line. In another terminal, open the Python interpreter and import the Client
class from the server module. When you create a client instance, the connection will be opened, and you can run commands!
>>> from server_ex import Client >>> client = Client() >>> client.mset('k1', 'v1', 'k2', ['v2-0', 1, 'v2-2'], 'k3', 'v3') 3 >>> client.get('k2') ['v2-0', 1, 'v2-2'] >>> client.mget('k3', 'k1') ['v3', 'v1'] >>> client.delete('k1') 1 >>> client.get('k1') >>> client.delete('k1') 0 >>> client.set('kx', {'vx': {'vy': 0, 'vz': [1, 2, 3]}}) 1 >>> client.get('kx') {'vx': {'vy': 0, 'vz': [1, 2, 3]}} >>> client.flush() 2
The code presented in this article is for demonstration purposes only. I hope you enjoyed reading about this project the way I liked to write about it. Here you can find a complete copy of the source code.
Consider the following if you want to expand the project:
SocketServer
from the standard library and ThreadingMixin
.The author of the publication is Charles Leifer. Translation - Evgeny Zyatev.
The original can be found on Charles' blog.
Source: https://habr.com/ru/post/346286/
All Articles