Not long ago, I had the honor of doing the implementation of demons (workers) on Node.js for use in all projects developed by our company. We often develop projects where it is necessary to process and distribute video files on several servers, so we need to have a ready-made tool for this.
Formulation of the problem:
Immediately provide a link to GitHub of what happened: ( https://github.com/pipll/node-daemons ).
Each worker is a separate Node.js process. To create a process vorker used built-in cluster . It also controls the fall of the workers and restarts them.
'use strict'; const config = require('./config/config'); const _ = require('lodash'); const path = require('path'); const cluster = require('cluster'); const logger = require('log4js').getLogger('app'); const models = require('./models'); // let shutdownInterval = null; if (cluster.isMaster) { // _.each(config.workers, (conf, name) => { if (conf.enabled) { startWorker(name); } }); } else { // let name = process.env.WORKER_NAME; let WorkerClass = require(path.join(__dirname, 'workers', name + '.js')); let worker = null; if (WorkerClass) { worker = new WorkerClass(name, config.workers[name]); // worker.start(); // , worker.on('stop', () => { process.exit(); }); } // process.on('message', message => { if ('shutdown' === message) { if (worker) { worker.stop(); } else { process.exit(); } } }); } // Shutdown process.on('SIGTERM', shutdownCluster); process.on('SIGINT', shutdownCluster); // function startWorker(name) { let worker = cluster.fork({WORKER_NAME: name}).on('online', () => { logger.info('Start %s worker #%d.', name, worker.id); }).on('exit', status => { // if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) { // , logger.info('Worker %s #%d was killed.', name, worker.id); } else { // , logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id); startWorker(name); } }); } // function shutdownCluster() { if (cluster.isMaster) { clearInterval(shutdownInterval); if (_.size(cluster.workers) > 0) { // logger.info('Shutdown workers:', _.size(cluster.workers)); _.each(cluster.workers, worker => { try { worker.send('shutdown'); } catch (err) { logger.warn('Cannot send shutdown message to worker:', err); } }); // shutdownInterval = setInterval(() => { if (_.size(cluster.workers) === 0) { process.exit(); } }, config.shutdownInterval); } else { process.exit(); } } }
I would like to draw attention to a few points:
WORKER_NAME
environment WORKER_NAME
is passed the name of the worker being started.process.exit()
.setInterval
and when all workers are stopped, makes process.exit()
.This component is designed to perform periodic processes and does not work with the task queue.
'use strict'; const _ = require('lodash'); const Promise = require('bluebird'); const log4js = require('log4js'); const EventEmitter = require('events'); const WorkerStates = require('./worker_states'); class Worker extends EventEmitter { constructor(name, conf) { super(); this.name = name; // this.conf = _.defaults({}, conf, { sleep: 1000 // }); this.logger = log4js.getLogger('worker-' + name); // this.stopped = true; // loop this.timer = null; // this.state = null; } // start() { this.logger.info('Start'); this.stopped = false; this.state = WorkerStates.STATE_IDLE; return this._startLoop(); } // stop() { this.logger.info('Stop'); this.stopped = true; if (this.state === WorkerStates.STATE_IDLE) { // if (this.timer) { clearTimeout(this.timer); this.timer = null; } this.state = WorkerStates.STATE_STOP; // this.emit('stop'); } } // loop() { return Promise.resolve(); } // loop _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { this.state = WorkerStates.STATE_IDLE; if (!this.stopped) { // loop this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } else { this.state = WorkerStates.STATE_STOP; // this.emit('stop'); } }); } } module.exports = Worker;
The simplest worker code might look like this:
'use strict'; const Promise = require('bluebird'); const Worker = require('../components/worker'); class Sample extends Worker { loop() { this.logger.info("Loop method"); return Promise.resolve().delay(30000); } } module.exports = Sample;
Some features:
loop
method is intended for inheritance in descendants and implementation of business tasks. The return value of this method should be Promise
.loop
method is finished, it starts again after the time specified in the settings of the worker.loop
method.loop
method is running.This is the main component designed for parallel processing of tasks from the MySQL table.
'use strict'; const config = require('../config/config'); const _ = require('lodash'); const Promise = require('bluebird'); const Worker = require('./worker'); const WorkerStates = require('./worker_states'); const models = require('../models'); class TaskWorker extends Worker { constructor(name, conf) { super(name, conf); // this.conf = _.defaults({}, this.conf, { maxAttempts: 3, // delayRatio: 300000, // count: 1, // queue: '', // update: 3000 // }); // this.count = 0; } loop() { if (this.count < this.conf.count && !this.stopped) { // return this._getTask().then(task => { if (task) { // this.count++; // let interval = setInterval(() => { return models.sequelize.transaction(t => { return task.touch({transaction: t}); }); }, this.conf.update); // this.handleTask(task.get({plain: true})).then(() => { // return models.sequelize.transaction(t => { return task.complete({transaction: t}).then(() => { this.logger.info('Task completed:', task.id); }); }); }).catch(err => { // - this.logger.warn('Handle error:', err); return this.delay(task).then(delay => { return models.sequelize.transaction(t => { return task.fail(delay, {transaction: t}).then(() => { this.logger.warn('Task failed:', task.id); }); }); }); }).finally(() => { clearInterval(interval); this.count--; }).done(); return null; } }); } else { return Promise.resolve(); } } // handleTask() { return Promise.resolve(); } // delay(task) { return Promise.resolve().then(() => { return task.attempts * this.conf.delayRatio; }); } // _getTask() { return models.sequelize.transaction({autocommit: false}, t => { return models.Task.scope({ method: ['forWork', this.conf.queue, config.node_id] }).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => { if (task) { return task.work(config.node_id, {transaction: t}); } }); }); } _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { if (this.count === 0) { this.state = WorkerStates.STATE_IDLE; } if (this.stopped && this.count === 0) { this.state = WorkerStates.STATE_STOP; this.emit('stop'); } else { this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } }); } } module.exports = TaskWorker;
The simplest worker code might look like this:
'use strict'; const Promise = require('bluebird'); const TaskWorker = require('../components/task_worker'); class Sample extends TaskWorker { handleTask(task) { this.logger.info('Sample Task:', task); return Promise.resolve().delay(30000); } } module.exports = Sample;
Features of the component:
SELECT ... FOR UPDATE
and the subsequent UPDATE
records in the database in the same transaction with the autocommit disabled. This allows you to get exclusive access to the task even with simultaneous requests from multiple servers.Promise
status returned by the handleTask
method. If successful, the task is marked as completed. Otherwise, the task is marked as a failure and starts with a delay specified in the delay
method.The sequelize module is used to work with database models. All tasks are in the tasks table. The table has the following structure:
Field | Type of | Description |
---|---|---|
id | integer autoincrement | Task id |
node_id | integer nullable | ID of the node for which the task is intended |
queue | string | Task queue |
status | enum | Task status |
attempts | integer | Number of task start attempts |
priority | integer | Task priority |
body | string | Task body in JSON format |
start_at | datetime, nullable | Date and time the task began processing |
finish_at | datetime, nullable | Date and time of the end of the processing task (analogue TTL) |
worker_node_id | integer nullable | ID of the node that started processing the task |
worker_started_at | datetime, nullable | Date and time the task began processing |
checked_at | datetime, nullable | Date and time of task status update |
created_at | datetime, nullable | Date and time of task creation |
updated_at | datetime, nullable | Date and time of task change |
A task can be assigned either to a specific node or to any node in the cluster. This is governed by the node_id
field during task creation. The task can be started with a delay ( start_at
field) and with a limited processing time ( finish_at
field).
Different workers work with different queues of tasks specified in the queue
field. Processing priority is set in the priority
field (the higher, the higher the priority). The number of task restarts is saved in the attempts
field. In the body of the task (the body
field) in JSON format the parameters necessary for the worker are passed.
The checked_at
field serves as an indication of a running task. Its value changes all the time while the task is running. If the value of the checked_at
field has not changed for a long time, and the task is in the working status, the task is considered failed and its status changes to failure .
'use strict'; const moment = require('moment'); module.exports = function(sequelize, Sequelize) { return sequelize.define('Task', { id: { type: Sequelize.INTEGER, primaryKey: true, autoIncrement: true }, node_id: { type: Sequelize.INTEGER }, queue: { type: Sequelize.STRING, allowNull: false }, status: { type: Sequelize.ENUM, values: ['pending', 'working', 'done', 'failure'], defaultValue: 'pending', allowNull: false }, attempts: { type: Sequelize.INTEGER, defaultValue: 0, allowNull: false }, priority: { type: Sequelize.INTEGER, defaultValue: 10, allowNull: false }, body: { type: Sequelize.TEXT, set: function(body) { return this.setDataValue('body', JSON.stringify(body)); }, get: function() { try { return JSON.parse(this.getDataValue('body')); } catch (e) { return null; } } }, start_at: { type: Sequelize.DATE }, finish_at: { type: Sequelize.DATE }, worker_node_id: { type: Sequelize.INTEGER }, worker_started_at: { type: Sequelize.DATE }, checked_at: { type: Sequelize.DATE } }, { tableName: 'tasks', freezeTableName: true, underscored: true, scopes: { forWork: function(queue, node_id) { return { where: { node_id: { $or: [ null, node_id ] }, queue: queue, status: 'pending', start_at: { $or: [ null, { $lt: moment().toDate() } ] }, finish_at: { $or: [ null, { $gte: moment().toDate() } ] } }, order: [ ['priority', 'DESC'], ['attempts', 'ASC'], [sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC'] ] }; } }, instanceMethods: { fail: function(delay, options) { this.start_at = delay ? moment().add(delay, 'ms').toDate() : null; this.attempts = sequelize.literal('attempts + 1'); this.status = 'failure'; return this.save(options); }, complete: function(options) { this.status = 'done'; return this.save(options); }, work: function(node_id, options) { this.status = 'working'; this.worker_node_id = node_id; this.worker_started_at = moment().toDate(); return this.save(options); }, check: function(options) { this.checked_at = moment().toDate(); return this.save(options); } } }); };
All tasks go through the following life cycle:
worker_node_id
and worker_started_at
.checked_at
field to checked_at
about the correct operation.attempts
increases, and the launch is postponed for a specified amount of time (calculated in the delay
method based on the number of attempts and the delayRatio
setting).The project also has a built-in Manager
module that runs on each node and does the following task processing:
In addition, this module works with on / off nodes.
'use strict'; const _ = require('lodash'); const moment = require('moment'); const Promise = require('bluebird'); const Worker = require('../components/worker'); const models = require('../models'); const config = require('../config/config'); class Manager extends Worker { constructor(name, conf) { super(name, conf); this.conf = _.defaults({}, this.conf, { maxUpdate: 30000, // 30 seconds maxCompleted: 3600000, // 1 hour maxFailed: 259200000 // 3 days }); } loop() { return models.sequelize.transaction(t => { return Promise.resolve() .then(() => { return this._checkCurrentNode(t); }) .then(() => { return this._activateNodes(t); }) .then(() => { return this._pauseNodes(t); }) .then(() => { return this._restoreFrozenTasks(t); }) .then(() => { return this._restoreFailedTasks(t); }) .then(() => { return this._deleteDeadTasks(t); }) .then(() => { return this._deleteCompletedTasks(t); }) .then(() => { return this._deleteFailedTasks(t); }); }); } _checkCurrentNode(t) { return models.Node.findById(config.node_id, {transaction: t}).then(node => { if (node) { return node.check(); } }); } _activateNodes(t) { return models.Node.update({ is_active: true }, { where: { is_active: false, checked_at: { $gte: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Activate nodes:', count); } }); } _pauseNodes(t) { return models.Node.update({ is_active: false }, { where: { is_active: true, checked_at: { $lt: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Pause nodes:', count); } }); } _restoreFrozenTasks(t) { return models.Task.update({ status: 'failure', attempts: models.sequelize.literal('attempts + 1') }, { where: { status: 'working', checked_at: { $lt: moment().subtract(this.conf.maxUpdate).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore frozen tasks:', count); } }); } _restoreFailedTasks(t) { let where = [{status: 'failure'}]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.update({ status: 'pending', worker_node_id: null, worker_started_at: null }, { where: where, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore failure tasks:', count); } }); } _deleteDeadTasks(t) { return models.Task.destroy({ where: { status: 'pending', finish_at: { $lt: moment().toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete dead tasks:', count); } }); } _deleteCompletedTasks(t) { return models.Task.destroy({ where: { status: 'done', checked_at: { $lt: moment().subtract(this.conf.maxCompleted).toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete completed tasks:', count); } }); } _deleteFailedTasks(t) { let where = [ {status: 'failure'}, {checked_at: { $lt: moment().subtract(this.conf.maxFailed).toDate() }} ]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.destroy({ where: where, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete failed tasks:', count); } }); } _failedTasksConditions() { let conditions = []; _.each(config.workers, (worker) => { if (worker.queue) { let item = {queue: worker.queue}; if (worker.maxAttempts !== undefined) { item.attempts = { $lt: worker.maxAttempts }; } conditions.push(item); } }); return conditions; } } module.exports = Manager;
In general, it turned out to be a good and fairly reliable tool for working with background tasks that we can share with the community. We have developed the main ideas and principles used in this project over several years of work on various projects.
In the project development plans:
progress
field) and adding a method to the TaskWorker
module to update this information.I will be glad to constructive criticism and answer the questions of interest in the comments.
Source: https://habr.com/ru/post/303574/
All Articles