Skip to content

Commit

Permalink
streams: add stream.pipe
Browse files Browse the repository at this point in the history
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
  • Loading branch information
ronag committed Jun 15, 2021
1 parent 4e17ffc commit fbe89e6
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
98 changes: 98 additions & 0 deletions lib/internal/streams/pipelinify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');

module.exports = function pipe(...streams) {
let onclose;
let ret;

const r = pipeline(streams, function(err) {
if (onclose) {
const cb = onclose;
onclose = null;
cb(err);
} else {
ret.destroy(err);
}
});
const w = streams[0];

const writable = w.writable;
const readable = r.readable;
const objectMode = w.readableObjectMode;

ret = new Duplex({
writable,
readable,
objectMode,
highWaterMark: 1
});

if (writable) {
let ondrain;
let onfinish;

ret._write = function(chunk, encoding, callback) {
if (w.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

ret._final = function(chunk, encoding, callback) {
w.end(chunk, encoding);
onfinish = callback;
};

ret.on('drain', function () {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});

ret.on('finish', function () {
if (onfinish) {
const cb = onfinish
onfinish = null;
cb();
}
});
}

if (readable) {
let onreadable;

r.on('readable', function () {
if (onreadable) {
const cb = onreadable
onreadable = null;
cb();
}
});

ret._read = function () {
while (true) {
const buf = r.read();

if (buf === null) {
onreadable = ret._read;
return;
}

if (!ret.push(buf)) {
return;
}
}
};
}

ret._destroy = function(err, callback) {
onclose = callback;
r.destroy(err);
};
}
2 changes: 2 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const pipelinify = require('internal/streams/pipelinify');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');

Expand All @@ -42,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.pipeline = pipeline;
Stream.pipelinify = pipelinify;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Expand Down

0 comments on commit fbe89e6

Please sign in to comment.