Skip to content

Commit b372493

Browse files
committed
streams: add stream.pipe
pipe is similar to pipeline however it supports stream composition. Refs: nodejs#32020
1 parent 4e17ffc commit b372493

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

lib/internal/streams/pipelinify.js

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
'use strict';
2+
3+
const pipeline = require('internal/streams/pipeline');
4+
const Duplex = require('internal/streams/duplex');
5+
6+
module.exports = function pipe(...streams) {
7+
let onclose;
8+
let ret;
9+
10+
const r = pipeline(streams, function(err) {
11+
if (onclose) {
12+
const cb = onclose;
13+
onclose = null;
14+
cb(err);
15+
} else {
16+
ret.destroy(err);
17+
}
18+
});
19+
const w = streams[0];
20+
21+
const writable = w.writable;
22+
const readable = r.readable;
23+
const objectMode = w.readableObjectMode;
24+
25+
ret = new Duplex({
26+
writable,
27+
readable,
28+
objectMode,
29+
highWaterMark: 1
30+
});
31+
32+
if (writable) {
33+
let ondrain;
34+
let onfinish;
35+
36+
ret._write = function(chunk, encoding, callback) {
37+
if (w.write(chunk, encoding)) {
38+
callback();
39+
} else {
40+
ondrain = callback;
41+
}
42+
};
43+
44+
ret._final = function(chunk, encoding, callback) {
45+
w.end(chunk, encoding);
46+
onfinish = callback;
47+
};
48+
49+
ret.on('drain', function () {
50+
if (ondrain) {
51+
const cb = ondrain;
52+
ondrain = null;
53+
cb();
54+
}
55+
});
56+
57+
ret.on('finish', function () {
58+
if (onfinish) {
59+
const cb = onfinish
60+
onfinish = null;
61+
cb();
62+
}
63+
});
64+
}
65+
66+
if (readable) {
67+
let onreadable;
68+
69+
r.on('readable', function () {
70+
if (onreadable) {
71+
const cb = onreadable
72+
onreadable = null;
73+
cb();
74+
}
75+
});
76+
77+
ret._read = function () {
78+
while (true) {
79+
const buf = r.read();
80+
81+
if (buf === null) {
82+
onreadable = ret._read;
83+
return;
84+
}
85+
86+
if (!ret.push(buf)) {
87+
return;
88+
}
89+
}
90+
};
91+
}
92+
93+
ret._destroy = function(err, callback) {
94+
onclose = callback;
95+
streams[0].destroy(err); // pipeline should propagate destroy(err) to all streams.
96+
};
97+
}

lib/stream.js

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33+
const pipelinify = require('internal/streams/pipelinify');
3334
const eos = require('internal/streams/end-of-stream');
3435
const internalBuffer = require('internal/buffer');
3536

@@ -42,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
4243
Stream.Transform = require('internal/streams/transform');
4344
Stream.PassThrough = require('internal/streams/passthrough');
4445
Stream.pipeline = pipeline;
46+
Stream.pipelinify = pipelinify;
4547
const { addAbortSignal } = require('internal/streams/add-abort-signal');
4648
Stream.addAbortSignal = addAbortSignal;
4749
Stream.finished = eos;

0 commit comments

Comments
 (0)