📜 ⬆️ ⬇️

Threads, locks, and condition variables in C ++ 11 [Part 2]

For a more complete understanding of this article, it is recommended to read its first part , where the focus was on threads and locks, it explained many points (terms, functions, etc.) that will be used here without explanation.
This article will discuss conditional variables ...

Conditional variables


In addition to the synchronization methods described earlier , C ++ 11 provides support for conditional variables that allow one or more threads to be blocked until either a notification from another thread is received or the mythical spurious wakeup occurs.
There are two implementations of condition variables available in the <condition_variable> header:

I will describe how the conditional variables work:

The code below shows an example of using a conditional variable to synchronize threads: during the operation of some threads (let's call them “workers”) an error may occur, while they are placed in a queue. The “logger” thread handles these errors (retrieving them from the queue) and prints them. “Workers” signal to the “registrar” when an error occurs. The registrar is waiting for a conditional signal. To avoid false awakenings, waiting occurs in a loop where the Boolean condition is checked.
 #include <condition_variable> #include <iostream> #include <random> #include <thread> #include <mutex> #include <queue> std::mutex g_lockprint; std::mutex g_lockqueue; std::condition_variable g_queuecheck; std::queue<int> g_codes; bool g_done; bool g_notified; void workerFunc(int id, std::mt19937 &generator) { //   { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } //   std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); //   int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } //    { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_notified = true; g_queuecheck.notify_one(); } } void loggerFunc() { //   { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } //   ,      while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); while(!g_notified) //    g_queuecheck.wait(locker); //     ,   while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } g_notified = false; } } int main() { //   -  std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); //   std::thread loggerThread(loggerFunc); //   std::vector<std::thread> threads; for(int i = 0; i < 5; ++i) threads.push_back(std::thread(workerFunc, i+1, std::ref(generator))); for(auto &t: threads) t.join(); //        g_done = true; loggerthread.join(); return 0; } 

Execution of this code will give approximately the following result (the result will be different each time, since the workflows work (or rather sleep) random time intervals):
 [logger] running... [worker 1] running... [worker 2] running... [worker 3] running... [worker 4] running... [worker 5] running... [worker 1] an error occurred: 101 [worker 2] an error occurred: 201 [logger] processing error: 101 [logger] processing error: 201 [worker 5] an error occurred: 501 [logger] processing error: 501 [worker 3] an error occurred: 301 [worker 4] an error occurred: 401 [logger] processing error: 301 [logger] processing error: 401 

The wait method, indicated above, has two overloads:

Thus, using the second overload, you can avoid using the g_notified boolean flag in the example above:
 void workerFunc(int id, std::mt19937 &generator) { //   { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } //   std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); //   int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } //    { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_queuecheck.notify_one(); } } void loggerFunc() { //   { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } //   ,      while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); g_queuecheck.wait(locker, [&](){return !g_codes.empty();}); //     ,   while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } } } 

In addition to the overloaded wait() method, there are two more similar methods with the same overload for a predicate:

Overloading these methods without a predicate returns cv_status , indicating whether the timeout occurred, or whether the awakening was due to a conditional variable signal, or a false awakening.

Std also provides the notify_all_at_thread_exit function, which implements a mechanism for notifying other threads that the data stream has completed its work, including the destruction of all thread_local objects. Waiting for threads with a mechanism other than join can lead to incorrect behavior when thread_locals have already been used, and their destructors could be called after the thread has been awakened or after it has already ended (see N3070 and N2880 . As a rule, the call This function should occur immediately before the thread starts to exist. Below is an example of how notify_all_at_thread_exit can be used with conditional variables to synchronize two threads:
 std::mutex g_lockprint; std::mutex g_lock; std::condition_variable g_signal; bool g_done; void workerFunc(std::mt19937 &generator) { { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker running..." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker finished..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); g_done = true; std::notify_all_at_thread_exit(g_signal, std::move(lock)); } int main() { std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); std::cout << "main running..." << std::endl; std::thread worker(workerFunc, std::ref(generator)); worker.detach(); std::cout << "main crunching..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "main waiting for worker..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); while(!g_done) //    g_signal.wait(lock); std::cout << "main finished..." << std::endl; return 0; } 

If the worker finishes his work before the main thread, then the result will be as follows:
 main running... worker running... main crunching... worker finished... main waiting for worker... main finished... 

If the main thread finishes its work before the worker, the result will be as follows:
 main running... worker running... main crunching... main waiting for worker... worker finished... main finished... 

')

As a conclusion


The C ++ 11 standard allows C ++ developers to write multi-threaded code in a standard, platform-independent way. This article is just a run through threads and synchronization mechanisms from std. The <thread> header provides a class with the same name (and many additional functions) representing threads. The <mutex> header provides the implementation of several mutexes and wrappers for synchronizing access to streams. The <condition_variable> header provides two implementations of condition variables that allow you to block one or more threads until you receive a notification from another thread or until a false wake up. For more information and understanding of the essence of the matter, of course, it is recommended to read additional literature :)

Source: https://habr.com/ru/post/182626/


All Articles