📜 ⬆️ ⬇️

The infrastructure processing queues in the social network My World



Some time ago we talked about the server queues, its principles of operation and internal structure. Now, finally, it's time to move on to reviewing the queues from a more grocery point of view and talk about the infrastructure used for processing tasks. Let's start a little from afar, with what we stopped in the last article: why, in fact, the queue can be used.

The benefits of queues

Of course, the most common method of application is for asynchronous execution of operations generated during the processing of user requests, the results of which are not immediately required for feedback. In our case, for example, sending mail and notifications, friend requests, updating search indexes, resetting caches, post-processing data received from the user (video transcoding, cutting photos), sending data to subsystems whose response time / reliability is not guaranteed.

In practice, it turned out that, having a convenient infrastructure for distributed task processing, the range of applications is significantly higher:

Processing infrastructure

We have to admit that for some time we tried to process jobs from queues without creating a specialized infrastructure for this. As the number of types of queues increased, it became more and more obvious that it became more and more difficult to keep track of the increasing number of scripts and demonos that are very similar but slightly different. It is necessary to monitor the load and run a sufficient number of processes, do not forget to monitor them, when server drops out, to timely lift processes on other machines. There is an understanding that we need infrastructure, which can be said: "here's a cluster of server handlers, here's a queue server, let's dig it out." Therefore, we thought a little, looked at whether there were ready-made solutions, took Perl (because it is one of the main languages, along with C, in which we develop) and rushed.
')
When designing the infrastructure for processing queues, the following characteristics were key:

At the same time, we applied a rather classical scheme, highlighting three components: a manager, a master and a slot.



A slot is the actual process handler of the queue (worker). There can be a lot of slots running on each server, each slot is characterized by the type of queue that it is currently processing. Specialization of slots on certain types of queues increases the locality of data, connections to databases and external services, simplifies debugging (including in case of process memory damage and the formation of "crusts").

A master is a process foreman responsible for the operation of slots on a specific server. His responsibilities include starting and stopping slots, maintaining the optimal number of slots on the server, depending on its configuration, available memory and LA, monitoring the life of slots, destroying stuck and using too many system resources of slots.

A manager is a cluster process manager, runs in one instance (under pacemaker for reliability) and is responsible for the entire cluster. He is involved in micromanagement: on the basis of data from the queue server and statistical data from the wizards, he determines the required number of slots of each type and decides which particular slot will deal with what type of tasks.

Cluster management

The definition of the required number of slots for processing each type of queue by the manager is carried out on the basis of:

At the same time, in order to exclude the blinking of slots, some inaccuracy is intentionally allowed: if the required number of slots slightly differs from the current one, then reconfiguration does not occur (“insignificance” upward and downward deviations are defined as a function of queue priority). With the same purpose, the compilation of a new slot card is always carried out on the basis of the current one. This leads to the fact that the slots are stably engaged in processing one type of tasks. Also, when distributing slots across servers, the manager tries to prevent the accumulation of handlers of the same type and spreads them across the cluster.

It should be recognized that we did not succeed in choosing a formula that balances the number of slots of each type the first time - at first we tried to do with fewer parameters and coefficients. In the process of selecting the formula, the administrator’s manager was drawn, according to which you can analyze the correctness of his actions, and also see how certain slots of the types are distributed across servers.



By the way, this picture shows a couple of funny effects, for example, that the number of slots (including the average) may be less than the minimum. This means that the slot lifetime is not very long, most likely due to memory intensive use, and they are often restarted. Or the fact that the load can be much more than 100%, since it takes into account not only the processing tasks, but also those that accumulate in the queue. It is on the basis of this indicator that the manager determines that the number of slots should be reduced or increased (highlighted in red). For some types of queues, we deliberately overestimate the minimum number of slots of this type. This is necessary to ensure for these queues the minimum response time to the event and to be sure that as soon as it enters the queue, it will immediately go into processing, without waiting for the previous tasks to be completed.



Local government

Servers on which masters are started, different in power: some are old and so-so, some are new and nothing at all. Therefore, it is necessary to automatically, without settings from the admins, determine the available resources, current slots appetites and adjust their number. In our case, it turned out that the most valuable resource is RAM. That it determines how many slots can be run on the server. Of course, to save it, we tried to make maximum use of copy-on-write and load all the code necessary for the execution of slots in the wizard so that after the fork of the slots they share all this memory. In order to calculate the allowable number of slots on one server, standard VSZ and RSS are not enough - they do not contain information about how much process memory is shared between processes. Fortunately, linux for some time has a wonderful parameter PSS (Proportional Set Size), which for each memory page is inversely proportional to the number of processes between which it is fumbled (you can subtract and calculate the PSS process from / proc / $ pid / smaps). The sum of the PSS master and all slots divided by the number of slots corresponds approximately to the average size of the slot (a bit more, since we do not specifically divide the master). By adding the amount of PSS to the free memory and dividing it by the average slot size, you can get their permissible amount (minus some reserve just in case).

In the course of work, the slots constantly interact with the master, receiving commands from it to change the slot parameters and passing it statistical information, which is subsequently used by the manager when planning. The slot periodically calculates the amount of memory it has exclusively (Private_Dirty), and if it exceeds the specified limit, it completes its work. Subsequently, such a slot is restarted by the master from scratch. This saves memory and does not allow individual slots to disrupt the functioning of the server as a whole.

Task Processing Algorithm


The task processing process itself is built according to a fairly simple cyclical algorithm. The slot constantly sends a request to the next queue server to the queue server. If there are events, they are processed. In this case, whenever possible, all events from the pack are processed together. This allows you to reduce the number of database queries by grouping the same type and sending multi-requests. Or, in the case of lack of support for multi-requests from the repositories, speed up communication with them by parallelizing asynchronous requests by using libev or libcoro. The logical solution, of course, would be to use asynchronous task processing as part of an event machine. But in practice, such a solution is fraught with development difficulties (the need to execute the same code from under a synchronous web server and an asynchronous queue handler) and debugging.

Error handling is built in the simplest way possible and uses the feature of the queue server, which, when issuing jobs to the handler, locks them for a while (as a rule, for two hours, but this is configured for each queue). All situations not foreseen by the developer simply result in the task being locked in a queue for two hours and then picked up by another handler. This approach allows the same type of response to problems in the processing logic, as well as errors in the low-level code, which can lead to the fall of the handler in the "shell". More or less regular situations can be handled in a more reasonable way, for example, by postponing the activation time of an event until a later time. In any case, the event is considered processed only when the handler sends a command to the queue server for deletion.

The entire process of job processing is neatly overlaid with sending statistical information, drawing various graphs and monitoring. Information on the number of tasks in the queue and how many of them are locked is used as the primary monitoring. An increase in the number of locked events clearly indicates that events are accumulating in the queue that cannot be processed due to the presence of fatal errors. The increase in the number of active tasks indicates that the incoming stream does not have time to be processed.

Conclusion

In conclusion, I would like to note that the described infrastructure has been used in My World not for the first couple of years and did not require any significant intervention or modification during this time, even though the number of queues and the number of servers used have significantly increased. During this time, we have managed to virtually nullify the zoo of scripts and demonok, transferring them to the infrastructure of the queue manager. Its convenience has led to the fact that new queues appear in the project no less than new ajax functions. And currently, about 350 million tasks of 150 different types are processed on a cluster of 40 servers (about 3,500 handlers) per day.

PS Of course, such ready-made solutions as the gearman have already appeared, and if there is a need for distributed processing of tasks, then it makes sense to look in their direction. But we were lucky to attend to this task a bit earlier, and we got a lot of pleasure in the process of developing our own solution.

Source: https://habr.com/ru/post/228131/


All Articles