📜 ⬆️ ⬇️

Implementing demons on Node.js

Not long ago, I had the honor of doing the implementation of demons (workers) on Node.js for use in all projects developed by our company. We often develop projects where it is necessary to process and distribute video files on several servers, so we need to have a ready-made tool for this.


Formulation of the problem:



Immediately provide a link to GitHub of what happened: ( https://github.com/pipll/node-daemons ).


Run Workers


Each worker is a separate Node.js process. To create a process vorker used built-in cluster . It also controls the fall of the workers and restarts them.


Vorker startup code
'use strict'; const config = require('./config/config'); const _ = require('lodash'); const path = require('path'); const cluster = require('cluster'); const logger = require('log4js').getLogger('app'); const models = require('./models'); //         let shutdownInterval = null; if (cluster.isMaster) { //   _.each(config.workers, (conf, name) => { if (conf.enabled) { startWorker(name); } }); } else { //   let name = process.env.WORKER_NAME; let WorkerClass = require(path.join(__dirname, 'workers', name + '.js')); let worker = null; if (WorkerClass) { worker = new WorkerClass(name, config.workers[name]); //   worker.start(); //   ,    worker.on('stop', () => { process.exit(); }); } //      process.on('message', message => { if ('shutdown' === message) { if (worker) { worker.stop(); } else { process.exit(); } } }); } // Shutdown process.on('SIGTERM', shutdownCluster); process.on('SIGINT', shutdownCluster); //    function startWorker(name) { let worker = cluster.fork({WORKER_NAME: name}).on('online', () => { logger.info('Start %s worker #%d.', name, worker.id); }).on('exit', status => { //     if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) { //     ,      logger.info('Worker %s #%d was killed.', name, worker.id); } else { //     ,      logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id); startWorker(name); } }); } //    function shutdownCluster() { if (cluster.isMaster) { clearInterval(shutdownInterval); if (_.size(cluster.workers) > 0) { //      logger.info('Shutdown workers:', _.size(cluster.workers)); _.each(cluster.workers, worker => { try { worker.send('shutdown'); } catch (err) { logger.warn('Cannot send shutdown message to worker:', err); } }); //     shutdownInterval = setInterval(() => { if (_.size(cluster.workers) === 0) { process.exit(); } }, config.shutdownInterval); } else { process.exit(); } } } 

I would like to draw attention to a few points:



Base Worker Component


This component is designed to perform periodic processes and does not work with the task queue.


Base Worker Code
 'use strict'; const _ = require('lodash'); const Promise = require('bluebird'); const log4js = require('log4js'); const EventEmitter = require('events'); const WorkerStates = require('./worker_states'); class Worker extends EventEmitter { constructor(name, conf) { super(); this.name = name; //     this.conf = _.defaults({}, conf, { sleep: 1000 //    }); this.logger = log4js.getLogger('worker-' + name); //    this.stopped = true; //      loop  this.timer = null; //   this.state = null; } //    start() { this.logger.info('Start'); this.stopped = false; this.state = WorkerStates.STATE_IDLE; return this._startLoop(); } //    stop() { this.logger.info('Stop'); this.stopped = true; if (this.state === WorkerStates.STATE_IDLE) { //    if (this.timer) { clearTimeout(this.timer); this.timer = null; } this.state = WorkerStates.STATE_STOP; //     this.emit('stop'); } } //      loop() { return Promise.resolve(); } //     loop  _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { this.state = WorkerStates.STATE_IDLE; if (!this.stopped) { //       loop  this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } else { this.state = WorkerStates.STATE_STOP; //     this.emit('stop'); } }); } } module.exports = Worker; 

The simplest worker code might look like this:


 'use strict'; const Promise = require('bluebird'); const Worker = require('../components/worker'); class Sample extends Worker { loop() { this.logger.info("Loop method"); return Promise.resolve().delay(30000); } } module.exports = Sample; 

Some features:



Task processing component


This is the main component designed for parallel processing of tasks from the MySQL table.


Task Processing Code
 'use strict'; const config = require('../config/config'); const _ = require('lodash'); const Promise = require('bluebird'); const Worker = require('./worker'); const WorkerStates = require('./worker_states'); const models = require('../models'); class TaskWorker extends Worker { constructor(name, conf) { super(name, conf); //     this.conf = _.defaults({}, this.conf, { maxAttempts: 3, //      delayRatio: 300000, //      count: 1, //      queue: '', //      update: 3000 //     }); //     this.count = 0; } loop() { if (this.count < this.conf.count && !this.stopped) { //    return this._getTask().then(task => { if (task) { //      this.count++; //      let interval = setInterval(() => { return models.sequelize.transaction(t => { return task.touch({transaction: t}); }); }, this.conf.update); //     this.handleTask(task.get({plain: true})).then(() => { //   return models.sequelize.transaction(t => { return task.complete({transaction: t}).then(() => { this.logger.info('Task completed:', task.id); }); }); }).catch(err => { //     -   this.logger.warn('Handle error:', err); return this.delay(task).then(delay => { return models.sequelize.transaction(t => { return task.fail(delay, {transaction: t}).then(() => { this.logger.warn('Task failed:', task.id); }); }); }); }).finally(() => { clearInterval(interval); this.count--; }).done(); return null; } }); } else { return Promise.resolve(); } } //    handleTask() { return Promise.resolve(); } //        delay(task) { return Promise.resolve().then(() => { return task.attempts * this.conf.delayRatio; }); } //      _getTask() { return models.sequelize.transaction({autocommit: false}, t => { return models.Task.scope({ method: ['forWork', this.conf.queue, config.node_id] }).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => { if (task) { return task.work(config.node_id, {transaction: t}); } }); }); } _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { if (this.count === 0) { this.state = WorkerStates.STATE_IDLE; } if (this.stopped && this.count === 0) { this.state = WorkerStates.STATE_STOP; this.emit('stop'); } else { this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } }); } } module.exports = TaskWorker; 

The simplest worker code might look like this:


 'use strict'; const Promise = require('bluebird'); const TaskWorker = require('../components/task_worker'); class Sample extends TaskWorker { handleTask(task) { this.logger.info('Sample Task:', task); return Promise.resolve().delay(30000); } } module.exports = Sample; 

Features of the component:



Task Model


The sequelize module is used to work with database models. All tasks are in the tasks table. The table has the following structure:


FieldType ofDescription
idinteger autoincrementTask id
node_idinteger nullableID of the node for which the task is intended
queuestringTask queue
statusenumTask status
attemptsintegerNumber of task start attempts
priorityintegerTask priority
bodystringTask body in JSON format
start_atdatetime, nullableDate and time the task began processing
finish_atdatetime, nullableDate and time of the end of the processing task (analogue TTL)
worker_node_idinteger nullableID of the node that started processing the task
worker_started_atdatetime, nullableDate and time the task began processing
checked_atdatetime, nullableDate and time of task status update
created_atdatetime, nullableDate and time of task creation
updated_atdatetime, nullableDate and time of task change

A task can be assigned either to a specific node or to any node in the cluster. This is governed by the node_id field during task creation. The task can be started with a delay ( start_at field) and with a limited processing time ( finish_at field).


Different workers work with different queues of tasks specified in the queue field. Processing priority is set in the priority field (the higher, the higher the priority). The number of task restarts is saved in the attempts field. In the body of the task (the body field) in JSON format the parameters necessary for the worker are passed.


The checked_at field serves as an indication of a running task. Its value changes all the time while the task is running. If the value of the checked_at field has not changed for a long time, and the task is in the working status, the task is considered failed and its status changes to failure .


Task Model Code
 'use strict'; const moment = require('moment'); module.exports = function(sequelize, Sequelize) { return sequelize.define('Task', { id: { type: Sequelize.INTEGER, primaryKey: true, autoIncrement: true }, node_id: { type: Sequelize.INTEGER }, queue: { type: Sequelize.STRING, allowNull: false }, status: { type: Sequelize.ENUM, values: ['pending', 'working', 'done', 'failure'], defaultValue: 'pending', allowNull: false }, attempts: { type: Sequelize.INTEGER, defaultValue: 0, allowNull: false }, priority: { type: Sequelize.INTEGER, defaultValue: 10, allowNull: false }, body: { type: Sequelize.TEXT, set: function(body) { return this.setDataValue('body', JSON.stringify(body)); }, get: function() { try { return JSON.parse(this.getDataValue('body')); } catch (e) { return null; } } }, start_at: { type: Sequelize.DATE }, finish_at: { type: Sequelize.DATE }, worker_node_id: { type: Sequelize.INTEGER }, worker_started_at: { type: Sequelize.DATE }, checked_at: { type: Sequelize.DATE } }, { tableName: 'tasks', freezeTableName: true, underscored: true, scopes: { forWork: function(queue, node_id) { return { where: { node_id: { $or: [ null, node_id ] }, queue: queue, status: 'pending', start_at: { $or: [ null, { $lt: moment().toDate() } ] }, finish_at: { $or: [ null, { $gte: moment().toDate() } ] } }, order: [ ['priority', 'DESC'], ['attempts', 'ASC'], [sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC'] ] }; } }, instanceMethods: { fail: function(delay, options) { this.start_at = delay ? moment().add(delay, 'ms').toDate() : null; this.attempts = sequelize.literal('attempts + 1'); this.status = 'failure'; return this.save(options); }, complete: function(options) { this.status = 'done'; return this.save(options); }, work: function(node_id, options) { this.status = 'working'; this.worker_node_id = node_id; this.worker_started_at = moment().toDate(); return this.save(options); }, check: function(options) { this.checked_at = moment().toDate(); return this.save(options); } } }); }; 

Task life cycle


All tasks go through the following life cycle:


  1. A new task is created with the pending status.
  2. When it is time to process the task, the first free worker receives it, translates it into working status and fills in the worker_node_id and worker_started_at .
  3. During the processing of the task, the worker with a certain frequency (every 10 seconds by default) updates the checked_at field to checked_at about the correct operation.
  4. At the end of work on a task there can be several scenarios:
    4.1. If the task is successfully completed, the task is transferred to the done status.
    4.2. If the task is completed, the task is transferred to the failure status, the number of attempts increases, and the launch is postponed for a specified amount of time (calculated in the delay method based on the number of attempts and the delayRatio setting).

The project also has a built-in Manager module that runs on each node and does the following task processing:


  1. Transfers hung tasks to failure status.
  2. Transfers failed tasks to pending status for new processing.
  3. Removes outstanding tasks that have already expired.
  4. Removes successfully completed tasks with a delay of 1 hour (can be configured).
  5. Removes failed completed tasks with the spent number of launch attempts with a delay of 3 days (can be configured).

In addition, this module works with on / off nodes.


Manager Module Code
 'use strict'; const _ = require('lodash'); const moment = require('moment'); const Promise = require('bluebird'); const Worker = require('../components/worker'); const models = require('../models'); const config = require('../config/config'); class Manager extends Worker { constructor(name, conf) { super(name, conf); this.conf = _.defaults({}, this.conf, { maxUpdate: 30000, // 30 seconds maxCompleted: 3600000, // 1 hour maxFailed: 259200000 // 3 days }); } loop() { return models.sequelize.transaction(t => { return Promise.resolve() .then(() => { return this._checkCurrentNode(t); }) .then(() => { return this._activateNodes(t); }) .then(() => { return this._pauseNodes(t); }) .then(() => { return this._restoreFrozenTasks(t); }) .then(() => { return this._restoreFailedTasks(t); }) .then(() => { return this._deleteDeadTasks(t); }) .then(() => { return this._deleteCompletedTasks(t); }) .then(() => { return this._deleteFailedTasks(t); }); }); } _checkCurrentNode(t) { return models.Node.findById(config.node_id, {transaction: t}).then(node => { if (node) { return node.check(); } }); } _activateNodes(t) { return models.Node.update({ is_active: true }, { where: { is_active: false, checked_at: { $gte: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Activate nodes:', count); } }); } _pauseNodes(t) { return models.Node.update({ is_active: false }, { where: { is_active: true, checked_at: { $lt: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Pause nodes:', count); } }); } _restoreFrozenTasks(t) { return models.Task.update({ status: 'failure', attempts: models.sequelize.literal('attempts + 1') }, { where: { status: 'working', checked_at: { $lt: moment().subtract(this.conf.maxUpdate).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore frozen tasks:', count); } }); } _restoreFailedTasks(t) { let where = [{status: 'failure'}]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.update({ status: 'pending', worker_node_id: null, worker_started_at: null }, { where: where, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore failure tasks:', count); } }); } _deleteDeadTasks(t) { return models.Task.destroy({ where: { status: 'pending', finish_at: { $lt: moment().toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete dead tasks:', count); } }); } _deleteCompletedTasks(t) { return models.Task.destroy({ where: { status: 'done', checked_at: { $lt: moment().subtract(this.conf.maxCompleted).toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete completed tasks:', count); } }); } _deleteFailedTasks(t) { let where = [ {status: 'failure'}, {checked_at: { $lt: moment().subtract(this.conf.maxFailed).toDate() }} ]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.destroy({ where: where, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete failed tasks:', count); } }); } _failedTasksConditions() { let conditions = []; _.each(config.workers, (worker) => { if (worker.queue) { let item = {queue: worker.queue}; if (worker.maxAttempts !== undefined) { item.attempts = { $lt: worker.maxAttempts }; } conditions.push(item); } }); return conditions; } } module.exports = Manager; 

Conclusions and future plans


In general, it turned out to be a good and fairly reliable tool for working with background tasks that we can share with the community. We have developed the main ideas and principles used in this project over several years of work on various projects.


In the project development plans:



I will be glad to constructive criticism and answer the questions of interest in the comments.


')

Source: https://habr.com/ru/post/303574/


All Articles