
Some time ago we
talked about the server queues, its principles of operation and internal structure. Now, finally, it's time to move on to reviewing the queues from a more grocery point of view and talk about the infrastructure used for processing tasks. Let's start a little from afar, with what we stopped in the last article: why, in fact, the queue can be used.
The benefits of queues
Of course, the most common method of application is for asynchronous execution of operations generated during the processing of user requests, the results of which are not immediately required for feedback. In our case, for example, sending mail and notifications, friend requests, updating search indexes, resetting caches, post-processing data received from the user (video transcoding, cutting photos), sending data to subsystems whose response time / reliability is not guaranteed.
In practice, it turned out that, having a convenient infrastructure for distributed task processing, the range of applications is significantly higher:
- Due to the fact that each event in the queue server has an activation time, you can use queues as a scheduler for pending tasks . Thus, for example, we made postponed posts in groups. Instead of posting to a group feed, the post is placed in temporary storage, and a task is created in the queue with an activation time equal to the post publication time.
- Using the activation time, you can spread the load on the database in time , postponing the execution of some non-priority tasks from peak time to night time.
- You can rotate content displayed by a user group using queues. We apply for social moderation of photos. Images that need to be moderated are added to a special queue, and when the content is delivered to the user, the user takes one photo from the queue and shows it to the moderator.
- The queue can be embedded as an interlayer in the data receiving API, for example, push notifications from colleagues from a neighboring project, to increase reliability and load smoothing.
- To organize the receipt of data from external sources , for example, by downloading some content from a third-party resource.
- You can also force the synchronization of data with friendly projects by circumventing our database and sending multiple requests to their API. This can be done periodically and for a long time by adjusting the load on the API of a third-party project by limiting the number of handlers or by spreading the activation time of events.
- Similarly, you can bypass your own repositories in order to correct the data, beaten due to logical errors in the code, or to update the structure of the stored data in the new format.
- It is very convenient with the help of queues to carry out modifications of the distributed and related data . For example, you can take such an operation as deleting a post, which also requires the removal of all comments and likes. Accordingly, it can be divided into three separate lines: one is responsible for removing the post, the other for deleting comments, and the third for deleting likes. Thus, we find that the speed of clearing each queue depends only on the state of the specific storage and, for example, problems with the storage of likes will not affect the removal of the posts themselves.
Processing infrastructure
We have to admit that for some time we tried to process jobs from queues without creating a specialized infrastructure for this. As the number of types of queues increased, it became more and more obvious that it became more and more difficult to keep track of the increasing number of scripts and demonos that are very similar but slightly different. It is necessary to monitor the load and run a sufficient number of processes, do not forget to monitor them, when server drops out, to timely lift processes on other machines. There is an understanding that we need infrastructure, which can be said: "here's a cluster of server handlers, here's a queue server, let's dig it out." Therefore, we thought a little, looked at whether there were ready-made solutions, took Perl (because it is one of the main languages, along with C, in which we develop) and rushed.
')
When designing the infrastructure for processing queues, the following characteristics were key:
- Distribution - tasks of the same type can be performed on different servers;
- resiliency - dropping ten to twenty percent of the cluster servers should not lead to the degradation of important queues;
- homogeneity - the absence of any server specialization, any task can be performed on any server;
- prioritization - there are critical queues, but there are those that can wait;
- automation - a minimum of manual intervention (especially admin): balancing - the redistribution of handlers of each type, depending on the needs; adaptation to the server - determining the number of processors on each server depending on its characteristics, tracking memory and processor consumption;
- batch processing - tasks of the same type should be grouped to enable the use of optimizations;
- statistics — collection of data on queue behavior, task flow, number, presence of errors;
- monitoring — alerts about abnormal situations and changes in the standard behavior of queues.
At the same time, we applied a rather classical scheme, highlighting three components: a manager, a master and a slot.
A slot is the actual process handler of the queue (worker). There can be a lot of slots running on each server, each slot is characterized by the type of queue that it is currently processing. Specialization of slots on certain types of queues increases the locality of data, connections to databases and external services, simplifies debugging (including in case of process memory damage and the formation of "crusts").
A master is a process foreman responsible for the operation of slots on a specific server. His responsibilities include starting and stopping slots, maintaining the optimal number of slots on the server, depending on its configuration, available memory and LA, monitoring the life of slots, destroying stuck and using too many system resources of slots.
A manager is a cluster process manager, runs in one instance (under pacemaker for reliability) and is responsible for the entire cluster. He is involved in micromanagement: on the basis of data from the queue server and statistical data from the wizards, he determines the required number of slots of each type and decides which particular slot will deal with what type of tasks.
Cluster management
The definition of the required number of slots for processing each type of queue by the manager is carried out on the basis of:
- configuration parameters, including: queue priority; the minimum and maximum number of processors allowed; the number of jobs processed in one iteration;
- recent statistics, including: the number of active jobs in the queue; number of tasks processed; total task processing time; total lifetime of handlers.
At the same time, in order to exclude the blinking of slots, some inaccuracy is intentionally allowed: if the required number of slots slightly differs from the current one, then reconfiguration does not occur (“insignificance” upward and downward deviations are defined as a function of queue priority). With the same purpose, the compilation of a new slot card is always carried out on the basis of the current one. This leads to the fact that the slots are stably engaged in processing one type of tasks. Also, when distributing slots across servers, the manager tries to prevent the accumulation of handlers of the same type and spreads them across the cluster.
It should be recognized that we did not succeed in choosing a formula that balances the number of slots of each type the first time - at first we tried to do with fewer parameters and coefficients. In the process of selecting the formula, the administrator’s manager was drawn, according to which you can analyze the correctness of his actions, and also see how certain slots of the types are distributed across servers.

By the way, this picture shows a couple of funny effects, for example, that the number of slots (including the average) may be less than the minimum. This means that the slot lifetime is not very long, most likely due to memory intensive use, and they are often restarted. Or the fact that the load can be much more than 100%, since it takes into account not only the processing tasks, but also those that accumulate in the queue. It is on the basis of this indicator that the manager determines that the number of slots should be reduced or increased (highlighted in red). For some types of queues, we deliberately overestimate the minimum number of slots of this type. This is necessary to ensure for these queues the minimum response time to the event and to be sure that as soon as it enters the queue, it will immediately go into processing, without waiting for the previous tasks to be completed.

Local government
Servers on which masters are started, different in power: some are old and so-so, some are new and nothing at all. Therefore, it is necessary to automatically, without settings from the admins, determine the available resources, current slots appetites and adjust their number. In our case, it turned out that the most valuable resource is RAM. That it determines how many slots can be run on the server. Of course, to save it, we tried to make maximum use of copy-on-write and load all the code necessary for the execution of slots in the wizard so that after the fork of the slots they share all this memory. In order to calculate the allowable number of slots on one server, standard VSZ and RSS are not enough - they do not contain information about how much process memory is shared between processes. Fortunately, linux for some time has a wonderful parameter PSS (Proportional Set Size), which for each memory page is inversely proportional to the number of processes between which it is fumbled (you can subtract and calculate the PSS process from / proc / $ pid / smaps). The sum of the PSS master and all slots divided by the number of slots corresponds approximately to the average size of the slot (a bit more, since we do not specifically divide the master). By adding the amount of PSS to the free memory and dividing it by the average slot size, you can get their permissible amount (minus some reserve just in case).
In the course of work, the slots constantly interact with the master, receiving commands from it to change the slot parameters and passing it statistical information, which is subsequently used by the manager when planning. The slot periodically calculates the amount of memory it has exclusively (Private_Dirty), and if it exceeds the specified limit, it completes its work. Subsequently, such a slot is restarted by the master from scratch. This saves memory and does not allow individual slots to disrupt the functioning of the server as a whole.
Task Processing Algorithm

The task processing process itself is built according to a fairly simple cyclical algorithm. The slot constantly sends a request to the next queue server to the queue server. If there are events, they are processed. In this case, whenever possible, all events from the pack are processed together. This allows you to reduce the number of database queries by grouping the same type and sending multi-requests. Or, in the case of lack of support for multi-requests from the repositories, speed up communication with them by parallelizing asynchronous requests by using libev or libcoro. The logical solution, of course, would be to use asynchronous task processing as part of an event machine. But in practice, such a solution is fraught with development difficulties (the need to execute the same code from under a synchronous web server and an asynchronous queue handler) and debugging.
Error handling is built in the simplest way possible and uses the feature of the queue server, which, when issuing jobs to the handler, locks them for a while (as a rule, for two hours, but this is configured for each queue). All situations not foreseen by the developer simply result in the task being locked in a queue for two hours and then picked up by another handler. This approach allows the same type of response to problems in the processing logic, as well as errors in the low-level code, which can lead to the fall of the handler in the "shell". More or less regular situations can be handled in a more reasonable way, for example, by postponing the activation time of an event until a later time. In any case, the event is considered processed only when the handler sends a command to the queue server for deletion.
The entire process of job processing is neatly overlaid with sending statistical information, drawing various graphs and monitoring. Information on the number of tasks in the queue and how many of them are locked is used as the primary monitoring. An increase in the number of locked events clearly indicates that events are accumulating in the queue that cannot be processed due to the presence of fatal errors. The increase in the number of active tasks indicates that the incoming stream does not have time to be processed.
Conclusion
In conclusion, I would like to note that the described infrastructure has been used in My World not for the first couple of years and did not require any significant intervention or modification during this time, even though the number of queues and the number of servers used have significantly increased. During this time, we have managed to virtually nullify the zoo of scripts and demonok, transferring them to the infrastructure of the queue manager. Its convenience has led to the fact that new queues appear in the project no less than new ajax functions. And currently, about 350 million tasks of 150 different types are processed on a cluster of 40 servers (about 3,500 handlers) per day.
PS Of course, such ready-made solutions as the
gearman have already appeared, and if there is a need for distributed processing of tasks, then it makes sense to look in their direction. But we were lucky to attend to this task a bit earlier, and we got a lot of pleasure in the process of developing our own solution.