Skip to content

Commit 474b9ab

Browse files
committed
streams: add stream.pipe
pipe is similar to pipeline however it supports stream composition. Refs: nodejs#32020
1 parent 4e17ffc commit 474b9ab

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

lib/internal/streams/pipe.js

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
'use strict';
2+
3+
const pipeline = require('internal/streams/pipeline');
4+
const Duplex = require('internal/streams/duplex');
5+
6+
module.exports = function pipe(...streams) {
7+
let cb;
8+
let ret;
9+
10+
const r = pipeline(streams, function(err) {
11+
if (cb) {
12+
cb(err);
13+
} else {
14+
ret.destroy(err);
15+
}
16+
});
17+
const w = streams[0];
18+
19+
ret = new Duplex({
20+
writable: !!w?.writable,
21+
readable: !!r?.readable,
22+
objectMode: streams[0].readableObjectMode,
23+
highWaterMark: 1
24+
});
25+
26+
if (ret.writable) {
27+
ret._write = function(chunk, encoding, callback) {
28+
w.write(chunk, encoding, callback);
29+
};
30+
31+
ret._final = function(chunk, encoding, callback) {
32+
w.end(chunk, encoding, callback);
33+
};
34+
}
35+
36+
if (ret.readable) {
37+
ret._read = function() {
38+
r.resume();
39+
};
40+
41+
r
42+
.on('data', function(buf) {
43+
if (!ret.push(buf)) {
44+
this.pause();
45+
}
46+
})
47+
.on('end', function() {
48+
ret.push(null);
49+
});
50+
}
51+
52+
ret._destroy = function(err, callback) {
53+
cb = callback;
54+
streams[0].destroy(err);
55+
};
56+
}

0 commit comments

Comments
 (0)