Skip to content

Commit

Permalink
streams: add stream.pipe
Browse files Browse the repository at this point in the history
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
  • Loading branch information
ronag committed Jun 15, 2021
1 parent 4e17ffc commit 175b4d4
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 0 deletions.
105 changes: 105 additions & 0 deletions lib/internal/streams/pipelinify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');

module.exports = function pipe(...streams) {
// TODO (ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.

let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;

const r = pipeline(streams, function(err) {
if (onclose) {
const cb = onclose;
onclose = null;
cb(err);
} else {
d.destroy(err);
}
});
const w = streams[0];

const writable = w.writable;
const readable = r.readable;
const objectMode = w.readableObjectMode;

d = new Duplex({
writable,
readable,
objectMode,
highWaterMark: 1
});

if (writable) {
d._write = function(chunk, encoding, callback) {
if (w.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
w.end();
onfinish = callback;
};

d.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});

d.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
r.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});

d._read = function() {
while (true) {
const buf = r.read();

if (buf === null) {
onreadable = d._read;
return;
}

if (!d.push(buf)) {
return;
}
}
};
}

d._destroy = function(err, callback) {
onclose = callback;
onreadable = null;
ondrain = null;
onfinish = null;
destroyer(r, err);
};

return d;
};
2 changes: 2 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const pipelinify = require('internal/streams/pipelinify');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');

Expand All @@ -42,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.pipeline = pipeline;
Stream.pipelinify = pipelinify;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Expand Down

0 comments on commit 175b4d4

Please sign in to comment.