📜 ⬆️ ⬇️

Stream-safe signals that are really convenient to use.

There are many libraries in the world that implement signals in C ++. Unfortunately, all the implementations I have encountered have several problems that do not allow writing simple multithreaded code using these libraries. Here I will talk about these problems and how they can be solved.

What are signals?


I think many are already familiar with this concept, but just in case, I will still write.

A signal is a way to send a notification of an arbitrary event to recipients who can be registered independently of each other. Callback with multiple recipients, if you like. Or, for those who worked with .NET, multicast delegate.

A couple of examples with boost :: signals2
Signal Announcement:
')
struct Button { boost::signals2::signal<void()> OnClick; }; 

Connect to the signal and disconnect from it:

 void ClickHandler() { cout << “Button clicked” << endl; } // ... boost::signals2::connection c = button->OnClick.connect(&ClickHandler); // ... c.disconnect(); 

Call signal:

 struct Button { boost::signals2::signal<void()> OnClick; private: void MouseDownHandler() { OnClick(); } }; 


Now about the problems


In a single-threaded code, everything looks great and works well, but what about multi-threaded?

Here, unfortunately, there are three common problems for different implementations:

  1. There is no way to atomically connect to a signal and get a connected state.
  2. Non-blocking signal trip
  3. Disabling an asynchronous handler does not cancel calls that are already in the queue of its thread.

Consider each of them in detail. To do this, we will write a part of the firmware of an imaginary media set-top box, namely, three classes:


At once I will say that the code that you see here is extremely simplified, and does not contain anything superfluous, so that we can concentrate on these problems. You will also see type types like TypePtr . This is just std :: shared_ptr <Type> , don't be alarmed.

There is no way to atomically connect to a signal and get a connected state.


So, StorageManager . You need a getter for those carriers that are already inserted in the console, and a signal to notify you of new ones.

 class StorageManager { public: std::vector<StoragePtr> GetStorages() const; boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded; // ... }; 

Alas, such an interface cannot be used without getting race condition.

Does not work in this order ...

 storageManager->OnStorageAdded.connect(&StorageHandler); //      ,     for (auto&& storage : storageManager->GetStorages()) StorageHandler(storage); 

... and does not work in that order.

 for (auto&& storage : storageManager->GetStorages()) StorageHandler(storage); //        ,      storageManager->OnStorageAdded.connect(&StorageHandler); 

Common solution


Obviously, since we got the race condition, we need a mutex.

 class StorageManager { mutable std::recursive_mutex _mutex; std::vector<StoragePtr> _storages; public: StorageManager() { /* ... */ } boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded; std::recursive_mutex& GetMutex() const { return _mutex; } std::vector<StoragePtr> GetStorages() const { std::lock_guard<std::recursive_mutex> l(_mutex); return _storages; } private: void ReportNewStorage(const StoragePtr& storage) { std::lock_guard<std::recursive_mutex> l(_mutex); _storages.push_back(storage); OnStorageAdded(storage); } }; // ... { std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex()); storageManager->OnStorageAdded.connect(&StorageHandler); for (auto&& storage : storageManager->GetStorages()) StorageHandler(storage); } 

This code will work, but it has several flaws:


How to do better?


Let's take everything we do around the call to connect (capturing a mutex and traversing the collection) inside.

It is important to understand that the algorithm for obtaining the current state depends on the nature of this state itself. If this is a collection, you need to call the handler for each element, if it is, for example, enum, then you need to call the handler exactly once. Accordingly, we need some kind of abstraction.

Add a populator to the signal — a function that accepts a handler that is currently connected, and let the signal owner (StorageManager, in our case) determine how the current state will be sent to this handler.

 template < typename Signature > class signal { using populator_type = std::function<void(const std::function<Signature>&)>; mutable std::mutex _mutex; std::list<std::function<Signature> > _handlers; populator_type _populator; public: signal(populator_type populator) : _populator(std::move(populator)) { } std::mutex& get_mutex() const { return _mutex; } signal_connection connect(std::function<Signature> handler) { std::lock_guard<std::mutex> l(_mutex); _populator(handler); //        _handlers.push_back(std::move(handler)); return signal_connection([&]() { /*    _handlers */ } ); } // ... }; 

The signal_connection class currently accepts a lambda function that will remove the handler from the list in the signal. Slightly updated code I will bring later.

Rewrite StorageManager using this new concept:

 class StorageManager { std::vector<StoragePtr> _storages; public: StorageManager() : _storages([&](const std::function<void(const StoragePtr&)>& h) { for (auto&& s : _storages) h(s); }) { /* ... */ } signal<void(const StoragePtr&)> OnStorageAdded; private: void ReportNewStorage(const StoragePtr& storage) { //      ,     , //          _storages std::lock_guard<std::mutex> l(OnStorageAdded.get_mutex()); _storages.push_back(storage); OnStorageAdded(storage); } }; 

If you are using C ++ 14, the populator may be quite short:

 StorageManager() : _storages([&](auto&& h) { for (auto&& s : _storages) h(s); }) { } 

Note that when calling the populator, the mutex is captured in the signal :: connect method, so you don’t need to do this in the body of the populator itself.

The client code becomes quite short:

 storageManager->OnStorageAdded.connect(&StorageHandler); 

One line we simultaneously connect to the signal and get the current state of the object. Fine!

Non-blocking signal trip


Now it's time to write MediaScanner . In the constructor, we connect to the signal StorageManager :: OnStorageAdded , and in the destructor we disconnect.

 class MediaScanner { private: boost::signals2::connection _connection; public: MediaScanner(const StorageManagerPtr& storageManager) { _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); } ~MediaScanner() { _connection.disconnect(); //        ,  . //   ,        MediaScanner. } private: void StorageHandler(const StoragePtr& storage) { /*  -  */ } }; 

Alas, this code will fall from time to time. The reason is how the disconnect method works in all the implementations I know. It guarantees that the next time the signal is called, the corresponding handler will not work. At the same time, if the handler is executed at that time in another stream, it will not be interrupted, and will continue to work with the destroyed MediaScanner object.

Qt Solution


In Qt, each object belongs to a specific thread, and its handlers are called exclusively on this thread. To safely disconnect from a signal, you should call the QObject :: deleteLater method , which guarantees that the real deletion will be made from the desired stream, and that no handler will be called after the deletion.

 mediaScanner->deleteLater(); 

This is a good option if you are ready to fully integrate with Qt (abandon std :: thread at the core of your program in favor of QObject, QThread, and others).

Solution in boost :: signals2


Boost to solve this problem suggests using the track / track_foreign methods in the slot (i.e., handler). These methods accept the weak_ptr to an arbitrary object, and the connection of the handler with the signal exists as long as each of the objects is alive, for which the slot “monitors”.

It works quite simply: in each slot there is a collection of weak_ptr 's for monitored objects that “crash” (excuse me) for the duration of the execution of the handler. Thus, these objects are guaranteed not to be destroyed while the handler code has access to them. If any of the objects have already been destroyed, the connection is broken.

The problem is that we need to have a weak_ptr for the object to be signed for this. In my opinion, the most adequate way to achieve this is to make the factory method in the MediaScanner class, where to sign the created object with all the signals that it is interested in:

 class MediaScanner { public: static std::shared_ptr<MediaScanner> Create(const StorageManagerPtr& storageManager) { std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex()); MediaScannerPtr result(new MediaScanner); boost::signals2::signal<void(const StoragePtr&)>::slot_type slot(bind(&MediaScanner::StorageHandler, result.get(), _1)); slot.track_foreign(result); storageManager->OnStorageAdded.connect(slot); for (auto&& storage : storageManager->GetStorages()) result->StorageHandler(storage); return result; } private: MediaScanner() //  ! { /*  ,    */ } void StorageHandler(const StoragePtr& storage); { /*  -  */ } }; 

So, the disadvantages:


How to do better?


Let's make the disconnect method block, so that it guarantees that after it returns control, you can destroy everything the signal handler has access to. Something like the std :: thread :: join method.

Looking ahead, I will say that we need three classes for this:


The class code signal_connection :

 class signal_connection { life_token _token; std::function<void()> _eraseHandlerFunc; public: signal_connection(life_token token, std::function<void()> eraseHandlerFunc) : _token(token), _eraseHandlerFunc(eraseHandlerFunc) { } ~signal_connection(); { disconnect(); } void disconnect() { if (_token.released()) return; _token.release(); //   ,     (. . ) _eraseHandler(); //   -,      } }; 

Here I must say that I am a supporter of the RAII-shnogo object connection. I will not dwell on this in detail; I will only say that it is irrelevant in this context.

The signal class will also change a bit here:

 template < typename Signature > class signal { using populator_type = std::function<void(const std::function<Signature>&)>; struct handler { std::function<Signature> handler_func; life_token::checker life_checker; }; mutable std::mutex _mutex; std::list<handler> _handlers; populator_type _populator; public: // ... signal_connection connect(std::function<Signature> handler) { std::lock_guard<std::mutex> l(_mutex); life_token token; _populator(handler); _handlers.push_back(Handler{std::move(handler), life_token::checker(token)}); return signal_connection(token, [&]() { /*    _handlers */ } ); } template < typename... Args > void operator() (Args&&... args) const { for (auto&& handler : _handlers) { life_token::checker::execution_guard g(handler.life_checker); if (g.is_alive()) handler.handler_func(forward<Args>(args)...); } } }; 

Now, next to each handler, we have a life_token :: checker object that refers to life_token , which is in signal_connection . We capture it for the execution time of the handler using the life_token :: checker :: execution_guard object

I will hide the implementation of these objects under the spoiler. If you are tired, you can skip it.
Inside life_token we need the following things:

  • Some operating system primitive for waiting in life_token :: release (here, for simplicity, let's take a mutex)
  • Flag "alive / dead"
  • Lock counter via execution_guard (omitted here for simplicity)

 class life_token { struct impl { std::mutex mutex; bool alive = true; }; std::shared_ptr<impl> _impl; public: life_token() : _impl(std::make_shared<impl>()) { } ~life_token() { release(); } bool released() const { return !_impl; } void release() { if (released()) return; std::lock_guard<std::mutex> l(_impl->mutex); _impl->alive = false; _impl.reset(); } class checker { shared_ptr<impl> _impl; public: checker(const life_token& t) : _impl(t._impl) { } class execution_guard { shared_ptr<Impl> _impl; public: execution_guard(const checker& c) : _impl(c._impl) { _impl->mutex.lock(); } ~execution_guard() { _impl->mutex.unlock(); } bool is_alive() const { return _impl->alive; } }; }; }; 

Mutex is captured for the lifetime of execution_guard . Accordingly, if the life_token :: release method is called in another thread at this time, it will block on capturing the same mutex and wait until the end of the signal handler execution. After that, it will reset the alive flag, and all subsequent calls to the signal will not lead to a call to the handler.

What is the MediaScanner code now ? Exactly the way we wanted to write it at the very beginning:

 class MediaScanner { private: signals_connection _connection; public: MediaScanner(const StorageManagerPtr& storageManager) { _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); } ~MediaScanner() { _connection.disconnect(); } private: void StorageHandler(const StoragePtr& storage) { /*  -  */ } }; 

Disabling an asynchronous handler does not cancel calls that are already in the queue of its thread.


We write MediaUiModel , which will respond to the found media files and add lines to display them.

To do this, add the following signal to MediaScanner :

 signal<void(const MediaPtr&)> OnMediaFound; 

There are two important things here:


 class MediaUiModel : public UiModel<MediaUiModelRow> { private: boost::io_service& _uiThread; boost::signals2::connection _connection; public: MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner) : _uiThread(uiThread) { std::lock_guard<std::recursive_mutex> l(scanner.GetMutex()); scanner.OnMediaFound.connect([&](const MediaPtr& m) { this->MediaHandler(m); }); for (auto&& m : scanner.GetMedia()) AppendRow(MediaUiModelRow(m)) } ~MediaUiModel() { _connection.disconnect(); } private: //      MediaScanner',        UI. void MediaHandler(const MediaPtr& m) { _uiThread.post([&]() { this->AppendRow(MediaUiModelRow(m)); }); } }; 

In addition to the previous problem, there is one more. Every time the signal is triggered, we shift the handler to the UI thread. If at some point we delete the model (for example, we left the Gallery application), all these handlers come later to the dead object. And fall again.

Qt Solution


All the same deleteLater , with the same features.

Solution in boost :: signals2


If you're lucky, and your UI framework lets you say deleteLater , you are saved. You just need to make a public method that will first disconnect the model from the signals, and then call deleteLater , and you will get about the same behavior as in Qt. True, the previous problem you still have to solve. To do this, you will most likely do inside the shared_ptr model for a certain class, which you will sign on signals. Code is not very small, but it is a matter of technology.

If you are unlucky, and your UI framework requires removing the model exactly when it wants, you will invent your life_token .

For example, something like this (also better not to read if you are tired).
 template < typename Signature_ > class AsyncToUiHandlerWrapper { private: boost::io_service& _uiThread; std::function<Signature_> _realHandler; bool _released; mutable std::mutex _mutex; public: AsyncToUiHandlerWrapper(boost::io_service& uiThread, std::function<Signature_> realHandler) : _uiThread(uiThread), _realHandler(realHandler), _released(false) { } void Release() { std::lock_guard<std::mutex> l(_mutex); _released = true; } template < typename... Args_ > static void AsyncHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args) { auto self = selfWeak.lock(); std::lock_guard<std::mutex> l(self->_mutex); if (!self->_released) // AsyncToUiHandlerWrapper   ,  _uiThread       self->_uiThread.post(std::bind(&AsyncToUiHandlerWrapper::UiThreadHandler<Args_&...>, selfWeak, std::forward<Args_>(args)...))); } private: template < typename... Args_ > static void UiThreadHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args) { auto self = selfWeak.lock(); if (!self) return; if (!self->_released) // AsyncToUiHandlerWrapper   , , ,  _realHandler,   self->_realHandler(std::forward<Args_>(args)...); } }; class MediaUiModel : public UiModel<MediaUiModelRow> { private: using AsyncMediaHandler = AsyncToUiHandlerWrapper<void(const MediaPtr&)>; private: std::shared_ptr<AsyncMediaHandler> _asyncHandler; public: MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner) { try { _asyncHandler = std::make_shared<AsyncMediaHandler>(std::ref(uiThread), [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); }); std::lock_guard<std::recursive_mutex> l(scanner.GetMutex()); boost::signals2::signal<void(const MediaPtr&)>::slot_type slot(std::bind(&AsyncMediaHandler::AsyncHandler<const MediaPtr&>, std::weak_ptr<AsyncMediaHandler>(_asyncHandler), std::placeholders::_1)); slot.track_foreign(_asyncHandler); scanner.OnMediaFound.connect(slot); for (auto&& m : scanner.GetMedia()) AppendRow(MediaUiModelRow(m)); } catch (...) { Destroy(); throw; } } ~MediaUiModel() { Destroy(); } private: void Destroy() { if (_asyncHandler) _asyncHandler->Release(); //      MediaUiModel   ,       _asyncHandler.reset(); } }; 

I will not even comment on this code, let's just get a little sad.

How to do better?


Very simple. First, make the interface for the thread as a task queue:

 struct task_executor { virtual ~task_executor() { } virtual void add_task(const std::function<void()>& task) = 0; }; 

Second, make the connect method overloaded in the signal, which accepts the flow:

 signal_connection connect(const std::shared_ptr<task_executor>& worker, std::function<Signature> handler); 

In this method, in the _handlers collection , put a wrapper over the handler, which, when called, puts the pair from the handler and the corresponding life_token :: checker into the required stream. To call the real handler in the final thread, we will use execution_guard in the same way as before.

Thus, the disconnect method will guarantee that the asynchronous handlers will not be called after we disconnect from the signal either.

I will not give the code for the wrapper and overloaded method connect here. I think the idea is clear and so.

The model code becomes quite simple:

 class MediaUiModel : public UiModel<MediaUiModelRow> { private: signal_connection _connection; public: MediaUiModel(const std::shared_ptr<task_executor>& uiThread, const MediaScanner& scanner) { _connection = scanner.OnMediaFound.connect(uiThread, [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); }); } ~MediaUiModel() { _connection.reset(); } }; 

Here, the AppendRow method will be called strictly in the UI thread, and only until we disconnect.

Summing up


So there are three key things that allow you to write much simpler code using signals:

  1. Populators allow you to conveniently get the current state during connection to the signal
  2. The blocking disconnect method allows you to unsubscribe an object in its own destructor.
  3. In order for the previous item to be true for asynchronous handlers, disconnect must also mark those calls that are already in the queue of the thread as "irrelevant"

Of course, the code of the signals that I gave here is very simple and primitive, and does not work very quickly.My goal was to talk about an alternative approach, which seems to me more attractive than the dominant today. In reality, all these things can be written much more efficiently.

We use this approach in our project for about five years and are very happy.

Ready implementation


I rewrote with C ++ 11 from scratch the signals that we had, improved those parts of the implementation that had long been worth improving.
Use on health: https://github.com/koplyarov/wigwag .

Mini FAQ


Judging by the reaction of people to Reddit and Twitter, basically everyone is concerned with three questions:

Q: Immediately you need to block life_token to call each handler. Would it be slow?
A: Oddly enough, no. You can use atomic variables instead of a mutex, and if we hit the disconnect call at the moment the handler was executed, wait for std :: condition_variable . Then the result is absolutely the opposite: due to the missing overhead project in the form of track / track_foreign (which require working with the weak_ptr collections ), this implementation leaves memory and speed far behind boost :: signals2, and even outperforms Qt.
Benchmarks can be found here .

Q: Will there be deadlocks due to the blocking disconnect method?
A: Yes, here deadlock is really a bit easier to get than in boost and Qt. In my opinion, this pays off with a simpler code for using signals and a higher speed of their work. In addition, if you carefully monitor who is subscribed to whom, then such situations are more likely an exception.

Well, naturally deadlock'i ​​need to catch and repair. On Linux, I recommend Helgrind . For Windows, a two-minute Google search gives Intel Inspector and CHESS .

If for some reason you cannot afford any of the above (for example, there is not enough memory on your platform to run helgrind or some kind of marginal operating system), there is a solution to this (again, simplified) mutex :

 class mutex { private: std::timed_mutex _m; public: void lock() { if (_m.try_lock()) return; while (!_m.try_lock_for(std::chrono::seconds(10))) Logger::Warning() << "Could not lock mutex " << (void*)this << " for a long time:\n" << get_backtrace_string(); } // ... }; 

Both in Visual Studio and GCC there are means for getting backtrace in code. In addition, there is a good libunwind.
With this approach, most of your deadlocks will catch QA, and at one glance at the logs you will understand where everything is blocked. It will only repair.

Q: Can I use one mutex for several signals? Can I handle exceptions the way I want? Is it possible not to use synchronization and get fast single-threaded signals?
A: Can, can, can be. For all of this there are template strategies. Read more in the documentation.

Source: https://habr.com/ru/post/279851/


All Articles