📜 ⬆️ ⬇️

QThread + QtSql the right way

Today's article inspired me to share my way of putting databases into a separate thread. The method is suitable not only for the database, but also for any interactions described by the pattern “some object lives in a separate thread, it is necessary to ask something from it and do something with it”. In addition, the method is good in that it tries to be type-safe and extensible: no stringly-typed QMetaObject::invokeMethod() , no transmission of the results of an object QMetaObject::invokeMethod() in the stream through signals. Only direct function call, only QFuture !

Disclaimer: The code here is a piece of one of my large projects, so it will use some auxiliary library functions from this project. I, however, will try not to miss cases of such use and describe their semantics.

So, let's start with the most important thing: how would we like to work with an object in a separate thread? Ideally, we simply pull some methods from some object, the methods return QFuture<T> , the readiness of which will mean that the corresponding asynchronous method has finished executing, and it has results like T

Recall that decomposition is our friend, so let's take our original task “we have to pull something in a separate thread” and consider its piece “we need to keep a separate thread and provide a thread-safe call for something in it with a return QFuture ”.
')
Let's solve this problem in the following way: a QThread object responsible for managing the flow has a ScheduleImpl() method, called from the main thread (and from others too), accepting some functor that wraps this functor in QFuture and saves everything that is needed, into a special queue, which is then processed inside QThread::run() .

It turns out something like this:
 class WorkerThreadBase : public QThread { Q_OBJECT QMutex FunctionsMutex_; QList<std::function<void ()>> Functions_; public: using QThread::QThread; protected: void run () override; virtual void Initialize () = 0; virtual void Cleanup () = 0; template<typename F> QFuture<ResultOf_t<F ()>> ScheduleImpl (const F& func) { QFutureInterface<ResultOf_t<F ()>> iface; iface.reportStarted (); auto reporting = [func, iface] () mutable { ReportFutureResult (iface, func); }; { QMutexLocker locker { &FunctionsMutex_ }; Functions_ << reporting; } emit rotateFuncs (); return iface.future (); } private: void RotateFuncs (); signals: void rotateFuncs (); }; 


Explanations for all ReportFutureResult and ResultOf_t
ResultOf_t is a direct analogue of std::result_of_t from C ++ 14. My project, unfortunately, still has to support C ++ 11 compilers.
 template<typename T> using ResultOf_t = typename std::result_of<T>::type; 


ReportFutureResult takes the functor, its arguments, executes the functor and marks the corresponding QFutureInterface as ready, at the same time passing it the result of the execution of the functor, or wraps QFutureInterface exception in QFutureInterface if the execution of the functor ends with this exception. Unfortunately, the case is somewhat complicated by returning void functors: for them, you have to write a separate function, because in C ++ you cannot declare a void variable. We have such a type system, oh, the type is, the value is in it, but it is impossible to declare it.
 template<typename R, typename F, typename... Args> EnableIf_t<!std::is_same<R, void>::value> ReportFutureResult (QFutureInterface<R>& iface, F&& f, Args... args) { try { const auto result = f (args...); iface.reportFinished (&result); } catch (const QtException_t& e) { iface.reportException (e); iface.reportFinished (); } catch (const std::exception& e) { iface.reportException (ConcurrentStdException { e }); iface.reportFinished (); } } template<typename F, typename... Args> void ReportFutureResult (QFutureInterface<void>& iface, F&& f, Args... args) { try { f (args...); } catch (const QtException_t& e) { iface.reportException (e); } catch (const std::exception& e) { iface.reportException (ConcurrentStdException { e }); } iface.reportFinished (); } 


QtException_t needed to support building with Qt4:
 #if QT_VERSION < 0x050000 using QtException_t = QtConcurrent::Exception; #else using QtException_t = QException; #endif 


ConcurrentStdException wraps the standard exception into one that Qt understands the QFuture mechanism, but its implementation is a bit more complicated and not so important here.


That is, ScheduleImpl() takes a certain functor with a signature of type T () , returns QFuture<T> , wraps the functor in a special function, now with the signature void () associated with the returned QFuture<T> , and which, when executed by the functor, QFuture<T> marks ready, and adds this wrapper to the queue

After that, the signal rotateFuncs() emitted, which inside run() connected to the RotateFuncs() method, which is responsible for processing the queue of saved functor wrappers.

Now let's look at the implementation of the run() and RotateFuncs() methods:
 void WorkerThreadBase::run () { SlotClosure<NoDeletePolicy> rotator { [this] { RotateFuncs (); }, this, SIGNAL (rotateFuncs ()), nullptr }; Initialize (); QThread::run (); Cleanup (); } void WorkerThreadBase::RotateFuncs () { decltype (Functions_) funcs; { QMutexLocker locker { &FunctionsMutex_ }; using std::swap; swap (funcs, Functions_); } for (const auto& func : funcs) func (); } 


A bit about SlotClosure
SlotClosure is a helper class that helps attach signals to lambdas, not slots. In Qt5, there is a more adequate syntax for this, but unfortunately, I also still need to support the Qt4 build.

SlotClosure is simple; it calls its first argument each time an object, which is the second argument, emits a signal — the third argument. The fourth argument is the parent object. Here we have SlotClosure set on the stack, so parents are not needed.

The template argument NoDeletePolicy means that the object should not commit suicide after the first signal. Other deletion policies include, for example, DeleteLaterPolicy , which DeleteLaterPolicy connection object after the first triggering of a signal, which is convenient for various tasks that are performed once.


With these functions, everything is simple: connect the rotateFuncs() signal to the rotateFuncs() function (hmm, I wonder how many comments will be on the naming style?), Call the initialization function of stream objects defined somewhere in the heir, and start spinning the stream. When the stream owner makes the quit() QThread::run() , QThread::run() will return control, and the heir will be able to clean up after itself in Cleanup() .

Note that it is Qt's slot-signal mechanism that is responsible for rotateFuncs() emitted from the main thread to cause RotateFuncs() in our WorkerThreadBase .

RotateFuncs() however, briefly blocks the main queue, moving it to itself, and then begins to execute it sequentially.

Actually, this is all. As an example of use, for example, a piece of the storage system of avatars on a disk in the IM client can be given:
avatarsstoragethread.h
 class AvatarsStorageThread final : public Util::WorkerThreadBase { std::unique_ptr<AvatarsStorageOnDisk> Storage_; public: using Util::WorkerThreadBase::WorkerThreadBase; QFuture<void> SetAvatar (const QString& entryId, IHaveAvatars::Size size, const QByteArray& imageData); QFuture<boost::optional<QByteArray>> GetAvatar (const QString& entryId, IHaveAvatars::Size size); QFuture<void> DeleteAvatars (const QString& entryId); protected: void Initialize () override; void Cleanup () override; }; 


avatarsstoragethread.cpp
 QFuture<void> AvatarsStorageThread::SetAvatar (const QString& entryId, IHaveAvatars::Size size, const QByteArray& imageData) { return ScheduleImpl ([=] { Storage_->SetAvatar (entryId, size, imageData); }); } QFuture<boost::optional<QByteArray>> AvatarsStorageThread::GetAvatar (const QString& entryId, IHaveAvatars::Size size) { return ScheduleImpl ([=] { return Storage_->GetAvatar (entryId, size); }); } QFuture<void> AvatarsStorageThread::DeleteAvatars (const QString& entryId) { return ScheduleImpl ([=] { Storage_->DeleteAvatars (entryId); }); } void AvatarsStorageThread::Initialize () { Storage_.reset (new AvatarsStorageOnDisk); } void AvatarsStorageThread::Cleanup () { Storage_.reset (); } 



But the implementation of AvatarsStorageOnDisk is a separate interesting topic related to my homegrown under-ORM framework, which allows generating labels, SQL queries and the corresponding functions for insertion / deletion / update by describing the structure with data via Boost.Fusion. However, this implementation is in no way related to the issue of multithreading, which, in general, is good, especially from the point of view of the decomposition of the original problem.

And finally, we note the shortcomings of the proposed solution:
  1. In the public API of the class WorkerThreadBase , you need to duplicate all the methods that you want to call the object that is being carried to a separate stream. How to effectively solve this problem, I did not think out right away.
  2. Initialize() and Cleanup() directly asked to turn into some RAII. It is worth coming up with something on this topic.

Source: https://habr.com/ru/post/273743/


All Articles