📜 ⬆️ ⬇️

Collect millions of likes or task queues in Node.js

Last week, we celebrated one round date - in the Likeastore database, we have accumulated as many as one million user “likes”.

We use JavaScript, all current services are written in JavaScript / Node.js. In general, I do not regret the use of Node.js in our project, it has proven itself as the best means of implementing the HTTP API. But to collect likes, it must be a daemon that works all the time. Probably not the most typical task for Node.js - about the specifics of the implementation and some of the pitfalls, read on.

ollector


Collector (collector), a key component of the Likeastore architecture. It collects data through open API services, processes responses, saves data and its current state. In essence, this is an “infinite loop” that builds a list of tasks to be executed, starts them, after which the procedure is repeated.

For maximum efficiency, we run 2 instances of the collector operating in different modes: initial, normal. In initial mode, the collector only processes the newly connected networks. Thus, the user quickly receives "likes", after connecting the network. After all the “likes” have been unloaded, the network goes into normal mode, is processed by another instance, where the time between charges is much higher.
')
 var argv = require('optimist').argv; var env = process.env.NODE_ENV = process.env.NODE_ENV || 'development'; var mode = process.env.COLLECTOR_MODE = process.env.COLLECTOR_MODE || argv.mode || 'normal'; var scheduler = require('./source/engine/scheduler'); scheduler(mode).run(); 

Scheduler


The scheduler is essentially that while(true) written for Node.js. I admit honestly, switching my thinking from “synchronous” to “asynchronous” mode was not the easiest process for me. Running an infinite number of tasks in Node.js seemed to be not an easy task, as a result of thinking this question was born on SO .

One option was to use async.queue , which seemed obvious, but not the best for this task. After several attempts, the best asynchronous while (true) was setTimeout.

 function scheduler (mode) { function schedulerLoop() { runCollectingTasks(restartScheduler); } function restartScheduler (err, results) { if (err) { logger.error(err); } // http://stackoverflow.com/questions/16072699/nodejs-settimeout-memory-leak var timeout = setTimeout(schedulerLoop, config.collector.schedulerRestart); } // ... return { run: function () { schedulerLoop(); } }; } 


It should be noted that the same pitfall daemon'ov Node.js - memory leaks. I noticed that after the long-term work of the collector, he began to consume a large amount of memory and the most unexpected moment just stopped. Pay attention to the comment in the code with the question on SO . After I added var timeout = , the situation improved, but not radically.

Another reason opened after an epic post about memory leaks and investigations by Joyent and Wallmart engineers. With the transition to Node.js v0.10.22, the collector began to work even more stable. However, spontaneous stops occur, the reason is extremely difficult to understand.

Networks and states


When a user connects a new network, we put in the networks collection a document that contains: user ID, access token and other service information. In the same document, the collector denormalizes its current state (in which mode it works, whether there were errors, how many of them, which current data page is being processed). Those. in fact, it is one and the same document, on the basis of which the executable task is created.

 function runCollectingTasks(callback) { prepareCollectingTasks(function (err, tasks) { if (err) { return callback(err); } runTasks(tasks, 'collecting', callback); }); } function prepareCollectingTasks(callback) { networks.findByMode(mode, function (err, states) { if (err) { return callback({message: 'error during networks query', err: err}); } if (!states) { return callback({message: 'failed to read networks states'}); } callback(null, createCollectingTasks(states)); }); } 


Tasks


Based on the state, we create a list of executable tasks. Almost all open APIs of popular services have limitations on the number of requests for a period of time. The collector's task is to execute the most effective number of requests and not go to rate-limit.

Only those tasks that were scheduled after the current point in time are allowed to start.

 function createCollectingTasks(states) { var tasks = states.map(function (state) { return allowedToExecute(state) ? collectingTask(state) : null; }).filter(function (task) { return task !== null; }); return tasks; } function allowedToExecute (state) { return moment().diff(state.scheduledTo) > 0; } function collectingTask(state) { return function (callback) { return executor(state, connectors, callback); }; } 


The data array is converted to an array of functions that go to the runTasks input. runTasks consistently performs each task through async.series .

 function runTasks(tasks, type, callback) { async.series(tasks, function (err) { if (err) { // report error but continue execution to do not break execution chain.. logger.error(err); } callback(null, tasks.length); }); } 


Executor


A generic function that accepts the current state, a list of existing connectors, and a callback function (I’ve picked up all error handling and logging to show its essence).

 function executor(state, connectors, callback) { var service = state.service; var connector = connectors[service]; var connectorStarted = moment(); connector(state, user, connectorExecuted); function connectorExecuted(err, updatedState, results) { saveConnectorState(state, connectorStateSaved); function saveConnectorState (state, callback) { // ... } function connectorStateSaved (err) { // ... saveConnectorResults(results, connectorResultsSaved); } function saveConnectorResults(results, callback) { // ... connectorResultsSaved(results, connectorResultsSaved); } function connectorResultsSaved (err, saveDuration) { // ... callback(null); } } } 


Connectors


A connector is a function that performs an HTTP request to the API, processes its response, updates the status of the task (scheduled next launch), and returns the collected data. It is the connector that distinguishes the state in which the “likes” collection is located, depending on which makes the necessary request (builds the required request URI).

At the moment, we have implemented 9 connectors, a code that is more or less similar. Initially, I was always looking for ready-made API access libraries, but this turned out to be the wrong strategy: you have to choose from several options, they don’t work or have an inappropriate interface, lag behind the current API version, etc. The most flexible and simplest solution was to use request (the best HTTP client for Node.js).

I will give an example of code for GitHub (again, I will shorten the details to show the essence).

 var API = 'https://api.github.com'; function connector(state, user, callback) { var accessToken = state.accessToken; if (!accessToken) { return callback('missing accessToken for user: ' + state.user); } initState(state); var uri = formatRequestUri(accessToken, state); var headers = { 'Content-Type': 'application/json', 'User-Agent': 'likeastore/collector'}; request({uri: uri, headers: headers, timeout: config.collector.request.timeout, json: true}, function (err, response, body) { if (err) { return handleUnexpected(response, body, state, err, function (err) { callback (err, state); }); } return handleResponse(response, body); }); function initState(state) { // intialize state fields (page, errors, mode etc.) ... } function formatRequestUri(accessToken, state) { // create request URI based on values from state ... } function handleResponse(response, body) { var rateLimit = +response.headers['x-ratelimit-remaining']; var stars = body.map(function (r) { return { // transforms response into likeastore item }; }); return callback(null, scheduleTo(updateState(state, stars, rateLimit, false)), stars); } function updateState(state, data, rateLimit, failed) { state.lastExecution = moment().toDate(); // update other state fields (next page, mode) ... return state; } } 


scheduleTo


Finally, when the connector is completed and the status is updated, you need to calculate the next moment of launch. It is calculated based on API limitations and collector operation mode (for initial mode the pause is minimal, for normal mode more, usually 15 minutes, if the connector goes to rate limit then the maximum pause).

 function scheduleTo(state) { var currentMoment = moment(); var service = state.service; var scheduleForMode = { initial: config.collector.quotes[service].runAfter, normal: config.collector.nextNormalRunAfter, rateLimit: config.collector.nextRateLimitRunAfter }; var next = scheduleForMode[state.mode]; var scheduledTo = currentMoment.add(next, 'milliseconds'); state.scheduledTo = scheduledTo.toDate(); return state; } 


Here is such a straightforward code that “settled” around September of last year and all we add from those times are new connectors, the engine itself remains unchanged. I think about allocating a separate library for running task queues in Node.js, but I’m not sure how much this is a generalized task.

As time goes on, the number of users is growing and at the moment the processing of 3,000 tasks takes about 30 minutes, which is quite a long time (we try to keep the cycle time no more than 15 minutes). I think that in the future the architecture of the collector will change in the direction of message queues and separation of collectors not according to the mode of operation, but according to another characteristic (network type, user cluster) for easier horizontal scaling.

Source: https://habr.com/ru/post/214781/


All Articles