From 46ddf18317946b2ccc25c99a356a01847b6f589e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 14 Jun 2021 15:22:59 +0200 Subject: [PATCH] streams: add stream.pipe pipe is similar to pipeline however it supports stream composition. --- lib/internal/streams/pipe.js | 56 ++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 lib/internal/streams/pipe.js diff --git a/lib/internal/streams/pipe.js b/lib/internal/streams/pipe.js new file mode 100644 index 00000000000000..6bf0eae503495d --- /dev/null +++ b/lib/internal/streams/pipe.js @@ -0,0 +1,56 @@ +'use strict'; + +const pipeline = require('internal/streams/pipeline'); +const Duplex = require('internal/streams/duplex'); + +module.exports = function pipe(...streams) { + let cb; + let ret; + + const r = pipeline(streams, function(err) { + if (cb) { + cb(err); + } else { + ret.destroy(err); + } + }); + const w = streams[0]; + + ret = new Duplex({ + writable: !!w?.writable, + readable: !!r?.readable, + objectMode: streams[0].readableObjectMode, + highWaterMark: 1 + }); + + if (ret.writable) { + ret._write = function(chunk, encoding, callback) { + w.write(chunk, encoding, callback); + }; + + ret._final = function(chunk, encoding, callback) { + w.end(chunk, encoding, callback); + }; + } + + if (ret.readable) { + ret._read = function() { + r.resume(); + }; + + r + .on('data', function(buf) { + if (!ret.push(buf)) { + this.pause(); + } + }) + .on('end', function() { + ret.push(null); + }); + } + + ret._destroy = function(err, callback) { + cb = callback; + streams[0].destroy(err); + }; +}