⬆️ ⬇️

Asynchronous tasks in C ++ 11

Good day, I would like to share with the community my small library.

I am programming in C / C ++, and, unfortunately, I cannot use the C ++ 11 standard in work projects. But the May holidays came, free time appeared and I decided to experiment and study this forbidden fruit. The best thing to learn is practice. Reading articles on a programming language will teach you how to best read, so I decided to write a small library for asynchronous function execution.

At once I will make a reservation that I know that there is std :: future, std :: async and so on. It was interesting for me to implement something very similar and plunge into the world of lambda functions, flows and mutexes with my head. Holidays - a great time for cycling.





so, let's begin


I decided that my library would function as follows.

There is some pool with a fixed number of threads.

Tasks are added to it using lambda function syntax.

From the task itself, you can extract the result of its implementation, or simply wait until the end of its work.

Looking ahead, it looks like this:



... act::control control(N_THREADS); auto some_task = act::make_task([](std::vector<double>::const_iterator begin, std::vector<double>::const_iterator end) { double sum = 0; for (auto i = begin; i != end; ++i) { sum+=(*i); } return sum; } , data.begin(), data.end()); control << some_task; cout << some_task->get() << endl; ... 


')

Class task


First you need to create a class that describes the task:



 template <typename T> class task : public task<decltype(&T::operator())> { }; template <typename ClassType, typename ReturnType, typename ... Args> class task<ReturnType(ClassType::*)(Args...) const> { protected: const ClassType &m_func; std::tuple<Args...> m_vars; ReturnType m_return; public: task(const ClassType &v, Args... args): m_func(v), m_vars(args ...) {} virtual ~task() {} private: }; 




As is known, the lamba function is expanded into a class-functor with operator ().

Our class of the problem is template; its type is derived from the type of the operator of the functor & T :: operator ().

The class stores a pointer to the functor, the function arguments in the form of std :: tuple, and the return value.



So now we can store in the object a lambda function with parameters, now we need to learn how to call it.

To do this, call opertator () on m_func with the parameters stored in m_vars.

From the beginning, I did not know how to do it, but the increased use of Google and the transition to the second link brought the result:



  template<int ...> struct seq { }; template<int N, int ...S> struct gens : gens<N-1, N-1, S...> { }; template<int ...S> struct gens<0, S...> { typedef seq<S...> type; }; 




With this construct, you can add the following functions to a class:



  ... public: void invoke() { ReturnType r = caller(typename gens<sizeof...(Args)>::type()); } private: template<int ...S> ReturnType caller(seq<S...>) const { return m_func(std::get<S>(m_vars) ...); } ... 




Base class task


Now we are implementing the base class of the task ::



  class abstract_task { protected: mutable std::mutex m_mutex; mutable std::condition_variable m_cond_var; mutable bool m_complete; public: abstract_task(): m_complete(false) {} virtual ~abstract_task() {} virtual void invoke() = 0; virtual void wait() const { std::unique_lock<std::mutex> lock(m_mutex); while (!m_complete) { m_cond_var.wait(lock); } } }; 


The class contains a mutex and a state variable that signals the completion of the task. Appropriately, our task class will accept some changes, which I omit, since the source code is available on the github.



Creating tasks


Let's make a wrapper function for creating tasks:



  template <typename T, typename ... Args> std::shared_ptr<task<decltype(&T::operator())>> make_task(T func, Args ... args ) { return std::shared_ptr<task<decltype(&T::operator())>>(new task<decltype(&T::operator())>(func, args ...)); } 


Since we have a virtual class, it is logical to use a pointer and we will do it, and not just a pointer, but a smart pointer.



Management class


Now we are implementing the entity for executing tasks in background threads.

I will give only part of the code:



 ... class control { std::deque<std::shared_ptr<abstract_task>> m_tasks; std::vector<std::thread> m_pool; std::mutex m_mutex; std::condition_variable m_cond_var; std::condition_variable m_empty_cond; std::atomic<bool> m_run; std::vector<bool> m_active; public: control(std::size_t pool_size = 2) { m_run.store(true, std::memory_order_relaxed); auto func = [this](int n) { while (m_run.load(std::memory_order_relaxed)) { std::unique_lock<std::mutex> lock(m_mutex); m_active[n] = true; if (m_tasks.empty()) { m_empty_cond.notify_all(); m_active[n] = false; m_cond_var.wait(lock); } else { std::shared_ptr<abstract_task> t = m_tasks.front(); m_tasks.pop_front(); lock.unlock(); t->invoke(); lock.lock(); m_active[n] = false; } } }; pool_size = pool_size > 0 ? pool_size : 1; m_active.resize(pool_size, false); for(std::size_t i = 0; i < pool_size; ++i) { m_pool.emplace_back(func, i); } } ... 




For interest, I used all the features of the new standard, the use of which I could at least somehow justify.

This class creates an array of threads and an array of activity state variables to monitor the execution of jobs by child threads.

The main loop of the child stream is controlled by an atomic variable (in theory, it was enough to declare it volatile, since there is no race condition, the main thread only writes to it, and the children only read)



Performance


I would not write this article most likely if it were not for the test of the performance of this solution that I conducted compared to std :: async.

Configuration:

Intel Core (TM) i7-2600 CPU @ 3.40GHz

$ gcc --version

gcc (Debian 4.8.2-21) 4.8.2





The test consists of parallel addition of arrays, and then asynchronous addition of the results of all additions. The result of the operation will be:

res = sum (array) * N_P




Numbers are in milliseconds.



Test 1


Optimization is turned off, the number of elements in the array is 100000000, the number of generated tasks is 73, the number of threads in the pool is 6

Results:

test_act 16775 OK

test_async 16028 OK


Performance is comparable.



Test 2


Optimization is enabled, the number of elements in the array is 100000000, the number of generated tasks is 73, the number of threads in the pool is 6

Results:

test_act 1597.6 OK

test_async 2530.5 OK


My implementation is faster one and a half times.



Test 3


Optimization is enabled, the number of elements in the array is 100000000, the number of generated tasks is 73, the number of threads in the pool is 7

Results:

test_act 1313.1 OK

test_async 2503.7 OK




Test 4


Optimization is enabled, the number of elements in the array is 100000000, the number of generated tasks is 73, the number of threads in the pool is 8

Results:

test_act 1402 OK

test_async 2492.2 OK




Test 5


Optimization is enabled, the number of elements in the array is 100000000, the number of generated tasks is 173, the number of threads in the pool is 8

Results:

test_act 4435.7 OK

test_async 5789.4 OK




Conclusions and bugs


These results are most likely related to the fact that async generates a thread for each task, in my implementation the number of threads is fixed and there are no overhead costs for creating them.

A bug - capturing variables of the scope (via []) in the lambda function calls SIGSEGV. Although passing them through the parameters works fine.



I don’t know how useful this article and the library itself is, but at least I have applied some of the features of the new standard in my practice.

Source

Source: https://habr.com/ru/post/222227/



All Articles