daemon
that works all the time. Probably not the most typical task for Node.js - about the specifics of the implementation and some of the pitfalls, read on. var argv = require('optimist').argv; var env = process.env.NODE_ENV = process.env.NODE_ENV || 'development'; var mode = process.env.COLLECTOR_MODE = process.env.COLLECTOR_MODE || argv.mode || 'normal'; var scheduler = require('./source/engine/scheduler'); scheduler(mode).run();
while(true)
written for Node.js. I admit honestly, switching my thinking from “synchronous” to “asynchronous” mode was not the easiest process for me. Running an infinite number of tasks in Node.js seemed to be not an easy task, as a result of thinking this question was born on SO . function scheduler (mode) { function schedulerLoop() { runCollectingTasks(restartScheduler); } function restartScheduler (err, results) { if (err) { logger.error(err); } // http://stackoverflow.com/questions/16072699/nodejs-settimeout-memory-leak var timeout = setTimeout(schedulerLoop, config.collector.schedulerRestart); } // ... return { run: function () { schedulerLoop(); } }; }
var timeout =
, the situation improved, but not radically.networks
collection a document that contains: user ID, access token and other service information. In the same document, the collector denormalizes its current state (in which mode it works, whether there were errors, how many of them, which current data page is being processed). Those. in fact, it is one and the same document, on the basis of which the executable task is created. function runCollectingTasks(callback) { prepareCollectingTasks(function (err, tasks) { if (err) { return callback(err); } runTasks(tasks, 'collecting', callback); }); } function prepareCollectingTasks(callback) { networks.findByMode(mode, function (err, states) { if (err) { return callback({message: 'error during networks query', err: err}); } if (!states) { return callback({message: 'failed to read networks states'}); } callback(null, createCollectingTasks(states)); }); }
function createCollectingTasks(states) { var tasks = states.map(function (state) { return allowedToExecute(state) ? collectingTask(state) : null; }).filter(function (task) { return task !== null; }); return tasks; } function allowedToExecute (state) { return moment().diff(state.scheduledTo) > 0; } function collectingTask(state) { return function (callback) { return executor(state, connectors, callback); }; }
runTasks
input. runTasks
consistently performs each task through async.series
. function runTasks(tasks, type, callback) { async.series(tasks, function (err) { if (err) { // report error but continue execution to do not break execution chain.. logger.error(err); } callback(null, tasks.length); }); }
function executor(state, connectors, callback) { var service = state.service; var connector = connectors[service]; var connectorStarted = moment(); connector(state, user, connectorExecuted); function connectorExecuted(err, updatedState, results) { saveConnectorState(state, connectorStateSaved); function saveConnectorState (state, callback) { // ... } function connectorStateSaved (err) { // ... saveConnectorResults(results, connectorResultsSaved); } function saveConnectorResults(results, callback) { // ... connectorResultsSaved(results, connectorResultsSaved); } function connectorResultsSaved (err, saveDuration) { // ... callback(null); } } }
var API = 'https://api.github.com'; function connector(state, user, callback) { var accessToken = state.accessToken; if (!accessToken) { return callback('missing accessToken for user: ' + state.user); } initState(state); var uri = formatRequestUri(accessToken, state); var headers = { 'Content-Type': 'application/json', 'User-Agent': 'likeastore/collector'}; request({uri: uri, headers: headers, timeout: config.collector.request.timeout, json: true}, function (err, response, body) { if (err) { return handleUnexpected(response, body, state, err, function (err) { callback (err, state); }); } return handleResponse(response, body); }); function initState(state) { // intialize state fields (page, errors, mode etc.) ... } function formatRequestUri(accessToken, state) { // create request URI based on values from state ... } function handleResponse(response, body) { var rateLimit = +response.headers['x-ratelimit-remaining']; var stars = body.map(function (r) { return { // transforms response into likeastore item }; }); return callback(null, scheduleTo(updateState(state, stars, rateLimit, false)), stars); } function updateState(state, data, rateLimit, failed) { state.lastExecution = moment().toDate(); // update other state fields (next page, mode) ... return state; } }
function scheduleTo(state) { var currentMoment = moment(); var service = state.service; var scheduleForMode = { initial: config.collector.quotes[service].runAfter, normal: config.collector.nextNormalRunAfter, rateLimit: config.collector.nextRateLimitRunAfter }; var next = scheduleForMode[state.mode]; var scheduledTo = currentMoment.add(next, 'milliseconds'); state.scheduledTo = scheduledTo.toDate(); return state; }
Source: https://habr.com/ru/post/214781/
All Articles