Skip to content

Commit bea9efc

Browse files
committed
streams: add stream.pipelinify
Refs: nodejs#32020
1 parent f179eb0 commit bea9efc

File tree

4 files changed

+355
-0
lines changed

4 files changed

+355
-0
lines changed

doc/api/stream.md

+33
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,39 @@ run().catch(console.error);
18591859
after the `callback` has been invoked. In the case of reuse of streams after
18601860
failure, this can cause event listener leaks and swallowed errors.
18611861

1862+
### `stream.pipelinify(...streams)`
1863+
<!-- YAML
1864+
added: REPLACEME
1865+
-->
1866+
1867+
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
1868+
* Returns: {stream.Duplex}
1869+
1870+
Combines multiple streams into a `Duplex` stream. This works by
1871+
calling `stream.pipeline` on all the passed streams and then creating
1872+
a `Duplex` stream which writes to the first passed stream and reads from
1873+
the stream returned from `stream.pipeline`.
1874+
1875+
```js
1876+
const { pipelinify, Transform } = require('streams');
1877+
const removeSpaces = new Transform({
1878+
transform(chunk, encoding, callback) {
1879+
callback(null, String(chunk).replace(' '));
1880+
}
1881+
});
1882+
const toUpper = new Transform({
1883+
transform(chunk, encoding, callback) {
1884+
callback(null, String(chunk).toUpperCase());
1885+
}
1886+
});
1887+
const removeSpacesAndToUpper = pipelinify(removeSpaces, toUpper);
1888+
removeSpacesAndToUpper
1889+
.end('hello world')
1890+
.on('data', (buf) => {
1891+
console.log(buf); // prints 'HELLOWORLD'
1892+
});
1893+
```
1894+
18621895
### `stream.Readable.from(iterable, [options])`
18631896
<!-- YAML
18641897
added:

lib/internal/streams/pipelinify.js

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
'use strict';
2+
3+
const pipeline = require('internal/streams/pipeline');
4+
const Duplex = require('internal/streams/duplex');
5+
const { destroyer } = require('internal/streams/destroy');
6+
7+
module.exports = function pipe(...streams) {
8+
let ondrain;
9+
let onfinish;
10+
let onreadable;
11+
let onclose;
12+
let d;
13+
14+
const r = pipeline(streams, function(err) {
15+
if (onclose) {
16+
onclose(err);
17+
} else if (err) {
18+
d.destroy(err);
19+
}
20+
onclose = null;
21+
});
22+
const w = streams[0];
23+
24+
// TODO (ronag): Avoid double buffering.
25+
// Implement Writable/Readable/Duplex traits.
26+
// See, https://github.com/nodejs/node/pull/33515.
27+
d = new Duplex({
28+
writable: w.writable,
29+
readable: r.readable,
30+
objectMode: w.readableObjectMode
31+
});
32+
33+
if (w.writable) {
34+
d._write = function(chunk, encoding, callback) {
35+
if (w.write(chunk, encoding)) {
36+
callback();
37+
} else {
38+
ondrain = callback;
39+
}
40+
};
41+
42+
d._final = function(callback) {
43+
w.end();
44+
onfinish = callback;
45+
};
46+
47+
w.on('drain', function() {
48+
if (ondrain) {
49+
const cb = ondrain;
50+
ondrain = null;
51+
cb();
52+
}
53+
});
54+
55+
r.on('finish', function() {
56+
if (onfinish) {
57+
const cb = onfinish;
58+
onfinish = null;
59+
cb();
60+
}
61+
});
62+
}
63+
64+
if (r.readable) {
65+
r.on('readable', function() {
66+
if (onreadable) {
67+
const cb = onreadable;
68+
onreadable = null;
69+
cb();
70+
}
71+
});
72+
73+
r.on('end', function() {
74+
d.push(null);
75+
});
76+
77+
d._read = function() {
78+
while (true) {
79+
const buf = r.read();
80+
81+
if (buf === null) {
82+
onreadable = d._read;
83+
return;
84+
}
85+
86+
if (!d.push(buf)) {
87+
return;
88+
}
89+
}
90+
};
91+
}
92+
93+
d._destroy = function(err, callback) {
94+
onreadable = null;
95+
ondrain = null;
96+
onfinish = null;
97+
98+
if (onclose === null) {
99+
callback(err);
100+
} else {
101+
onclose = callback;
102+
destroyer(r, err);
103+
}
104+
};
105+
106+
return d;
107+
};

lib/stream.js

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33+
const pipelinify = require('internal/streams/pipelinify');
3334
const { destroyer } = require('internal/streams/destroy');
3435
const eos = require('internal/streams/end-of-stream');
3536
const internalBuffer = require('internal/buffer');
@@ -43,6 +44,7 @@ Stream.Duplex = require('internal/streams/duplex');
4344
Stream.Transform = require('internal/streams/transform');
4445
Stream.PassThrough = require('internal/streams/passthrough');
4546
Stream.pipeline = pipeline;
47+
Stream.pipelinify = pipelinify;
4648
const { addAbortSignal } = require('internal/streams/add-abort-signal');
4749
Stream.addAbortSignal = addAbortSignal;
4850
Stream.finished = eos;
+213
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
Transform,
7+
Writable,
8+
pipelinify
9+
} = require('stream');
10+
const assert = require('assert');
11+
12+
{
13+
let res = '';
14+
pipelinify(
15+
new Transform({
16+
transform: common.mustCall((chunk, encoding, callback) => {
17+
callback(null, chunk + chunk);
18+
})
19+
}),
20+
new Transform({
21+
transform: common.mustCall((chunk, encoding, callback) => {
22+
callback(null, chunk.toString().toUpperCase());
23+
})
24+
})
25+
)
26+
.end('asd')
27+
.on('data', common.mustCall((buf) => {
28+
res += buf;
29+
}))
30+
.on('end', common.mustCall(() => {
31+
assert.strictEqual(res, 'ASDASD');
32+
}));
33+
}
34+
35+
{
36+
let res = '';
37+
pipelinify(
38+
Readable.from(['asd']),
39+
new Transform({
40+
transform: common.mustCall((chunk, encoding, callback) => {
41+
callback(null, chunk.toString().toUpperCase());
42+
})
43+
})
44+
)
45+
.on('data', common.mustCall((buf) => {
46+
res += buf;
47+
}))
48+
.on('end', common.mustCall(() => {
49+
assert.strictEqual(res, 'ASD');
50+
}));
51+
}
52+
53+
{
54+
let res = '';
55+
pipelinify(
56+
async function* () {
57+
yield 'asd';
58+
},
59+
new Transform({
60+
transform: common.mustCall((chunk, encoding, callback) => {
61+
callback(null, chunk.toString().toUpperCase());
62+
})
63+
})
64+
)
65+
.on('data', common.mustCall((buf) => {
66+
res += buf;
67+
}))
68+
.on('end', common.mustCall(() => {
69+
assert.strictEqual(res, 'ASD');
70+
}));
71+
}
72+
73+
{
74+
let res = '';
75+
pipelinify(
76+
new Transform({
77+
transform: common.mustCall((chunk, encoding, callback) => {
78+
callback(null, chunk.toString().toUpperCase());
79+
})
80+
}),
81+
async function*(source) {
82+
for await (const chunk of source) {
83+
yield chunk;
84+
}
85+
},
86+
new Writable({
87+
write: common.mustCall((chunk, encoding, callback) => {
88+
res += chunk;
89+
callback(null);
90+
})
91+
})
92+
)
93+
.end('asd')
94+
.on('finish', common.mustCall(() => {
95+
assert.strictEqual(res, 'ASD');
96+
}));
97+
}
98+
99+
{
100+
let res = '';
101+
pipelinify(
102+
new Transform({
103+
transform: common.mustCall((chunk, encoding, callback) => {
104+
callback(null, chunk.toString().toUpperCase());
105+
})
106+
}),
107+
async function*(source) {
108+
for await (const chunk of source) {
109+
yield chunk;
110+
}
111+
},
112+
async function(source) {
113+
for await (const chunk of source) {
114+
res += chunk;
115+
}
116+
}
117+
)
118+
.end('asd')
119+
.on('finish', common.mustCall(() => {
120+
assert.strictEqual(res, 'ASD');
121+
}));
122+
}
123+
124+
{
125+
let res;
126+
pipelinify(
127+
new Transform({
128+
objectMode: true,
129+
transform: common.mustCall((chunk, encoding, callback) => {
130+
callback(null, { chunk });
131+
})
132+
}),
133+
async function*(source) {
134+
for await (const chunk of source) {
135+
yield chunk;
136+
}
137+
},
138+
new Transform({
139+
objectMode: true,
140+
transform: common.mustCall((chunk, encoding, callback) => {
141+
callback(null, { chunk });
142+
})
143+
})
144+
)
145+
.end(true)
146+
.on('data', common.mustCall((buf) => {
147+
res = buf;
148+
}))
149+
.on('end', common.mustCall(() => {
150+
assert.strictEqual(res.chunk.chunk, true);
151+
}));
152+
}
153+
154+
{
155+
const _err = new Error('asd');
156+
pipelinify(
157+
new Transform({
158+
objectMode: true,
159+
transform: common.mustCall((chunk, encoding, callback) => {
160+
callback(_err);
161+
})
162+
}),
163+
async function*(source) {
164+
for await (const chunk of source) {
165+
yield chunk;
166+
}
167+
},
168+
new Transform({
169+
objectMode: true,
170+
transform: common.mustNotCall((chunk, encoding, callback) => {
171+
callback(null, { chunk });
172+
})
173+
})
174+
)
175+
.end(true)
176+
.on('data', common.mustNotCall())
177+
.on('end', common.mustNotCall())
178+
.on('error', (err) => {
179+
assert.strictEqual(err, _err);
180+
});
181+
}
182+
183+
{
184+
const _err = new Error('asd');
185+
pipelinify(
186+
new Transform({
187+
objectMode: true,
188+
transform: common.mustCall((chunk, encoding, callback) => {
189+
callback(null, chunk);
190+
})
191+
}),
192+
async function*(source) {
193+
let tmp = '';
194+
for await (const chunk of source) {
195+
tmp += chunk;
196+
throw _err;
197+
}
198+
return tmp;
199+
},
200+
new Transform({
201+
objectMode: true,
202+
transform: common.mustNotCall((chunk, encoding, callback) => {
203+
callback(null, { chunk });
204+
})
205+
})
206+
)
207+
.end(true)
208+
.on('data', common.mustNotCall())
209+
.on('end', common.mustNotCall())
210+
.on('error', (err) => {
211+
assert.strictEqual(err, _err);
212+
});
213+
}

0 commit comments

Comments
 (0)