📜 ⬆️ ⬇️

Solve RAII problems with std :: thread: cancellation_token as an alternative to pthread_cancel and boost :: thread :: interrupt

The article deals with problems in std :: thread, simultaneously resolving an ancient dispute on the topic "what to use: pthread_cancel, a boolean flag or boost :: thread :: interrupt?"


Problem


The class std :: thread, which was added to C ++ 11, has one unpleasant feature - it does not correspond to the RAII idiom (Resource Acquisition Is Initialization) . Excerpt from standard :


30.3.1.3 thread destructor
~ thread ();
If joinable () then terminate () , otherwise no effects.

What we face such a destructor? The programmer must be very careful when it comes to the destruction of the object std::thread :


 void dangerous_thread() { std::thread t([] { do_something(); }); do_another_thing(); // may throw - can cause termination! t.join(); } 

If an exception is thrown from the do_another_thing function, the std::thread destructor will terminate the entire program by calling std::terminate . What can be done with this? Let's try to write a RAII wrapper around std::thread and see where this attempt takes us.


Add RAII to std :: thread


 class thread_wrapper { public: // Constructors ~thread_wrapper() { reset(); } void reset() { if (joinable()) { // ??? } } // Other methods private: std::thread _impl; }; 

thread_wrapper copies the std::thread interface and implements another additional function - reset . This function should translate the stream into a non-joinable state. The destructor calls this function, so after that _impl will _impl without calling std::terminate .


In order to translate the _impl into a non-joinable state, reset has two options: detach or join . The problem with detach is that the thread will continue to run, wreaking havoc and disrupting the RAII idiom. So our choice is join :


 thread_wrapper::reset() { if (joinable()) join(); } 

Serious problem


Unfortunately, such a thread_wrapper implementation is no better than the usual std::thread . Why? Let's consider the following usage example:


 void use_thread() { std::atomic<bool> alive{true}; thread_wrapper t([&alive] { while(alive) do_something(); }); do_another_thing(); alive = false; } 

If an exception is do_another_thing from do_another_thing , then crash will not occur. However, calling join from the thread_wrapper destructor will thread_wrapper forever , because alive will never thread_wrapper false and the thread will never end.


The thing is that the thread_wrapper object thread_wrapper no way to influence the function being executed in order to “ask” it to complete. The situation is further complicated by the fact that in the do_something function, the execution thread may well “fall asleep” on the condition variable or in the blocking call of the operating system.


Thus, to solve the problem with the std::thread destructor, a more serious problem needs to be solved:


How to interrupt the execution of a long-term function, especially if in this function the thread of execution can "fall asleep" on a conditional variable or in the blocking call of the OS?


A special case of this problem is the interruption of the entire execution flow. Let's look at three existing ways to interrupt the thread: pthread_cancel , boost::thread::interrupt and a boolean flag.


Existing solutions


pthread_cancel


Sends an interrupt request to the selected thread. The POSIX specification contains a special list of interrupted functions ( read , write , etc.). After calling pthread_cancel for any thread, these functions in this thread begin to throw an exception of a special type. This exception cannot be ignored - a catch-block that caught such an exception must throw it further, so this exception completely unwinds the thread stack and ends it. A thread can temporarily prohibit the interruption of its calls using the pthread_setcancelstate function (one possible use: to avoid exceptions from destructors, logging functions, etc.).


Pros:



Minuses:



Problems with std::condition_variable::wait occur because in C ++ 14 std::condition_variable::wait received the noexcept specification. If you enable interrupts with pthread_setcancelstate , then we lose the ability to interrupt waiting on conditional variables, and if interrupts are enabled, then we are not able to meet the noexcept specification, because we cannot "swallow" this special exception.


boost :: thread :: interrupt


The Boost.Thread library provides an optional thread termination mechanism, somewhat similar to pthread_cancel . In order to interrupt the thread of execution, just call the corresponding object boost::thread method interrupt . You can check the status of the current thread using the function boost::this_thread::interruption_point : in the interrupted thread, this function throws an exception of type boost::thread_interrupted . If the use of exceptions is prohibited by using BOOST_NO_EXCEPTIONS, then you can use boost::this_thread::interruption_requested to check the status. Boost.Thread also allows you to interrupt wait in boost::condition_variable::wait . To do this, use thread-local storage and an extra mutex inside the condition variable.


Pros:



Minuses:



Boolean flag


If you read questions about pthread_cancel ( 1 , 2 , 3 , 4 ) on StackOverflow, then one of the most popular answers is: "Use the boolean flag instead of pthread_cancel ".


The atomic variable alive in our example with exceptions is the boolean flag:


 void use_thread() { std::atomic<bool> alive{true}; thread_wrapper t([&alive] { while(alive) do_something(); }); do_another_thing(); // may throw alive = false; } 

Pros:



Minuses:



Cancellation token


What to do? Let's take the boolean flag as the basis and start solving the problems associated with it. Code duplication? Great - let's wrap the boolean flag in a separate class. Let's call it cancellation_token .


 class cancellation_token { public: explicit operator bool() const { return !_cancelled; } void cancel() { _cancelled = true; } private: std::atomic<bool> _cancelled; }; 

Now you can put the cancellation_token in our thread_wrapper :


 class thread_wrapper { public: // Constructors ~thread_wrapper() { reset(); } void reset() { if (joinable()) { _token.cancel(); _impl.join(); } } // Other methods private: std::thread _impl; cancellation_token _token; }; 

Great, now it only remains to pass the reference to the token to the function that is executed in a separate thread:


 template<class Function, class... Args> thread_wrapper(Function&& f, Args&&... args) { _impl = std::thread(f, args..., std::ref(_token)); } 

Since we write thread_wrapper for illustrative purposes, we can still not use std::forward and, at the same time, ignore the problems that will arise in the move-constructor and the swap function.


It is time to recall the example with use_thread and exceptions:


 void use_thread() { std::atomic<bool> alive{true}; thread_wrapper t([&alive] { while(alive) do_something(); }); do_another_thing(); alive = false; } 

In order to add support for the cancellation_token , we just need to add the correct argument to the lambda and remove alive :


 void use_thread() { thread_wrapper t([] (cancellation_token& token) { while(token) do_something(); }); do_another_thing(); } 

Wonderful! Even if an exception is do_another_thing from do_another_thing destructor thread_wrapper still call cancellation_token::cancel and the thread will complete its execution. In addition, by removing the code of the boolean flag in the cancellation_token , we significantly reduced the amount of code in our example.


Interrupt waiting


It is time to teach our tokens to interrupt blocking calls, for example, waiting on conditional variables. To abstract from specific interrupt mechanisms, we need the cancellation_handler interface:


 struct cancellation_handler { virtual void cancel() = 0; }; 

A handler for interrupting waiting on a condition variable looks like this:


 class cv_handler : public cancellation_handler { public: cv_handler(std::condition_variable& condition, std::unique_lock<mutex>& lock) : _condition(condition), _lock(lock) { } virtual void cancel() { unique_lock l(_lock.get_mutex()); _condition.notify_all(); } private: std::condition_variable& _condition; std::unique_lock<mutex>& _lock; }; 

Now it’s enough to put a pointer to cancellation_handler in our cancellation_token and call cancellation_handler::cancel from cancellation_token::cancel :


 class cancellation_token { std::mutex _mutex; std::atomic<bool> _cancelled; cancellation_handler* _handler; public: explicit operator bool() const { return !_cancelled; } void cancel() { std::unique_lock<mutex> l(_mutex); if (_handler) _handler->cancel(); _cancelled = true; } void set_handler(cancellation_handler* handler) { std::unique_lock<mutex> l(_mutex); _handler = handler; } }; 

An interrupted version of waiting on a condition variable looks like this:


 void cancellable_wait(std::condition_variable& cv, std::unique_lock<mutex>& l, cancellation_token& t) { cv_handler handler(cv, l); // implements cancel() t.set_handler(&handler); cv.wait(l); t.set_handler(nullptr); } 

Attention! The implementation is unsafe, both in terms of exceptions and thread safety. She is here only to illustrate how the cancellation_handler works. Links to the correct implementation can be found at the end of the article.


By implementing the corresponding cancellation_handler , you can teach the token to interrupt the blocking calls of the OS and the blocking functions from other libraries (if these functions have at least some mechanism to interrupt the wait).


Rethread library


The described tokens, handlers and threads are implemented as an open-source library: https://github.com/bo-on-software/rethread , with documentation (in English), tests and benchmarks .


Here is a list of the main differences between the above code and the one implemented in the library:



What is in the library:



Performance


The measurements were carried out on a laptop with an Intel Core i7-3630QM @ 2.4GHz processor.


Below are the results of benchmarks tokens from rethread .
The performance of the following operations was measured:



Ubuntu 16.04

CPU time, ns
Check token status1.7
Call interrupted function15.0
Creating a token21.3

Windows 10

CPU time, ns
Check token status2.8
Call interrupted function17.0
Creating a token33.0

Negative Overhead


Such low overhead costs for interruptibility create an interesting effect:
In some situations, the interrupted function is faster than the “normal” approach.
In the code without using tokens, blocking functions cannot be blocked forever - then it will not be possible to achieve "normal" application termination (perversions like exit(1); it cannot be considered the norm). In order to avoid perpetual blocking and regularly check the status, we need a timeout. For example, such:


 while (alive) { _condition.wait_for(lock, std::chrono::milliseconds(100)); // ... } 

First, such a code will wake up every 100 milliseconds only to check the flag (the timeout value can be increased, but it is limited from above by the "reasonable" application completion time).


Secondly, this code is not optimal even without such meaningless awakenings. The fact is that the call condition_variable::wait_for(...) less effective than condition_variable::wait(...) : at a minimum, it needs to get the current time, count the wake-up time, etc.


To prove this statement, two synthetic benchmarks were written in rethread_testing, in which two primitive implementations of a multi-threaded queue were compared: “regular” (with timeout) and interrupted (with tokens). Measured processor time spent waiting for a single object to appear in the queue.


CPU time, ns
Ubuntu 16.04 & g ++ 5.3.1 ("normal" queue)5913
Ubuntu 16.04 & g ++ 5.3.1 (interrupted queue)5824
Windows 10 & MSVS 2015 ("normal" queue)2467
Windows 10 & MSVS 2015 (interrupted queue)1729

So, on MSVS 2015, the interrupted version runs 1.4 faster than the "regular" version with timeouts. On Ubuntu 16.04, the difference is not so noticeable, but even there the interrupted version clearly outperforms the “normal” one.


Conclusion


This is not the only possible solution to this problem. The most tempting alternative is to put a token in the thread-local storage and throw an exception when it is interrupted. The behavior will be similar to boost::thread::interrupt , but without an additional mutex in each conditional variable and with significantly less overhead. The main disadvantage of this approach is the already mentioned violation of the philosophy of exceptions and the non-obviousness of breakpoints.


An important advantage of the approach with tokens is that it is possible not to interrupt entire streams, but to separate tasks, and if you use the cancellation_token_source implemented in the library, then you can have several tasks at the same time.


Almost all of my Wishlist in the library I implemented. In my opinion, there is not enough integration with blocking calls to the system, such as working with files or sockets. Write interruptible versions for read , write , connect , accept , etc. it will not be difficult, the main problems are the unwillingness to put tokens in standard iostreams and the lack of a generally accepted alternative.


')

Source: https://habr.com/ru/post/306332/


All Articles