📜 ⬆️ ⬇️

Using Database Connections in Multi-threaded Qt Applications

When writing a multi-service system in bark, each service must be multithreaded, faced with the problem of using a database connection. Services are developed in QT, so they used the QtSql module to interact with the database.

Problems


  1. Each thread requires its own database connection (QSqlDatabase). When using one connection from different streams, a segmentation error occurs.
  2. Since at the current time it is possible to keep open a limited number of connections to the database, it is necessary to implement the capture, release and waiting for the connection by threads.
  3. In the context of the flow, in order to work properly with transactions, you need to work with only one connection. For example: Entity order contains Entity entities. When you save the order must be saved all the goods. If an exceptional situation arises while saving the goods, then the whole saving transaction must cancel the order.
  4. The library should be able to work with several databases simultaneously, and of different types (Mysql, PostgreSQL)


Decision


As a result, we got 3 classes:


Connection

In the class constructor, the QSqlDatabase _conn member is initialized and the connection is opened (open):
Connection::Connection(const QString& ident, const QString& driver, const QString& dbHost, const QString& dbName, const QString& dbUser, const QString& dbPassword, int dbPort) : _threadId(0), _countRef(0), _countBegins(0), _retryCount(0),_invalid(false) { _conn = QSqlDatabase::addDatabase(driver, ident); _conn.setHostName(dbHost); _conn.setDatabaseName(dbName); _conn.setUserName(dbUser); _conn.setPassword(dbPassword); _conn.setPort(dbPort), open(); } 

Basic prototypes method for working with DB
  void exec(QSqlQuery& sql); void exec(const char* sql); void exec(const QString& sql); /** *   ,         */ template <typename T> void fetchCol(QSqlQuery& sql,QList<T>& result)... template <typename T> QList<T> fetchCol(QSqlQuery& sql) ... template <typename T> QList<T> fetchCol(const char* sql) ... template <typename T> QList<T> fetchCol(const QString& sql) ... /* *         */ template <typename T> T fetchOne(QSqlQuery& sql, bool* ok = 0) ... template <typename T> T fetchOne(const QString& sql, bool* ok = 0) ... template <typename T> T fetchOne(const char* sql, bool* ok = 0) ... /* *     */ void fetchRow(QSqlQuery& sql,QVariantMap& res); QVariantMap fetchRow(const QString& sql); QVariantMap fetchRow(const char* sql); QVariantMap fetchRow(QSqlQuery& sql); /** *     */ void query(QSqlQuery& sql,QList<QVariant>& result); QList<QVariant> query(const char* sql); QList<QVariant> query(QSqlQuery& sql); QList<QVariant> query(const QString& sql); /*  query */ void fetchAll(QSqlQuery& sql,QList<QVariant>& result); QList<QVariant> fetchAll(const char* sql); QList<QVariant> fetchAll(QSqlQuery& sql); QList<QVariant> fetchAll(const QString& sql); 

Since in QT, to work with a database, QSqlQuery is used, which depends on QSqlDatabase, then it is strictly necessary to use methods for creating queries:
  QSqlQuery createQuery() const { return QSqlQuery(_conn);} QSqlQuery createQuery(const QString& sql) { return QSqlQuery(sql, _conn); } 

')
ManagedConnection

The class "links" Connection and ConnetcionManager.
When an object is created, an attempt is made to request a connection from the ConnetcionManager by identifier (for example, db1conn). After capture, the member pointer to the connection is initialized. For convenience, the operator is overridden -> so that Connection methods are called.
Typically, an application requires connecting to only one database. Therefore, it was customary to give it the identifier “default”.
The typedef type of ManagedConnection DConn will allow to get a connection. for example
 DB::DConn conn; // DB::ManagedConnection conn("default'); //   DB::ManagedConnection c1("db1conn); DB::ManagedConnection c2("db2conn); 


Take for example the call stack on pseudocode. Order (Order) saves its data in the database and causes its member to save (there are a lot of them in the IDAL). Item saves its data in the database and causes its Data member to be saved. Data saves its data in the database. As a result, the nesting on 3 levels:
 Order : save(){ DB::DBConn conn; // . countRef = 1 conn->query('INSERT INTO order...'); item->save(); Item:save() { DB::DBConn conn;// . countRef = 2 conn->query('INSERT INTO item...'); data->save(); Data:save() { DB::DBConn conn;// . countRef = 3 conn->query('INSERT INTO data...'); // ,   ~ManagedConnection countRef = 2 } // ,   ~ManagedConnection countRef = 1 } // ,   ~ManagedConnection countRef = 0 } } } 

Connectionmanager

Keep the settings for the connection: host, port, database type, etc.
For example, for an application, you must have a connection with the db1 database of the MySQL type and db2 of the PostgresSQL type. The configuration in the configuration file will look like this:
 [database] size = 2 ; 1\ident=db1conn ; ,    MySQL 1\driver=QMYSQL ;  1\host=localhost ;   1\name=db1 ;  1\user=db1_user ; 1\password=lol ; 1\port=3306 ; -         1\max_count = 30 2\ident=db2conn 2\driver=QPSQL 2\host=localhost 2\name=test 2\user=postgres 2\password= 2\port=5432 2\max_count = 30 

When the application starts, the config is read and converted to QVariantMap.
An example of initialization in Application
 Application::Application(int& argc, char** argv): QCoreApplication(argc, argv) { ... QVariantMap stgs = settings(); DB::ConnectionManager::init(stgs); ... } 


The static member of the ConnectionManager is initialized from the config (static QMap <QString, ConnectionManager *> _instances;)
The identifier from the ident config will be used as the key in the map.
 void ConnectionManager::init(const QVariantMap& settings) { const int size = settings.value("database/size").toInt(); for (int i = 1; i <= size; ++i) { const QString ident = settings.value(QString("database/%1/ident").arg(i), "default").toString(); ConnectionManager* inst = new ConnectionManager( ident, settings.value(QString("database/%1/driver").arg(i)).toString(), ... ); _instances[ident] = inst; Log::info(QString("ConnectionManager::init: [%1] [%2@%3:%4] ").arg(inst->_driver).arg(inst->_dbUser).arg(inst->_dbHost).arg(inst->_dbPort)); } } 


The identifier from the ident config will be used as the key in the map.
The main class method getConnection (explained in the code comments):

 Connection* ConnectionManager::getConnection() { Connection* conn = 0; int count = 0; while(count++ < MAX_RETRY_GET_CONN_COUNT) { pthread_t thread_id = pthread_self(); //  ,           { QMutexLocker mlocker(&_mutex); for (int i = 0; i < _pool.size(); ++i) { conn = _pool.at(i); //    if (conn && conn->threadID() == thread_id && conn->isValid()) { //         conn->lock(); //Log::debug(QString("ConnectionManager::getConnection          thread [%1])").arg(conn->name())); return conn; } } } //       ,       { QMutexLocker mlocker(&_mutex); for (int i = 0; i < _pool.size(); ++i) { conn = _pool.at(i); if (conn && !conn->isLocked() && conn->isValid() ) { //Log::debug(QString("ConnectionManager::getConnection    [%1])").arg(conn->name())); //   conn->lock(); return conn; } } } //       { QMutexLocker mlocker(&_mutex); if(_currentCount < _maxCount) { //      //    //try { conn = new Connection( QString("%1_%2").arg(_ident).arg(_currentCount), _driver, _dbHost, _dbName, _dbUser, _dbPassword,_dbPort ); _currentCount++; conn->lock(); _pool.append(conn); return conn; /*} catch(exc::Message& ex) { delete conn; throw ex; }*/ } else { //   //Log::warn("  - [%d]    DB  [%d]",_maxCount,count); /*for (int i = 0; i < _pool.size(); ++i) { conn = _pool.at(i); if (!conn->isValid() && !conn->isLocked() ) { removeConnection(conn); break; } }*/ } } // , //     sleep(2); } Log::crit(" %d       ",MAX_RETRY_GET_CONN_COUNT); { QMutexLocker mlocker(&_mutex); for (int i = 0; i < _pool.size(); ++i) { conn = _pool.at(i); if (!conn->isValid() && !conn->isLocked()) { removeConnection(conn); break; } } } throw exc::Message("     "); //return 0; } 

Work logic


ConnetctionManager is initialized from the config in order to know with what settings to create connections, and what is their maximum number.
When creating a DB :: ManagedConnection instance, the ConnetctionManager is accessed and an attempt is made to get a pointer to the Connetction from the ConnetctionManager :: getConnetction.
In ConnetctionManager :: getConnetction using several attempts occurs:
  1. An attempt to take from the drive a connection whose thread_id matches the current one. If found then return by increasing the refCount of the connection by 1
  2. Attempt to take a free connection from the drive. If it is found to return, increasing the connection refCount by 1
  3. If all connections are busy and the max is not reached. limit, then create a new connection, put it in the pool and return

After deleting an instance of the DB :: ManagedConnection class, the connection's refCount decreases. If refCount == 0, the connection becomes available for capturing other streams.

Usage example



 QList<QVariant> Bank::banksByAccountNumber(const QString& accountNumber) { QList<QVariant> res; DB::ManagedConnection conn; foreach(const QVariant& row, conn->fetchAll( "SELECT `real` as state ,namen as name,namep as full_name," "newnum as bik,ksnp as korr_acc,okpo as okpo,nnp as city," "ind as zip, adr as address,regn as regnum, telef as phones FROM `bankdinfo` WHERE `real` = '' ORDER BY RAND()" )) { if(isBelongToBank((row.toMap()["bik"]).toString(),accountNumber)) { res.append(row); } } Log::debug("Banks found %d",res.size()); return res; } 


Sources on Github
UPD
At peak load, a maximum number of connections is created (specified in the config), but after a load drops, the connections do not close and take up resources. It turns out that it is necessary to launch a thread that will close the connections, leaving their optimal number, only it is not clear on the basis of what criteria it will determine at the moment this number.
During the operation, we faced the fact that MySQL breaks the connection once a day, and it happens that the DBMS must be reloaded. For application fault tolerance, we had to add not very flexible code in Connection:
 bool Connection::tryRetry(const QSqlError& err) { //    bool needRecon = false; QRegExp mrx("MySQL server has gone away"); if (err.type() == QSqlError::NoError && qstrcmp(_conn.driver()->handle().typeName(), "PGconn*") == 0) { needRecon = true; } else if ( /*(qstrcmp(_conn.driver()->handle().typeName(), "MYSQL*") == 0)*/ err.text().contains(mrx)) { needRecon = true; } bool ok = false; if (needRecon) { Log::err( "     - GONE AWAY.   ..."); bool lol = true; while(lol) { if (_retryCount >= MAX_RETRY_COUNT) { _retryCount = 0; _invalid = true; Log::crit( "       ."); throw exc::Message("     "); } lol = !reconnect(); if(!lol) { ok = true; _retryCount = 0; break; } _retryCount++; sleep(2); } } //_retryCount return ok; } void Connection::query(QSqlQuery& sql, QList<QVariant>& result) { if (!sql.exec()) { if (tryRetry(sql.lastError())) { if(isValid()) { QSqlQuery qs = createQuery(); cpQuery(sql, qs); Log::warn("   ..."); return query(qs, result); } else { throw exc::Message("    ");; } } else throw exc::Sql(sql); } ... } 

Source: https://habr.com/ru/post/164377/


All Articles