⬆️ ⬇️

Passing messages between threads. Classic blocking algorithms

Once I got out of the sandbox with a scoop in my hand and a post about non-blocking queues and data transfer between threads. That post was not so much about algorithms and their implementation, but about measuring performance. Then in the comments I was asked a perfectly reasonable question about the usual blocking transmission algorithms - how much slower they are and how to choose the optimal algorithm for a specific task.

Of course, I promised, and enthusiastically set to work, I even got funny results, however ... there wasn’t enough zest, it was boring and flat. As a result, my inner perfectionist merged with my undisguised procrastinator and together they overcame me, the post had long settled in drafts and even my conscience did not tremble at the sight of a forgotten headline.

However, everything is changing, new technologies are appearing, old technologies are disappearing in the archives, and I suddenly decided that it was time to repay debts and keep promises. As a punishment, I had to rewrite everything from scratch, if the miser pays twice, then the lazy one reworks twice, which is what I need.

Yes, I apologize for the KDPV - it is of course completely from a different subject area, but it is nevertheless ideally suited to illustrate the interaction between the threads.





So what woke Herzen?


Pushed me into action getting to know
D language
no need to continue the analogy, I did not mean to say that he was terribly far from the people
- an extremely conceptually beautiful language that inherited and powerfully moved C ++ idioms forward and at the same time retained effective low-level tools, down to pointers. Perhaps because of this, in my opinion, in the standard library D there is some kind of duality - most of the functionality can be called either out of the box, in a simple and easy way, or through an interface close to the native, but fully using the resources and capabilities of the system. If C ++ covers the entire range with a continuous spectrum, then this division is usually well visible in D. See for yourself: you need to measure the time interval, there is a wonderful std.datetime module, but the measurement quantum is 100 ns, which is absolutely not enough for me, please - there is a no less remarkable module - core.time . The std.concurrency.spawn, which is lightweight to the limit, does not suit you - you can use the whole bunch of core.thread . And so almost everywhere, except for one, but an extremely important place - the separation of data between threads. Yes, yes, all variables local to this thread are located in the thread local storage and you cannot force another thread to see their address by any means. And for the exchange of data, there are built-in queues, it must be admitted that they are very convenient - polymorphic, with the possibility of an extraordinary sending of important messages and an extremely pleasant interface. It is possible to send data through them naturally or by value, or immutable links. When I read about it for the first time, I just jumped out of indignation - “But how did your nasty hand rise ...” - and then I thought about it, remembered my projects over the past years and acknowledged - yes, the whole exchange between the streams follows this pattern , but what does not pass is a clear design error.

Nevertheless, the question hung in the air - how effective are the queues in D? If not, this negates all the other effectiveness of the language, such a built-in bottle neck. So I woke up and took up the measurements again.



What exactly are we going to measure?


The question is actually not an easy one; I wrote about this last time, and I repeat. The usual “naive” approach when sending N messages, measuring the total time and dividing by N does not work . Let's see, we measure the performance of the queue , right? Therefore, we can assume that in the process of measuring the speed of the message generator and the message receiver tends to infinity , with a reasonable assumption that no data is copied inside the queue, it is beneficial to put as much data into the queue as possible , then perform a one-time transfer of some internal pointer and everything is already there . At the same time, the average time per message will fall as 1 / N (actually limited to the bottom by the insertion / deletion time, which can be a few nanoseconds) while the delivery time of each message in theory remains constant, and even grows like O (N) in practice .

Instead, I use the opposite approach - each message is sent, the time is measured, and only then the next ( latency ) is sent. As a result, the results are presented in the form of histograms, along the X axis - time, along the Y axis - the number of packets delivered during this time. The most interesting are numerically two parameters - the median average distribution time and the percentage of messages not met in some (arbitrary) upper limit.

Strictly speaking, this approach is also not quite adequate; nevertheless, it describes the requirements for speed much more accurately. I will do some self-criticism in conclusion, until I say that a complete description would include the generation of all possible types of traffic and analysis of it using statistical methods, a full-fledged scientific work from the field of QA theory would have turned out, or rather, I would get another procrastination attack.

One more thing, I mention this because last time there was a long debate, the message generator can insert them into the queue as quickly as possible, but on the condition that the recipient on average has time to extract and process them, otherwise the whole measurement is simply meaningless. If your receiving stream does not have time to process the data stream, you need to make the code faster, parallelize the processing, change the message protocol, but in any case the queue itself has nothing to do with it. It seems to be a simple idea, but the last time had to be repeated several times in the comments. Fluctuations of speed, when suddenly there are many messages in the queue, are quite possible and even unavoidable, this is just one of the factors that a well-designed algorithm should smooth out, but this is possible only if the maximum reception speed is greater than the average sending speed.

')

Let's start with




What's this? And this is actually the result, all my works were in one picture, but now I will explain for a long time what and why is drawn here.



Pink. Standard mechanism D


5 microseconds, is it a lot or a little? In almost all cases, it is not enough (that is, it is very good). For the overwhelming majority of real-world projects, this is more than enough speed, moreover, not so long ago this transfer time could be obtained only with the help of special hardware and / or very special software. Here we have a tool from the standard library, with many other tasty buns and fast enough for all practical needs. The rating is excellent. But however, not great, because this implementation has some disadvantages not related to speed, I will tell about it in the abusive part.

Once again, we are pleased to see that the main programming magic is the absence of any magic. If you climb under the hood (of course, I could not help but have a look), we will see that the code is completely normal - simply linked lists protected by mutexes. I will not even give him here because in the sense of the implementation of the queue he will not tell us anything new. But those few who really need faster algorithms, including non-blocking ones, can easily write their own version removing all convenient but slowing down buns. But I will give my code, just to show how D is still concise and expressive language.

code to illustrate
import std.stdio, std.concurrency, std.datetime, core.thread; void main() { immutable int N=1000000; auto tid=spawn(&f, N); foreach(i; 0..N) { tid.send(thisTid, MonoTime.currTime.ticks); // wait for receiver to handle message receiveOnly!int(); } } void f(int n) { foreach(i; 0..n) { auto m=receiveOnly!(Tid,long)(); writeln(MonoTime.currTime.ticks-m[1]); // ask for the next message m[0].send(0); } } 






Blue. Cruel and undisguised C ++.


400 nanoseconds! Bingo! Side by side all non-blocking and other tricky algorithms! Or is it still not?

No, of course, this is a crude provocation, the fact is that in this version the reading thread never falls asleep, it continues to continuously check the queue for incoming messages in a loop. This option works as long as your CPU simply has nothing more to do, as soon as competing processes appear, especially if they are just as careless about shared resources, everything starts to slip unpredictably. Yes, there is an option with the forced assignment of one of the cores to serve this thread, but architecturally this is a very bad decision, I will return to this later. There are places where it is justified or even necessary, but if you work in such a place, you probably already know everything yourself, for you this post is completely superfluous.

However, we received important information - on modern systems, the speed of transactions is not determined by the speed of mutexes or data copying, the main factor is the wake up time for the stream after a forced or voluntary pause. Hence the moral - if you don’t want or can’t afford a dedicated CPU to process messages from the queue, think twice before using quick and complex but inconvenient solutions, the loss of adjusting the application architecture to them will almost completely outweigh the insignificant gain that the algorithm itself during the transaction. And yes, here I mean
boost :: lockfree
This is an exemplary implementation of a non-blocking queue, but the message type must have a trivial destructor and an assignment operator , the condition is so harsh for C ++ that I actually have never brought the code to the final product.



So what can be done while remaining within the bounds of reason?



Red. Reasonable and weighted C ++.


If usleep () and others like it came to mind - forget, you are guaranteed to increase the response time to at least 40 microseconds, this is the best that the modern core can guarantee. Slightly better yield () , although it works well for small loads, it tends to share processor time with anyone.
This is all about seals of course.
Experience shows that on each server there is at least one process that currently draws seals, and it will not give the CPU to anyone until all the seals on the internet have been carefully drawn and clicked
There is only one way out and it is obvious - use std :: condition_variable , since we have already used mutexes for synchronization and the changes in the code will be minimal. In this embodiment, the recipient falls asleep on a variable if the queue is empty, and the message generator sends a signal if it suspects that the partner can sleep. In this case, the kernel has all the possibilities for optimization and we get the result, 3 microseconds. It is already possible to say that wow, we literally step on the heels of every tricky implementation, while the basic code is extremely simple and can be adapted to all occasions. There is no polymorphism here, of course, and did not sleep in D, but it turned out almost twice as fast. No kidding, this is a very real competitor to non-blocking algorithms.



Green. Scalability, scalability.


And this is an architectural solution that I have been searching for and carrying for a long time, although the result looks extremely simple. It is often asked how many maximum messages per second can be transmitted through a queue and the like, forgetting that the opposite situation happens at least as often - even if we have a number of threads that do their work and should send messages from time to time, not too often, but important. We do not want to hang on each such stream an individual listener who will still sleep for the most part, so we will have to create one common processing center that will poll all the queues and serve the messages as they arrive. But since today we don’t have a long code evening, but an evening of short conceptual fragments, I suggest using boost :: asio , as a huge bonus this stream can also serve sockets and timers. Here, by the way, it would be easy to do without a queue at all, capturing data directly in the function being transferred, the queue rather serves as an aggregator and a buffer for data, and of course for the meaningful link with the previous examples.

And what do we get? 4.3 microseconds on the process of only one generator, not bad at all. It should be borne in mind that the result will inevitably deteriorate in a system where many threads simultaneously write messages, but scalability is almost unlimited and it is worth a lot.

Once again I want to emphasize what the philosophical meaning of this fragment is - we are sending to another thread not just data, but data plus a functor, who himself knows how to work with them, something like an inter-stream virtuality. This is such a general concept that it could probably claim to be a separate design pattern.




This completes the experimental part, if you need a code for all tests, then here it is . Carefully, this is not a ready-made library, so I do not advise you to copy it thoughtlessly, but it can serve as a completely suitable tutorial for developing your code. Additions and improvements are welcome.



Different reasoning, in the case and not very.


Why do we need message queues at all? As example D teaches us, this is the most kosher pattern for designing multi-threaded systems, for which the future means the future and behind the queues too. But are all the queues the same? What are the options and what are the differences? That's about it and talk.

First you need to distinguish between data streams and message flows . With data streams everything is relatively simple, each transmitted fragment does not carry a semantic load and the boundaries between fragments are rather arbitrary. Copying costs are comparable to or exceed the resources consumed by the queue itself and the recipe in this case is extremely simple - increase the internal buffer as much as possible, get just incredible speed. A data quantum, a large file, for example, can be considered as one message, so large that it cannot be technically transmitted at once. Well, that's all, there is probably nothing more to say about it. But in the message flow, each next fragment carries a complete piece of information and should cause an immediate reaction, we are talking about them today.

It is also useful to analyze the architecture from the point of view of connectivity , what connects with what. The simplest type is the “pipe” that connects two streams, the writer and the reader, its main purpose is to provide a decoupling of the input and output streams, ideally neither of them should be aware of the problems of the other. The second atomic type of queue is a “funnel” where an arbitrary number of threads can write, but only one reads. This is probably the most requested case, the simplest example is the logger. And in general, this is all, the opposite case, when one thread writes and reads a little, is realized with the help of a bundle of "pipes" and therefore is not atomic, and if you suddenly need a queue, anyone can write and read from anyone else I would strongly advise to revise my attitude to life in general and to the design of multithreaded systems in particular.

Returning to the decoupling of the input and output streams, this inevitably leads to the conclusion that the ideal queue must be dimensionless , that is, if necessary, contain infinitely many messages. A simple example: let an extremely important and responsible stream want to write a short message to the log and return to its most important matters. However, our log is built on the basis of a queue with a fixed-size buffer, and here is just that someone dropped “War and Peace” in full. What to do? Such a low priority task as a logger should not block a calling thread, return an error or an exception is extremely undesirable from an architectural point of view (we shift the responsibility to the calling function, we commit it to track all possible outcomes, we extremely complicate the calling code and the probability of error, and instead of we get nothing - what to do is still not clear). And in general, what was all this talk about non-blocking queues for, if it’s right here before our eyes is blocked ?. That is why I have already mentioned that standard queues D are not a universal solution for inter-thread communication, moreover, a non-blocking boost :: lockfree :: queue in one of the options also uses a fixed buffer and is not actually a non-blocking queue, although it uses a non-blocking algorithm .

Fortunately, RAM is now one of the cheapest resources, so the adaptive strategy will probably be the most optimal among the universal ones - memory, if necessary, is allocated from the heap ( not to run twice in large pieces) and is never released, thus the queue size adjusts to bursts traffic and, with normal statistics, referring to the allocator happens less and less. Experience shows that even on medium-sized servers such an approach easily gives several hours of handicap, in which you can manage to fix something, hang it up to a planned stop, or at least find another job .

And finally - the statistical nature of traffic. I have already spoken about the difference in data transmission and transmission of messages, but messages can also have different distribution in time. Oddly enough, the easiest case if the data arrive as quickly as possible (but not faster than we manage to remove them from the queue), but at the same time evenly. At the same time, various accelerators work most effectively, from spin locks to tools built into the system. More difficult is the case when powerful bursts occur in the message flow, which are guaranteed to surpass the processing speed. In this mode, the queue should effectively accumulate incoming messages, allocating memory if necessary and not allowing for a significant slowdown.
It was here that I cheated
in tests, messages are sent strictly one by one and I have not investigated in any way the behavior of queues D when locked on the record, nor the behavior of C queues if necessary, allocating memory. I also did not explore the mutual influence and struggle for the resources of several threads, especially when there are more of them than physical CPUs. In terms of volume, it easily pulls into a separate post.
However, the most possible heavy mode - when messages come very rarely, but require an immediate response. During this time, anything can happen, including dropping into a swap. If during the normal distribution of intervals such events occur rarely and fall into those fractions of a percent that we have discarded in tests, then the efficiency may fall by orders of magnitude.



Incomplete list of rakes in stock.







It is customary to end on an optimistic note: behind multithreading it’s not that the future is more like the present. And behind the powerful, flexible and universal messaging mechanisms - the future, but they are not really written, apparently waiting for us.

All success.

Source: https://habr.com/ru/post/211717/



All Articles