⬆️ ⬇️

PostgreSQL libpq connection pool

To work with PostgreSQL in C ++, there is a great library libpq. The library is well documented, there is even a full translation into Russian, from the PostgresPRO company.



When writing a server backend, I ran into the fact that there was no connection pool in this library, and work from the database was assumed to be quite intensive and one connection was obviously not enough. To establish a connection each time to send the received data would be insane, because connection is the longest operation, it was decided to write my own pool of connections.



The idea is that at the start of the program we create several connections and store them in a queue.



When the data arrives, we simply take the free connection from the queue, and if there are no free connections, wait for it to appear, use it to insert the data, and then put the connection back. The idea is quite simple, quick to implement and, most importantly, the speed of work is very high.

')

Let's create a database in PostgreSQL called demo, with a demo sign like this



structures
-- Table: public.demo -- DROP TABLE public.demo; CREATE TABLE public.demo ( id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass), name character varying(256), CONSTRAINT demo_pk PRIMARY KEY (id) ) WITH ( OIDS=FALSE ); ALTER TABLE public.demo OWNER TO postgres; 




We write the class that will be a connection to the database, the connection parameters will be written directly in the code to simplify it, in reality, of course, they must be stored in the configuration file and read from there when starting, so that when the server parameters change, you do not have to recompile the program.



pgconnection.h
 #ifndef PGCONNECTION_H #define PGCONNECTION_H #include <memory> #include <mutex> #include <libpq-fe.h> class PGConnection { public: PGConnection(); std::shared_ptr<PGconn> connection() const; private: void establish_connection(); std::string m_dbhost = "localhost"; int m_dbport = 5432; std::string m_dbname = "demo"; std::string m_dbuser = "postgres"; std::string m_dbpass = "postgres"; std::shared_ptr<PGconn> m_connection; }; #endif //PGCONNECTION_H 


pgconnection.cpp
 #include "pgconnection.h" PGConnection::PGConnection() { m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish ); if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 ) { throw std::runtime_error( PQerrorMessage( m_connection.get() ) ); } } std::shared_ptr<PGconn> PGConnection::connection() const { return m_connection; } std :: to_string (m_dbport) .c_str (), nullptr, nullptr, m_dbname.c_str (), m_dbuser.c_str (), m_dbpass.c_str ()), & PQfinish); #include "pgconnection.h" PGConnection::PGConnection() { m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish ); if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 ) { throw std::runtime_error( PQerrorMessage( m_connection.get() ) ); } } std::shared_ptr<PGconn> PGConnection::connection() const { return m_connection; } 




To prevent possible resource leaks, we will store the connection in a smart pointer.



In the constructor, we call the PQsetdbLogin function, which establishes a connection to the database, returning a pointer to the PGconn * connection and translate the connection into asynchronous mode of operation.



Upon completion of the work, the connection must be removed by the PQfinish function, to which the pointer returned by the PQsetdbLogin function is passed. Therefore, the last parameter in the m_connection.reset () call is the address of the & PQfinish function. When the smart pointer goes out of scope and the link counter is reset, it will call this function, thereby completing the connection correctly.



Now we need a class that will create, store and manage the work of the pool of connections.



pgbackend.h
 #ifndef PGBACKEND_H #define PGBACKEND_H #include <memory> #include <mutex> #include <string> #include <queue> #include <condition_variable> #include <libpq-fe.h> #include "pgconnection.h" class PGBackend { public: PGBackend(); std::shared_ptr<PGConnection> connection(); void freeConnection(std::shared_ptr<PGConnection>); private: void createPool(); std::mutex m_mutex; std::condition_variable m_condition; std::queue<std::shared_ptr<PGConnection>> m_pool; const int POOL = 10; }; #endif //PGBACKEND_H 




pgbackend.cpp
 #include <iostream> #include <thread> #include <fstream> #include <sstream> #include "pgbackend.h" PGBackend::PGBackend() { createPool(); } void PGBackend::createPool() { std::lock_guard<std::mutex> locker_( m_mutex ); for ( auto i = 0; i< POOL; ++i ){ m_pool.emplace ( std::make_shared<PGConnection>() ); } } std::shared_ptr<PGConnection> PGBackend::connection() { std::unique_lock<std::mutex> lock_( m_mutex ); while ( m_pool.empty() ){ m_condition.wait( lock_ ); } auto conn_ = m_pool.front(); m_pool.pop(); return conn_; } void PGBackend::freeConnection(std::shared_ptr<PGConnection> conn_) { std::unique_lock<std::mutex> lock_( m_mutex ); m_pool.push( conn_ ); lock_.unlock(); m_condition.notify_one(); } 




In the createPool function, we create a pool of connections, I set up 10 connections. Next, we create the PGBackend class, and work with it through the connection functions — which returns the free connection to the database, and freeConnection — which puts the connection back into the queue.



All this works on the basis of conditional variables, if the queue is empty, then there are no free connections, and the stream falls asleep until it is awakened through a conditional variable.



The simplest example that uses our backend with a pool of connections is given in the main.cpp file. In the "combat conditions" you will of course have some kind of cycle of events, upon the occurrence of which you will work with the database. I have this boost :: asio, which works asynchronously and accepting events from the network, writes everything to the database. It is unnecessary to bring it here, so as not to complicate the idea with a pool of connections. Here we simply create 50 threads that work with the server through one PGBackend instance.



main.cpp
 #include <thread> #include <iostream> #include "pgbackend.h" void testConnection(std::shared_ptr<PGBackend> pgbackend) { //   auto conn = pgbackend->connection(); std::string demo = "SELECT max(id) FROM demo; " ; PQsendQuery( conn->connection().get(), demo.c_str() ); while ( auto res_ = PQgetResult( conn->connection().get()) ) { if (PQresultStatus(res_) == PGRES_TUPLES_OK && PQntuples(res_)) { auto ID = PQgetvalue (res_ ,0, 0); std::cout<< ID<<std::endl; } if (PQresultStatus(res_) == PGRES_FATAL_ERROR){ std::cout<< PQresultErrorMessage(res_)<<std::endl; } PQclear( res_ ); } //    pgbackend->freeConnection(conn); } int main(int argc, char const *argv[]) { auto pgbackend = std::make_shared<PGBackend>(); std::vector<std::shared_ptr<std::thread>> vec; for ( size_t i = 0; i< 50 ; ++i ){ vec.push_back(std::make_shared<std::thread>(std::thread(testConnection, pgbackend))); } for(auto &i : vec) { i.get()->join(); } return 0; } 




This is compiled with the command:



 g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool -std=c++14 -I/usr/include/postgresql/ -lpq -lpthread 


Be careful with the number of database connections - this parameter is set by the max_connections (integer) parameter.



Source

Source: https://habr.com/ru/post/322966/



All Articles