📜 ⬆️ ⬇️

Queue Server



In the process of growth in many projects there is a need to solve a number of tasks associated with the queues. Often, message queues are used as a link between different internal subsystems. Some classic examples:


There are several approaches to queuing:

')
My World used queues for a while in a relational database, but as the project grew, performance problems began. We had a choice: to apply existing solutions or develop our own system.

For a start, we appreciated the first option. There are many servers queuing for various tasks. Basically, we considered the most famous, RabbitMQ, which is rightfully considered the leader among similar systems with open source.

Several advantages:


But in any system lacks and disadvantages. Here are some of those that influenced our choice in favor of developing our solution:


So, after analyzing the pros and cons of this option, after some thought, we chose to develop our own system. One of the main reasons was that in 2009 (when the first release of its own queue server took place), the existing solutions were not very stable under load. Now a lot has been fixed and improved, but there are still enough arguments in favor of their decision.

Realizing that we need, we prepared the TZ and determined the requirements for our server queues:


Architecture




The queue server is implemented on the C module to our first network storage framework, from which Tarantool also grew. This is a single-threaded asynchronous server that uses libev for organizing an event loop. All requests are processed using a simple IPROTO-based binary protocol.

WAL process


All changes are written to disk as binary logs using a separate WAL process. Ideologically, everything is very similar to Tarantool, common roots affect. Each record is signed using crc32 to check the readability of the data during the loading process. The server of queues, perhaps, most of our modules interact with the WAL process, since almost all commands, including issuing messages to consumers, modify the state of the server, and they must be written to disk.

Dumper


From time to time, a process is generated that saves a complete image of the current state of user data and the necessary service information to disk. By and large, Dumper is not needed, but it allows speeding up the server after a restart, as it is enough to read the last snapshot and apply only those binary logs that were made after recording the data image.

Say logger


The last process is responsible for writing text logs to disk. Often logs are completely disabled on the combat servers due to deteriorating performance; we tried to avoid this fate. For this, a separate process is generated in which an external logger is executed, for example, cronolog . Communication is implemented using sockets in such a way that we can work in one of two modes:


Dive further




All events in the queue are in one of three states:


In each queue, indices are organized for events in each of the three states and another central index for all events in the queue.

The queue server works with two types of queues. The logic differs in the policy of issuing ID messages: either the ID is issued by the server or by the client. The presence of an identifier for all messages allows you to implement advanced logic of working with queues. In addition to insertion, retrieval and deletion, commands to modify the data or the message activation time are supported. This allows you to organize a permutation of messages and change the status of processing within a single queue. If you have periodic actions, it is not necessary to delete the event after its processing - it is enough to rearrange the activation time to the required number of minutes \ hours \ days.

Queue server supports transactions. All actions necessary for the response (allocation of temporary data, checking for the presence of necessary events, preparation of connection buffers) are performed before writing to disk. If errors occur, all changes are rolled back. Multiple instructions within a single transaction are not supported. The transaction mechanism provides exclusive ownership of the right to change the event. A simultaneous attempt by two clients to perform actions that change the state of an event will end up returning an error with the corresponding code to the second client.

For correct activation of events, a server command is implemented in the server that cannot be called over the network — it is activated by a timer. Inside, a service queue is organized for all user queues, which are responsible for activating events.

When a request is received from the processor for a new portion of events (usually around 1000) from a specific queue, the server performs the following actions:


Handling duplicate message inserts. In case of problems with the network or high server load, the client may decide that the timeout has come, the server did not process the message and need to send it again; however, the server can already process the message by this time. With queues in which the client issues an ID, everything is simple: there is an ID in the request data - we check if there is one in the queue, and, if so, send an error.

In queues with internal issuance of IDs, the situation is more complicated: the server does not have an unambiguous attribute, according to which it can be understood that the current request is actually a take of the request that was processed several seconds ago. “So, we need to add such a sign,” we decided and added 2 additional fields to the package: RequestID, guaranteed unique within one client process, and PID process. On the queue server, the insert cache is organized using the {ClientIP, RequestID, PID} key, which allows you to track duplicate requests for 10–15 minutes. In practice, this is more than enough. Potential disadvantage - the method does not work through NAT, since all clients will have the same IP and, accordingly, false positives are possible.

Create and configure queues. To simplify the configuration, a queue is automatically created with the default settings when you first try to insert a message into the specified queue. In the configuration, the settings of a specific queue can be specified, the size of the portions for attempts to activate events, the time after which the event from the queue goes from the status blocked to the active status, etc.

By the way, I note that now I would not do automatic creation of queues. Business logic developers, who use the queues right and left, liked this and liked it, but debugging of all this magnificence took a lot of time and effort.
Suddenly a lot of corner cases surfaced in the testing process, for the sake of rare situations, I had to write a lot of code. The main problems came to light when processing a rollback of a transaction of an event that generates a new queue. If during the creation of a new queue and writing an event to disk, other clients try to add events to the queue that has not yet been created, we have to understand that the queue is in the process of creation. The situation is complicated if it is a queue with internal IDs in which the server itself issues message IDs in response to the insert commands. All events in the created queue are blocked until the process of creating a queue is completed, and an ID is already assigned to them. If the transaction creating the queue has to be rolled back, all the transactions of dependent events that are waiting to create a queue must be rolled back. It sounds scary, and the code is even worse.

Summing up


Good


Bad and challenges for the future


Using Queues in My World




In the project we use clients in Perl and C, in other projects we have implemented clients in PHP, Ruby and Java.

PS: Specially did not begin to draw plates with performance comparison with existing subsystems. It’s impossible (I don’t know systems with suitable capabilities) to compare with the same functionality that we use in battle, and without this we will get another test of a spherical horse in a vacuum.

PPS: Some components (administrative interface, local replica, etc.) have been omitted because they are implemented similarly in Tarantool.

PPPS: In one of the following articles we will try to talk about our infrastructure for working with queues - about how we manage resources, how we monitor the status of queues, how the sharding of events between queue servers is organized and about many other things.

If you have any questions, ask in the comments.

Source: https://habr.com/ru/post/216363/


All Articles