📜 ⬆️ ⬇️

Stream in NodeJS - rivers that you enter twice

image

A stream is a concept that was first implemented on UNIX systems for transferring data from one program to another in I / O operations. This allows each program to be very specialized in what it does — to be an independent module. The combination of such simple programs helps to create more complex systems by "combining" them into a chain of calls.

Streams allow you to exchange data in small parts, which in turn makes it possible in your work not to spend a lot of memory. Of course, this depends on how you implement the internal flow functionality.
')
A common task is parsing a large file. For example, in a text file with these logs, you need to find a line containing certain text. Instead of loading the file completely into memory, and then begin to sort the lines in it looking for the right one, we can read it in small portions. Thus, we do not occupy the memory beyond the necessary, but only as much memory as is necessary for buffering the read data. As soon as we find the required entry, immediately stop further work. Or we can transfer the found record to another stream along the chain, for example, to convert to another format, or save to another file.

The stream module provides a basic streaming API in Node.JS. Documentation of Node.JS is quite enough to sort out this issue, but we will try to make up something like a cheat sheet with explanations of some points.

Types of threads


There are four types of threads:


Stream instanceof EventEmitter


All streams are EventEmitter instances, that is, you can generate StreamClass.emit ('eventName', data) events and process them StreamClass.on ('eventName', (data) => {});

Pipe method


To transfer data from one stream to another, the easiest way to call the pipe method over threads is:

Readable.pipe(Writable);//,  "" DataBase -> File Readable.pipe(Transform).pipe(Writable);//DataBase ->   JSON  ->  JSON  File Duplex.pipe(Transform).pipe(Duplex);//  DataBase ->  ->    DataBase  

The last chain of calls shows that it is better to implement your thread classes in such a way that each of them performs its own task.

As you can see, the pipe method returns an instance of the stream that was passed to it, which allows the threads to be combined with each other.

The pipe method is implemented in such a way that it solves the problem of controlling the “speed” of data transfer from one stream to another (exceeding the volume of the internal stream buffer). For example, the Writable stream works on writing slower than the Readable data source sends them. In this case, the data transfer is “suspended” until Writable “informs” (the internal buffer is cleared) that it is ready to receive the next piece of data.

Buffering


Streams store data in their internal buffer. The buffer size can be specified using the highWaterMark parameter, which can be specified in the class constructor.

The physical meaning of the value of highWaterMark depends on another option - objectMode .

 new StreamObject({objectMode: false, highWaterMark: __}); //  16384 (16kb) new StreamObject({objectMode: true, highWaterMark: __});//  16 

In the Readable stream, data is buffered when the push (data) method is invoked on it, and remains in the buffer until it is read by calling the read () method. As soon as the total size of the internal buffer of the Readable stream reaches the threshold specified in highWaterMark, the stream will temporarily stop reading data.

For Writable, buffering occurs when the write (data) method is called over it. The method returns true until the buffer size reaches the highWaterMark value, and false when the buffer is full.
When using the pipe () method, just at this moment it “stops” data reading, waits for the “drain” event, after which the data transfer is resumed.

Object mode


By default, streams work with data in the form of a buffer, but they can also work with strings as well as with other JavaScript objects (for example, {“user”: {“name”: “Ivan”, “last_name”: “Petrov” }}), except for the null object that plays a separate role in data transfer (if the stream receives null, it is a signal that there is no more data to process and reading or writing of data is completed). How to set one or another stream mode during its initialization will be shown in the examples below.

State of flowing or paused stream Readable



Flowing state === true - automatically if:


From flowing to paused state, you can switch (flowing === false):


At the time of initialization of the class Readable flowing === null, that is, a data reading mechanism has not yet been implemented, and no data is generated.

Readable Streams - streams as a data source




Readable flows work in one of two states: flowing and paused. In the paused state, it is necessary to explicitly call the read () method to read data. When you transfer data from one stream to another (R.pipe (W)), the read () method is called automatically.

The entire current data buffer can be obtained using the Readable._readableState.buffer property.



Readable.js example
 'use strict'; const { Readable } = require('stream'); /** *     Readable ,    _read(). *       *    (_readableState)   ,      (on('end', ()=>{})) */ class Source extends Readable { constructor(array_of_data = [], opt = {}) { super(opt); this._array_of_data = array_of_data; console.log('objectMode ', this._readableState.objectMode);//false  ,      console.log('highWaterMark ', this._readableState.highWaterMark);//16384 console.log('buffer ', this._readableState.buffer);//[] -   console.log('length ', this._readableState.length);//0 - -   console.log('flowing ', this._readableState.flowing);//null //  ,      this.on('data', (chunk)=> { //   'data' -         console.log('\n---'); console.log('Readable on data '); // chunk     console.log(`chunk = ${chunk} chunk isBuffer ${Buffer.isBuffer(chunk)} and chunk.length is ${chunk.length}`); //-     (-  ) console.log('buffer.length ', this._readableState.buffer.length); console.log(': ', chunk.toString(), ' buffer of chunk ', this._readableState.buffer, ' buffer of chunk   ', this._readableState.buffer.toString()); }) .on('error',(err)=> { console.log('Readable on error ', err); }) .on('end',()=> { console.log('Readable on end '); console.log('objectMode ', this._readableState.objectMode);//false console.log('highWaterMark ', this._readableState.highWaterMark);//16384 console.log('buffer ', this._readableState.buffer);//[] -   console.log('buffer.length ', this._readableState.buffer.length);//0 console.log('flowing ', this._readableState.flowing);//true !!!       'data' }) .on('close',()=> { console.log('Readable on close      '); }); } _read() { let data = this._array_of_data.shift() if (!data) { //,    this.push(null); } else { this.push(data); } } } /*   , ..       ,   .         this.push(data) Readable on error TypeError: Invalid non-string/buffer chunk */ let array_of_data = ['1', '2', '3', '4', '5']; let opts = {/*     */}; const R = new Source(array_of_data, opts); array_of_data = ['1', '2', '3', '4', '5']; opts = { objectMode: false, highWaterMark: 1//1      _readableState.buffer.length  === 1 }; const R2 = new Source(array_of_data, opts); array_of_data = ['1', '2', '3', '4', '5']; opts = { objectMode: false , encoding: 'utf8'//   ( NodeJS),         ,      }; const R3 = new Source(array_of_data, opts);//        .setEncoding('utf8') array_of_data = [1, 2, 3, 4, 5]; /*  ""   .  objectMode: true      -   ,    Readable.setEncoding('utf8')*/ opts = { objectMode: true , encoding: 'utf8' }; const R4 = new Source(array_of_data, opts); // objectMode: true    ,    (Number) array_of_data = [1, 2, 3, 4, 5]; opts = { objectMode: true }; const R5 = new Source(array_of_data, opts); //highWaterMark 16 -      /*     (    Writable.write(someData) === false).      Node.JS.  ,      ,    ,    */ array_of_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; opts = { objectMode: true }; const R6 = new Source(array_of_data, opts); R6.on('data', (chunk) => { //    1  R6.pause(); setTimeout(() => { R6.resume();//   }, 1000); }); 



Writable Streams - streams for writing data




The entire current data buffer can be obtained using the writable._writableState.getBuffer () method.


Example writable.js
 'use strict'; const Source = require('./readable.js'); const { Writable } = require('stream'); class Writer extends Writable { constructor(opt = {}) { super(opt); console.log('objectMode ', this._writableState.objectMode);//false  ,     true console.log('highWaterMark ', this._writableState.highWaterMark);//16384 console.log('decodeStrings ', this._writableState.decodeStrings);//true  ;    Buffer ,      _write() console.log('buffer ', this._writableState.getBuffer());//[] -   this.on('drain', ()=> { console.log('\n------ writable on drain'); }) .on('error', (err)=> { console.log('\n------ writable on error', err); }) .on('finish', ()=> { console.log('\n------ writable on finish'); console.log('_writableState.getBuffer()', this._writableState.getBuffer()); }); } /** * @param chunk - || * @param encoding -   .  objectMode === true,  encoding   * @param done - callback -.     ,     ,    *   _write,  ,       chunk,      *  : done(err) -     new Error(...) * @private */ _write(chunk, encoding, done) { console.log('_writableState.getBuffer()', this._writableState.getBuffer()); console.log(typeof chunk ); //    Transform   if (typeof chunk === 'object') { console.log('chunk = ', chunk.get(), chunk.get() +' in pow '+ chunk.get() +' = '+ chunk.inPow(chunk.get())); } else { console.log(`chunk = ${chunk}; isBuffer ${Buffer.isBuffer(chunk)}; chunk.length is ${chunk.length}; encoding = ${encoding}`); } /*     . , : 1)     on('error', (err)=>{...}) 2)   ,    Readable    .         - ,  Readable.emit('error', err);     Readable.puse(),      Readable.remuse().    ,           //if (chunk > 3) return done(new Error('chunk > 3'));*/ done(); } } let array_of_data = ['1', '2', '3', '4', '5']; let r_opts = {/*    */}; const R = new Source(array_of_data, r_opts); let w_opts = {/*    */}; const W = new Writer(w_opts); R.pipe(W); array_of_data = ['1', '2', '3', '4', '5']; r_opts = {encoding: 'utf8'}; const R1 = new Source(array_of_data, r_opts); w_opts = { decodeStrings: false//  _write     'utf8',      -  (  r_opts), }; const W1 = new Writer(w_opts); R1.pipe(W1); array_of_data = [1, 2, 3, 4, 5]; r_opts = {objectMode: true}; const R2 = new Source(array_of_data, r_opts); w_opts = { objectMode: true// false,       ( r_opts),   "TypeError: Invalid non-string/buffer chunk" }; const W2 = new Writer(w_opts); R2.pipe(W2); array_of_data = [1, 2, 3, 4, 5]; r_opts = {objectMode: true}; const R3 = new Source(array_of_data, r_opts); w_opts = { objectMode: true// false,       ( r_opts),   "TypeError: Invalid non-string/buffer chunk" , highWaterMark: 1 // ;          'drain' }; const W3 = new Writer(w_opts); R3.pipe(W3); //  pipe() const R3_1 = new Source(array_of_data, r_opts); const W3_1 = new Writer(w_opts); R3_1.on('data', (chunk)=> { //R3_1._readableState.flowing === true console.log('R3_1 in flowing mode', R3_1._readableState.flowing, 'R3_1 _readableState.buffer', R3_1._readableState.buffer); toWriteOrNotToWriteThatIsTheQuestion(chunk, onDrain); }); function onDrain() { //R3_1._readableState.flowing === false,      R3_1.pause()  toWriteOrNotToWriteThatIsTheQuestion console.log('R3_1 in flowing mode', R3_1._readableState.flowing); R3_1.resume(); } /** *           Writable,       Readable (R3_1.pause()) *     ( 'drain'),       Readable ( cb R3_1.resume(); ),    Writable * @param data * @param cb */ function toWriteOrNotToWriteThatIsTheQuestion(data, cb) { // " "     write(...),    _write(...) if (!W3_1.write(data)) { R3_1.pause(); W3_1.once('drain', cb); } else { process.nextTick(cb); } } 



Transform Streams - data change streams



Transform is a variation of Duplex streams. We decided to first set an example with him.



Transform.js example
 'use strict'; const Readable = require('./readable.js'); const Writable = require('./writable.js'); const {Transform} = require('stream'); /*  ,      , ,  JS ,    */ class Chunk { constructor(chunk) { this.set(chunk); } set(chunk) { this._chunk = chunk; } get() { return this._chunk; } inPow(pow = 2) { return Math.pow(this.get(), pow); } } class Transformer extends Transform { constructor(opt = {}) { super(opt); console.log('\n -------- Transform in constructor'); console.log('objectMode ', this._writableState.objectMode);//false  ,     true console.log('highWaterMark ', this._writableState.highWaterMark);//16384 console.log('decodeStrings ', this._writableState.decodeStrings);//true  ;    Buffer ,      _write() console.log('buffer ', this._writableState.getBuffer());//[] -   this.on('close', ()=> { console.log('\n------ Transform on close'); }) .on('drain', ()=> { console.log('\n------ Transform on drain'); }) .on('error', (err)=> { console.log('\n------ Transform on error', err); }) .on('finish', ()=> { console.log('\n------ Transform on finish'); }) .on('end', ()=> { console.log('\n------ Transform on end'); }) .on('pipe', ()=> { console.log('\n------ Transform on pipe'); }); } /** * ,      (chunk    Transform), *    -      Transform * @param chunk * @param encoding * @param done -    done(err, chunk) * @private */ _transform(chunk, encoding, done) { /*    chunk,         done(null, chunk); done(err, chunk); -       error  ,    : this.push(chunk); done(); this.push(chunk); done(err);*/ //      Chunk (.  writable.js) this.push(new Chunk(chunk)); done(); } /** *  transform     _flush.   ,      ,    'end'  Readable (  Transform,       ,    ). * @param done - done(err)     Error * @private */ _flush(done) { //TODO ... -       done(); } } let array_of_data = ['1', '2', '3', '4', '5']; let r_opts = { encoding: 'utf8' }; const R = new Readable(array_of_data, r_opts); let t_opts = { readableObjectMode: true //   Transform   , writableObjectMode: false//   Transform      , decodeStrings: false }; const T = new Transformer(t_opts); let w_opts = { objectMode: true// false,    }; const W = new Writable(w_opts); R.pipe(T).pipe(W); 



Duplex Streams - write and read streams


Duplex is implemented in itself as Readable and Writable threads. In this case, their "work" occurs independently of each other.

If you are interested in the topic of threads, we suggest experimenting with the implementation of your Duplex streams yourself.

new stream.Duplex (options)
new stream.Duplex (options)
The options Object is passed to the Duplex stream's Writable and Readable constructors.

  • allowHalfOpen default boolean true. If false, then when the read stream completes its work, it automatically terminates the work and write stream;
  • readableObjectMode boolean defaults to false. mode objectMode for stream readable. property value is ignored if objectMode property = true;
  • writableObjectMode boolean defaults to false. mode objectMode for stream writable. property value is ignored if objectMode property = true.


Error processing


When an 'error' event was triggered in one of the links, and if you need to notify the “previous” streams in the chain, you also need to trigger the 'error' event: StreamClass.emit ('error', err), and handle the situation. Or use the pump module (https://github.com/mafintosh/pump), with which you can solve this issue.

Summing up


Using threads, you can solve almost any problem:


As they say - for every taste.

Source: https://habr.com/ru/post/339900/


All Articles