When I talked about OOP, I did not mean C ++.
Alan Kay .
I regret having coined the term “objects” many years ago because it forces people to concentrate on small ideas. A really big idea is messages.
Alan Kay .
Reader
class, which reads the data from the source and returns an object of the Buffer
type: class Reader { public: Buffer read(Range range, const Options& options); // ... };
read
method. This method can be converted to the next, almost equivalent, call: Buffer read(Reader* this, Range range, const Options& options);
read
method? For example: Reader reader; auto buffer = reader.read(range, options);
read
method call as follows: reader <- read(range, options) -> buffer;
reader
is given some kind of read(range, options)
input, and the output is an object named buffer
.read(range, options)
? Some input message: struct InReadMessage { Range range; Options options; }; struct OutReadMessage { Buffer buffer; }; reader <- InReadMessage{range, options} -> OutReadMessage;
InReadMessage
message and then wait for the OutReadMessage response message. Why synchronous? Because call semantics implies that we are waiting for a response. However, generally speaking, the response message in the place of the call can not wait, then it will be an asynchronous message sending. template<typename T_base> struct ReaderAdapter : T_base { Buffer read(Range range, const Options& options) { return T_base::call([range, options](Reader& reader) { return reader.read(range, options); }); } };
read
method, the call is wrapped in a lambda and T_base::call
to the base method of the T_base::call
method. In this case, the lambda is a functional object that will transmit its closure to our heir object T_base
, automatically dispatching it. This lambda is our message, which we pass on to transform actions. template<typename T_base, typename T_locker> struct BaseLocker : private T_base { protected: template<typename F> auto call(F&& f) { std::unique_lock<T_locker> _{lock_}; return f(static_cast<T_base&>(*this)); } private: T_locker lock_; };
call
method, the lock_
lock is lock_
and the lambda is subsequently called on an instance of the T_base
base class, which allows for further transformations if necessary. // ReaderAdapter<BaseLocker<Reader, std::mutex>> reader; auto buffer = reader.read(range, options);
Reader
directly, we now replace the object with a ReaderAdapter
. When the read
method is called, this adapter creates a message in the form of a lambda and passes it on, where the lock is automatically taken and released strictly for the duration of this operation. At the same time, we exactly keep the original interface of the Reader
class! DECL_ADAPTER(Reader, read) AdaptedLocked<Reader, std::mutex> reader;
Reader
class specified in the DECL_ADAPTER
list, in this case read
, and then AdaptedLocked
already intercepted the message wraps in std::mutex
. This is described in more detail in the article just mentioned above, so here I will not dwell on this in detail. namespace synca { struct Spinlock { void lock() { while (lock_.test_and_set(std::memory_order_acquire)) { reschedule(); } } void unlock() { lock_.clear(std::memory_order_release); } private: std::atomic_flag lock_ = ATOMIC_FLAG_INIT; }; } // namespace synca
lock
method we are trying to atomically set the flag value from false
to true
. If we succeeded, the blocking was taken, and it was taken by us, and the necessary atomic actions can be performed. When unblocking, simply reset the flag back to the initial value false
.std::this_thread::yield()
. In this case, I'm acting smarter: instead of warming up the processor or transferring control to the operating system scheduler, I simply reschedule our coroutine for a later execution by calling synca::reschedule
. At the same time, the current execution is frozen, and the scheduler starts another, ready-to-run coroutine. This is very similar to std::this_thread::yield()
, except that instead of switching to kernel space, we always remain in user space and continue to do some meaningful work without an empty increase in the entropy of space. template <typename T> using CoSpinlock = AdaptedLocked<T, synca::Spinlock>; CoSpinlock<Reader> reader; auto buffer = reader.read(range, options);
template <typename T> using CoMutex = AdaptedLocked<T, synca::Mutex>; CoMutex<Reader> reader; auto buffer = reader.read(range, options);
CoMutex
, CoMutex
, unlike CoSpinlock
, gives a FIFO guarantee, i.e. provides fair competitive access to the facility. template <typename T_base> struct BaseSerializedPortal : T_base { // 1 BaseSerializedPortal() : tp_(1) {} protected: template <typename F> auto call(F&& f) { // 1 synca::Portal _{tp_}; return f(static_cast<T_base&>(*this)); // } private: mt::ThreadPool tp_; }; CoSerializedPortal<Reader> reader;
CoSerializedPortal
will have a similar guarantee.synca::Alone
.synca::Alone
guarantees that no handler will be launched in parallel with another. There are tasks - only one of them will be performed. No tasks - nothing is executed. It is clear that with this approach, actions are serialized, which means that access through this scheduler will be synchronized. Semantically, this is very similar to CoSerializedPortal
. However, it is worth noting that such a scheduler runs its tasks on a certain thread pool, i.e. it does not create any new threads on its own, but works on existing ones. template <typename T_base> struct BaseAlone : T_base { BaseAlone(mt::IScheduler& scheduler) : alone_{scheduler} {} protected: template <typename F> auto call(F&& f) { // .. Alone - , synca::Portal _{alone_}; return f(static_cast<T_base&>(*this)); } private: synca::Alone alone_; }; CoAlone<Reader> reader;
CoSerializedPortal
is the replacement of the mt::ThreadPool
synca::Alone
with synca::Alone
.chan
channels in the Go language, i.e. This is a queue (not necessarily, by the way, of limited size, as is done in Go), in which several data producers can simultaneously write and simultaneously read consumers without additional synchronization on their part. Simply put, a channel is just a pipe into which you can add and take messages without fear of a race condition. template <typename T_base> struct BaseChannel : T_base { BaseChannel() { // synca::go([&] { loop(); }); } private: void loop() { // // for (auto&& action : channel_) { action(); } } synca::Channel<Handler> channel_; }; CoChannel<Reader> reader;
Handler
?Handler
is just std::function<void()>
. All the magic happens not here, but in how this Handler
is created for automatic dispatch. template <typename T_base> struct BaseChannel : T_base { protected: template <typename F> auto call(F&& f) { // fun auto fun = [&] { return f(static_cast<T_base&>(*this)); }; // WrappedResult<decltype(fun())> result; channel_.put([&] { try { // - result.set(wrap(fun)); } catch (std::exception&) { // result.setCurrentError(); } // synca::done(); }); // synca::wait(); // return result.get().unwrap(); } };
f
in a WrappedResult
, put this call inside the channel and WrappedResult
. We will call this deferred call inside the BaseChannel::loop
method, thereby filling the result and resuming the fallen asleep source coroutine.WrappedResult
. This class serves several purposes:void
type), then the construction of assigning the result without a wrapper would be incorrect. Indeed, one cannot simply take the result void
into a void
type. However, it is allowed to return, which is what the WrappedResult<void>
specialization WrappedResult<void>
through calls to .get().unwrap()
. struct AsyncSpinlock { void lock(std::function<void()> cb) { if (lock_.test_and_set(std::memory_order_acquire)) { // => currentScheduler().schedule( [this, cb = std::move(cb)]() mutable { lock(std::move(cb)); }); } else { cb(); } } void unlock() { lock_.clear(std::memory_order_release); } private: std::atomic_flag lock_ = ATOMIC_FLAG_INIT; };
AsyncSpinlockReader
class, which will use our asynchronous spinlock: struct AsyncSpinlockReader { void read(Range range, const Options& options, std::function<void(const Buffer&)> cbBuffer) { spinlock_.lock( [this, range, options, cbBuffer = std::move(cbBuffer)] { auto buffer = reader_.read(range, options); // , unlock , // spinlock_.unlock(); cbBuffer(buffer); }); } private: AsyncSpinlock spinlock_; Reader reader_; }
read
method, the AsyncSpinlock
asynchronous spinlock AsyncSpinlock
surely break the existing interfaces of our classes. // // CoSpinlock<Reader> reader; // auto buffer = reader.read(range, options); AsyncSpinlockReader reader; reader.read(buffer, options, [](const Buffer& buffer) { // // });
Spinlock::unlock
and the call to the Reader::read
method are also asynchronous. It's easy enough to believe, assuming that Reader
pulls data across the network, and Spinlock
are used instead of Spinlock
, for example. Then: struct SuperAsyncSpinlockReader { // , // void read(Range range, const Options& options, std::function<void(const Buffer&)> cb) { spinlock_.lock( [this, range, options, cb = std::move(cb)]() mutable { // : read reader_.read(range, options, [this, cb = std::move(cb)](const Buffer& buffer) mutable { // : spinlock_.unlock( [buffer, cb = std::move(cb)] { // cb(buffer); }); }); }); } private: AsyncSpinlock spinlock_; AsyncNetworkReader reader_; }
co_await
. And this, in turn, means that each (!) Call to any method wrapped through a sync adapter must add a call to co_await
, changing the semantics and interfaces: // Buffer baseRead() { Reader reader; return reader.read(range, options); } // callback-style // void baseRead(std::function<void(const Buffer& buffer)> cb) { AsyncReader reader; reader.read(range, options, cb); } // stackless coroutines // , future_t<Buffer> standardPlannedRead() { CoMutex<Reader> reader; return co_await reader.read(range, options); } // stackful coroutines // Buffer myRead() { CoMutex<Reader> reader; return reader.read(range, options); }
Reader
with CoMutex<Reader>
. Such an invasive approach significantly limits the scope of stackless coroutines . template <typename T_base> struct BasePortal : T_base, private synca::SchedulerRef { template <typename... V> BasePortal(mt::IScheduler& scheduler, V&&... v) : T_base{std::forward<V>(v)...} , synca::SchedulerRef{scheduler} // { } protected: template <typename F> auto call(F&& f) { // f(...) synca::Portal _{scheduler()}; return f(static_cast<T_base&>(*this)); } using synca::SchedulerRef::scheduler; };
mt::IScheduler
, f(static_cast<T_base&>(*this))
. 1 : // mt::ThreadPool serialized{1}; CoPortal<Reader> reader1{serialized}; CoPortal<Reader> reader2{serialized};
Reader
, serialized
.CoAlone
CoChannel
: // .. CoAlone CoChannel , // mt::ThreadPool isolated{3}; // // isolated CoAlone<Reader> reader1{isolated}; // // isolated CoChannel<Reader> reader2{isolated};
CoSpinlock
CoMutex
CoSerializedPortal
CoAlone
CoChannel
#define BIND_SUBJECTOR(D_type, D_subjector, ...) \ template <> \ struct subjector::SubjectorPolicy<D_type> \ { \ using Type = D_subjector<D_type, ##__VA_ARGS__>; \ }; template <typename T> struct SubjectorPolicy { using Type = CoMutex<T>; }; template <typename T> using Subjector = typename SubjectorPolicy<T>::Type;
Subjector<T>
, 5 . For example: // , Reader 3 : read, open, close // DECL_ADAPTER(Reader, read, open, close) // , Reader CoChannel . // , CoMutex, // .. BIND_SUBJECTOR(Reader, CoChannel) // - Subjector<Reader> reader;
Reader
, , , : BIND_SUBJECTOR(Reader, CoSerializedPortal)
, , , . - .
Alan Kay .
class Network { public: void send(const Packet& packet); }; DECL_ADAPTER(Network, send) BIND_SUBJECTOR(Network, CoChannel)
void sendPacket(const Packet& packet) { Subjector<Network> network; network.send(myPacket); // , // doSomeOtherStuff(); }
doSomeOtherStuff()
, network.send()
. : void sendPacket(const Packet& packet) { Subjector<Network> network; // .async() network.async().send(myPacket); // // doSomeOtherStuff(); }
BaseAsyncWrapper
: template <typename T_derived> struct BaseAsyncWrapper { protected: template <typename F> auto call(F&& f) { return static_cast<T_derived&>(*this).asyncCall(std::forward<F>(f)); } };
.async()
BaseAsyncWrapper
, T_derived
, asyncCall
call
. , Co
- asyncCall
call
, .asyncCall
:CoSpinlock
, CoMutex
, CoSerializedPortal
, CoAlone
. . template <typename T_base> struct Go : T_base { protected: template <typename F> auto asyncCall(F&& f) { return synca::go( [ f = std::move(f), this ]() { f(static_cast<T_base&>(*this)); }, T_base::scheduler()); } };
CoChannel
. . template <typename T_base> struct BaseChannel : T_base { template <typename F> auto asyncCall(F&& f) { channel_.put([ f = std::move(f), this ] { try { f(static_cast<T_base&>(*this)); } catch (std::exception&) { // do nothing due to async call } }); } };
CoSpinlock
. , . CoSpinlock
, .. . , ..lock()
, . , FIFO , CoSpinlock
FIFO-.CoPortal
. — . CoSerializedPortal
, .. . , , , : CoAlone
CoChannel
. . , .CoChannel
, . CoChannel
. Those. — . : struct Counter { void set(int value); int get() const; private: int value_ = 0; };
DECL_ADAPTER(Counter, set, get) Subjector<Counter> counter;
get
set
, . . counter.set(counter.get() + 1);
The execution of a program contains a data race if it contains two potentially concurrent conflicting actions, at least one of which is not atomic, and neither happens before the other, except for the special case for signal handlers described below. Any such data race results in undefined behavior.
C++17 Standard N4659, §4.7.1 (20.2)
std::vector::push_back(value)
. , — (, — ). :counter.set(counter.get() + 1)
. - , .get()
.set()
.CoAlone
: struct User { void setName(const std::string& name); std::string getName() const; void setAge(int age); int getAge() const; }; DECL_ADAPTER(User, setName, getName, setAge, getAge) BIND_SUBJECTOR(User, CoAlone) struct UserManager { void increaseAge() { user_.setAge(user_.getAge() + 1); } private: Subjector<User> user_; }; UserManager manager; // race condition 2- manager.increaseAge();
manager.increaseAge()
2- , increaseAge()
. struct UserManager { void increaseAge() { user_.setAge(user_.getAge() + 1); } private: Subjector<User> user_; }; DECL_ADAPTER(UserManager, increaseAge) BIND_SUBJECTOR(UserManager, CoAlone) Subjector<UserManager> manager; manager.increaseAge();
CoAlone
. : ?UserManager
Alone
. user_.getAge()
Alone
, User
. Those.another coroutine is now able to enter the method increaseAge()
in parallel with the current one, which is currently inside user_.getAge()
. This is possible because Alone
it guarantees only the absence of parallel execution in its scheduler. In this case, parallel execution takes place in two different schedulers: CoAlone<User>
and CoAlone<UserManager>
.CoAlone
and CoPortal
. BIND_SUBJECTOR(UserManager, CoMutex)
struct UI { // , void onRequestUser(const std::string& userName); // void updateUser(const User& user); }; DECL_ADAPTER(UI, onRequestUser, updateUser) // UI UI BIND_SUBJECTOR(UI, CoPortal) struct UserManager { // User getUser(const std::string& userName); private: void addUser(const User& user); User findUser(const std::string& userName); }; DECL_ADAPTER(UserManager, getUser) BIND_SUBJECTOR(UserManager, CoAlone) struct NetworkManager { // User getUser(const std::string& userName); }; DECL_ADAPTER(NetworkManager, getUser) // BIND_SUBJECTOR(NetworkManager, CoSerializedPortal) // , Subjector<UserManager>& getUserManager(); Subjector<NetworkManager>& getNetworkManager(); Subjector<UI>& getUI(); void UI::onRequestUser(const std::string& userName); { updateUser(getUserManager().getUser(userName)); } void UserManager::getUser(const std::string& userName) { auto user = findUser(userName); if (user) { // , return user; } // , // user = getNetworkManager().getUser(userName); // , addUser(user); return user; }
UI::onRequestUsername
. UI UserManager::getUser
. UI Alone
, . UI . , , UI, .UserManager
— , . NetworkManager
. UserManager
. - UserManager
, ! Those. . , . , ! // forward declaration struct User; DECL_ADAPTER(User, addFriend, getId, addFriendId) struct User { void addFriend(Subjector<User>& myFriend) { auto friendId = myFriend.getId(); if (hasFriend(friendId)) { // - return; } addFriendId(friendId); auto myId = getId(); myFriend.addFriendId(myId); } Id getId() const; void addFriendId(Id id); private: bool hasFriend(Id id); }; // void makeFriends(Subjector<User>& u1, Subjector<User>& u2) { u1.addFriend(u2); }
CoMutex
, makeFriends
. , ? : BIND_SUBJECTOR(User, CoAlone)
CoChannel<T>.async().someMethod(...)
. , , . , mailbox , , , ++, . , , , , , , .guard
, , , .. , , .Source: https://habr.com/ru/post/340732/
All Articles