📜 ⬆️ ⬇️

Pooling database connections on node.js

In this article I will describe two abstractions-classes written with nodejs tools that provide the functionality of distributing requests over open channels (tcp-socket). This takes into account the total system load and, if there are not enough channels, new ones are opened, as the total number of requests decreases, the “extra” channels are closed.

This client can be used to distribute requests across channels that are essentially net.Socket. To do this, you need to make changes to the method for opening and closing a channel, adding a request to the channel.

In the example that I will describe, the pg library is used, which provides functionality for opening sockets to the server with the database. In this case, the default connection pool management provided by the library is not used in any way.

To begin, consider the Connect class, with the help of which one entity will be managed - a connection:
')
Connect class
/*   ,      "pg://USER:PASSWORD@HOST:PORT/DATABASE" */ function Connect(connString) { //      this._connString = connString; //  ,     this._isRun = false; //      ,      "maxCount" this._maxQueryCount = 100; //  ,    _nextTick this._worked = false; //  ,    this._queryCount = 0; // ""  this._emitter = new (require('events').EventEmitter); //  "" var self = this; //      "open",      this._emitter.on('open', function() { self._arrayQuery = []; }); //      ,     ,    this._emitter.on('error', function(err) { throw err; }); //      ,    this._emitter.on('maxCount', function(message) { self._setMax = true; }); //        ,      , //      net.Socket pg.connect(this._connString, function(err, client, done) { if (err) { return self._emitter.emit('error', err); } //   ""    ,     self._client = client; // " "  self._done = done; //    (    ) self._emitter.emit('open'); }); } /* ,     ""    */ Connect.prototype.on = function(typeEvent, func) { if(typeEvent == 'error') { //          this._emitter.removeAllListeners('error'); } this._emitter.addListener(typeEvent, func); }; /* ,       */ Connect.prototype.start = function() { this._isRun = true; this._nextTick(); }; /* ,       */ Connect.prototype.stop = function() { this._isRun = false; }; /* ,    (  ) */ Connect.prototype.isFull = function() { return this._setMax; }; /* ,    (..     ,       ) */ Connect.prototype.close = function () { if(this._done) { this._emitter.emit('close'); this._done(); } else { this._emitter.emit('error', new Error('connect is not active')); } }; /* ,     */ Connect.prototype.queryQueue = function () { return this._arrayQuery; }; /*     -  .        ,  ,     */ Connect.prototype.addQuery = function (query, params, cb) { if(!(typeof query == 'string')) { return this._emitter.emit('error', new Error('not valid query')); } if( !(typeof params == "object") || !(params instanceof Array) ) { return this._emitter.emit('error', new Error('not valid argument')); } this._queryCount++; this._arrayQuery.push({ query: query, params: params, callback: cb }); if(this._queryCount > this._maxQueryCount) { this._emitter.emit('maxCount', 'in queue added too many requests, the waiting time increases'); } this._nextTick(); }; /*         */ Connect.prototype.maxQueryCount = function (count) { if(count) { this._maxQueryCount = count; } else { return this._maxQueryCount; } }; /*     */ Connect.prototype.queryCount = function () { return this._queryCount; }; /*   ,    ,     ,       ,             ( _arrayQuery)         */ Connect.prototype._nextTick = function() { var self = this; if(this._worked) { return; } while(this._isRun && this._arrayQuery.length>0) { this._worked = true; var el = this._arrayQuery.shift(); //     pg,     this._client.query(el.query, el.params, function(err, result) { self._queryCount--; if(err) { return el.callback(err); } el.callback(null, result); if(self._queryCount==0) { self._emitter.emit('drain'); self._setMax = false; } }) } this._worked = false; }; 




Now the Balancer class itself, which will manage our connections: open new ones, close unnecessary ones, distribute requests between them, provide a single entrance for the service

Class balancer
 /*  ,     */ function Balancer(minCountConnect, maxCountConnect) { //          this._maxCountConnect = maxCountConnect; //          this._minCountConnect = minCountConnect; //   this._connectArray = []; //   this._closedConnect = []; //   this._taskArray = []; //   this._run = false; //   this._emitter = new (require('events').EventEmitter); //   this._init(); } /*   ,   ,    */ Balancer.prototype._init = function() { this._cursor = 0; this.activQuery = 0; var self = this; var i=0; //   ,    var cycle = function() { i++; if(i<self._minCountConnect) { self._addNewConnect(cycle); } else { self._emitter.emit('ready'); } }; this._addNewConnect(cycle); }; /*  ,  ,    */ Balancer.prototype._addNewConnect = function(cb) { var self = this; var connect = new GPSconnect(connString); connect.on('open', function() { self._connectArray.push(connect); cb(); }); }; /* ,   ""      */ Balancer.prototype._cycle = function(pos) { for (var i=pos;i<this._connectArray.length;i++) { if( !(this._connectArray[i].isFull()) ) break; } return i; }; /* ,    */ Balancer.prototype._next = function(connect, el) { connect.addQuery(el.query, el.params, el.cb); connect.start(); this._distribution(); }; /*    -    .     "Round-robin"     .    ,      "",                ,      */ Balancer.prototype._distribution = function() { if(this._taskArray.length>0) { var el = this._taskArray.shift(); this._cursor = this._cycle(this._cursor); var self = this; if(this._cursor<this._connectArray.length) { var connect = this._connectArray[this._cursor]; this._next(connect, el); this._cursor++; } else { this._cursor = this._cycle(0); if(this._cursor<this._connectArray.length) { var connect = this._connectArray[this._cursor]; this._next(connect, el); this._cursor++; } else if( this._connectArray.length<this._maxCountConnect) { self._addNewConnect(function() { self._cursor = self._connectArray.length-1; var connect = self._connectArray[self._cursor]; self._next(connect, el); }); } else { for (var i=0;i<this._connectArray.length;i++) { if( this.activQuery/this._connectArray.length > this._connectArray[i].queryCount() ) { break; } } if(i==this._connectArray.length) { i = 0; } this._cursor = i; var connect = this._connectArray[this._cursor]; this._next(connect, el); } } } else { this._run = false; } }; /* ,     ""    */ Balancer.prototype.on = function(typeEvent, func) { this._emitter.addListener(typeEvent, func); }; /* ,      ,        ""      */ Balancer.prototype._removeLoad = function() { var self = this; var temp = this._connectArray[0].maxQueryCount().toFixed(); var currentCount = (this.activQuery/temp < this._minCountConnect) ? this._minCountConnect : temp; if(currentCount< this._connectArray.length ) { while( this._connectArray.length != currentCount ) { var poppedConnect = this._connectArray.pop(); if(poppedConnect.queryCount()==0) { poppedConnect.close(); } else { poppedConnect.index = self._closedConnect.length; poppedConnect.on('drain', function() { poppedConnect.close(); self._closedConnect.slice(poppedConnect.index, 1); }); self._closedConnect.push(poppedConnect); } } } }; /* C ,   -,     .  tube,       ,     . */ Balancer.prototype.addQuery = function(tube, query, params, cb) { this.activQuery++; var self = this; this._removeLoad(); var wrappCb = function() { self.activQuery--; cb.apply(this, arguments); }; this._taskArray.push({ query: query, params: params, cb: wrappCb }); if(!this._run) { this._run = true; this._distribution(); } }; 



How to check all this? For testing, I use the query “select pg_sleep (1)”, which runs for 1 second and simulates a query to the database.

10,000 such requests were processed by the balancer ~ 101590 ms, with the maximum number of connection requests equal to 100 and the limits of the total number of channels = sockets from 10 to 100.

Used script:

 var balancer = new Balancer(10,100); balancer.on('ready', function() { var y=0; var time = +new Date(); for(var i=0;i<10000; i++) { balancer.addQuery('gps', 'select pg_sleep(1)', [], function(err, result) { if(err) console.log(err); y++; if(y==10000) { console.log(balancer._connectArray.length); console.log(+new Date()-time); } }); } }); 


All sources are available on github .
The client is still raw, of course, much needs to be finished / rewritten, so please do not scold too much. If necessary, I can do it more closely.

Source: https://habr.com/ru/post/241255/


All Articles