QMetaObject::invokeMethod()
, no transmission of the results of an object QMetaObject::invokeMethod()
in the stream through signals. Only direct function call, only QFuture
!QFuture<T>
, the readiness of which will mean that the corresponding asynchronous method has finished executing, and it has results like T
QFuture
”.QThread
object responsible for managing the flow has a ScheduleImpl()
method, called from the main thread (and from others too), accepting some functor that wraps this functor in QFuture
and saves everything that is needed, into a special queue, which is then processed inside QThread::run()
. class WorkerThreadBase : public QThread { Q_OBJECT QMutex FunctionsMutex_; QList<std::function<void ()>> Functions_; public: using QThread::QThread; protected: void run () override; virtual void Initialize () = 0; virtual void Cleanup () = 0; template<typename F> QFuture<ResultOf_t<F ()>> ScheduleImpl (const F& func) { QFutureInterface<ResultOf_t<F ()>> iface; iface.reportStarted (); auto reporting = [func, iface] () mutable { ReportFutureResult (iface, func); }; { QMutexLocker locker { &FunctionsMutex_ }; Functions_ << reporting; } emit rotateFuncs (); return iface.future (); } private: void RotateFuncs (); signals: void rotateFuncs (); };
ResultOf_t
is a direct analogue of std::result_of_t
from C ++ 14. My project, unfortunately, still has to support C ++ 11 compilers. template<typename T> using ResultOf_t = typename std::result_of<T>::type;
ReportFutureResult
takes the functor, its arguments, executes the functor and marks the corresponding QFutureInterface
as ready, at the same time passing it the result of the execution of the functor, or wraps QFutureInterface
exception in QFutureInterface
if the execution of the functor ends with this exception. Unfortunately, the case is somewhat complicated by returning void functors: for them, you have to write a separate function, because in C ++ you cannot declare a void
variable. We have such a type system, oh, the type is, the value is in it, but it is impossible to declare it. template<typename R, typename F, typename... Args> EnableIf_t<!std::is_same<R, void>::value> ReportFutureResult (QFutureInterface<R>& iface, F&& f, Args... args) { try { const auto result = f (args...); iface.reportFinished (&result); } catch (const QtException_t& e) { iface.reportException (e); iface.reportFinished (); } catch (const std::exception& e) { iface.reportException (ConcurrentStdException { e }); iface.reportFinished (); } } template<typename F, typename... Args> void ReportFutureResult (QFutureInterface<void>& iface, F&& f, Args... args) { try { f (args...); } catch (const QtException_t& e) { iface.reportException (e); } catch (const std::exception& e) { iface.reportException (ConcurrentStdException { e }); } iface.reportFinished (); }
QtException_t
needed to support building with Qt4: #if QT_VERSION < 0x050000 using QtException_t = QtConcurrent::Exception; #else using QtException_t = QException; #endif
ConcurrentStdException
wraps the standard exception into one that Qt understands the QFuture mechanism, but its implementation is a bit more complicated and not so important here.ScheduleImpl()
takes a certain functor with a signature of type T ()
, returns QFuture<T>
, wraps the functor in a special function, now with the signature void ()
associated with the returned QFuture<T>
, and which, when executed by the functor, QFuture<T>
marks ready, and adds this wrapper to the queuerotateFuncs()
emitted, which inside run()
connected to the RotateFuncs()
method, which is responsible for processing the queue of saved functor wrappers.run()
and RotateFuncs()
methods: void WorkerThreadBase::run () { SlotClosure<NoDeletePolicy> rotator { [this] { RotateFuncs (); }, this, SIGNAL (rotateFuncs ()), nullptr }; Initialize (); QThread::run (); Cleanup (); } void WorkerThreadBase::RotateFuncs () { decltype (Functions_) funcs; { QMutexLocker locker { &FunctionsMutex_ }; using std::swap; swap (funcs, Functions_); } for (const auto& func : funcs) func (); }
SlotClosure
is a helper class that helps attach signals to lambdas, not slots. In Qt5, there is a more adequate syntax for this, but unfortunately, I also still need to support the Qt4 build.SlotClosure
is simple; it calls its first argument each time an object, which is the second argument, emits a signal — the third argument. The fourth argument is the parent object. Here we have SlotClosure
set on the stack, so parents are not needed.NoDeletePolicy
means that the object should not commit suicide after the first signal. Other deletion policies include, for example, DeleteLaterPolicy
, which DeleteLaterPolicy
connection object after the first triggering of a signal, which is convenient for various tasks that are performed once.rotateFuncs()
signal to the rotateFuncs()
function (hmm, I wonder how many comments will be on the naming style?), Call the initialization function of stream objects defined somewhere in the heir, and start spinning the stream. When the stream owner makes the quit()
QThread::run()
, QThread::run()
will return control, and the heir will be able to clean up after itself in Cleanup()
.rotateFuncs()
emitted from the main thread to cause RotateFuncs()
in our WorkerThreadBase
.RotateFuncs()
however, briefly blocks the main queue, moving it to itself, and then begins to execute it sequentially. class AvatarsStorageThread final : public Util::WorkerThreadBase { std::unique_ptr<AvatarsStorageOnDisk> Storage_; public: using Util::WorkerThreadBase::WorkerThreadBase; QFuture<void> SetAvatar (const QString& entryId, IHaveAvatars::Size size, const QByteArray& imageData); QFuture<boost::optional<QByteArray>> GetAvatar (const QString& entryId, IHaveAvatars::Size size); QFuture<void> DeleteAvatars (const QString& entryId); protected: void Initialize () override; void Cleanup () override; };
QFuture<void> AvatarsStorageThread::SetAvatar (const QString& entryId, IHaveAvatars::Size size, const QByteArray& imageData) { return ScheduleImpl ([=] { Storage_->SetAvatar (entryId, size, imageData); }); } QFuture<boost::optional<QByteArray>> AvatarsStorageThread::GetAvatar (const QString& entryId, IHaveAvatars::Size size) { return ScheduleImpl ([=] { return Storage_->GetAvatar (entryId, size); }); } QFuture<void> AvatarsStorageThread::DeleteAvatars (const QString& entryId) { return ScheduleImpl ([=] { Storage_->DeleteAvatars (entryId); }); } void AvatarsStorageThread::Initialize () { Storage_.reset (new AvatarsStorageOnDisk); } void AvatarsStorageThread::Cleanup () { Storage_.reset (); }
AvatarsStorageOnDisk
is a separate interesting topic related to my homegrown under-ORM framework, which allows generating labels, SQL queries and the corresponding functions for insertion / deletion / update by describing the structure with data via Boost.Fusion. However, this implementation is in no way related to the issue of multithreading, which, in general, is good, especially from the point of view of the decomposition of the original problem.WorkerThreadBase
, you need to duplicate all the methods that you want to call the object that is being carried to a separate stream. How to effectively solve this problem, I did not think out right away.Initialize()
and Cleanup()
directly asked to turn into some RAII. It is worth coming up with something on this topic.Source: https://habr.com/ru/post/273743/
All Articles