typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; };
class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { wprintf(L"%d\n", *((int*)pContext)); } }; int _tmain(int argc, _TCHAR* argv[]) { CDispatcher Dispatcher; CListener Listener1; CListener Listener2; Dispatcher.Subscribe(&Listener1); Dispatcher.Subscribe(&Listener2); for(int i = 0; i < 5; ++i) { Dispatcher.SendMessage(&i); } Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); return 0; }
CDispatcher g_Dispatcher; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); } return 0; }
class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; };
CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { // SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { // ( ) g_Lock.Lock(); g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); g_Lock.Unlock(); } return 0; }
Window Flow A -> B Workflow B -> A
void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } }
typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; class CDispatcher { private: typedef std::vector<CSubscriberPtr> CSubscriberList; public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CSubscriberPtr toRelease; CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { toRelease = m_SubscriberList[i]; m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; };
CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { // SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); for(;;) { boost::shared_ptr<CListener> pListener1(new CListener); boost::shared_ptr<CListener> pListener2(new CListener); // ( ) g_Lock.Lock(); g_Dispatcher.Subscribe(pListener1); g_Dispatcher.Subscribe(pListener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(pListener1->GetSubscriberId()); g_Dispatcher.Unsubscribe(pListener2->GetSubscriberId()); g_Lock.Unlock(); } return 0; }
namespace Observer { ////////////////////////// // Subscriber ////////////////////////// typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; virtual void UnsubscribeHandler() = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; ////////////////////////////////////////////////////////////////////// // Dispatcher /////////////////////////////////// class CDispatcher { private: class CSubscriberItem { public: CSubscriberItem(CSubscriberPtr pSubscriber) :m_pSubscriber(pSubscriber) { } ~CSubscriberItem() { m_pSubscriber->UnsubscribeHandler(); }; CSubscriberPtr Subscriber()const {return m_pSubscriber;} private: CSubscriberPtr m_pSubscriber; }; typedef boost::shared_ptr<CSubscriberItem> CSubscriberItemPtr; typedef std::vector<CSubscriberItemPtr> CSubscriberList; typedef boost::shared_ptr<CSubscriberList> CSubscriberListPtr; public: CDispatcher() { } private: CDispatcher(const CDispatcher&){} CDispatcher& operator=(const CDispatcher&){return *this;} public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { //Declaration of the next shared pointer before ScopeLocker //prevents release of subscribers from under lock CSubscriberListPtr pNewSubscriberList(new CSubscriberList()); //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(m_pSubscriberList) { //Copy existing subscribers pNewSubscriberList->assign(m_pSubscriberList->begin(), m_pSubscriberList->end()); } for(size_t i = 0; i < pNewSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*pNewSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } //Add new subscriber to new subscriber list pNewSubscriberList->push_back(CSubscriberItemPtr(new CSubscriberItem(pNewSubscriber))); //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { //Declaration of the next shared pointers before ScopeLocker //prevents release of subscribers from under lock CSubscriberItemPtr pSubscriberItemToRelease; CSubscriberListPtr pNewSubscriberList; //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return false; } pNewSubscriberList = CSubscriberListPtr(new CSubscriberList()); for(size_t i = 0; i < m_pSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*m_pSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == id) { pSubscriberItemToRelease = pSubscriberItem; } else { pNewSubscriberList->push_back(pSubscriberItem); } } //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; if(!pSubscriberItemToRelease.get()) { return false; } return true; } void SendMessage(void* pContext) { CSubscriberListPtr pSubscriberList; { CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return; } //Get shared pointer to an existing list of subscribers pSubscriberList = m_pSubscriberList; } //pSubscriberList pointer to copy of subscribers' list for(size_t i = 0; i < pSubscriberList->size(); ++i) { (*pSubscriberList)[i]->Subscriber()->MessageHandler(pContext); } } private: CSubscriberListPtr m_pSubscriberList; CLock m_Lock; }; }; //namespace Observer
Source: https://habr.com/ru/post/108857/
All Articles