In this article, we will try to solve the real problem with Node.js Stream and a little Reactive Programming. I'm not sure about the latter - RP, to some extent, "bogey" (how to translate a buzzword?) That everyone is talking about, but no one "does."
The article considers a practical example and is aimed at a reader familiar with the platform, so this purposely does not explain basic concepts - if something is not clear from the Stream API, then you should contact the platform documentation or some kind of retelling of it (for example, this one ).
Let's start with a description of the problem: we need to build a “spider” that will take all the data from the “alien” REST API, somehow process it and write it to “our” database. For ease of modeling, we omit details about specific APIs and databases (in reality, this was the API of one well-known hotel-related startup and Postgres database).
Imagine that we have two functions (the function code like the whole code from the article can be found here ):
getAPI(n, count) // - API. promise count n- insertDB(entries) // - . promise //, : getAPI(0, 2).then(console.log) // [{ id: 0}, {id: 1}] getAPI(LAST_ITEM_ID, 1000).then(console.log) // [{id: LAST_ITEM_ID}] – API. // count 1000: 1001, 1000 insertDB([{id: 0}]).then(console.log) // { count: 1 }
We intentionally ignore error handling possible when working with the API and the database, for simplicity. If there is interest, we will consider them in a separate article.
Well, in order not to be bored, let's say that our customer is a pervert and he has set the following task: we don’t want to see in our database all the id entities that contain the number 3. And whose id entities contain the number 9, we want to add the current timestamp value: {id: 9} -> {id: 9, timestamp: 1490571732068}
. A little far-fetched, but similar to the processing and filtering tasks that have to be solved in such “spiders”.
Well, let's start. Let's try to solve this problem "in the forehead." Most likely we will end up with code with something similar to this:
function grab(offset = 0, rows = 1000) { offset = offset return getAPI(offset, 1000).then((items) => { if(_.isEmpty(items)) { return } else { return insertDB(items).then(() => grab(offset + rows)) } }) } console.time('transition') grab().then(() => { console.timeEnd('transition') })
What is wrong with this code?
As you may have guessed, this problem is easy to solve with Streams. To begin with, we divide this task into two subtasks: reading and writing.
Let's start with reading, let's try our ReadableStream:
const {Writable, Readable} = require('stream') const {getAPI, insertDB} = require('./io-simulators') const ROWS = 1000 class APIReadable extends Readable { constructor(options) { super({objectMode: true}) this.offset = 0 } _read(size) { getAPI(this.offset, ROWS).then(data => { if(_.isEmpty(data)) { this.push(null) } else { this.push(data) } }) this.offset = this.offset + ROWS } }
Looks a little more bulky. It is worth paying attention to objectMode: true
- we want to operate with objects, which means it is worth passing this flag to the constructor.
Okay, now for the record. We implement our Writable stream. Something like that:
class DBWritable extends Writable { constructor(options) { super({highWaterMark: 5, objectMode: true}); } _write(chunk, encoding, callback) { insertDB(chunk).asCallback(callback) } _writev(chunks, callback) { const entries = _.map(chunks, 'chunk') insertDB(_.flatten(entries)).asCallback(callback) // Bluebird-promises, } }
What you should pay attention to:
Well, now use our code like this:
const dbWritable = new DBWritable() const apiReadable= new APIReadable() apiReadable.pipe(dbWritable)
It seems to me - this is very cool, now from the code it is extremely clear that we read from one place and write to another. In addition, the reader can verify that our code is very efficient and uses a buffer. Well, all sorts of small buns like the fact that it does not block the event-loop.
Hmm -, an attentive reader will ask, - what about the processing of data? To do this, we can write another Transform stream, but this is somehow "flat and boring", so we use the Highland.js library, which will allow us to apply our favorite filter and filter over the elements of our "stream" of entities. In general, Highland is something more than this simple usecase, but this is a topic for a separate and not a small article. Something like this:
H(apiReadable) .flatten() .reject(x => _.includes(String(x.id), 3)) .map(function(x) { if(_.includes(String(x.id), 9)) { return _.extend(x, {timestamp: Date.now()}) } else { return x } }) .batchWithTimeOrCount(100, 1000) .pipe(dbWritable)
As for me, it is very similar to list operations and readable. And .flatten()
and .batchWithTimeOrCount(100, 1000)
we need only because our Streamy operates with arrays instead of separate objects.
That's actually all. I hope I reached my goal and interested the reader in studying Stream and Highland.js.
Translation of this article into English
Source: https://habr.com/ru/post/325320/