📜 ⬆️ ⬇️

A little bit about multi-threaded programming. Part 1. Synchronization is evil or not.

In my work, I often come across high-load multi-thread or multi-process services (application-, web-, index-server).
Quite interesting, but sometimes ungrateful work - to optimize the whole economy.
The growing needs of customers often rest against the impossibility of simply replacing the iron component of the system with a more modern one, since Computer performance, read-write speeds of hard drives and networks are growing much slower than customer requests.
Rarely helps to increase the number of nodes of the cluster (the system is usually distributed).
Most of the time having to run the profiler, look for bottlenecks, crawl into the source code and edit bloopers that were left by colleagues, and sometimes myself, to be honest, many years ago.
Some of the problems associated with synchronization, I will try to explain here. This will not be an introductory course on multi-threaded programming - it is assumed that the reader is familiar with the concept of thread and context switch, and knows what mutex, semaphore, etc. are for.

It is clear to any developer multithreadedly designing something more than “Hello world” that creating a fully asynchronous code is incredibly difficult - you need to write something in a common channel, change the structure in memory (for example, rotate the hash-table tree), pick up what from the queue, etc.
By synchronizing such access, we limit the simultaneous execution of some critical sections of code. As a rule, this is one, rarely several streams (for example, 1 writer / N readers).
The need for synchronization is undeniable. Excessive synchronization is very harmful - a piece of the program is more or less smartly working on 2 or 3 threads, for almost 5 threads it can be performed almost “singlethreaded”, and on 20 even on a very good hardware it almost goes to bed.

However, practice shows that sometimes insufficient execution synchronization leads to the same result - the system sticks. This happens when the code executed in parallel contains, for example, HDD (continuous seek) calls, or when multiple large memory pieces are accessed (for example, a constant cache flush during context switch - CPU cache just falls off stupidly).

Use semaphores (semaphore)

Semaphores were invented not only for building constructions of the type ReadWriteMutex on them. Semaphores can and should be used to reduce the load on iron on a piece of code executed in parallel.
As a rule, this way you can cure a lot of “stickings” that are easy to find by code profiling — when with an increase in the number of threads, the execution time of certain functions grows noticeably, while other functions work at the same or comparable speed.
Expand Profiler-Output
======================================================================================================================== Run # 1 (5 Threads) rpcsd (hbgsrv0189, PID:0718, TID:2648) # 03-09-2012 | 13:50:45 | Servlet: A::RpcsServlet, URI: /index-search ======================================================================================================================== NS | Name | C | T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s) ======================================================================================================================== ::RPC::Service | service | 1 | 1 | 1.593 | 1.593 | 1.593 | 1.593 | 1.593 | 1.593 ::A::RpcsServlet | service | 1 | 1 | 1.592 | 1.592 | 1.592 | 1.592 | 1.592 | 1.592 ::IndexSrvRpc | index-search | 1 | 1 | 1.584 | 1.584 | 1.584 | 1.584 | 1.584 | 1.584 ::Indexer::Search | Search | 1 | 1 | 1.584 | 1.584 | 1.584 | 1.584 | 1.584 | 1.584 ::Indexer::Search | ParallelSearch | 2 | 2 | 1.256 | 1.256 | 0.628 | 0.628 | 0.655 | 0.601 ::Indexer::Search::Cache | SearchL2Index | 44 | 44 | 0.686 | 0.686 | 0.016 | 0.016 | 0.016 | 0.015 ::Indexer::Search | InvalidateCacheIdx | 20 | 20 | 0.570 | 0.570 | 0.028 | 0.028 | 0.031 | 0.020 ::Indexer::Search::Cache | InvalidateIdx | 20 | 20 | 0.276 | 0.276 | 0.014 | 0.014 | 0.016 | 0.002 ::Indexer::Search | SearchL1Index | 1 | 14 | 0.203 | 0.203 | 0.203 | 0.016 | 0.203 | 0.016 ::Indexer::Search | MergeJoin | 1 | 1 | 0.125 | 0.125 | 0.125 | 0.125 | 0.125 | 0.125 ======================================================================================================================== Run # 2 (25 Threads w/o semaphore) rpcsd (hbgsrv0189, PID:0718, TID:2648) # 03-09-2012 | 13:52:03 | Servlet: A::RpcsServlet, URI: /index-search ======================================================================================================================== NS | Name | C | T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s) ======================================================================================================================== ::RPC::Service | service | 1 | 1 | 4.255 | 4.255 | 4.255 | 4.255 | 4.255 | 4.255 ::A::RpcsServlet | service | 1 | 1 | 4.254 | 4.254 | 4.254 | 4.254 | 4.254 | 4.254 ::IndexSrvRpc | index-search | 1 | 1 | 4.244 | 4.244 | 4.244 | 4.244 | 4.244 | 4.244 ::Indexer::Search | Search | 1 | 1 | 4.244 | 4.244 | 4.244 | 4.244 | 4.244 | 4.244 ::Indexer::Search | ParallelSearch | 2 | 2 | 3.729 | 3.729 | 1.865 | 1.865 | 1.889 | 1.840 ::Indexer::Search | InvalidateCacheIdx | 20 | 20 | 2.497 | 2.497 | 0.125 | 0.125 | 0.126 | 0.125 ::Indexer::Search::Cache | InvalidateIdx | 20 | 20 | 2.188 | 2.188 | 0.109 | 0.109 | 0.113 | 0.109 ::Indexer::Search::Cache | SearchL2Index | 44 | 44 | 1.231 | 1.231 | 0.028 | 0.028 | 0.031 | 0.015 ::Indexer::Search | SearchL1Index | 1 | 14 | 0.360 | 0.360 | 0.360 | 0.028 | 0.360 | 0.016 ::Indexer::Search | MergeJoin | 1 | 1 | 0.155 | 0.155 | 0.155 | 0.155 | 0.155 | 0.155 ======================================================================================================================== Run # 3 (25 Threads with semaphore in InvalidateCacheIdx, before InvalidateIdx) rpcsd (hbgsrv0189, PID:0718, TID:2648) # 03-09-2012 | 14:02:51 | Servlet: A::RpcsServlet, URI: /index-search ======================================================================================================================== NS | Name | C | T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s) ======================================================================================================================== ::RPC::Service | service | 1 | 1 | 2.213 | 2.213 | 2.213 | 2.213 | 2.213 | 2.213 ::A::RpcsServlet | service | 1 | 1 | 2.213 | 2.213 | 2.213 | 2.213 | 2.213 | 2.213 ::IndexSrvRpc | index-search | 1 | 1 | 2.205 | 2.205 | 2.205 | 2.205 | 2.205 | 2.205 ::Indexer::Search | Search | 1 | 1 | 2.205 | 2.205 | 2.205 | 2.205 | 2.205 | 2.205 ::Indexer::Search | ParallelSearch | 2 | 2 | 1.690 | 1.690 | 0.845 | 0.845 | 0.889 | 0.801 ::Indexer::Search::Cache | SearchL2Index | 44 | 44 | 1.153 | 1.153 | 0.026 | 0.026 | 0.031 | 0.016 ::Indexer::Search | InvalidateCacheIdx | 20 | 20 | 0.537 | 0.537 | 0.027 | 0.027 | 0.031 | 0.007 ::Indexer::Search | SearchL1Index | 1 | 14 | 0.359 | 0.359 | 0.359 | 0.028 | 0.359 | 0.017 ::Indexer::Search::Cache | InvalidateIdx | 20 | 20 | 0.278 | 0.278 | 0.014 | 0.014 | 0.016 | 0.004 ::Indexer::Search | MergeJoin | 1 | 1 | 0.156 | 0.156 | 0.156 | 0.156 | 0.156 | 0.156 


In the third issue of the profiler, you can see how the execution time of the InvalidateIdx method and, accordingly, of the InvalidateCacheIdx method have changed, after invCI_semaphore the InvalidateIdx method after surrounding the invCI_semaphore semaphore
 semaphore invCI_semaphore(config.InvCI_Count/* = 5*/); ... int InvalidateCacheIdx() { ... while (...) { cache.SearchL2Index(); invCI_semaphore++; while (cache.InvalidateIdx()) {}; invCI_semaphore--; } ... } 

This method of using semaphores is quite simple and does not necessarily require a complete understanding of the process, but has many drawbacks, including the fact that the maximum number of threads for each block will most likely be chosen in combat (production, on the client system) - which is not always eat well But the great advantage of this optimization method is the ability to quickly increase the number of threads of the entire service, without changing the execution plan, i.e. practically without reworking the entire engine — simply putting several semaphores at the previous value in narrow places. I am not a supporter of thoughtlessly using semaphores, but as a temporary solution (to calm the client), I used this method more than once, so that later I could easily redo it “correctly”, having penetrated into the source code.
')
Prioritize

Priorities are a very convenient mechanism, which also makes it quite easy to “ease” the application. For example, if the system logs are written in a separate thread, then reducing its priority to the minimum, you can greatly “facilitate” the process without reducing the log-level.
For example, a structure of the following form can be used if a pool with multiple threads processes tasks of different priority:
 // before doing ... if ( thisThread.pool.count() > 1 && !(currentTaskType in (asap, immediately, now)) ) { thisThread.priority = 2 * thisThread.pool.priority; } else { thisThread.priority = 5 * thisThread.pool.priority; } // do current task ... 

It should be understood that the priority of the stream is valid for the whole process, and not only for the pool in which this thread exists - use it with care.

Divide et impera (Divide and conquer)

Quite often, the instant execution of a code segment is not required - i.e. some action or part of the task can be delayed. For example, write logs, count visits, re-index cache, etc.
You can significantly increase the speed of execution by highlighting pieces of synchronous code into separate tasks, and then performing them later (for example, using background service). This can be a separate thread, a pool of threads, or even another aka RPC process (for example, an asynchronous WebService call). Naturally, the time cost of a call (placing in a queue, etc.) of this task should be less than the cost of the execution itself.
An example with a separate LOG stream:
 //      : int log(int level, ...) { if (level >= level2log) { logMutex.lock(); try { file.write(...); file.flush(); } finally { logMutex.release(); } } } 


 //  -  : int log(int level, ...) { if (level >= level2log) { // ,     : logQueue.mutex.lock(); logQueue.add(currentThread.id, ...); logQueue.mutex.release(); //  -worker' : logQueue.threadEvent.pulse(); } } // background-logging thread: int logThreadProc() { ... while (true) { //   -   /* 500 ms */    /* 10 */: if ( logQueue.count < config.LogMaxCount /* = 10 */ || (sleepTime = currentTime - lastTime) < config.LogLatency /* = 500 */) { logQueue.threadEvent.wait(config.LogLatency - sleepTime); continue; }; //        : logQueue.mutex.lock(); try { foreach (... in logQueue) { file.write(...); logQueue.delete(...); } } finally { logQueue.mutex.release(); } //    : file.flush(); //  : logQueue.threadEvent.wait(); lastTime = currentTime; } ... } 

Such a simple construction can significantly reduce logging costs and reduce the consequences of the context switch, which practically will not depend on the number of threads using the log method.
It is important to understand that now, by hanging additional logic on logging, only the stream directly writing to the log is loaded. Those. You can make our log more intelligent as you like - enter the concept of LogLatency, as an example, add some log analyzer (something like fail2ban) or save for example all debug messages, in order to log them only in case of error, group by TID , etc. - all this will practically not load the remaining flows.
In addition, when using the first method (the message is written synchronously directly to the log file), the threads are “parallelized” so to speak. Those. the more synchronization objects (mutex, critical section, waiting events) and the higher the costs of the context switch, the more likely that all threads passing through these objects will be executed sequentially.
Those. the speed of a multithreaded task execution approaches or becomes even worse than the speed of a single-threaded execution. By reducing the time between lock () and release (), the code improves in two directions at once — it becomes faster in the stream itself and the probability of the process “paralleling” decreases.
Having organized a queue of events, it is sometimes possible to create such constructions without even resorting to creating additional streams. For example, write to the queue some actions to later, for example during the “idle time”, to perform them by the same thread, one after the other.
You can easily illustrate this at TCL:
 ##   /  ... ... ##  counter : set counter [db onecolumn {select cntr from accesslog where userid = $userid}] %>     <%= $counter %> ... <% ##   " access log" in background,    "update idle": after idle UpdateAccess $userid [clock seconds] ## . .... ## -    : proc UpdateAccess {userid lasttime} { db exec {update accesslog set cntr = cntr + 1, lastaccess = $lasttime where userid = $userid} } 


Queues, FIFOs, LIFOs, and Multithreading

Queuing, pooled data or a sequential buffer is not tricky, but it must be kept in mind that with multithreading and other things being equal, the LIFO queue should be made number one (of course, if the sequence of actions is not important). Sometimes it is possible to combine or group LIFO and FIFO (make LIFO elements in small FIFO queues or, for example, build a buffer from the end, etc.). The meaning of such perversions lies in the cache of the processor and partly in the virtual organization of memory. Those. the probability that the last elements from the LIFO are still in the processor cache is incomparably higher than the probability of the same for the FIFO of the same length.

Life example - In our own memory manager, a hash table was organized from pools of free objects of the same size (who very often called malloc / free knows why this is done :). Pools were organized according to the FIFO principle — the mymalloc function returned the first element that had been pooled by the myfree function a long time ago. The reason that prompted the developer to use the FIFO is simple to banality - if some unscrupulous "programmer" will use the object after myfree while, then the program will probably work longer. After replacing the LIFO with the entire arsenal (application server), actively using the memory manager earned about 30% faster.

ReadWriteMutex

Very often, you only need to synchronize if the object changes. For example, when writing to a common file, when changing the structure of lists or hash tables, etc. In this case, as a rule, this is allowed only to one thread, while often even reading threads are blocked (to exclude dirty read and program crashes with an exception, since the records to the end of the change are not completely valid).
It is more correct to block such objects using RW-mutex, where reading threads do not block each other, and only when the record is locked, complete synchronization of the code occurs (executed by a single stream).
When using read / write-mutex, it is always necessary to accurately represent how an object is read, since in some cases, even when reading, the object may change (for example, when building an internal cache during initial initialization or reinitialization after writing). In this case, the ideal API provides a callback for blocking, or blocks itself in the case of multithreading, or the possible use of RW-mutex, with all the exceptions, is described in more detail in the API documentation. In some implementations of RW-mutex, you need to know in advance (report to mutex) the number of reader streams, sometimes writer streams. This is due to the specific implementation of a write lock (as a rule, semaphores are used). Despite these and other limitations, if there are several reader threads, it is advisable, if possible, to try to synchronize on such a mutex.

Read the documentation, read the source code

The problem of ignorance, sometimes misunderstanding of what lies behind a particular class or object, is especially critical when used in a multithreaded application. Especially it concerns the basic objects of synchronization. I will try to clarify what I mean by the example of improper use of the RW-mutex.
A colleague of mine once used a fair RW-mutex built on semaphores. He was too lazy to dynamically transfer the number of reader streams to the RWMutex class (set a statically “maximum possible” value of 500) and wrote the following code for a writer stream:
 ... RWMutex mtx(500); ... mtx.lockWrite(); hashTab.add(...); mtx.releaseWrite(); ... 

And with a good load, the server went into deep booze and went into hibernation. The thing is, he made two mistakes - taking a static value of 500 and did not figure out how such an RW-mutex will behave on this particular platform. Because RW-mutex was made fair - a code similar to the following was used:
 void RWMutex::lockWrite() { writeMutex.lock(); for (register int i = 0; i < readersCount /*    500 */; i++) readSemaphore++; } void RWMutex::releaseWrite() { if (!f4read) writeMutex.release(); readSemaphore -= readersCount; if (f4read) writeMutex.release(); } 

Such a construction, due to the use of the readSemaphore++ increment in the body of lockWrite readSemaphore++ , instead of readSemaphore += readersCount , gives the same chances for reader- and writer-streams. Perhaps he did not know that the semaphore class for building this RWMutex, used one cross-platform library, which produced for this particular platform a simple code that looked something like this:
 int Semaphore::operator ++() { mutex.lock(); if (sema++ > MaxFlowCount) flowMutex.lock(); mutex.release(); } 

Those. when adding 100 values ​​to the hash table of the hashTab , while reading several threads at the same time, we had 100 * 500 locks (and precipitation for several milliseconds due to the context switch). The most interesting thing about this story is that it was the base class RWSyncHashTable, which is actively used throughout our code.
You need to clearly remember: some API constructs may already be synchronized. Sometimes it is even the constructor and destructor of the object. In this case, additional synchronization is often harmful. This is exactly the case when you spoil the porridge with butter.
Read the sources, look in the documentation for the API - and such blunders are more likely to pass you by.

Synchronous! = Waiting

Synchronization of execution does not mean at all that our process does nothing more than wait. The blocking methods of modern systems are davolno flexible, and allow the following constructions to be made:
 static int mtx_locked = 0; //   - - ,  1 ? while ( mtx_locked || !mtx.lock(config.MaxWaitTime /*  1 ms */) ) { //    -  -  ...  ... processNextRequest(); } //   -  ... mtx_locked++; //  ... processInLock(); // unlock ... mtx_locked--; mtx.release(); 

Using this type of code allows you not to wait for the mutex to block and goes to bed, but to try to do something else at this time. The concept of asynchronous programming is based on a similar principle, although often a bit differently implemented (callback or event execution, transactional nowait locking, per thread-caching, etc.). In this case, you must follow a very simple rule - "do not wait."
In this example, another technique is given to avoid or minimize the context switch: this is a static variable mtx_locked . This technique allows you not to perform mtx.lock , if you mtx.lock know that the code is blocked ( mtx_locked > 0 ), and we don’t have to know this for sure - we just do something else.

Probably should finish on this first part (and then a lot of letters). If somewhere I wrote the uppercase for someone truth, I ask you to forgive meekly - not from evil. Suggestions, suggestions, criticism are welcome.

In the next part:

Source: https://habr.com/ru/post/150801/


All Articles