From b84f10104970b990d599636b9d3a02b37fc816d4 Mon Sep 17 00:00:00 2001 From: Mestery Date: Thu, 23 Sep 2021 19:26:25 +0200 Subject: [PATCH] stream: support array of streams in promises pipeline Fixes: https://github.com/nodejs/node/issues/40191 PR-URL: https://github.com/nodejs/node/pull/40193 Reviewed-By: Luigi Pinca Reviewed-By: Robert Nagy Reviewed-By: James M Snell --- lib/internal/streams/pipeline.js | 11 +++---- test/parallel/test-stream-pipeline.js | 41 +++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 012d99de0357f2..8dc4e5792c47d8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -166,17 +166,14 @@ async function pump(iterable, writable, finish) { } function pipeline(...streams) { - const callback = once(popCallback(streams)); + return pipelineImpl(streams, once(popCallback(streams))); +} - // stream.pipeline(streams, callback) - if (ArrayIsArray(streams[0]) && streams.length === 1) { +function pipelineImpl(streams, callback, opts) { + if (streams.length === 1 && ArrayIsArray(streams[0])) { streams = streams[0]; } - return pipelineImpl(streams, callback); -} - -function pipelineImpl(streams, callback, opts) { if (streams.length < 2) { throw new ERR_MISSING_ARGS('streams'); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 4b0f11ea41218a..061ef923d03a59 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1406,3 +1406,44 @@ const tsp = require('timers/promises'); })); ac.abort(); } + +{ + async function run() { + let finished = false; + let text = ''; + const write = new Writable({ + write(data, enc, cb) { + text += data; + cb(); + } + }); + write.on('finish', () => { + finished = true; + }); + + await pipelinep([Readable.from('Hello World!'), write]); + assert(finished); + assert.strictEqual(text, 'Hello World!'); + } + + run(); +} + +{ + let finished = false; + let text = ''; + const write = new Writable({ + write(data, enc, cb) { + text += data; + cb(); + } + }); + write.on('finish', () => { + finished = true; + }); + + pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => { + assert(finished); + assert.strictEqual(text, 'Hello World!'); + })); +}