📜 ⬆️ ⬇️

Creating a synchronization barrier using C ++ 11

Introduction

Comparing two different parallel programming technologies: POSIX streams and C ++ 11 streams, you can see that the latter lacks an analogue of the type barrier_t from the pthread library.

It is rather strange that such an important synchronization primitive is missing in the standard library. This article will discuss how to make a barrier using only the libraries included in the C ++ 11 standard set.
')
Definition
Barrier is one of the synchronization primitives. It is created on a number of threads. When the first thread finishes its work, it remains to wait at the barrier and waits until the other threads are completed.
As soon as the barrier accumulates exactly as many streams as the barrier was created, all the flows that are expected at the barrier continue their work.

Let's start creating your own barrier, with blackjack and ...

First of all, we need to connect the following libraries included in the C ++ 11 standard:
#include <thread> #include <atomic> #include <condition_variable> #include <mutex> 


Now you probably ask, why do we need all these libraries? Well, the first for the barrier is not needed, but I do not think that you can check your code without connecting this library.

But first things first!

What is the most important field for the barrier? Obviously, the number of threads.
What else do you need to know the barrier? The number of threads that is currently waiting for it.

The hand and stretches to write
 class barrier { const unsigned int threadCount; unsigned int threadsWaiting; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } }; 


However, let's think a bit. The barrier is already slowing down the application, since synchronization takes time. Thus, we would like to reduce the cost of creating and processing the barrier itself.
Therefore, atomic operations are more suitable for changing the number of flows that are expected at the barrier.

So, our class now looks like this:

 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } }; 


Well, the skeleton class we wrote. We can create an object of this class, there is a constructor, there is a copy constructor ...
I'm sorry, what did I say? In general, when combining object-oriented and parallel programming, it is better to get rid of copy constructors in order to protect themselves from unpleasant consequences.

Well, C ++ 11 allows you to explicitly disable this constructor.

 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } barrier(const barrier &) = delete; }; 


So, we figured it out. It remains only to write the method for which we all started it. Waiting at the barrier.

The following idea comes to mind: let's make a logical variable that will be responsible for waiting at the barrier or passing it, and implement the behavior with the help of a conditional variable according to this very condition.

So let's fix our class with new fields:
 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; bool isNotWaiting; std::condition_variable waitVariable; std::mutex mutex; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } barrier(const barrier &) = delete; }; 


Now let's deal with the method. If not all the threads have passed yet, then the flows that reached the barrier should sleep on this conditional variable, i.e. the following code should be executed

 std::unique_lock<std::mutex> lock(mutex); waitVariable.wait(lock, [&]{ return noWait; }); 


If all the flows have passed, then we must notify the remaining flows that there is no need to wait any longer on the barrier. This will make the following code:
 isNotWaiting = true; waitVariable.notify_all(); threadsWaiting.store(0); 


The latter method atomic writes the number 0 to the threadsWaiting variable.

Now it remains to solve one simple question: how to combine these two cases. How do we know how many streams are waiting on the barrier?

Now we remember how the barrier is arranged. To wait for a flow at a barrier, all flows must call a barrier function. Thus, as soon as the wait method is called, we must immediately increase our threadsWaiting variable by 1.
For this we use a function like fetch_add. This is one of the so-called RMW operations (read-modify-write). It reads the value of an atomic variable, adds it atomic to the argument, and writes a new value to it, while returning the old one.

Thus, the two cases described above are combined by a conditional operator, and our class looks like this:

 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; bool isNotWaiting; std::condition_variable waitVariable; std::mutex mutex; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } barrier(const barrier &) = delete; void wait() { if (threadsWaiting.fetch_add(1) >= threadCount - 1) { isNotWaiting = true; waitVariable.notify_all(); threadCount.store(0); } else { std::unique_lock<std::mutex> lock(mutex); waitVariable.wait(lock,[&]{ return isNoWaiting;}); } }; 


Now it only remains to set the initial value of the isNotWaiting variable, which is obviously false.

 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; bool isNotWaiting; std::condition_variable waitVariable; std::mutex mutex; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; isNotWaiting = false; } barrier(const barrier &) = delete; void wait() { if (threadsWaiting.fetch_add(1) >= threadCount - 1) { isNotWaiting = true; waitVariable.notify_all(); threadCount.store(0); } else { std::unique_lock<std::mutex> lock(mutex); waitVariable.wait(lock,[&]{ return isNotWaiting;}); } }; 


So, we wrote a class for a barrier using standard C ++ 11 without connecting third-party libraries.

Now you can argue to me: well, I wrote some code? And where is the proof that it works?

So the most important part: demonstrating the barrier

 #include <iostream> #include <thread> #include <atomic> #include <condition_variable> #include <mutex> class barrier { const unsigned int threadCount; std::atomic<unsigned int>threadsWaiting; bool isNotWaiting; std::condition_variable waitVariable; std::mutex mutex; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; isNotWaiting = false; } barrier(const barrier &) = delete; void wait() { if (threadsWaiting.fetch_add(1) >= threadCount - 1) { isNotWaiting = true; waitVariable.notify_all(); threadsWaiting.store(0); } else { std::unique_lock<std::mutex> lock(mutex); waitVariable.wait(lock,[&]{ return isNotWaiting;}); } } }; barrier *myBarrier; class Thread { private: std::thread* cppthread; static void threadFunction(Thread* arg) { arg->run(); } public: Thread() {} Thread(const Thread&) = delete; virtual ~Thread() {delete cppthread;} virtual void run() = 0; void start() { cppthread = new std::thread(Thread::threadFunction, this); } void wait() { cppthread->join(); } }; class BarrierDemo: public Thread { int id; public: BarrierDemo(int i) { id = i; } void run() { std::cout << "Thread " << id << "runs before barrier" << std::endl; myBarrier->wait(); std::cout << "Thread " << id << "runs after barrier" << std::endl; } }; int main() { // your code goes here int threads; std::cin >> threads; myBarrier = new barrier(threads); BarrierDemo* bardemos = static_cast<BarrierDemo*>(::operator new(sizeof(BarrierDemo)*threads)); for (int i = 0; i < threads; i++) { new (&bardemos[i])BarrierDemo(i); bardemos[i].start(); } for (int i = 0; i < threads; i++) { bardemos[i].wait(); } ::operator delete(bardemos); delete myBarrier; return 0; } 


You can copy the above code to a compiler with C ++ 11 support to test its functionality. This article ends here.

PS It is easy to guess from the above code that this is a “one-time” barrier: as soon as all threads pass through it, you cannot reuse the same instance of the class as a barrier.

Source: https://habr.com/ru/post/246947/


All Articles