There are many libraries in the world that implement signals in C ++. Unfortunately, all the implementations I have encountered have several problems that do not allow writing simple multithreaded code using these libraries. Here I will talk about these problems and how they can be solved.
What are signals?
I think many are already familiar with this concept, but just in case, I will still write.
A signal is a way to send a notification of an arbitrary event to recipients who can be registered independently of each other. Callback with multiple recipients, if you like. Or, for those who worked with .NET, multicast delegate.
A couple of examples with boost :: signals2Signal Announcement:
')
struct Button { boost::signals2::signal<void()> OnClick; };
Connect to the signal and disconnect from it:
void ClickHandler() { cout << “Button clicked” << endl; }
Call signal:
struct Button { boost::signals2::signal<void()> OnClick; private: void MouseDownHandler() { OnClick(); } };
Now about the problems
In a single-threaded code, everything looks great and works well, but what about multi-threaded?
Here, unfortunately, there are three common problems for different implementations:
- There is no way to atomically connect to a signal and get a connected state.
- Non-blocking signal trip
- Disabling an asynchronous handler does not cancel calls that are already in the queue of its thread.
Consider each of them in detail. To do this, we will write a part of the firmware of an imaginary media set-top box, namely, three classes:
- StorageManager - a class that responds to flash drives, DVDs, and other media that the user has inserted into the console
- MediaScanner - a class that searches for media files on each of these devices.
- MediaUiModel - a model for displaying these media files in an imaginary Model-View-framework
At once I will say that the code that you see here is extremely simplified, and does not contain anything superfluous, so that we can concentrate on these problems. You will also see type types like
TypePtr . This is just
std :: shared_ptr <Type> , don't be alarmed.
There is no way to atomically connect to a signal and get a connected state.
So,
StorageManager . You need a getter for those carriers that are already inserted in the console, and a signal to notify you of new ones.
class StorageManager { public: std::vector<StoragePtr> GetStorages() const; boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded;
Alas, such an interface cannot be used without getting race condition.
Does not work in this order ...
storageManager->OnStorageAdded.connect(&StorageHandler);
... and does not work in that order.
for (auto&& storage : storageManager->GetStorages()) StorageHandler(storage);
Common solution
Obviously, since we got the race condition, we need a mutex.
class StorageManager { mutable std::recursive_mutex _mutex; std::vector<StoragePtr> _storages; public: StorageManager() { } boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded; std::recursive_mutex& GetMutex() const { return _mutex; } std::vector<StoragePtr> GetStorages() const { std::lock_guard<std::recursive_mutex> l(_mutex); return _storages; } private: void ReportNewStorage(const StoragePtr& storage) { std::lock_guard<std::recursive_mutex> l(_mutex); _storages.push_back(storage); OnStorageAdded(storage); } };
This code will work, but it has several flaws:
- If you want to use std :: mutex instead of std :: recursive_mutex , you lose the ability to capture it inside the GetStorages method, which makes the StorageManager class non- safe.
- You cannot get rid of copying a collection inside GetStorages without losing StorageManager 's thread safety
- You have to show out the type std :: vector <StoragePtr> , although in reality these are just implementation details.
- A fairly voluminous code for connecting to a signal and receiving the current state, which is almost no different for different signals.
How to do better?
Let's take everything we do around the call to
connect (capturing a mutex and traversing the collection) inside.
It is important to understand that the algorithm for obtaining the current state depends on the nature of this state itself. If this is a collection, you need to call the handler for each element, if it is, for example, enum, then you need to call the handler exactly once. Accordingly, we need some kind of abstraction.
Add a
populator to the signal — a function that accepts a handler that is currently connected, and let the signal owner (StorageManager, in our case) determine how the current state will be sent to this handler.
template < typename Signature > class signal { using populator_type = std::function<void(const std::function<Signature>&)>; mutable std::mutex _mutex; std::list<std::function<Signature> > _handlers; populator_type _populator; public: signal(populator_type populator) : _populator(std::move(populator)) { } std::mutex& get_mutex() const { return _mutex; } signal_connection connect(std::function<Signature> handler) { std::lock_guard<std::mutex> l(_mutex); _populator(handler);
The
signal_connection class
currently accepts a lambda function that will remove the handler from the list in the signal. Slightly updated code I will bring later.
Rewrite StorageManager using this new concept:
class StorageManager { std::vector<StoragePtr> _storages; public: StorageManager() : _storages([&](const std::function<void(const StoragePtr&)>& h) { for (auto&& s : _storages) h(s); }) { } signal<void(const StoragePtr&)> OnStorageAdded; private: void ReportNewStorage(const StoragePtr& storage) {
If you are using C ++ 14, the populator may be quite short:
StorageManager() : _storages([&](auto&& h) { for (auto&& s : _storages) h(s); }) { }
Note that when calling the populator, the mutex is captured in the
signal :: connect method, so you don’t need to do this in the body of the populator itself.
The client code becomes quite short:
storageManager->OnStorageAdded.connect(&StorageHandler);
One line we simultaneously connect to the signal and get the current state of the object. Fine!
Non-blocking signal trip
Now it's time to write
MediaScanner . In the constructor, we connect to the signal
StorageManager :: OnStorageAdded , and in the destructor we disconnect.
class MediaScanner { private: boost::signals2::connection _connection; public: MediaScanner(const StorageManagerPtr& storageManager) { _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); } ~MediaScanner() { _connection.disconnect();
Alas, this code will fall from time to time. The reason is how the
disconnect method works in all the implementations I know. It guarantees that the next time the signal is called, the corresponding handler will not work. At the same time, if the handler is executed at that time in another stream, it will not be interrupted, and will continue to work with the destroyed
MediaScanner object.
Qt Solution
In Qt, each object belongs to a specific thread, and its handlers are called exclusively on this thread. To safely disconnect from a signal, you should call the
QObject :: deleteLater method , which guarantees that the real deletion will be made from the desired stream, and that no handler will be called after the deletion.
mediaScanner->deleteLater();
This is a good option if you are ready to fully integrate with Qt (abandon std :: thread at the core of your program in favor of QObject, QThread, and others).
Solution in boost :: signals2
Boost to solve this problem suggests using the
track /
track_foreign methods in the slot (i.e., handler). These methods accept the
weak_ptr to an arbitrary object, and the connection of the handler with the signal exists as long as each of the objects is alive, for which the slot “monitors”.
It works quite simply: in each slot there is a collection of
weak_ptr 's for monitored objects that “crash” (excuse me) for the duration of the execution of the handler. Thus, these objects are guaranteed not to be destroyed while the handler code has access to them. If any of the objects have already been destroyed, the connection is broken.
The problem is that we need to have a
weak_ptr for the object to be signed for this. In my opinion, the most adequate way to achieve this is to make the factory method in the
MediaScanner class, where to sign the created object with all the signals that it is interested in:
class MediaScanner { public: static std::shared_ptr<MediaScanner> Create(const StorageManagerPtr& storageManager) { std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex()); MediaScannerPtr result(new MediaScanner); boost::signals2::signal<void(const StoragePtr&)>::slot_type slot(bind(&MediaScanner::StorageHandler, result.get(), _1)); slot.track_foreign(result); storageManager->OnStorageAdded.connect(slot); for (auto&& storage : storageManager->GetStorages()) result->StorageHandler(storage); return result; } private: MediaScanner()
So, the disadvantages:
- Oh, a lot of code that you copy-paste every time
- MediaScanner's initialization broke up into two parts: subscribing to signals in the Create method, and everything else in the constructor
- You must use shared_ptr to store MediaScanner
- You are not sure that MediaScanner is deleted when you release the last external link to it. This can be a problem if he uses any limited resource that you want to reuse after releasing MediaScanner
How to do better?
Let's make the
disconnect method block, so that it guarantees that after it returns control, you can destroy everything the signal handler has access to. Something like the
std :: thread :: join method.
Looking ahead, I will say that we need three classes for this:
- life_token - controls the lifetime of the handler, allows you to mark it as "dying", and wait for the execution to finish if necessary
- life_token :: checker - stored inside the signal next to the processor, refers to its life_token
- life_token :: checker :: execution_guard - created on the stack at the time of execution of the handler, blocks the corresponding life_token and allows you to check if the handler has not “died”
The class code
signal_connection :
class signal_connection { life_token _token; std::function<void()> _eraseHandlerFunc; public: signal_connection(life_token token, std::function<void()> eraseHandlerFunc) : _token(token), _eraseHandlerFunc(eraseHandlerFunc) { } ~signal_connection(); { disconnect(); } void disconnect() { if (_token.released()) return; _token.release();
Here I must say that I am a supporter of the RAII-shnogo object connection. I will not dwell on this in detail; I will only say that it is irrelevant in this context.
The
signal class will also change a bit here:
template < typename Signature > class signal { using populator_type = std::function<void(const std::function<Signature>&)>; struct handler { std::function<Signature> handler_func; life_token::checker life_checker; }; mutable std::mutex _mutex; std::list<handler> _handlers; populator_type _populator; public:
Now, next to each handler, we have a
life_token :: checker object that refers to
life_token , which is in
signal_connection . We capture it for the execution time of the handler using the
life_token :: checker :: execution_guard object
I will hide the implementation of these objects under the spoiler. If you are tired, you can skip it.Inside
life_token we need the following things:
- Some operating system primitive for waiting in life_token :: release (here, for simplicity, let's take a mutex)
- Flag "alive / dead"
- Lock counter via execution_guard (omitted here for simplicity)
class life_token { struct impl { std::mutex mutex; bool alive = true; }; std::shared_ptr<impl> _impl; public: life_token() : _impl(std::make_shared<impl>()) { } ~life_token() { release(); } bool released() const { return !_impl; } void release() { if (released()) return; std::lock_guard<std::mutex> l(_impl->mutex); _impl->alive = false; _impl.reset(); } class checker { shared_ptr<impl> _impl; public: checker(const life_token& t) : _impl(t._impl) { } class execution_guard { shared_ptr<Impl> _impl; public: execution_guard(const checker& c) : _impl(c._impl) { _impl->mutex.lock(); } ~execution_guard() { _impl->mutex.unlock(); } bool is_alive() const { return _impl->alive; } }; }; };
Mutex is captured for the lifetime of
execution_guard . Accordingly, if the
life_token :: release method is called in another thread at this time, it will block on capturing the same mutex and wait until the end of the signal handler execution. After that, it will reset the
alive flag, and all subsequent calls to the signal will not lead to a call to the handler.
What is the
MediaScanner code
now ? Exactly the way we wanted to write it at the very beginning:
class MediaScanner { private: signals_connection _connection; public: MediaScanner(const StorageManagerPtr& storageManager) { _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); } ~MediaScanner() { _connection.disconnect(); } private: void StorageHandler(const StoragePtr& storage) { } };
Disabling an asynchronous handler does not cancel calls that are already in the queue of its thread.
We write
MediaUiModel , which will respond to the found media files and add lines to display them.
To do this, add the following signal to
MediaScanner :
signal<void(const MediaPtr&)> OnMediaFound;
There are two important things here:
- A model is an object of a UI library, so all actions on it must be performed from the UI thread.
- Often UI libraries use their own ownership hierarchy, so we cannot use shared_ptr to store the model. Accordingly, the focus from track / track_foreign will not work here, but this is not the main thing now, so let's pretend that everything is fine
class MediaUiModel : public UiModel<MediaUiModelRow> { private: boost::io_service& _uiThread; boost::signals2::connection _connection; public: MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner) : _uiThread(uiThread) { std::lock_guard<std::recursive_mutex> l(scanner.GetMutex()); scanner.OnMediaFound.connect([&](const MediaPtr& m) { this->MediaHandler(m); }); for (auto&& m : scanner.GetMedia()) AppendRow(MediaUiModelRow(m)) } ~MediaUiModel() { _connection.disconnect(); } private:
In addition to the previous problem, there is one more. Every time the signal is triggered, we shift the handler to the UI thread. If at some point we delete the model (for example, we left the Gallery application), all these handlers come later to the dead object. And fall again.
Qt Solution
All the same
deleteLater , with the same features.
Solution in boost :: signals2
If you're lucky, and your UI framework lets you say
deleteLater , you are saved. You just need to make a public method that will first disconnect the model from the signals, and then call
deleteLater , and you will get about the same behavior as in Qt. True, the previous problem you still have to solve. To do this, you will most likely do inside the
shared_ptr model for a certain class, which you will sign on signals. Code is not very small, but it is a matter of technology.
If you are unlucky, and your UI framework requires removing the model exactly when it wants, you will invent your
life_token .
For example, something like this (also better not to read if you are tired). template < typename Signature_ > class AsyncToUiHandlerWrapper { private: boost::io_service& _uiThread; std::function<Signature_> _realHandler; bool _released; mutable std::mutex _mutex; public: AsyncToUiHandlerWrapper(boost::io_service& uiThread, std::function<Signature_> realHandler) : _uiThread(uiThread), _realHandler(realHandler), _released(false) { } void Release() { std::lock_guard<std::mutex> l(_mutex); _released = true; } template < typename... Args_ > static void AsyncHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args) { auto self = selfWeak.lock(); std::lock_guard<std::mutex> l(self->_mutex); if (!self->_released)
I will not even comment on this code, let's just get a little sad.
How to do better?
Very simple. First, make the interface for the thread as a task queue:
struct task_executor { virtual ~task_executor() { } virtual void add_task(const std::function<void()>& task) = 0; };
Second, make the
connect method overloaded in the signal, which accepts the flow:
signal_connection connect(const std::shared_ptr<task_executor>& worker, std::function<Signature> handler);
In this method, in the
_handlers collection
, put a wrapper over the handler, which, when called,
puts the pair from the handler and the corresponding
life_token :: checker into the required stream. To call the real handler in the final thread, we will use
execution_guard in the same way as before.
Thus, the
disconnect method will guarantee that the asynchronous handlers will not be called after we disconnect from the signal either.
I will not give the code for the wrapper and overloaded method
connect here. I think the idea is clear and so.
The model code becomes quite simple:
class MediaUiModel : public UiModel<MediaUiModelRow> { private: signal_connection _connection; public: MediaUiModel(const std::shared_ptr<task_executor>& uiThread, const MediaScanner& scanner) { _connection = scanner.OnMediaFound.connect(uiThread, [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); }); } ~MediaUiModel() { _connection.reset(); } };
Here, the
AppendRow method will be called strictly in the UI thread, and only until we disconnect.
Summing up
So there are three key things that allow you to write much simpler code using signals:
- Populators allow you to conveniently get the current state during connection to the signal
- The blocking disconnect method allows you to unsubscribe an object in its own destructor.
- In order for the previous item to be true for asynchronous handlers, disconnect must also mark those calls that are already in the queue of the thread as "irrelevant"
Of course, the code of the signals that I gave here is very simple and primitive, and does not work very quickly.
My goal was to talk about an alternative approach, which seems to me more attractive than the dominant today. In reality, all these things can be written much more efficiently.We use this approach in our project for about five years and are very happy.Ready implementation
I rewrote with C ++ 11 from scratch the signals that we had, improved those parts of the implementation that had long been worth improving.Use on health: https://github.com/koplyarov/wigwag .Mini FAQ
Judging by the reaction of people to Reddit and Twitter, basically everyone is concerned with three questions:Q: Immediately you need to block life_token to call each handler. Would it be slow?A: Oddly enough, no. You can use atomic variables instead of a mutex, and if we hit the disconnect call at the moment the handler was executed, wait for std :: condition_variable . Then the result is absolutely the opposite: due to the missing overhead project in the form of track / track_foreign (which require working with the weak_ptr collections ), this implementation leaves memory and speed far behind boost :: signals2, and even outperforms Qt.Benchmarks can be found here .Q: Will there be deadlocks due to the blocking disconnect method?A: Yes, here deadlock is really a bit easier to get than in boost and Qt. In my opinion, this pays off with a simpler code for using signals and a higher speed of their work. In addition, if you carefully monitor who is subscribed to whom, then such situations are more likely an exception.Well, naturally deadlock'i ​​need to catch and repair. On Linux, I recommend Helgrind . For Windows, a two-minute Google search gives Intel Inspector and CHESS .If for some reason you cannot afford any of the above (for example, there is not enough memory on your platform to run helgrind or some kind of marginal operating system), there is a solution to this (again, simplified) mutex : class mutex { private: std::timed_mutex _m; public: void lock() { if (_m.try_lock()) return; while (!_m.try_lock_for(std::chrono::seconds(10))) Logger::Warning() << "Could not lock mutex " << (void*)this << " for a long time:\n" << get_backtrace_string(); }
Both in Visual Studio and GCC there are means for getting backtrace in code. In addition, there is a good libunwind.With this approach, most of your deadlocks will catch QA, and at one glance at the logs you will understand where everything is blocked. It will only repair.Q: Can I use one mutex for several signals? Can I handle exceptions the way I want? Is it possible not to use synchronization and get fast single-threaded signals?A: Can, can, can be. For all of this there are template strategies. Read more in the documentation.