Readable.pipe(Writable);//, "" DataBase -> File Readable.pipe(Transform).pipe(Writable);//DataBase -> JSON -> JSON File Duplex.pipe(Transform).pipe(Duplex);// DataBase -> -> DataBase
new StreamObject({objectMode: false, highWaterMark: __}); // 16384 (16kb) new StreamObject({objectMode: true, highWaterMark: __});// 16
'use strict'; const { Readable } = require('stream'); /** * Readable , _read(). * * (_readableState) , (on('end', ()=>{})) */ class Source extends Readable { constructor(array_of_data = [], opt = {}) { super(opt); this._array_of_data = array_of_data; console.log('objectMode ', this._readableState.objectMode);//false , console.log('highWaterMark ', this._readableState.highWaterMark);//16384 console.log('buffer ', this._readableState.buffer);//[] - console.log('length ', this._readableState.length);//0 - - console.log('flowing ', this._readableState.flowing);//null // , this.on('data', (chunk)=> { // 'data' - console.log('\n---'); console.log('Readable on data '); // chunk console.log(`chunk = ${chunk} chunk isBuffer ${Buffer.isBuffer(chunk)} and chunk.length is ${chunk.length}`); //- (- ) console.log('buffer.length ', this._readableState.buffer.length); console.log(': ', chunk.toString(), ' buffer of chunk ', this._readableState.buffer, ' buffer of chunk ', this._readableState.buffer.toString()); }) .on('error',(err)=> { console.log('Readable on error ', err); }) .on('end',()=> { console.log('Readable on end '); console.log('objectMode ', this._readableState.objectMode);//false console.log('highWaterMark ', this._readableState.highWaterMark);//16384 console.log('buffer ', this._readableState.buffer);//[] - console.log('buffer.length ', this._readableState.buffer.length);//0 console.log('flowing ', this._readableState.flowing);//true !!! 'data' }) .on('close',()=> { console.log('Readable on close '); }); } _read() { let data = this._array_of_data.shift() if (!data) { //, this.push(null); } else { this.push(data); } } } /* , .. , . this.push(data) Readable on error TypeError: Invalid non-string/buffer chunk */ let array_of_data = ['1', '2', '3', '4', '5']; let opts = {/* */}; const R = new Source(array_of_data, opts); array_of_data = ['1', '2', '3', '4', '5']; opts = { objectMode: false, highWaterMark: 1//1 _readableState.buffer.length === 1 }; const R2 = new Source(array_of_data, opts); array_of_data = ['1', '2', '3', '4', '5']; opts = { objectMode: false , encoding: 'utf8'// ( NodeJS), , }; const R3 = new Source(array_of_data, opts);// .setEncoding('utf8') array_of_data = [1, 2, 3, 4, 5]; /* "" . objectMode: true - , Readable.setEncoding('utf8')*/ opts = { objectMode: true , encoding: 'utf8' }; const R4 = new Source(array_of_data, opts); // objectMode: true , (Number) array_of_data = [1, 2, 3, 4, 5]; opts = { objectMode: true }; const R5 = new Source(array_of_data, opts); //highWaterMark 16 - /* ( Writable.write(someData) === false). Node.JS. , , , */ array_of_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; opts = { objectMode: true }; const R6 = new Source(array_of_data, opts); R6.on('data', (chunk) => { // 1 R6.pause(); setTimeout(() => { R6.resume();// }, 1000); });
'use strict'; const Source = require('./readable.js'); const { Writable } = require('stream'); class Writer extends Writable { constructor(opt = {}) { super(opt); console.log('objectMode ', this._writableState.objectMode);//false , true console.log('highWaterMark ', this._writableState.highWaterMark);//16384 console.log('decodeStrings ', this._writableState.decodeStrings);//true ; Buffer , _write() console.log('buffer ', this._writableState.getBuffer());//[] - this.on('drain', ()=> { console.log('\n------ writable on drain'); }) .on('error', (err)=> { console.log('\n------ writable on error', err); }) .on('finish', ()=> { console.log('\n------ writable on finish'); console.log('_writableState.getBuffer()', this._writableState.getBuffer()); }); } /** * @param chunk - || * @param encoding - . objectMode === true, encoding * @param done - callback -. , , * _write, , chunk, * : done(err) - new Error(...) * @private */ _write(chunk, encoding, done) { console.log('_writableState.getBuffer()', this._writableState.getBuffer()); console.log(typeof chunk ); // Transform if (typeof chunk === 'object') { console.log('chunk = ', chunk.get(), chunk.get() +' in pow '+ chunk.get() +' = '+ chunk.inPow(chunk.get())); } else { console.log(`chunk = ${chunk}; isBuffer ${Buffer.isBuffer(chunk)}; chunk.length is ${chunk.length}; encoding = ${encoding}`); } /* . , : 1) on('error', (err)=>{...}) 2) , Readable . - , Readable.emit('error', err); Readable.puse(), Readable.remuse(). , //if (chunk > 3) return done(new Error('chunk > 3'));*/ done(); } } let array_of_data = ['1', '2', '3', '4', '5']; let r_opts = {/* */}; const R = new Source(array_of_data, r_opts); let w_opts = {/* */}; const W = new Writer(w_opts); R.pipe(W); array_of_data = ['1', '2', '3', '4', '5']; r_opts = {encoding: 'utf8'}; const R1 = new Source(array_of_data, r_opts); w_opts = { decodeStrings: false// _write 'utf8', - ( r_opts), }; const W1 = new Writer(w_opts); R1.pipe(W1); array_of_data = [1, 2, 3, 4, 5]; r_opts = {objectMode: true}; const R2 = new Source(array_of_data, r_opts); w_opts = { objectMode: true// false, ( r_opts), "TypeError: Invalid non-string/buffer chunk" }; const W2 = new Writer(w_opts); R2.pipe(W2); array_of_data = [1, 2, 3, 4, 5]; r_opts = {objectMode: true}; const R3 = new Source(array_of_data, r_opts); w_opts = { objectMode: true// false, ( r_opts), "TypeError: Invalid non-string/buffer chunk" , highWaterMark: 1 // ; 'drain' }; const W3 = new Writer(w_opts); R3.pipe(W3); // pipe() const R3_1 = new Source(array_of_data, r_opts); const W3_1 = new Writer(w_opts); R3_1.on('data', (chunk)=> { //R3_1._readableState.flowing === true console.log('R3_1 in flowing mode', R3_1._readableState.flowing, 'R3_1 _readableState.buffer', R3_1._readableState.buffer); toWriteOrNotToWriteThatIsTheQuestion(chunk, onDrain); }); function onDrain() { //R3_1._readableState.flowing === false, R3_1.pause() toWriteOrNotToWriteThatIsTheQuestion console.log('R3_1 in flowing mode', R3_1._readableState.flowing); R3_1.resume(); } /** * Writable, Readable (R3_1.pause()) * ( 'drain'), Readable ( cb R3_1.resume(); ), Writable * @param data * @param cb */ function toWriteOrNotToWriteThatIsTheQuestion(data, cb) { // " " write(...), _write(...) if (!W3_1.write(data)) { R3_1.pause(); W3_1.once('drain', cb); } else { process.nextTick(cb); } }
'use strict'; const Readable = require('./readable.js'); const Writable = require('./writable.js'); const {Transform} = require('stream'); /* , , , JS , */ class Chunk { constructor(chunk) { this.set(chunk); } set(chunk) { this._chunk = chunk; } get() { return this._chunk; } inPow(pow = 2) { return Math.pow(this.get(), pow); } } class Transformer extends Transform { constructor(opt = {}) { super(opt); console.log('\n -------- Transform in constructor'); console.log('objectMode ', this._writableState.objectMode);//false , true console.log('highWaterMark ', this._writableState.highWaterMark);//16384 console.log('decodeStrings ', this._writableState.decodeStrings);//true ; Buffer , _write() console.log('buffer ', this._writableState.getBuffer());//[] - this.on('close', ()=> { console.log('\n------ Transform on close'); }) .on('drain', ()=> { console.log('\n------ Transform on drain'); }) .on('error', (err)=> { console.log('\n------ Transform on error', err); }) .on('finish', ()=> { console.log('\n------ Transform on finish'); }) .on('end', ()=> { console.log('\n------ Transform on end'); }) .on('pipe', ()=> { console.log('\n------ Transform on pipe'); }); } /** * , (chunk Transform), * - Transform * @param chunk * @param encoding * @param done - done(err, chunk) * @private */ _transform(chunk, encoding, done) { /* chunk, done(null, chunk); done(err, chunk); - error , : this.push(chunk); done(); this.push(chunk); done(err);*/ // Chunk (. writable.js) this.push(new Chunk(chunk)); done(); } /** * transform _flush. , , 'end' Readable ( Transform, , ). * @param done - done(err) Error * @private */ _flush(done) { //TODO ... - done(); } } let array_of_data = ['1', '2', '3', '4', '5']; let r_opts = { encoding: 'utf8' }; const R = new Readable(array_of_data, r_opts); let t_opts = { readableObjectMode: true // Transform , writableObjectMode: false// Transform , decodeStrings: false }; const T = new Transformer(t_opts); let w_opts = { objectMode: true// false, }; const W = new Writable(w_opts); R.pipe(T).pipe(W);
Source: https://habr.com/ru/post/339900/
All Articles