📜 ⬆️ ⬇️

Multithread Observer in C ++ (practice)

There are many variations on this pattern, but most examples are not suitable for multi-threaded applications.
In this article, I want to share the experience of using a pattern in multi-threaded applications and describe the main problems that I had to face.
The purpose of this article is to draw the attention of developers to the problems that can be encountered when creating multi-threaded applications. Identify pitfalls in the implementation of communication between components in a multi-threaded application.
If you need a ready-made solution, pay attention to the Signals2 library, which has been included in boost since May 2009 .
I'm not trying to provide a solution that could be used in a finished form. But nevertheless, having familiarized with the material, you can do without the use of third-party libraries, in those projects in which they are not available or undesirable for some reason (drivers, low-level applications, etc.).

Subject area


Characters

NotificationSender - an object that sends messages.
Typically, this is a workflow that notifies you of a change in its state that needs to be displayed on the user interface.
NotificationListener - an object that implements the processing of notifications.
Typically, this is an object that controls the display of the part of the user interface associated with the background task.
There can be many such objects, and they can be connected / disconnected dynamically (for example, opening a remote window where task execution details are shown)
NotificationDispatcher - an object that controls subscribers and sending messages.

Interaction between objects

Sending messages to all subscribers.
The subscription / withdrawal process.
The lifetime of objects.
This article describes the method of synchronous messaging. This means that the call to the SendMessage function will be synchronous, and the thread calling this method will wait for all subscribers to process the messages. In some cases, this approach is more convenient asynchronous mailing, but it also has difficulty with the termination of the subscription.

The simplest implementation for a single-threaded environment


typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; }; 

Here the unique identifier of the subscriber is the address of the object of the subscriber, the function GetSubscriberId always returns the same value for one object of the subscriber regardless of the type conversion.
')
Usage example

 class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { wprintf(L"%d\n", *((int*)pContext)); } }; int _tmain(int argc, _TCHAR* argv[]) { CDispatcher Dispatcher; CListener Listener1; CListener Listener2; Dispatcher.Subscribe(&Listener1); Dispatcher.Subscribe(&Listener2); for(int i = 0; i < 5; ++i) { Dispatcher.SendMessage(&i); } Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); return 0; } 

Disable the subscriber inside the message handler


In the example there is a problem that is not related to multithreading. This problem occurs when we try to unsubscribe inside a MessageHandler handler. This problem will be solved by copying the list of subscribers before calling the MessageHandler.

Go to a multithreaded environment.


With one thread, such code will work quite stably.
Let's see what happens when multiple threads are running.
 CDispatcher g_Dispatcher; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); } return 0; } 

Sooner or later, a crash will occur.
The problem is to add / remove subscribers and simultaneously send notifications (multi-threaded access to CDispatcher :: m_SubscriberList in our example).
Here you need to synchronize access to the list of subscribers.

Synchronize access to the list of subscribers


 class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; }; 

Access synchronization was implemented using synchronization objects (Critical section or Mutex).
For greater portability and in order not to be distracted from the essence of what is happening, we abstract away from direct calls to platform-dependent functions, such as EnterCriticalSection. For this is the class CLock.
For robust c ++ exceptions, it is convenient to use the RAII technology, namely the CScopeLocker class, which captures the synchronization object in the constructor and frees it in the destructor.
With this implementation, the program will not fall, but another unpleasant situation awaits us.

Combating deadlock


Suppose we have a thread that performs some kind of background task and there is a window that displays the progress of the task.
As a rule, a thread sends a notification to a window class, which in turn calls the system function SendMessage, which initiates some actions in the context of a window procedure.
The system function SendMessage is blocking, it sends a message to the window stream and waits for it to process it.
If the connection / disconnection of the object-listener will also occur in the context of a window procedure (in the window stream), a mutual blocking of threads, the so-called deadlock, is possible.
Such a deadlock can be rarely reproduced (at the time of calling Subscribe / Unsubscribe and simultaneously calling MessageHandler in a separate thread)
The following code emulates a situation with a blocking call to the SendMessage system function.

 CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { //   SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { //    (  ) g_Lock.Lock(); g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); g_Lock.Unlock(); } return 0; } 

The problem is that the main thread captures the global synchronization object g_Lock (similar to a window procedure, it runs in the context of a window stream), and then calls the Subscribe / Unsubscribe method, which inside tries to capture the second synchronization object CDispatcher :: m_Lock.
At this point, the worker thread sends a notification, capturing CDispatcher :: m_Lock in the CDispatcher :: SendMessage function, and then tries to capture the global synchronization object g_Lock (in the same way as the window, it calls the SendMessage system function).

 Window Flow A -> B
 Workflow B -> A

This can be called the classic deadlock.
The problem lies in the CDispatcher :: SendMessage () function.
Here the rule should be followed - you cannot call a callback function while capturing any synchronization object.
So, remove the lock when sending notifications.

 void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } } 

Monitoring subscriber lifetime


After we removed the deadlock, another problem appeared - the lifetime of the subscriber objects.
We no longer have a guarantee that the MessageHandler method will not be called after calling Unsubscribe, and therefore we cannot delete the subscriber object immediately after calling Unsubscribe.
In this situation, it is easiest to control the lifetime of the subscriber objects using the reference count.
To do this, you can use COM technology - inherit the CSubscriber interface from IUnknown and use ATL CComPtr for the list of subscriber objects, that is, replace std :: vector <CSubscriber *> with std :: vector <CComPtr>.
But such an implementation is fraught with additional costs for implementing subscriber classes, since in each of them the AddRef / Release methods and the unnecessary QueryInterface should be implemented, although if the project actively uses COM, then this approach can be beneficial.
Smart pointers are well suited to control the lifetime of subscriber objects using a link counter.

Simple implementation for multithreaded environment


 typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; class CDispatcher { private: typedef std::vector<CSubscriberPtr> CSubscriberList; public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CSubscriberPtr toRelease; CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { toRelease = m_SubscriberList[i]; m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; }; 

In this implementation, I replaced the “bare” CSubscriber * pointer with a “smart” pointer with a reference counter, which turned out to be in the boost library.
Also, I added the toRelease variable to the Unsubscribe function in order to call the subscriber's destructor after the Unlock call (you cannot call the callback function, including the destructor of the subscriber object, capturing any synchronization object).
It is worth paying attention to the fact that the SendMessage function copies the list of smart pointers (after copying, all pointers increase their reference counters, and when they exit the function, they decrease, which controls the lifetime of the subscriber objects)

We are testing


 CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { //   SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); for(;;) { boost::shared_ptr<CListener> pListener1(new CListener); boost::shared_ptr<CListener> pListener2(new CListener); //    (  ) g_Lock.Lock(); g_Dispatcher.Subscribe(pListener1); g_Dispatcher.Subscribe(pListener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(pListener1->GetSubscriberId()); g_Dispatcher.Unsubscribe(pListener2->GetSubscriberId()); g_Lock.Unlock(); } return 0; } 

Optimized implementation for multithreaded environment


As a rule, the SendMessage function call will occur much more frequently than Subscribe / Unsubscribe. With a large number of subscribers, the bottleneck may be copying the list of subscribers within SendMessage.
Copying the list of subscribers can be transferred in the function Subscribe / Unsubscribe. It will be similar to the methodology of the lock-free algorithms.
The CDispatcher object will not keep the list of subscribers directly, but with a smart pointer. Inside the SendMessage function, we will receive a pointer to the current list of subscribers and work with it. In the Subscribe / Unsubscribe functions, we will create a new list of subscribers each time and redirect the pointer inside the CDispatcher object to a new list of subscribers. Thus, while the pointer to the list of subscribers in the CDispatcher object will already point to a new list of subscribers, the SendMessage function will still work with the old list. Since no one changes the old list of subscribers, everything will work stably in a multithreaded environment.
In principle, you can slightly modify the functions of Subscribe / Unsubscribe and implement a completely lock-free algorithm, but this is another topic.
Medot Unsubscribe is asynchronous and does not guarantee after its completion a complete termination of the mailing, half the solution - the subscriber receives a notice of termination of the subscription using the function UnsubscribeHandler. To implement this behavior, an intermediate class CSubscriberItem was added, which in its destructor invokes the UnsubscribeHandler function.
 namespace Observer { ////////////////////////// // Subscriber ////////////////////////// typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; virtual void UnsubscribeHandler() = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; ////////////////////////////////////////////////////////////////////// // Dispatcher /////////////////////////////////// class CDispatcher { private: class CSubscriberItem { public: CSubscriberItem(CSubscriberPtr pSubscriber) :m_pSubscriber(pSubscriber) { } ~CSubscriberItem() { m_pSubscriber->UnsubscribeHandler(); }; CSubscriberPtr Subscriber()const {return m_pSubscriber;} private: CSubscriberPtr m_pSubscriber; }; typedef boost::shared_ptr<CSubscriberItem> CSubscriberItemPtr; typedef std::vector<CSubscriberItemPtr> CSubscriberList; typedef boost::shared_ptr<CSubscriberList> CSubscriberListPtr; public: CDispatcher() { } private: CDispatcher(const CDispatcher&){} CDispatcher& operator=(const CDispatcher&){return *this;} public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { //Declaration of the next shared pointer before ScopeLocker //prevents release of subscribers from under lock CSubscriberListPtr pNewSubscriberList(new CSubscriberList()); //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(m_pSubscriberList) { //Copy existing subscribers pNewSubscriberList->assign(m_pSubscriberList->begin(), m_pSubscriberList->end()); } for(size_t i = 0; i < pNewSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*pNewSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } //Add new subscriber to new subscriber list pNewSubscriberList->push_back(CSubscriberItemPtr(new CSubscriberItem(pNewSubscriber))); //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { //Declaration of the next shared pointers before ScopeLocker //prevents release of subscribers from under lock CSubscriberItemPtr pSubscriberItemToRelease; CSubscriberListPtr pNewSubscriberList; //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return false; } pNewSubscriberList = CSubscriberListPtr(new CSubscriberList()); for(size_t i = 0; i < m_pSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*m_pSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == id) { pSubscriberItemToRelease = pSubscriberItem; } else { pNewSubscriberList->push_back(pSubscriberItem); } } //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; if(!pSubscriberItemToRelease.get()) { return false; } return true; } void SendMessage(void* pContext) { CSubscriberListPtr pSubscriberList; { CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return; } //Get shared pointer to an existing list of subscribers pSubscriberList = m_pSubscriberList; } //pSubscriberList pointer to copy of subscribers' list for(size_t i = 0; i < pSubscriberList->size(); ++i) { (*pSubscriberList)[i]->Subscriber()->MessageHandler(pContext); } } private: CSubscriberListPtr m_pSubscriberList; CLock m_Lock; }; }; //namespace Observer 

Links


Library boost :: signals2 article
Smart pointers Jeff Alger
Resource Acquisition Is Initialization (RAII) Wikipedia
Comments on the first version of this article can be found here.

Source: https://habr.com/ru/post/108857/


All Articles