
In the process of growth in many projects there is a need to solve a number of tasks associated with the queues. Often, message queues are used as a link between different internal subsystems. Some classic examples:
- deferred processing of user data;
- transfer of statistics;
- smoothing the load on relatively slow systems;
- performing periodic tasks.
There are several approaches to queuing:
- use relational databases;
- apply existing solutions ( RabbitMQ , etc.);
- write your bike.
')
My World used queues for a while in a relational database, but as the project grew, performance problems began. We had a choice: to apply existing solutions or develop our own system.
For a start, we appreciated the first option. There are many servers queuing for various tasks. Basically, we considered the most famous, RabbitMQ, which is rightfully considered the leader among similar systems with open source.
Several advantages:
- good performance, sufficient for a large number of tasks;
- rich functionality that allows you to implement almost any logic to work with queues;
- subsystem displacement messages to disk, which allows you to work on servers with insufficient RAM, albeit with a loss of performance;
- the ability to develop plug-ins to extend functionality.
But in any system lacks and disadvantages. Here are some of those that influenced our choice in favor of developing our solution:
- Insufficient performance. Information passing through queues is important for us, so we want to write all changes in the queue server to disk in the form of binary logs. With the inclusion of writing messages to disk, RabbitMQ performance drops to very low values ​​(<10,000 event inserts per second).
- Too fast performance drop with increasing message size, even without writing to disk.
- No timeout on message processing. If the queue handler is frozen due to an error in the code or for some other reason, the message will not be passed to another handler until the hung connection terminates the connection.
- Unstable response time to requests, especially on large queues (> 10 000 000 messages).
- The fact that the server determines to which of the handlers to give events and how much to give them. There are a lot of articles on adjusting parameters for a specific task. It did not suit us, I wanted to bring the logic of resource management handlers out of the server. Looking ahead, I will say - it turned out to be a good idea. We built a large infrastructure on the queue servers that monitors queue priorities, the number of events in the queues, and free resources on the servers with event handlers. This allows you to dynamically generate and kill queue handlers, as well as to set limits on the allowable set of events received from the queue server for a single request (batch processing).
- Enough large amount of service data for each message. For reasons that are unclear (for me), the amount of memory occupied by the same number of messages may differ significantly from launch to launch.
So, after analyzing the pros and cons of this option, after some thought, we chose to develop our own system. One of the main reasons was that in 2009 (when the first release of its own queue server took place), the existing solutions were not very stable under load. Now a lot has been fixed and improved, but there are still enough arguments in favor of their decision.
Realizing that we need, we prepared the TZ and determined the requirements for our server queues:
- Any message should be issued to consumers only after the triggering of a certain condition. As a condition, the time set by the client is selected after which the event is considered active and can be received by the handler of the corresponding queue.
- Save all changes (Persistence) of events to disk, in case of software or hardware failures, recover with all data.
- Provide the ability to set the procedure for issuing events to handlers (sorted by the time of activation of the event).
- Do not deceive the client, answer OK on the insert only after writing data to disk.
- Provide consistently low latency when working with queues up to 100,000,000 events.
- Work with events of various sizes from 1 byte to several megabytes.
- Not less than 15,000 inserts per second.
- Productivity should not fall when working with at least 1000 event manufacturers and 1000 consumers.
- Provide fault tolerance (at least partial) in case of hardware disk failure and data loss / corruption. When starting the server, it is important to be able to determine the correct data and discard broken records.
Architecture

The queue server is implemented on the C module to our first network storage framework, from which Tarantool also grew. This is a single-threaded asynchronous server that uses
libev for organizing an event loop. All requests are processed using a simple IPROTO-based binary protocol.
WAL process
All changes are written to disk as binary logs using a separate
WAL process. Ideologically, everything is very similar to Tarantool, common roots affect. Each record is signed using crc32 to check the readability of the data during the loading process. The server of queues, perhaps, most of our modules interact with the WAL process, since almost all commands, including issuing messages to consumers, modify the state of the server, and they must be written to disk.
Dumper
From time to time, a process is generated that saves a complete image of the current state of user data and the necessary service information to disk. By and large, Dumper is not needed, but it allows speeding up the server after a restart, as it is enough to read the last snapshot and apply only those binary logs that were made after recording the data image.
Say logger
The last process is responsible for writing text logs to disk. Often logs are completely disabled on the combat servers due to deteriorating performance; we tried to avoid this fate. For this, a separate process is generated in which an external logger is executed, for example,
cronolog . Communication is implemented using sockets in such a way that we can work in one of two modes:
- wait for writing to disk. Logging delays will degrade overall performance.
- ignore message queue overflow to logger process. This will lead to the loss of some records, but will not depend on the performance of the disk with text logs.
Dive further

All events in the queue are in one of three states:
- Not active. The message was received by the message server, but it cannot be sent to queue handlers before the activation time.
- Actively. The time the event is activated has arrived and the event can be issued to queue handlers.
- Blocked. The event has already been issued and is awaiting confirmation of processing. May be issued again if after X seconds the command to delete the event does not come.
In each queue, indices are organized for events in each of the three states and another central index for all events in the queue.
The queue server works with two types of queues. The logic differs in the policy of issuing ID messages: either the ID is issued by the server or by the client. The presence of an identifier for all messages allows you to implement advanced logic of working with queues. In addition to insertion, retrieval and deletion, commands to modify the data or the message activation time are supported. This allows you to organize a permutation of messages and change the status of processing within a single queue. If you have periodic actions, it is not necessary to delete the event after its processing - it is enough to rearrange the activation time to the required number of minutes \ hours \ days.
Queue server supports transactions. All actions necessary for the response (allocation of temporary data, checking for the presence of necessary events, preparation of connection buffers) are performed before writing to disk. If errors occur, all changes are rolled back. Multiple instructions within a single transaction are not supported. The transaction mechanism provides exclusive ownership of the right to change the event. A simultaneous attempt by two clients to perform actions that change the state of an event will end up returning an error with the corresponding code to the second client.
For correct activation of events, a server command is implemented in the server that cannot be called over the network — it is activated by a timer. Inside, a service queue is organized for all user queues, which are responsible for activating events.
When a request is received from the processor for a new portion of events (usually around 1000) from a specific queue, the server performs the following actions:
- create a transaction;
- look for the specified queue;
- blocking the required number of messages from the top of the index of active messages;
- create a service record about blocking selected events;
- send the service record to the WAL process;
- we receive the answer from WAL process;
- we apply transaction to data in memory, we transfer events to queue blocked;
- we write the answer to the client from in advance prepared data;
- we complete the transaction, remove the internal locks from all events and clear the temporary transaction data;
- in case of any errors at any of the stages, roll back the transaction and inform the client of the reason.
Handling duplicate message inserts. In case of problems with the network or high server load, the client may decide that the timeout has come, the server did not process the message and need to send it again; however, the server can already process the message by this time. With queues in which the client issues an ID, everything is simple: there is an ID in the request data - we check if there is one in the queue, and, if so, send an error.
In queues with internal issuance of IDs, the situation is more complicated: the server does not have an unambiguous attribute, according to which it can be understood that the current request is actually a take of the request that was processed several seconds ago. “So, we need to add such a sign,” we decided and added 2 additional fields to the package: RequestID, guaranteed unique within one client process, and PID process. On the queue server, the insert cache is organized using the {ClientIP, RequestID, PID} key, which allows you to track duplicate requests for 10–15 minutes. In practice, this is more than enough. Potential disadvantage - the method does not work through NAT, since all clients will have the same IP and, accordingly, false positives are possible.
Create and configure queues. To simplify the configuration, a queue is automatically created with the default settings when you first try to insert a message into the specified queue. In the configuration, the settings of a specific queue can be specified, the size of the portions for attempts to activate events, the time after which the event from the queue goes from the status blocked to the active status, etc.
By the way, I note that now I would not do automatic creation of queues. Business logic developers, who use the queues right and left, liked this and liked it, but debugging of all this magnificence took a lot of time and effort.
Suddenly a lot of corner cases surfaced in the testing process, for the sake of rare situations, I had to write a lot of code. The main problems came to light when processing a rollback of a transaction of an event that generates a new queue. If during the creation of a new queue and writing an event to disk, other clients try to add events to the queue that has not yet been created, we have to understand that the queue is in the process of creation. The situation is complicated if it is a queue with internal IDs in which the server itself issues message IDs in response to the insert commands. All events in the created queue are blocked until the process of creating a queue is completed, and an ID is already assigned to them. If the transaction creating the queue has to be rolled back, all the transactions of dependent events that are waiting to create a queue must be rolled back. It sounds scary, and the code is even worse.
Summing up
Good- Good performance indicators - at least 50,000 rps on the inserts. Performance depends solely on the capacity of the disks and the number of records through which you need to call the fdatasync system call.
- Work with large queues. At one time we had a queue of 170 million messages on the combat server.
- Stable work with uneven load (in some queues high intensity, in some often comes peak load).
- Good results of a SLAB allocator are both in performance and fragmentation (usually 90% of messages in the same queue have the same or similar size).
- The stability of the system as a whole. Every day we process billions of messages on multiple queue servers. Over the past 2-3 years there has not been a single failure due to the program part.
Bad and challenges for the future- A number of inherited problems derived from the used network framework. All of them must go with the transfer to the Tarantool code base.
- Sharding on the client.
- Handling duplicate messages - it would cost to redo it. In principle, it works, but the problem with NAT is confusing.
- It is necessary to highlight the creation of queues by a separate team.
- Sometimes you want stored procedures on Lua to expand the ability to work with queues. So far, not so often that they have reached their hands.
- All events are always in memory. Theoretically, it would be good to displace events that are not soon activated on the disk. But in practice, for the time being, it is more important for us to have a stable response time to requests to the server.
Using Queues in My World
- Delayed processing of user actions. It’s not a good idea to force the user to wait while you save his data to the SQL database or other storage (and often you need to make changes to several systems at once). Worse, in some implementations (mostly in small projects with young developers), the data may not get into the repository at all if the client has disconnected the connection without waiting for an answer. It is a good practice to add an event about user actions to a fairly fast queue server, after which you can respond to the client that the operation was successful. All other work will be handled reliably and efficiently by queue handlers. As a free bonus, you will receive a simplification of the code on frontend servers, which will only need to communicate with the queue daemon to make changes to any repositories. Knowledge of the business logic of various data can be transferred to queue handlers.
- Mailing of messages, letters, etc. You need to send a large amount of data without overloading the repository with a burst of requests. Easy! Varying the number of queue handlers, we spread the peak load to a reasonable level so that the time of processing client requests to the same data sources does not deteriorate. And, most importantly, using queues is easy to avoid duplicate messages. It is extremely unpleasant to receive two letters about the same event. For periodic mailings, it is enough to update the activation time of the message after processing the message, rather than delete it: it will be processed again at the right time.
- Transport for "reliable" statistics. Transmission of important (everything related to money) data to aggregators of statistics. Statistics aggregation systems are usually demanding of processor resources, and during data processing they may not provide the response time required for frontend servers. Another feature of such systems is uneven loading, usually associated with data processing in chunks. The transfer of statistics through servers of queues will allow you to avoid problems with an unstable delay and at the same time preserve the delivery guarantee.
- Event grouping If a group of events will access the same data set in other systems, it makes sense to set the same activation time, because even if you set the activation time in the past, the events are sorted by activation time. The physical meaning of tweaks in more efficient use of storage caches, in which requests from message handlers go.
- Cascading queues. The organization of the state machine of several queues by transferring data between the queues at the end of the next stage of processing. It is often necessary in the process of processing a message to perform a series of actions, which differ greatly in the necessary resources. In this case, the separation of “fast” and “slow” actions by different stages (queues) allows you to effectively manage the necessary amount of resources, varying the number of processors for each queue. Additionally, we win in simplifying the code of handlers and searching for errors in business logic. By queuing schedules, you can understand at which of the stages events are piling up and in which handler you need to look for problems.
In the project we use clients in Perl and C, in other projects we have implemented clients in PHP, Ruby and Java.
PS: Specially did not begin to draw plates with performance comparison with existing subsystems. It’s impossible (I don’t know systems with suitable capabilities) to compare with the same functionality that we use in battle, and without this we will get another test of a spherical horse in a vacuum.
PPS: Some components (administrative interface, local replica, etc.) have been omitted because they are implemented similarly in Tarantool.
PPPS: In one of the following articles we will try to talk about our infrastructure for working with queues - about how we manage resources, how we monitor the status of queues, how the sharding of events between queue servers is organized and about many other things.
If you have any questions, ask in the comments.