Greetings to the readers. Now I am developing my own framework, based on the
signal architecture (signal / slot), as opposed to the now dominant MVC model. While it is being tested in our startup, and during that time I realized that just libraries are few — real tasks and visions of the future show that different functionality is needed, but united by one theme — processing and delivering information to many clients in real time (yes, somewhat similar to Comet, he is there too). Therefore, it was decided to try to implement the main component - a message queue that would become the main one for the following projects, fast, flexible and scalable.
What happened? Alpha version of SignalsyMQ is a PHP / Redis / Zend Framework message queue.
We already wrote about message queues (the
first article of the review , the
second ), but once again critically reviewing all the systems and even trying some of them, I realized that the niche is small, flexible and configurable queues as close as possible to the main application platform (in this case, , PHP), remains open and free. It lacks a simple server that would operate with standard messages (in JSON format, it is just an associative array inside the application), it would provide flexible configuration of message delivery, constant storage of queues, and very fast work with them. In this case, I want a simple protocol and generally maximum simplicity.
')
Closest to this stand or products based on Ruby -
Starling MQ and
MemcacheQ . By the way, both systems work according to the memcache protocol, which positively affects the possibility of embedding in a heterogeneous environment. But memcacheq was embarrassed by its development policy, or rather, lack of it, restrictions on the maximum length of a message, etc., and Starling, although it has the experience of being used on Twitter, is also quite a specific product, it is damp, besides it requires absolutely another platform. Therefore, it was decided, in the best traditions, to write our own implementation of the message queue.
As a starting point, the
Redis project was taken - a very, very fast and flexible NoSQL system with advanced storage and processing of structured data (it was already written about on Habré, did not have time).
Rediska became the second “whale”
- PHP is a library for working with Redis with convenient syntax and support for many interesting features (for example, working with several servers, embedded serialization, key distribution among servers, etc., but the library is in active development, therefore, features are added constantly) The third "whale" was the
Zend Framework , which I now use in most projects, although here it plays only a supporting role (several service classes are used).
Immediately, I note that ZF already has its own implementation of message queues -
Zend_Queue , which supports various backends for storing messages, starting from a regular PHP array, ending with MemcacheQ, databases and the Zend Platform. However, in practice, I have not yet succeeded in trying the most promising interface with MemcacheQ, but working with the base in the case of a dense flow of messages is simply impossible - the work slows down a lot until it crashes with the entire script. Well, the Zend_Queue interface itself is very abstract, in my case I would like some higher-level functions, so I would have to significantly extend the existing code. By the way, there is a special adapter in the Rediska library in order to work with queues over Redis, but after suffering a week and never even making a test case work, I finally decided that I’m writing my system!
To begin, we define the overall architecture and features. The basic concepts are message, queue (channel), server and storage.
- A message is some kind of data structure, in our case an associative array containing information that we store. The message consists of several service fields and the body. The message body stores the data itself, which is transmitted to the recipient, and, most importantly, unchanged. Service fields are subject to change during processing. So far my required fields are the type of message (arbitrary string), the source of the message (also string), the channel to which the message should be delivered, and the timestamp for creating the message. The message body is also one of the fields, but it may well be empty - it makes sense only for a specific recipient, the duty of the entire system is to simply save and deliver.
- The queue (channel, channel) is some unique address, the name of the queue to which messages are delivered. I took the simplest and most understandable form of addressing — something like URLs. For example, / users / aleks is the message queue for the user aleks. There is some additional processing inside, but this does not affect how other scripts work with the queue. It is also possible to send messages to several queues at once - for this, meta characters '*' (any number of characters), '?' Are used (arbitrary one character) and '[a..z]' is an arbitrary set of characters that must be in the name for the message to be delivered. By this, my development compares favorably with the same Zend_Queue - you can easily send a message, for example, to all users who have the name aleks (provided that we call the queue by the name of the users). Queues are also characterized by a length — the maximum number of messages that can be stored there. Considering that messages are stored in memory (and also serialized), you should consider the memory consumption, so choose the queue length based on the specifics of the project. In the event of a queue overflow, the oldest messages will be deleted.
- The queue server is actually the library script itself. The current implementation works directly with Redis servers, the next step is to create a dedicated server. That is, the client application will connect over the socket to my server, send messages with meta-information (at least, specify the channel where to send the message), the rest will be the server’s business (or, rather, the daemon). To expand the capabilities, it is planned to support both various data formats (serialization) and several work protocols - through sockets, XML / JSON-RPC, HTTP-REST, SOAP and any other.
- Storage - in the current version it is one or several Redis-servers running, in the future, perhaps, I implement, by analogy with Zend_Queue, several options for storing messages, although it is unlikely.
Another remark about the architecture and differences from Rediska_Queue will be that, as far as I understand from the sources, for the sake of speed, they put a valuable property of distribution. For example, the getQueues command, which returns an array of all stored queues (not messages, but only queue names), works with a local copy of the list of queues. Therefore, if another client creates a new queue while the first one is running, he will not know about it. In the case of a web application, when the request is processed while the page is being formed, it may not matter, I just need a message server that can work as a daemon and process many queues and commands from many clients. Although this requires additional resources, although thanks to Redis, all operations have very good indicators of complexity.
And so, the general API consists of a number of methods that are as close as possible to the Zend_Queue interface, but with some extensions, which I will describe below.
- getInstance ($ config) - One instance of the library and adapter is used to access Redis, so all work goes through a static method, which itself makes the decision to create or use an already initialized instance. The configuration defines server parameters (addresses, ports), as well as serializer functions and the key distribution mechanism in the case of multiple servers.
private $ _options = Array (
'maxQueues' => 100000, // maximum number of queues
'maxMessagesPerQueues' => 1000, // maximum number of messages in one queue
'expireQueues' => 15552000, // the maximum queue lifetime is 180 days
'expireMessages' => 604800, // lifetime maximum of one message
'maxMessageLength' => 32768, // maximum size in bytes of the message (after serialization)
'log' => false, // if specified, then the Zend_Log object where to log messages,
'redis' => Array (
'namespace' => 'smq_', // namespace for keys, selected small to reduce memory
'servers' => array (
array ('host' => 'localhost', // Host
'port' => 6379, // port
'weight' => 1, // Weight of server for key distribution
'password' => '' // password
)
),
'keyDistributor' => 'crc32', // type of key distribution
'serializer' => array ('Signalsy_MQ', '_serialize'),
'unserializer' => array ('Signalsy_MQ', '_unserialize'))
);
It is also possible to set a logger (use Zend_Log) to record all significant events, errors, and even the messages themselves (rather, for debugging, in production, it is enough to configure Redis to save + its built-in appendonly.log mechanism to prevent data loss, or replication to the backup server). - _serialize / _unserialize are built-in serialization functions that are used when adding stored messages to and from the repository. Redis can only store numbers and strings as values, so we need to serialize the data before saving. This is done transparently by the server itself, you only need to specify the methods for these actions, otherwise the default serialization of PHP is used. I chose a more convenient and native for JSON AJAX applications, since in the future I want to minimize the number of layers between the client and the repository. In the next version, the new Zend_Serializer component from the Zend incubator will be used, which will allow using other algorithms. In addition, the data stored in PHP can be read by any other program in any language where there is a JSON parser, which is very important for distributed systems.
- createQueue is the main method that creates a queue whose name is passed to it. Initially, the correctness of the name is checked (various special characters are deleted, etc.), and the existence of such a queue in the system is also checked. After creation, the first service message is placed in the queue, in which the time stamp is simply indicated. This is done because Redis automatically creates an empty list (List data structure) when we add the first item, so in order not to specify something service, such as the empty string, it was decided to initialize the queue with a special service type of messages.
- isExists - checks for the existence of the specified queue on the server. Unlike the implementation of the Rediska adapter for Zend_Queue, we perform more serious checks, including checking the existence of such an object in the memory of the database, as well as the coincidence of data types.
- count - allows you to get the number of messages in the queue.
- deleteQueue - deleting a queue from the server storage, respectively, all messages in it will also be deleted. It may be necessary to initialize the forced recording of the state of the database with such changes, so that in the event of a crash, the data are relevant.
- send is the main method that sends messages to the specified channels. In addition to the required parameter - messages, you can specify additional checks that guarantee the correct operation of the queue system, but can significantly slow down the work. skipCheckQueueLength is responsible for checking the queue length and its truncation when the limit set in the configuration is exceeded. When truncating the oldest messages will be deleted. If this parameter is equal to an integer, then it sets the probability that such a check will be performed in this request. This is a kind of compromise between speed and reliability, as it requires one or two server requests. The skipCheckQueueExists parameter allows you to disable the check for the existence of a queue. As mentioned above, the message is a standard kind of array, which after serialization is stored in the queue. In the required channel field, we specify a queue (or several, using a mask) where the message should be placed. If a mask is specified, first we extract all queue names by this mask that satisfy the specified parameters, and then in the loop we add a message to each of them, having previously corrected the channel field in the message. No other service headers are currently changed or analyzed, as well as the body of the message itself, which can be either a string or an arbitrary data type in PHP that allows for correct serialization. Once again - the main difference from other implementations is the ability to send messages to several (or even all) queues with one call. In case the logging option is set, the message will be recorded in the log using Zend_Log
- receive — the second most important operation — receiving one or more messages from the queue. Masks are no longer supported here, messages can only be received from a specific specified queue. Also, the existence of a queue on the server is also checked (perhaps for maximum performance, especially if no one else works with the Redis server, all these checks can be carried out only once, when initializing work). To receive, we specify the maximum number of messages as we can receive, but less can be received, for example, if there are not so many of them in the queue. Another interesting option (I will not hide it until the functionality is perfect in terms of optimization) is to receive only messages sent after the specified date. For example, we want to receive no more than a hundred messages sent in the last hour. First, a specified number of messages will be sampled, then checking and discarding those that do not meet the creation time criterion. It should be noted that messages from the queue are only read, not deleted immediately after reading - for this you need to use a special command. The queue itself is persistent, its state is changed only manually by the client (or as a result of checking for the queue length). In the future, the expires mechanism will be implemented - it is initially supported in Redis, but in the client part (for individual elements of the lists) has not yet been implemented.
- deleteMessage - delete a message from the queue.
- getQueues - allows you to view a list of all available queues on the server.
- getQueuesByPattern - displays a list of queues that match the specified pattern (using the metacharacters '*', '?' and groups of characters [a..z]
Honestly, the
source code of the project, although accessible , is in no way ready for real work - it’s rather just a prototype and the first alpha version. There are still a lot of not optimal places, different documentation, including pieces from Zend_Queue, on the interface of which I was based, not fully thought out work algorithm and data structure (for example, deleteMessage requires two parameters, the name of the queue and the message, although in the service channel messages already have the name of his queue, etc.). However, the current code already shows results, it is working and suitable for experiments.
It’s also really impossible to talk about performance now, but I conducted several tests using Amazon EC2 small instance (32 bit, Debian, PHP 5.2.11, Redis 1.1.9) - a single-threaded server in the mode of selective recording of a large number of messages to random queues (1000 messages and packs of 100) shows the speed in the range of 1500 - 3000 messages per second (recording, as the other systems are still on the server, the vibrations are very dependent on the load, and in this mode it is very sensitive how the storage interval is set in Redis itself ).
In the case when each client works with a small number of messages and one queue, but there are many parallel clients, the maximum speed is achieved, in principle, closely approaching the maximum
that can be pulled out
by tests on this configuration (5–7 thousand messages per second in the case when the test and the server on the same machine). Greater results can be achieved by applying multithreading, taking Redis to a separate server, as well as unloading the server from other tasks as much as possible, besides, performance rests on server CPU parameters - the Large instance (2 cores and 7 GB RAM) shows 5-6 times greater speed (of course, this is only an average figure, far from real).
These are the experiments. The topic of message queues is interesting, we should write a couple more articles, for example, almost no one knows about the components of Zend_Queue and their application, even from developers who actively use ZF in their work. Well, there are many improvements and server improvements ahead, turning it into a network daemon and building a special server on this database that works with various protocols (possibly standard for MQ - Stomp, AMQP), set up replication and take into account workload during background saving, work with several parallel servers, etc. Are you interested in reading about it?