





UI , Mem Cache , Disk Cache , Network are the objects that perform the corresponding operations on our newly created Handler .Handler performs a simple sequence:Mem Cache and Disk Cache . If successful, that is, when receiving a response with the result found from at least one cache, it returns the result immediately. And in case of failure (as in the diagram), the execution continues.Handler accesses the Network to retrieve the object over the network. To do this, connect to the service ( connect ), send a request ( send ) and receive a response ( receive ). Such operations are performed asynchronously and do not block other network interactions.Network component is written in parallel to both caches.Handler operation and to which the result should return;Mem Cache and Disk Cache ;Network . It is created separately from the main thread pool so that the load on the main pool does not affect the network thread pool. // stub: struct DiskCache { boost::optional<std::string> get(const std::string& key) { JLOG("get: " << key); return boost::optional<std::string>(); } void set(const std::string& key, const std::string& val) { JLOG("set: " << key << ";" << val); } }; // : - struct MemCache { boost::optional<std::string> get(const std::string& key) { auto it = map.find(key); return it == map.end() ? boost::optional<std::string>() : boost::optional<std::string>(it->second); } void set(const std::string& key, const std::string& val) { map[key] = val; } private: std::unordered_map<std::string, std::string> map; }; struct Network { // ... // std::string get(const std::string& key) { net::Socket socket; JLOG("connecting"); socket.connect(address, port); // - Buffer sz(1, char(key.size())); socket.write(sz); // - socket.write(key); // socket.read(sz); Buffer val(size_t(sz[0]), 0); // socket.read(val); JLOG("val received"); return val; } private: std::string address; int port; // ... }; // UI-: UI struct UI : IScheduler { void schedule(Handler handler) { // UI- // ... } void handleResult(const std::string& key, const std::string& val) { TLOG("UI result inside UI thread: " << key << ";" << val); // TODO: add some actions } }; Activity.runOnUiThread , Ultimate ++: PostCallback , Qt: via the signal-slot mechanism). These methods should be used in the implementation of the UI::schedule method. // ThreadPool cpu(3, "cpu"); // ThreadPool net(2, "net"); // Alone diskStorage(cpu, "disk storage"); // Alone memStorage(cpu, "mem storage"); // scheduler<DefaultTag>().attach(cpu); // service<NetworkTag>().attach(net); // service<TimeoutTag>().attach(cpu); // portal<DiskCache>().attach(diskStorage); // portal<MemCache>().attach(memStorage); // portal<Network>().attach(net); UI& ui = single<UI>(); // UI- UI- portal<UI>().attach(ui); go([key] { // timeout : 1=1000 Timeout t(1000); std::string val; // boost::optional<std::string> result = goAnyResult<std::string>({ [&key] { return portal<DiskCache>()->get(key); }, [&key] { return portal<MemCache>()->get(key); } }); if (result) { // val = std::move(*result); JLOG("cache val: " << val); } else { // // { // : 0.5=500 Timeout tNet(500); val = portal<Network>()->get(key); } JLOG("net val: " << val); // // ( ) EventsGuard guard; // goWait({ [&key, &val] { portal<DiskCache>()->set(key, val); }, [&key, &val] { portal<MemCache>()->set(key, val); } }); JLOG("cache updated"); } // UI portal<UI>()->handleResult(key, val); }); 

void goWait(Handler); void goWait(Handler handler) { deferProceed([&handler](Handler proceed) { go([proceed, &handler] { // handler(); proceed(); // }); }); } deferProceed function, which is implemented as follows: typedef std::function<void(Handler)> ProceedHandler; void deferProceed(ProceedHandler proceed) { auto& coro = currentCoro(); defer([&coro, proceed] { proceed([&coro] { coro.resume(); }); }); } defer and why it should be used, described in my previous article ), namely: it accepts not Handler , but ProceedHandler , in which Handler is passed as an input parameter to continue execution of the coroutine. Actually, proceed itself saves in its object a reference to the current coroutine and calls coro.resume() . Thus, we encapsulate all the work with coroutines, and the user only needs to work with the proceed handler.goWait function. So, when we call deferProceed , we have proceed , which needs to be called at the end of the operation in the handler . All we have to do is create a new coroutine, run our handler handler in it, and after it completes, immediately call proceed, which internally calls coro.resume() , thus continuing the execution of the original coroutine.goWait we sort of pause our operations in the current coroutine, and when the transferred handler finishes, we continue execution as if nothing had happened. void goWait(std::initializer_list<Handler> handlers); proceed . Each handler at its completion will update the counter, and thus the last of the handlers will continue the execution of the original coroutine. However, there is one small difficulty: the counter must be divided between the running coroutines, and the last handler must not only call proceed, but also delete this counter from memory. All this can be implemented as follows: void goWait(std::initializer_list<Handler> handlers) { deferProceed([&handlers](Handler proceed) { std::shared_ptr<void> proceeder(nullptr, [proceed](void*) { proceed(); }); for (const auto& handler: handlers) { go([proceeder, &handler] { handler(); }); } }); } deferProceed , but inside it is hidden a little magic. Few people know that when constructing shared_ptr you can pass not only a pointer to the data, but also a deleter , which will delete the object, calling not the delete ptr , but the handler. Actually, there we also will thrust a call proceed, in the end to continue the initial coroutine. And there is no need to delete the object itself, since we put “nothing” there - nullptr . Then everything is simple: in the loop we go through all the handlers and run them in the generated coroutines. Here, too, there is one nuance: we capture our proceeder by value, which will lead to its copying, which means an increase in our atomic reference counter inside shared_ptr . After the handler is finished, our lambda with captured proceeder will be deleted, which will decrease the counter. The one who last will reduce the counter to zero and delete the proceeder object will call deleter for the shared shared_ptr , that is, it will eventually coro.proceed() .

int fibo (int v) { if (v < 2) return v; int v1, v2; goWait({ [v, &v1] { v1 = fibo(v-1); }, [v, &v2] { v2 = fibo(v-2); } }); return v1 + v2; } fibo function occurs in its own coroutine.Waiter primitive with the following interface: struct Waiter { Waiter& go(Handler); void wait(); }; Waiter object.proceeder that continues the work of our coroutine. However, a small subtlety is added: now proceeder is divided between the running coroutines and the Waiter object. Accordingly, at the time the wait method is called, we need to get rid of the copy in the Waiter itself. Here's how to do it: void Waiter::wait() { if (proceeder.unique()) { // Waiter proceeder => JLOG("everything done, nothing to do"); return; } defer([this] { // proceeder auto toDestroy = std::move(proceeder); // proceeder , // - }); // proceeder , // init0(); } shared_ptr . Amen!
Waiter : int fibo (int v) { if (v < 2) return v; int v1; Waiter w; w.go([v, &v1] { v1 = fibo(v-1); }); int v2 = fibo(v-2); w.wait(); return v1 + v2; } int fibo (int v) { if (v < 2) return v; int v1, v2; Waiter() .go([v, &v1] { v1 = fibo (v-1); }) .go([v, &v2] { v2 = fibo (v-2); }) .wait(); return v1 + v2; } size_t goAnyWait(std::initializer_list<Handler> handlers); void* ptr == nullptr , but quite a specific atomic counter counter . At the very beginning, it is initialized to 0 . Each handler at the end of its work increases the counter. And if it suddenly turned out that a change in the value from 0 to 1 occurred, then he and only he calls proceed() : size_t goAnyWait(std::initializer_list<Handler> handlers) { VERIFY(handlers.size() >= 1, "Handlers amount must be positive"); size_t index = static_cast<size_t>(-1); deferProceed([&handlers, &index](Handler proceed) { std::shared_ptr<std::atomic<int>> counter = std::make_shared<std::atomic<int>>(); size_t i = 0; for (const auto& handler: handlers) { go([counter, proceed, &handler, i, &index] { handler(); if (++ *counter == 1) { // , ! index = i; proceed(); } }); ++ i; } }); VERIFY(index < handlers.size(), "Incorrect index returned"); return index; } boost::optional<T_result> , while goAnyResult obtained with this plain prototype: template<typename T_result> boost::optional<T_result> goAnyResult( std::initializer_list< std::function< boost::optional<T_result>() > > handlers) T_result . That is, handlers must have a signature: boost::optional<T_result> handler(); counter , and if we get 1 when increasing it, then it is necessary to return the “empty” value, since no one has been able to distort the counter and return the required result. Thus, instead of a simple atomic value for counter we have a whole Counter object: template<typename T_result> boost::optional<T_result> goAnyResult( std::initializer_list< std::function< boost::optional<T_result>() > > handlers) { typedef boost::optional<T_result> Result; typedef std::function<void(Result&&)> ResultHandler; struct Counter { Counter(ResultHandler proceed_) : proceed(std::move(proceed_)) {} ~Counter() { tryProceed(Result()); // - } void tryProceed(Result&& result) { if (++ counter == 1) proceed(std::move(result)); } private: std::atomic<int> counter; ResultHandler proceed; }; Result result; deferProceed([&handlers, &result](Handler proceed) { std::shared_ptr<Counter> counter = std::make_shared<Counter>( [&result, proceed](Result&& res) { result = std::move(res); proceed(); } ); for (const auto& handler: handlers) { go([counter, &handler] { Result result = handler(); if (result) // counter->tryProceed(std::move(result)); }); } }); return result; } std::move only moves the result when the condition inside tryProceed is satisfied. And all because std::move does not perform the movement as such, no matter how someone would like it. This is just a cast operation on links. struct IScheduler : IObject { virtual void schedule(Handler handler) = 0; }; 
typedef boost::asio::io_service IoService; struct IService : IObject { virtual IoService& ioService() = 0; }; struct ThreadPool : IScheduler, IService { ThreadPool(size_t threadCount); void schedule(Handler handler) { service.post(std::move(handler)); } private: IoService& ioService(); std::unique_ptr<boost::asio::io_service::work> work; boost::asio::io_service service; std::vector<std::thread> threads; }; boost::asio::io_service::post .work class that holds the io_service event io_service , otherwise, if there are no events, the cycle will complete its work and the threads will close.IService interface with the ioService method, which is returned by IoService , which is boost::asio::io_service . All this looks strange, but now I will try to explain what the trick is.boost::asio::io_service . The remaining components, which I will use in the future, should somehow get access to the instance boost::asio::io_service . To prevent easy access to this class, I introduced the IService interface, which allows you to receive the coveted instance. However, in the implementation of the method is made private. This provides some level of protection against misuse, since in order to pull this object out, you first need to convert the ThreadPool to IService , and then call the desired method. An alternative would be to use friendly classes. But I didn’t want to spoil ThreadPool knowledge of possible uses, so I thought that the approach used was a reasonable price for encapsulation.Journey (why it will be clear later): struct Journey { void proceed(); Handler proceedHandler(); void defer(Handler handler); void deferProceed(ProceedHandler proceed); static void create(Handler handler, mt::IScheduler& s); private: Journey(mt::IScheduler& s); struct CoroGuard { CoroGuard(Journey& j_) : j(j_) { j.onEnter0(); } ~CoroGuard() { j.onExit0(); } coro::Coro* operator->() { return &j.coro; } private: Journey& j; }; void start0(Handler handler); void schedule0(Handler handler); CoroGuard guardedCoro0(); void proceed0(); void onEnter0(); void onExit0(); mt::IScheduler* sched; coro::Coro coro; Handler deferHandler; }; create .Journey sched , coro deferHandler -, defer .CoroGuard – -, onEnter0 onExit0 . void Journey::schedule0(Handler handler) { VERIFY(sched != nullptr, "Scheduler must be set in journey"); sched->schedule(std::move(handler)); } void Journey::proceed0() { // guardedCoro0()->resume(); } Journey::CoroGuard Journey::guardedCoro0() { return CoroGuard(*this); } // void Journey::proceed() { schedule0([this] { proceed0(); }); } // , Handler Journey::proceedHandler() { return [this] { proceed(); }; } // // . 1 void Journey::start0(Handler handler) { schedule0([handler, this] { // guardedCoro0()->start([handler] { JLOG("started"); // try { handler(); } catch (std::exception& e) { (void) e; JLOG("exception in coro: " << e.what()); } JLOG("ended"); }); }); } void Journey::defer(Handler handler) { // deferHandler = handler; // coro::yield(); } // deferProceed, void Journey::deferProceed(ProceedHandler proceed) { defer([this, proceed] { proceed(proceedHandler()); }); } deferHandler . TLS Journey* t_journey = nullptr; void Journey::onEnter0() { t_journey = this; } // . 2 void Journey::onExit0() { if (deferHandler == nullptr) { // => , delete this; } else { // deferHandler(); deferHandler = nullptr; } // , t_journey = nullptr; } create : void Journey::create(Handler handler, mt::IScheduler& s) { (new Journey(s))->start0(std::move(handler)); } Journey , , . , … void Journey::teleport(mt::IScheduler& s) { if (&s == sched) { JLOG("the same destination, skipping teleport <-> " << s.name()); return; } JLOG("teleport " << sched->name() << " -> " << s.name()); sched = &s; defer(proceedHandler()); } defer , . , .
Scheduler / Thread Scheduler2 / Thread2 :
auto result = someCalculations(); teleport(uiScheduler); showResult(result); teleport(calcScheduler); auto newResult = continueSmartCalculations(result); teleport(uiScheduler); updateResult(newResult); //… 
struct Portal { Portal(mt::IScheduler& destination) : source(journey().scheduler()) { JLOG("creating portal " << source.name() << " <=> " << destination.name()); teleport(destination); } ~Portal() { teleport(source); } private: mt::IScheduler& source; }; ThreadPool tp1(1, "tp1"); ThreadPool tp2(1, "tp2"); go([&tp2] { Portal p(tp2); JLOG("throwing exception"); throw std::runtime_error("exception occur"); }, tp1); 
struct Scheduler { Scheduler(); void attach(mt::IScheduler& s) { scheduler = &s; } void detach() { scheduler = nullptr; } operator mt::IScheduler&() const { VERIFY(scheduler != nullptr, "Scheduler is not attached"); return *scheduler; } private: mt::IScheduler* scheduler; }; struct DefaultTag; template<typename T_tag> Scheduler& scheduler() { return single<Scheduler, T_tag>(); } template<typename T> struct WithPortal : Scheduler { struct Access : Portal { Access(Scheduler& s) : Portal(s) {} T* operator->() { return &single<T>(); } }; Access operator->() { return *this; } }; template<typename T> WithPortal<T>& portal() { return single<WithPortal<T>>(); } ThreadPool tp1(1, "tp1"); ThreadPool tp2(1, "tp2"); struct X { void op() {} }; portal<X>().attach(tp2); go([] { portal<X>()->op(); }, tp1); X tp2. , X ( return &single<T>() ) . Journey -, !
struct Alone : mt::IScheduler { Alone(mt::IService& service); void schedule(Handler handler) { strand.post(std::move(handler)); } private: boost::asio::io_service::strand strand; }; Alone IService , io_service::strand boost.asio . boost.asio , , . , (mutual exclusion).Alone , , . struct MemCache { boost::optional<std::string> get(const std::string& key); void set(const std::string& key, const std::string& val); }; // ThreadPool common_pool(3); // Alone mem_alone(common_pool); // portal<MemCache>().Attach(mem_alone); // // auto value = portal<MemCache>()->get(key); // portal<MemCache>()->set(anotherKey, anotherValue); 

enum EventStatus { ES_NORMAL, ES_CANCELLED, ES_TIMEDOUT, }; struct EventException : std::runtime_error { EventException(EventStatus s); EventStatus status(); private: EventStatus st; }; struct Goer { Goer(); EventStatus reset(); bool cancel(); bool timedout(); private: struct State { State() : status(ES_NORMAL) {} EventStatus status; }; bool setStatus0(EventStatus s); State& state0(); std::shared_ptr<State> state; }; Journey : void Journey::handleEvents() { // if (!eventsAllowed || std::uncaught_exception()) return; auto s = gr.reset(); if (s == ES_NORMAL) return; // throw EventException(s); } void Journey::disableEvents() { handleEvents(); eventsAllowed = false; } void Journey::enableEvents() { eventsAllowed = true; handleEvents(); } struct EventsGuard { EventsGuard(); // disableEvents() ~EventsGuard(); // enableEvents() }; handleEvents ? : void Journey::defer(Handler handler) { // handleEvents(); deferHandler = handler; coro::yield(); // handleEvents(); } handleEvents . . Goer go(Handler handler, mt::IScheduler& scheduler) { return Journey::create(std::move(handler), scheduler); } Journey::create Goer : struct Journey { // … Goer goer() const { return gr; } // … private: // … Goer gr; }; Goer Journey::create(Handler handler, mt::IScheduler& s) { return (new Journey(s))->start0(std::move(handler)); } // . 1 Goer Journey::start0(Handler handler) { // … return goer(); } : Goer op = go(myMegaHandler); // … If (weDontNeedMegaHandlerAnymore) op.cancel(); op.cancel() , handleEvents() .
Journey , -, , go . , , . : go , defer , deferProceed . ., Journey , TLS. struct Timeout { Timeout(int ms); ~Timeout(); private: boost::asio::deadline_timer timer; }; boost::asio::deadline_timer : Timeout::Timeout(int ms) : timer(service<TimeoutTag>(), boost::posix_time::milliseconds(ms)) { // Goer goer = journey().goer(); // timer.async_wait([goer](const Error& error) mutable { // mutable, goer if (!error) // , goer.timedout(); }); } Timeout::~Timeout() { // timer.cancel_one(); // , handleEvents(); } // Timeout t(100); // 100 for (auto element: container) { performOperation(element); handleEvents(); } 
// 200 Timeout outer(200); portal<MyObject>()->performOp(); { // 100 // Timeout inner(100); portal<MyAnotherObject>()->performAnotherOp(); // EventsGuard guard; performGuardedAction(); } Goer Journey::start0(Handler handler) { schedule0([handler, this] { guardedCoro0()->start([handler] { JLOG("started"); try { handler(); } catch (std::exception& e) { (void) e; JLOG("exception in coro: " << e.what()); } JLOG("ended"); }); }); return goer(); } Goer Journey::start0(Handler handler) { + Goer gr = goer(); schedule0([handler, this] { guardedCoro0()->start([handler] { JLOG("started"); @@ -121,7 +122,7 @@ JLOG("ended"); }); }); - return goer(); + return gr; } 
void Journey::onExit0() { if (deferHandler == nullptr) { delete this; } else { deferHandler(); deferHandler = nullptr; } t_journey = nullptr; } { @@ -153,8 +154,8 @@ - deferHandler(); - deferHandler = nullptr; + Handler handler = std::move(deferHandler); + handler(); } 

struct A { ~A() { TLOG("~A"); } }; struct B:A { ~B() { TLOG("~B"); } }; struct C { ~C() { TLOG("~C"); } }; ThreadPool tp(1, "tp"); go([] { A* a = gcnew<B>(); C* c = gcnew<C>(); }, tp); tp#1: ~C tp#1: ~B tp#1: ~A template<typename T, typename... V> T* gcnew(V&&... v) { return gc().add(new T(std::forward(v)...)); } GC& gc() { return journey().gc; } struct GC { ~GC() { // for (auto& deleter: boost::adaptors::reverse(deleters)) deleter(); } template<typename T> T* add(T* t) { // T deleters.emplace_back([t] { delete t; }); return t; } private: std::vector<Handler> deleters; }; GC Journey , . : , .
Source: https://habr.com/ru/post/240525/
All Articles