📜 ⬆️ ⬇️

Do not be afraid of bicycles. Or another Grand Central Dispatch (GCD) in C ++ 11

IMHO (I Have Opinion Horseradish Dispute)


From my point of view, the most useful thing a programmer can do to improve his professional level is writing bicycles. Cycling is a very exciting process. Sometimes he carries away more than the task for which the bicycle itself was started. When writing a bicycle (by bicycle, I understand the implementation of an existing one) a deeper understanding of existing solutions and techniques occurs.


Motivation


For more than three years, my main working language has been objective-c, and when I first started writing on it I was pleasantly surprised by a thoughtful high-level API for working with multithreading NSOperationQueue , and later - GCD , which in my opinion is the quintessence of conciseness and clarity for Thread concurrency. And here recent articles on Habré: Technique of writing of await / async analog from C # for C ++ and Thread concurrency C ++ 11 . They are forced to look at those new buns that C ++ provides for working with multithreading. And most of them (the same std :: future) look like this to me:


Speculation and Wishlist


Here is a typical scenario in which I use multithreading in my applications:

It is convenient that each of these operations had its own line.
And even more convenient when all this is collected in one place, and not scattered across the five source files. Something like:
file_io_queue.async([=]{ file_data = get_data_from_file( file_name ); parser_queue.async([=]{ parsed_data = parse( file_data ); main_queue.async([=]{ update_ui_with_new_data( parsed_data ) ; }); }); }); 
This code is read as an absolutely linear, synchronous code. It describes the logic of how data changes will occur. For me, by and large, no matter in which stream the file will be read, in which - its parsing. The main thing is the sequence of these operations. I can call the previous code 100500 times for 100500 files.

The obvious solution is to implement a pool of threads template. But almost all implementations that I have seen on the Internet, suggest using one std :: thread for one queue. From my point of view, this is not good. For example, you need to store the instance of the queue itself as long as the asynchronous operations are performed. Creating a std :: thread ist is an order of magnitude more expensive operation than capturing / releasing a mutex. When should we destroy the queue? Yes, and idle a large number of threads at a time when the queue is not in use - not ice.
We will do differently. We will have the Nth number of threads (std :: thread) and a list of lightweight queues with priorities. When we add a task to the queue, we notify the thread that a new task has appeared. The thread takes the highest priority task and performs it. If a task with such a priority is already being performed, then it takes a lower priority task. If there are none, wait.

')

Code


Let's start:
Turn
 namespace dispatch{ typedef std::function<void ()> function; struct queue { typedef long priority; //  .      const queue::priority queue_priority; static std::shared_ptr<queue> main_queue() ; //    virtual void async(function) const; //        queue(queue::priority priority) : queue_priority(priority) {}; }; } 

Implementation of the async method
just redirects the call to the thread pool:
  void queue::async(dispatch::function task) const { thread_pool::shared_pool()->push_task_with_priority(task, this->queue_priority); }; 

All work will be done in our
Thread pool:
  struct queue_impl{ const queue::priority priority; std::queue<function> tasks; bool is_running; queue_impl(queue::priority priority): priority(priority){}; }; struct thread_pool{ thread_pool(); static std::shared_ptr<thread_pool>& shared_pool(); // thread_pool virtual ~thread_pool(); bool stop; typedef std::shared_ptr<queue_impl> queue_ptr; void push_task_with_priority(const function&, queue::priority);//       bool get_free_queue(queue_ptr*) const; //  ,        void start_task_in_queue(const queue_ptr&); //     void stop_task_in_queue(const queue_ptr&); //   std::mutex mutex; //     std::map<queue::priority, queue_ptr> queues; //      std::mutex main_thread_mutex; std::queue<dispatch::function> main_queue; std::condition_variable condition; std::vector<std::thread> threads; //   ,      dispatch::function main_loop_need_update; void add_worker(); //     }; 

Consider the methods in order. We need to find a free queue with maximum priority:
find free queue with maximum priority:
  bool thread_pool::get_free_queue(queue_ptr* out_queue) const { //           auto finded = std::find_if(queues.rbegin(), queues.rend(), [](const std::pair<queue::priority, queue_ptr>& iter){ return ! iter.second->is_running; //      }); bool is_free_queue_exist = (finded != queues.rend()); if (is_free_queue_exist) *out_queue = finded->second; return is_free_queue_exist; } 

Add a task to the queue
  void thread_pool::push_task_with_priority(const function& task, queue::priority priority){ { std::unique_lock<std::mutex> lock(mutex); //   //    .    -   auto queue = queues[priority]; if (!queue){ queue = std::make_shared<dispatch::queue_impl>(priority); queues[priority] = queue; } queue->tasks.push(task); //  ,    unsigned max_number_of_threads = std::max<unsigned>(std::thread::hardware_concurrency(), 2); unsigned number_of_threads_required = round(log(queues.size()) + 1); while (threads.size() < std::min<unsigned>(max_number_of_threads, number_of_threads_required)){ add_worker(); } } condition.notify_one(); //  ,       } 

Mark the task as completed
  void thread_pool::stop_task_in_queue(const queue_ptr& queue){ { std::unique_lock<std::mutex> lock(mutex); //    .    -      queue->is_running = false; if ( queue->tasks.size() ==0 ){ queues.erase(queues.find(queue->queue_priority)); } } condition.notify_one(); //  ,      } 

And, actually, the flow itself:
  void thread_pool::add_worker(){ threads.push_back(std::thread([=]{ dispatch::function task; thread_pool::queue_ptr queue; while(true){ { std::unique_lock<std::mutex> lock(mutex); //    while(!stop && !get_free_queue(&queue)) //     condition.wait(lock); //    if(stop) //     ,   return; task = queue->tasks.front(); //     queue->tasks.pop(); start_task_in_queue(queue); //     } task(); //   stop_task_in_queue(queue); //     } })); } 


Main Thread and Run Loop




In C ++, there is no such thing as a main thread. But almost all UI applications are built on this concept. We can only change the UI from the main thread. So, we need to either organize the Run Loop ourselves, or wedge in the existing one.

To begin, create a separate queue for the “main thread”:
Main queue
  struct main_queue : queue{ virtual void async(dispatch::function task) const override; main_queue(): queue(0) {}; }; std::shared_ptr<queue> queue::main_queue(){ return std::static_pointer_cast<dispatch::queue>(std::make_shared<dispatch::main_queue>()); } 

And in the async method we will add tasks to
separate queue
  void main_queue::async(dispatch::function task) const { auto pool = thread_pool::shared_pool(); std::unique_lock<std::mutex> lock(pool->main_thread_mutex); pool->main_queue.push(task); if (pool->main_loop_need_update != nullptr) pool->main_loop_need_update(); } 

Well, we need a function that will be called from the main thread:
Code
  void process_main_loop() { auto pool = thread_pool::shared_pool(); std::unique_lock<std::mutex> lock(pool->main_thread_mutex); while (!pool->main_queue.empty()) { auto task = pool->main_queue.front(); pool->main_queue.pop(); task(); } } 



Now only two questions: “How?” And “Why?”


First “Why?”: C ++ is often used to write cross-platform software. For the sake of portability, many convenient things must be discarded. GCD is a very convenient library, providing a simple, intuitive and convenient way to manage asynchronous queues.
There is no definite answer to the question "How?" Ruckup can be wedged in different ways. Many systems provide APIs for this. For example, in iOS there is “performSelectorOnMainThread:”. We just need to set the callback via dispatch :: set_main_loop_process_callback:
 -(void)dispatchMainThread{ dispatch::process_main_loop(); } - (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions{ dispatch::set_main_loop_process_callback([=]{ [self performSelectorOnMainThread:@selector(dispatchMainThread) withObject:nil waitUntilDone:NO]; }); return YES; } 

If we ourselves organize our own business, then we can do something like this:
  void main_loop(dispatch::function main_loop_function); void main_loop(dispatch::function main_loop_function){ auto main_queue = queue::main_queue(); while (!thread_pool::shared_pool()->stop) { main_queue->async(main_loop_function); process_main_loop(); } } 



And now actually for the sake of what it was started:


Create 6 queues and shove 6 tasks each:
  auto main_thread_id = std::this_thread::get_id(); for (unsigned task = 0; task < 6; ++task) for (unsigned priority = 0; priority < 6; ++priority){ dispatch::queue(priority).async([=]{ assert(std::this_thread::get_id() != main_thread_id); std::string task_string = std::to_string(task); std::string palceholder(1+priority*5, ' '); dispatch::queue::main_queue()->async([=]{ assert(std::this_thread::get_id() == main_thread_id); std::cout << palceholder << task_string << std::endl; }); }); } 

We get about this picture
 0 1 0 0 2 1 1 3 2 2 4 3 3 5 4 4 0 5 5 1 0 0 2 1 1 3 2 2 4 3 3 5 4 4 5 5 

A “pillar” is a queue. The more to the right, the higher priority at the queue. The line is callbacks to the “main stream”.

Well, the code for iOS:
  for (int i = 0; i < 20; ++i){ dispatch::queue(dispatch::QUEUE_PRIORITY::DEFAULT).async([=]{ NSAssert(![NSThread isMainThread], nil); std::string first_string = std::to_string(i); dispatch::queue::main_queue()->async([=]{ NSAssert([NSThread isMainThread], nil); std::string second_string = std::to_string(i+1); std::cout << first_string << " -> " << second_string << std::endl; [self.tableView reloadData]; //  -  UI. ,        }); }); } 


Conclusion


Conclusion will not be. This bike was written solely for the purpose of feeling multi-threading in C ++ 11. The code is a little more than 200 lines of not very good C ++ code presented on the github . It was checked on clang ++ 3.3, g ++ - 4.7 / g ++ - 4.8 and by the compiler 2012 Visual Studio. That is, the main compilers already sufficiently support C ++ 11.

Py.Sy. Calling to write my bikes, I do not call them to use on combat projects. Although, on the other hand, how else can a bike turn into something serious?

Well, a couple of bicycles that I did not think of where to cram in the article


Source: https://habr.com/ru/post/186200/


All Articles