Not long ago, I had the honor of doing the implementation of demons (workers) on Node.js for use in all projects developed by our company. We often develop projects where it is necessary to process and distribute video files on several servers, so we need to have a ready-made tool for this.
Formulation of the problem:
Immediately provide a link to GitHub of what happened: ( https://github.com/pipll/node-daemons ).
Each worker is a separate Node.js process. To create a process vorker used built-in cluster . It also controls the fall of the workers and restarts them.
'use strict'; const config = require('./config/config'); const _ = require('lodash'); const path = require('path'); const cluster = require('cluster'); const logger = require('log4js').getLogger('app'); const models = require('./models'); // let shutdownInterval = null; if (cluster.isMaster) { // _.each(config.workers, (conf, name) => { if (conf.enabled) { startWorker(name); } }); } else { // let name = process.env.WORKER_NAME; let WorkerClass = require(path.join(__dirname, 'workers', name + '.js')); let worker = null; if (WorkerClass) { worker = new WorkerClass(name, config.workers[name]); // worker.start(); // , worker.on('stop', () => { process.exit(); }); } // process.on('message', message => { if ('shutdown' === message) { if (worker) { worker.stop(); } else { process.exit(); } } }); } // Shutdown process.on('SIGTERM', shutdownCluster); process.on('SIGINT', shutdownCluster); // function startWorker(name) { let worker = cluster.fork({WORKER_NAME: name}).on('online', () => { logger.info('Start %s worker #%d.', name, worker.id); }).on('exit', status => { // if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) { // , logger.info('Worker %s #%d was killed.', name, worker.id); } else { // , logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id); startWorker(name); } }); } // function shutdownCluster() { if (cluster.isMaster) { clearInterval(shutdownInterval); if (_.size(cluster.workers) > 0) { // logger.info('Shutdown workers:', _.size(cluster.workers)); _.each(cluster.workers, worker => { try { worker.send('shutdown'); } catch (err) { logger.warn('Cannot send shutdown message to worker:', err); } }); // shutdownInterval = setInterval(() => { if (_.size(cluster.workers) === 0) { process.exit(); } }, config.shutdownInterval); } else { process.exit(); } } } I would like to draw attention to a few points:
WORKER_NAME environment WORKER_NAME is passed the name of the worker being started.process.exit() .setInterval and when all workers are stopped, makes process.exit() .This component is designed to perform periodic processes and does not work with the task queue.
'use strict'; const _ = require('lodash'); const Promise = require('bluebird'); const log4js = require('log4js'); const EventEmitter = require('events'); const WorkerStates = require('./worker_states'); class Worker extends EventEmitter { constructor(name, conf) { super(); this.name = name; // this.conf = _.defaults({}, conf, { sleep: 1000 // }); this.logger = log4js.getLogger('worker-' + name); // this.stopped = true; // loop this.timer = null; // this.state = null; } // start() { this.logger.info('Start'); this.stopped = false; this.state = WorkerStates.STATE_IDLE; return this._startLoop(); } // stop() { this.logger.info('Stop'); this.stopped = true; if (this.state === WorkerStates.STATE_IDLE) { // if (this.timer) { clearTimeout(this.timer); this.timer = null; } this.state = WorkerStates.STATE_STOP; // this.emit('stop'); } } // loop() { return Promise.resolve(); } // loop _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { this.state = WorkerStates.STATE_IDLE; if (!this.stopped) { // loop this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } else { this.state = WorkerStates.STATE_STOP; // this.emit('stop'); } }); } } module.exports = Worker; The simplest worker code might look like this:
'use strict'; const Promise = require('bluebird'); const Worker = require('../components/worker'); class Sample extends Worker { loop() { this.logger.info("Loop method"); return Promise.resolve().delay(30000); } } module.exports = Sample; Some features:
loop method is intended for inheritance in descendants and implementation of business tasks. The return value of this method should be Promise .loop method is finished, it starts again after the time specified in the settings of the worker.loop method.loop method is running.This is the main component designed for parallel processing of tasks from the MySQL table.
'use strict'; const config = require('../config/config'); const _ = require('lodash'); const Promise = require('bluebird'); const Worker = require('./worker'); const WorkerStates = require('./worker_states'); const models = require('../models'); class TaskWorker extends Worker { constructor(name, conf) { super(name, conf); // this.conf = _.defaults({}, this.conf, { maxAttempts: 3, // delayRatio: 300000, // count: 1, // queue: '', // update: 3000 // }); // this.count = 0; } loop() { if (this.count < this.conf.count && !this.stopped) { // return this._getTask().then(task => { if (task) { // this.count++; // let interval = setInterval(() => { return models.sequelize.transaction(t => { return task.touch({transaction: t}); }); }, this.conf.update); // this.handleTask(task.get({plain: true})).then(() => { // return models.sequelize.transaction(t => { return task.complete({transaction: t}).then(() => { this.logger.info('Task completed:', task.id); }); }); }).catch(err => { // - this.logger.warn('Handle error:', err); return this.delay(task).then(delay => { return models.sequelize.transaction(t => { return task.fail(delay, {transaction: t}).then(() => { this.logger.warn('Task failed:', task.id); }); }); }); }).finally(() => { clearInterval(interval); this.count--; }).done(); return null; } }); } else { return Promise.resolve(); } } // handleTask() { return Promise.resolve(); } // delay(task) { return Promise.resolve().then(() => { return task.attempts * this.conf.delayRatio; }); } // _getTask() { return models.sequelize.transaction({autocommit: false}, t => { return models.Task.scope({ method: ['forWork', this.conf.queue, config.node_id] }).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => { if (task) { return task.work(config.node_id, {transaction: t}); } }); }); } _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { if (this.count === 0) { this.state = WorkerStates.STATE_IDLE; } if (this.stopped && this.count === 0) { this.state = WorkerStates.STATE_STOP; this.emit('stop'); } else { this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } }); } } module.exports = TaskWorker; The simplest worker code might look like this:
'use strict'; const Promise = require('bluebird'); const TaskWorker = require('../components/task_worker'); class Sample extends TaskWorker { handleTask(task) { this.logger.info('Sample Task:', task); return Promise.resolve().delay(30000); } } module.exports = Sample; Features of the component:
SELECT ... FOR UPDATE and the subsequent UPDATE records in the database in the same transaction with the autocommit disabled. This allows you to get exclusive access to the task even with simultaneous requests from multiple servers.Promise status returned by the handleTask method. If successful, the task is marked as completed. Otherwise, the task is marked as a failure and starts with a delay specified in the delay method.The sequelize module is used to work with database models. All tasks are in the tasks table. The table has the following structure:
| Field | Type of | Description |
|---|---|---|
id | integer autoincrement | Task id |
node_id | integer nullable | ID of the node for which the task is intended |
queue | string | Task queue |
status | enum | Task status |
attempts | integer | Number of task start attempts |
priority | integer | Task priority |
body | string | Task body in JSON format |
start_at | datetime, nullable | Date and time the task began processing |
finish_at | datetime, nullable | Date and time of the end of the processing task (analogue TTL) |
worker_node_id | integer nullable | ID of the node that started processing the task |
worker_started_at | datetime, nullable | Date and time the task began processing |
checked_at | datetime, nullable | Date and time of task status update |
created_at | datetime, nullable | Date and time of task creation |
updated_at | datetime, nullable | Date and time of task change |
A task can be assigned either to a specific node or to any node in the cluster. This is governed by the node_id field during task creation. The task can be started with a delay ( start_at field) and with a limited processing time ( finish_at field).
Different workers work with different queues of tasks specified in the queue field. Processing priority is set in the priority field (the higher, the higher the priority). The number of task restarts is saved in the attempts field. In the body of the task (the body field) in JSON format the parameters necessary for the worker are passed.
The checked_at field serves as an indication of a running task. Its value changes all the time while the task is running. If the value of the checked_at field has not changed for a long time, and the task is in the working status, the task is considered failed and its status changes to failure .
'use strict'; const moment = require('moment'); module.exports = function(sequelize, Sequelize) { return sequelize.define('Task', { id: { type: Sequelize.INTEGER, primaryKey: true, autoIncrement: true }, node_id: { type: Sequelize.INTEGER }, queue: { type: Sequelize.STRING, allowNull: false }, status: { type: Sequelize.ENUM, values: ['pending', 'working', 'done', 'failure'], defaultValue: 'pending', allowNull: false }, attempts: { type: Sequelize.INTEGER, defaultValue: 0, allowNull: false }, priority: { type: Sequelize.INTEGER, defaultValue: 10, allowNull: false }, body: { type: Sequelize.TEXT, set: function(body) { return this.setDataValue('body', JSON.stringify(body)); }, get: function() { try { return JSON.parse(this.getDataValue('body')); } catch (e) { return null; } } }, start_at: { type: Sequelize.DATE }, finish_at: { type: Sequelize.DATE }, worker_node_id: { type: Sequelize.INTEGER }, worker_started_at: { type: Sequelize.DATE }, checked_at: { type: Sequelize.DATE } }, { tableName: 'tasks', freezeTableName: true, underscored: true, scopes: { forWork: function(queue, node_id) { return { where: { node_id: { $or: [ null, node_id ] }, queue: queue, status: 'pending', start_at: { $or: [ null, { $lt: moment().toDate() } ] }, finish_at: { $or: [ null, { $gte: moment().toDate() } ] } }, order: [ ['priority', 'DESC'], ['attempts', 'ASC'], [sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC'] ] }; } }, instanceMethods: { fail: function(delay, options) { this.start_at = delay ? moment().add(delay, 'ms').toDate() : null; this.attempts = sequelize.literal('attempts + 1'); this.status = 'failure'; return this.save(options); }, complete: function(options) { this.status = 'done'; return this.save(options); }, work: function(node_id, options) { this.status = 'working'; this.worker_node_id = node_id; this.worker_started_at = moment().toDate(); return this.save(options); }, check: function(options) { this.checked_at = moment().toDate(); return this.save(options); } } }); }; All tasks go through the following life cycle:
worker_node_id and worker_started_at .checked_at field to checked_at about the correct operation.attempts increases, and the launch is postponed for a specified amount of time (calculated in the delay method based on the number of attempts and the delayRatio setting).The project also has a built-in Manager module that runs on each node and does the following task processing:
In addition, this module works with on / off nodes.
'use strict'; const _ = require('lodash'); const moment = require('moment'); const Promise = require('bluebird'); const Worker = require('../components/worker'); const models = require('../models'); const config = require('../config/config'); class Manager extends Worker { constructor(name, conf) { super(name, conf); this.conf = _.defaults({}, this.conf, { maxUpdate: 30000, // 30 seconds maxCompleted: 3600000, // 1 hour maxFailed: 259200000 // 3 days }); } loop() { return models.sequelize.transaction(t => { return Promise.resolve() .then(() => { return this._checkCurrentNode(t); }) .then(() => { return this._activateNodes(t); }) .then(() => { return this._pauseNodes(t); }) .then(() => { return this._restoreFrozenTasks(t); }) .then(() => { return this._restoreFailedTasks(t); }) .then(() => { return this._deleteDeadTasks(t); }) .then(() => { return this._deleteCompletedTasks(t); }) .then(() => { return this._deleteFailedTasks(t); }); }); } _checkCurrentNode(t) { return models.Node.findById(config.node_id, {transaction: t}).then(node => { if (node) { return node.check(); } }); } _activateNodes(t) { return models.Node.update({ is_active: true }, { where: { is_active: false, checked_at: { $gte: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Activate nodes:', count); } }); } _pauseNodes(t) { return models.Node.update({ is_active: false }, { where: { is_active: true, checked_at: { $lt: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Pause nodes:', count); } }); } _restoreFrozenTasks(t) { return models.Task.update({ status: 'failure', attempts: models.sequelize.literal('attempts + 1') }, { where: { status: 'working', checked_at: { $lt: moment().subtract(this.conf.maxUpdate).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore frozen tasks:', count); } }); } _restoreFailedTasks(t) { let where = [{status: 'failure'}]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.update({ status: 'pending', worker_node_id: null, worker_started_at: null }, { where: where, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore failure tasks:', count); } }); } _deleteDeadTasks(t) { return models.Task.destroy({ where: { status: 'pending', finish_at: { $lt: moment().toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete dead tasks:', count); } }); } _deleteCompletedTasks(t) { return models.Task.destroy({ where: { status: 'done', checked_at: { $lt: moment().subtract(this.conf.maxCompleted).toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete completed tasks:', count); } }); } _deleteFailedTasks(t) { let where = [ {status: 'failure'}, {checked_at: { $lt: moment().subtract(this.conf.maxFailed).toDate() }} ]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.destroy({ where: where, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete failed tasks:', count); } }); } _failedTasksConditions() { let conditions = []; _.each(config.workers, (worker) => { if (worker.queue) { let item = {queue: worker.queue}; if (worker.maxAttempts !== undefined) { item.attempts = { $lt: worker.maxAttempts }; } conditions.push(item); } }); return conditions; } } module.exports = Manager; In general, it turned out to be a good and fairly reliable tool for working with background tasks that we can share with the community. We have developed the main ideas and principles used in this project over several years of work on various projects.
In the project development plans:
progress field) and adding a method to the TaskWorker module to update this information.I will be glad to constructive criticism and answer the questions of interest in the comments.
Source: https://habr.com/ru/post/303574/
All Articles