UI
, Mem Cache
, Disk Cache
, Network
are the objects that perform the corresponding operations on our newly created Handler
.Handler
performs a simple sequence:Mem Cache
and Disk Cache
. If successful, that is, when receiving a response with the result found from at least one cache, it returns the result immediately. And in case of failure (as in the diagram), the execution continues.Handler
accesses the Network
to retrieve the object over the network. To do this, connect to the service ( connect
), send a request ( send
) and receive a response ( receive
). Such operations are performed asynchronously and do not block other network interactions.Network
component is written in parallel to both caches.Handler
operation and to which the result should return;Mem Cache
and Disk Cache
;Network
. It is created separately from the main thread pool so that the load on the main pool does not affect the network thread pool. // stub: struct DiskCache { boost::optional<std::string> get(const std::string& key) { JLOG("get: " << key); return boost::optional<std::string>(); } void set(const std::string& key, const std::string& val) { JLOG("set: " << key << ";" << val); } }; // : - struct MemCache { boost::optional<std::string> get(const std::string& key) { auto it = map.find(key); return it == map.end() ? boost::optional<std::string>() : boost::optional<std::string>(it->second); } void set(const std::string& key, const std::string& val) { map[key] = val; } private: std::unordered_map<std::string, std::string> map; }; struct Network { // ... // std::string get(const std::string& key) { net::Socket socket; JLOG("connecting"); socket.connect(address, port); // - Buffer sz(1, char(key.size())); socket.write(sz); // - socket.write(key); // socket.read(sz); Buffer val(size_t(sz[0]), 0); // socket.read(val); JLOG("val received"); return val; } private: std::string address; int port; // ... }; // UI-: UI struct UI : IScheduler { void schedule(Handler handler) { // UI- // ... } void handleResult(const std::string& key, const std::string& val) { TLOG("UI result inside UI thread: " << key << ";" << val); // TODO: add some actions } };
Activity.runOnUiThread
, Ultimate ++: PostCallback
, Qt: via the signal-slot mechanism). These methods should be used in the implementation of the UI::schedule
method. // ThreadPool cpu(3, "cpu"); // ThreadPool net(2, "net"); // Alone diskStorage(cpu, "disk storage"); // Alone memStorage(cpu, "mem storage"); // scheduler<DefaultTag>().attach(cpu); // service<NetworkTag>().attach(net); // service<TimeoutTag>().attach(cpu); // portal<DiskCache>().attach(diskStorage); // portal<MemCache>().attach(memStorage); // portal<Network>().attach(net); UI& ui = single<UI>(); // UI- UI- portal<UI>().attach(ui);
go([key] { // timeout : 1=1000 Timeout t(1000); std::string val; // boost::optional<std::string> result = goAnyResult<std::string>({ [&key] { return portal<DiskCache>()->get(key); }, [&key] { return portal<MemCache>()->get(key); } }); if (result) { // val = std::move(*result); JLOG("cache val: " << val); } else { // // { // : 0.5=500 Timeout tNet(500); val = portal<Network>()->get(key); } JLOG("net val: " << val); // // ( ) EventsGuard guard; // goWait({ [&key, &val] { portal<DiskCache>()->set(key, val); }, [&key, &val] { portal<MemCache>()->set(key, val); } }); JLOG("cache updated"); } // UI portal<UI>()->handleResult(key, val); });
void goWait(Handler);
void goWait(Handler handler) { deferProceed([&handler](Handler proceed) { go([proceed, &handler] { // handler(); proceed(); // }); }); }
deferProceed
function, which is implemented as follows: typedef std::function<void(Handler)> ProceedHandler; void deferProceed(ProceedHandler proceed) { auto& coro = currentCoro(); defer([&coro, proceed] { proceed([&coro] { coro.resume(); }); }); }
defer
and why it should be used, described in my previous article ), namely: it accepts not Handler
, but ProceedHandler
, in which Handler
is passed as an input parameter to continue execution of the coroutine. Actually, proceed
itself saves in its object a reference to the current coroutine and calls coro.resume()
. Thus, we encapsulate all the work with coroutines, and the user only needs to work with the proceed
handler.goWait
function. So, when we call deferProceed
, we have proceed
, which needs to be called at the end of the operation in the handler
. All we have to do is create a new coroutine, run our handler handler
in it, and after it completes, immediately call proceed, which internally calls coro.resume()
, thus continuing the execution of the original coroutine.goWait
we sort of pause our operations in the current coroutine, and when the transferred handler finishes, we continue execution as if nothing had happened. void goWait(std::initializer_list<Handler> handlers);
proceed
. Each handler at its completion will update the counter, and thus the last of the handlers will continue the execution of the original coroutine. However, there is one small difficulty: the counter must be divided between the running coroutines, and the last handler must not only call proceed, but also delete this counter from memory. All this can be implemented as follows: void goWait(std::initializer_list<Handler> handlers) { deferProceed([&handlers](Handler proceed) { std::shared_ptr<void> proceeder(nullptr, [proceed](void*) { proceed(); }); for (const auto& handler: handlers) { go([proceeder, &handler] { handler(); }); } }); }
deferProceed
, but inside it is hidden a little magic. Few people know that when constructing shared_ptr
you can pass not only a pointer to the data, but also a deleter
, which will delete the object, calling not the delete ptr
, but the handler. Actually, there we also will thrust a call proceed, in the end to continue the initial coroutine. And there is no need to delete the object itself, since we put “nothing” there - nullptr
. Then everything is simple: in the loop we go through all the handlers and run them in the generated coroutines. Here, too, there is one nuance: we capture our proceeder
by value, which will lead to its copying, which means an increase in our atomic reference counter inside shared_ptr
. After the handler
is finished, our lambda with captured proceeder will be deleted, which will decrease the counter. The one who last will reduce the counter to zero and delete the proceeder
object will call deleter
for the shared shared_ptr
, that is, it will eventually coro.proceed()
. int fibo (int v) { if (v < 2) return v; int v1, v2; goWait({ [v, &v1] { v1 = fibo(v-1); }, [v, &v2] { v2 = fibo(v-2); } }); return v1 + v2; }
fibo
function occurs in its own coroutine.Waiter
primitive with the following interface: struct Waiter { Waiter& go(Handler); void wait(); };
Waiter
object.proceeder
that continues the work of our coroutine. However, a small subtlety is added: now proceeder
is divided between the running coroutines and the Waiter
object. Accordingly, at the time the wait
method is called, we need to get rid of the copy in the Waiter
itself. Here's how to do it: void Waiter::wait() { if (proceeder.unique()) { // Waiter proceeder => JLOG("everything done, nothing to do"); return; } defer([this] { // proceeder auto toDestroy = std::move(proceeder); // proceeder , // - }); // proceeder , // init0(); }
shared_ptr
. Amen!Waiter
: int fibo (int v) { if (v < 2) return v; int v1; Waiter w; w.go([v, &v1] { v1 = fibo(v-1); }); int v2 = fibo(v-2); w.wait(); return v1 + v2; }
int fibo (int v) { if (v < 2) return v; int v1, v2; Waiter() .go([v, &v1] { v1 = fibo (v-1); }) .go([v, &v2] { v2 = fibo (v-2); }) .wait(); return v1 + v2; }
size_t goAnyWait(std::initializer_list<Handler> handlers);
void* ptr == nullptr
, but quite a specific atomic counter counter
. At the very beginning, it is initialized to 0
. Each handler at the end of its work increases the counter. And if it suddenly turned out that a change in the value from 0
to 1
occurred, then he and only he calls proceed()
: size_t goAnyWait(std::initializer_list<Handler> handlers) { VERIFY(handlers.size() >= 1, "Handlers amount must be positive"); size_t index = static_cast<size_t>(-1); deferProceed([&handlers, &index](Handler proceed) { std::shared_ptr<std::atomic<int>> counter = std::make_shared<std::atomic<int>>(); size_t i = 0; for (const auto& handler: handlers) { go([counter, proceed, &handler, i, &index] { handler(); if (++ *counter == 1) { // , ! index = i; proceed(); } }); ++ i; } }); VERIFY(index < handlers.size(), "Incorrect index returned"); return index; }
boost::optional<T_result>
, while goAnyResult
obtained with this plain prototype: template<typename T_result> boost::optional<T_result> goAnyResult( std::initializer_list< std::function< boost::optional<T_result>() > > handlers)
T_result
. That is, handlers must have a signature: boost::optional<T_result> handler();
counter
, and if we get 1
when increasing it, then it is necessary to return the “empty” value, since no one has been able to distort the counter and return the required result. Thus, instead of a simple atomic value for counter
we have a whole Counter
object: template<typename T_result> boost::optional<T_result> goAnyResult( std::initializer_list< std::function< boost::optional<T_result>() > > handlers) { typedef boost::optional<T_result> Result; typedef std::function<void(Result&&)> ResultHandler; struct Counter { Counter(ResultHandler proceed_) : proceed(std::move(proceed_)) {} ~Counter() { tryProceed(Result()); // - } void tryProceed(Result&& result) { if (++ counter == 1) proceed(std::move(result)); } private: std::atomic<int> counter; ResultHandler proceed; }; Result result; deferProceed([&handlers, &result](Handler proceed) { std::shared_ptr<Counter> counter = std::make_shared<Counter>( [&result, proceed](Result&& res) { result = std::move(res); proceed(); } ); for (const auto& handler: handlers) { go([counter, &handler] { Result result = handler(); if (result) // counter->tryProceed(std::move(result)); }); } }); return result; }
std::move
only moves the result when the condition inside tryProceed
is satisfied. And all because std::move
does not perform the movement as such, no matter how someone would like it. This is just a cast operation on links. struct IScheduler : IObject { virtual void schedule(Handler handler) = 0; };
typedef boost::asio::io_service IoService; struct IService : IObject { virtual IoService& ioService() = 0; }; struct ThreadPool : IScheduler, IService { ThreadPool(size_t threadCount); void schedule(Handler handler) { service.post(std::move(handler)); } private: IoService& ioService(); std::unique_ptr<boost::asio::io_service::work> work; boost::asio::io_service service; std::vector<std::thread> threads; };
boost::asio::io_service::post
.work
class that holds the io_service
event io_service
, otherwise, if there are no events, the cycle will complete its work and the threads will close.IService
interface with the ioService
method, which is returned by IoService
, which is boost::asio::io_service
. All this looks strange, but now I will try to explain what the trick is.boost::asio::io_service
. The remaining components, which I will use in the future, should somehow get access to the instance boost::asio::io_service
. To prevent easy access to this class, I introduced the IService
interface, which allows you to receive the coveted instance. However, in the implementation of the method is made private. This provides some level of protection against misuse, since in order to pull this object out, you first need to convert the ThreadPool
to IService
, and then call the desired method. An alternative would be to use friendly classes. But I didn’t want to spoil ThreadPool
knowledge of possible uses, so I thought that the approach used was a reasonable price for encapsulation.Journey
(why it will be clear later): struct Journey { void proceed(); Handler proceedHandler(); void defer(Handler handler); void deferProceed(ProceedHandler proceed); static void create(Handler handler, mt::IScheduler& s); private: Journey(mt::IScheduler& s); struct CoroGuard { CoroGuard(Journey& j_) : j(j_) { j.onEnter0(); } ~CoroGuard() { j.onExit0(); } coro::Coro* operator->() { return &j.coro; } private: Journey& j; }; void start0(Handler handler); void schedule0(Handler handler); CoroGuard guardedCoro0(); void proceed0(); void onEnter0(); void onExit0(); mt::IScheduler* sched; coro::Coro coro; Handler deferHandler; };
create
.Journey
sched
, coro
deferHandler
-, defer
.CoroGuard
– -, onEnter0
onExit0
. void Journey::schedule0(Handler handler) { VERIFY(sched != nullptr, "Scheduler must be set in journey"); sched->schedule(std::move(handler)); } void Journey::proceed0() { // guardedCoro0()->resume(); } Journey::CoroGuard Journey::guardedCoro0() { return CoroGuard(*this); } // void Journey::proceed() { schedule0([this] { proceed0(); }); } // , Handler Journey::proceedHandler() { return [this] { proceed(); }; } // // . 1 void Journey::start0(Handler handler) { schedule0([handler, this] { // guardedCoro0()->start([handler] { JLOG("started"); // try { handler(); } catch (std::exception& e) { (void) e; JLOG("exception in coro: " << e.what()); } JLOG("ended"); }); }); }
void Journey::defer(Handler handler) { // deferHandler = handler; // coro::yield(); } // deferProceed, void Journey::deferProceed(ProceedHandler proceed) { defer([this, proceed] { proceed(proceedHandler()); }); }
deferHandler
. TLS Journey* t_journey = nullptr; void Journey::onEnter0() { t_journey = this; } // . 2 void Journey::onExit0() { if (deferHandler == nullptr) { // => , delete this; } else { // deferHandler(); deferHandler = nullptr; } // , t_journey = nullptr; }
create
: void Journey::create(Handler handler, mt::IScheduler& s) { (new Journey(s))->start0(std::move(handler)); }
Journey
, , . , … void Journey::teleport(mt::IScheduler& s) { if (&s == sched) { JLOG("the same destination, skipping teleport <-> " << s.name()); return; } JLOG("teleport " << sched->name() << " -> " << s.name()); sched = &s; defer(proceedHandler()); }
defer
, . , .Scheduler
/ Thread
Scheduler2
/ Thread2
: auto result = someCalculations(); teleport(uiScheduler); showResult(result); teleport(calcScheduler); auto newResult = continueSmartCalculations(result); teleport(uiScheduler); updateResult(newResult); //…
struct Portal { Portal(mt::IScheduler& destination) : source(journey().scheduler()) { JLOG("creating portal " << source.name() << " <=> " << destination.name()); teleport(destination); } ~Portal() { teleport(source); } private: mt::IScheduler& source; };
ThreadPool tp1(1, "tp1"); ThreadPool tp2(1, "tp2"); go([&tp2] { Portal p(tp2); JLOG("throwing exception"); throw std::runtime_error("exception occur"); }, tp1);
struct Scheduler { Scheduler(); void attach(mt::IScheduler& s) { scheduler = &s; } void detach() { scheduler = nullptr; } operator mt::IScheduler&() const { VERIFY(scheduler != nullptr, "Scheduler is not attached"); return *scheduler; } private: mt::IScheduler* scheduler; }; struct DefaultTag; template<typename T_tag> Scheduler& scheduler() { return single<Scheduler, T_tag>(); } template<typename T> struct WithPortal : Scheduler { struct Access : Portal { Access(Scheduler& s) : Portal(s) {} T* operator->() { return &single<T>(); } }; Access operator->() { return *this; } }; template<typename T> WithPortal<T>& portal() { return single<WithPortal<T>>(); }
ThreadPool tp1(1, "tp1"); ThreadPool tp2(1, "tp2"); struct X { void op() {} }; portal<X>().attach(tp2); go([] { portal<X>()->op(); }, tp1);
X
tp2. , X
( return &single<T>()
) . Journey
-, ! struct Alone : mt::IScheduler { Alone(mt::IService& service); void schedule(Handler handler) { strand.post(std::move(handler)); } private: boost::asio::io_service::strand strand; };
Alone
IService
, io_service::strand
boost.asio
. boost.asio
, , . , (mutual exclusion).Alone
, , . struct MemCache { boost::optional<std::string> get(const std::string& key); void set(const std::string& key, const std::string& val); }; // ThreadPool common_pool(3); // Alone mem_alone(common_pool); // portal<MemCache>().Attach(mem_alone); // // auto value = portal<MemCache>()->get(key); // portal<MemCache>()->set(anotherKey, anotherValue);
enum EventStatus { ES_NORMAL, ES_CANCELLED, ES_TIMEDOUT, }; struct EventException : std::runtime_error { EventException(EventStatus s); EventStatus status(); private: EventStatus st; };
struct Goer { Goer(); EventStatus reset(); bool cancel(); bool timedout(); private: struct State { State() : status(ES_NORMAL) {} EventStatus status; }; bool setStatus0(EventStatus s); State& state0(); std::shared_ptr<State> state; };
Journey
: void Journey::handleEvents() { // if (!eventsAllowed || std::uncaught_exception()) return; auto s = gr.reset(); if (s == ES_NORMAL) return; // throw EventException(s); } void Journey::disableEvents() { handleEvents(); eventsAllowed = false; } void Journey::enableEvents() { eventsAllowed = true; handleEvents(); }
struct EventsGuard { EventsGuard(); // disableEvents() ~EventsGuard(); // enableEvents() };
handleEvents
? : void Journey::defer(Handler handler) { // handleEvents(); deferHandler = handler; coro::yield(); // handleEvents(); }
handleEvents
. . Goer go(Handler handler, mt::IScheduler& scheduler) { return Journey::create(std::move(handler), scheduler); }
Journey::create
Goer
: struct Journey { // … Goer goer() const { return gr; } // … private: // … Goer gr; }; Goer Journey::create(Handler handler, mt::IScheduler& s) { return (new Journey(s))->start0(std::move(handler)); } // . 1 Goer Journey::start0(Handler handler) { // … return goer(); } : Goer op = go(myMegaHandler); // … If (weDontNeedMegaHandlerAnymore) op.cancel();
op.cancel()
, handleEvents()
.Journey
, -, , go
. , , . : go
, defer
, deferProceed
. ., Journey
, TLS. struct Timeout { Timeout(int ms); ~Timeout(); private: boost::asio::deadline_timer timer; };
boost::asio::deadline_timer
: Timeout::Timeout(int ms) : timer(service<TimeoutTag>(), boost::posix_time::milliseconds(ms)) { // Goer goer = journey().goer(); // timer.async_wait([goer](const Error& error) mutable { // mutable, goer if (!error) // , goer.timedout(); }); } Timeout::~Timeout() { // timer.cancel_one(); // , handleEvents(); }
// Timeout t(100); // 100 for (auto element: container) { performOperation(element); handleEvents(); }
// 200 Timeout outer(200); portal<MyObject>()->performOp(); { // 100 // Timeout inner(100); portal<MyAnotherObject>()->performAnotherOp(); // EventsGuard guard; performGuardedAction(); }
Goer Journey::start0(Handler handler) { schedule0([handler, this] { guardedCoro0()->start([handler] { JLOG("started"); try { handler(); } catch (std::exception& e) { (void) e; JLOG("exception in coro: " << e.what()); } JLOG("ended"); }); }); return goer(); }
Goer Journey::start0(Handler handler) { + Goer gr = goer(); schedule0([handler, this] { guardedCoro0()->start([handler] { JLOG("started"); @@ -121,7 +122,7 @@ JLOG("ended"); }); }); - return goer(); + return gr; }
void Journey::onExit0() { if (deferHandler == nullptr) { delete this; } else { deferHandler(); deferHandler = nullptr; } t_journey = nullptr; }
{ @@ -153,8 +154,8 @@ - deferHandler(); - deferHandler = nullptr; + Handler handler = std::move(deferHandler); + handler(); }
struct A { ~A() { TLOG("~A"); } }; struct B:A { ~B() { TLOG("~B"); } }; struct C { ~C() { TLOG("~C"); } }; ThreadPool tp(1, "tp"); go([] { A* a = gcnew<B>(); C* c = gcnew<C>(); }, tp);
tp#1: ~C tp#1: ~B tp#1: ~A
template<typename T, typename... V> T* gcnew(V&&... v) { return gc().add(new T(std::forward(v)...)); } GC& gc() { return journey().gc; } struct GC { ~GC() { // for (auto& deleter: boost::adaptors::reverse(deleters)) deleter(); } template<typename T> T* add(T* t) { // T deleters.emplace_back([t] { delete t; }); return t; } private: std::vector<Handler> deleters; };
GC
Journey
, . : , .Source: https://habr.com/ru/post/240525/
All Articles