From 4d04a5ed88c21eaa3661dcc33f814e2511c37743 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 22 Jan 2023 19:20:59 +0530 Subject: [PATCH] stream: add pipeline() for webstreams Refs: https://github.com/nodejs/node/issues/39316 --- lib/internal/streams/pipeline.js | 40 ++- lib/internal/streams/utils.js | 10 + test/parallel/test-webstreams-pipeline.js | 306 ++++++++++++++++++++++ 3 files changed, 350 insertions(+), 6 deletions(-) create mode 100644 test/parallel/test-webstreams-pipeline.js diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index b8a756330536c5..5610ce1bb489ca 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -35,11 +35,29 @@ const { isReadable, isReadableNodeStream, isNodeStream, + isReadableStream, + isWritableStream, + isTransformStream, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; +let Writable; + +function lazyloadReadable() { + if (!Readable) { + Readable = require('internal/streams/readable'); + } + return Readable; +} + +function lazyloadWritable() { + if (!Writable) { + Writable = require('internal/streams/writable'); + } + return Writable; +} function destroyer(stream, reading, writing) { let finished = false; @@ -81,11 +99,7 @@ function makeAsyncIterable(val) { } async function* fromReadable(val) { - if (!Readable) { - Readable = require('internal/streams/readable'); - } - - yield* Readable.prototype[SymbolAsyncIterator].call(val); + yield* lazyloadReadable().prototype[SymbolAsyncIterator].call(val); } async function pump(iterable, writable, finish, { end }) { @@ -147,6 +161,20 @@ async function pump(iterable, writable, finish, { end }) { } } +function convertToNodeStreamIfWebstream(stream) { + if (isReadableStream(stream)) { + return lazyloadReadable().fromWeb(stream); + } else if (isWritableStream(stream)) { + return lazyloadWritable().fromWeb(stream); + } else if (isTransformStream(stream)) { + return Duplex.from({ + writable: stream.writable, + readable: stream.readable + }); + } + return stream; +} + function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))); } @@ -212,7 +240,7 @@ function pipelineImpl(streams, callback, opts) { let ret; for (let i = 0; i < streams.length; i++) { - const stream = streams[i]; + const stream = convertToNodeStreamIfWebstream(streams[i]); const reading = i < streams.length - 1; const writing = i > 0; const end = reading || opts?.end !== false; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 9d08af6f31a280..1471b8b7eb07d7 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -77,6 +77,15 @@ function isWritableStream(obj) { ); } +function isTransformStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.readable === 'object' && + typeof obj.writable === 'object' + ); +} + function isIterable(obj, isAsync) { if (obj == null) return false; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; @@ -312,4 +321,5 @@ module.exports = { isServerRequest, isServerResponse, willEmitClose, + isTransformStream, }; diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js new file mode 100644 index 00000000000000..e8448065d32cb8 --- /dev/null +++ b/test/parallel/test-webstreams-pipeline.js @@ -0,0 +1,306 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Readable, Writable, Transform, pipeline } = require('stream'); +const { pipeline: pipelinePromise } = require('stream/promises'); +const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); +const http = require('http'); + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(rs, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipeline(rs, ws, common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +} + +{ + let c; + const values = []; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString().toUpperCase()); + } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(rs, ts, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + function makeTransformStream() { + return new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString()); + } + }); + } + + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipeline(rs, + makeTransformStream(), + makeTransformStream(), + makeTransformStream(), + makeTransformStream(), + ws, + common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +} + +{ + const values = []; + + const r = new Readable({ + read() { } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipeline(r, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + r.push('hello'); + r.push('world'); + r.push(null); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const w = new Writable({ + write(chunk, encoding, callback) { + values.push(chunk?.toString()); + callback(); + } + }); + + pipeline(rs, w, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + const t = new Transform({ + transform(chunk, encoding, callback) { + callback(null, chunk?.toString().toUpperCase()); + } + }); + + pipeline(rs, t, ws, common.mustSucceed(() => { + assert.deepStrictEqual(values, ['HELLO', 'WORLD']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + const server = http.createServer((req, res) => { + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('hello'); + controller.enqueue('world'); + controller.close(); + } + }); + pipeline(rs, res, common.mustSucceed(() => {})); + }); + + server.listen(0, common.mustCall(() => { + const req = http.request({ + port: server.address().port + }); + req.end(); + const values = []; + req.on('response', (res) => { + res.on('data', (chunk) => { + values.push(chunk?.toString()); + }); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + server.close(); + })); + }); + })); +} + +{ + const values = []; + const server = http.createServer((req, res) => { + const ts = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk?.toString().toUpperCase()); + } + }); + pipeline(req, ts, res, common.mustSucceed()); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + let c; + + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + c.enqueue('hello'); + c.close(); + + pipeline(rs, req, common.mustSucceed(() => { + server.close(); + })); + + req.on('response', (res) => { + res.on('data', (chunk) => { + values.push(chunk?.toString()); + } + ); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual(values, ['HELLO']); + server.close(); + })); + }); + }); +} + +{ + const values = []; + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + const ws = new WritableStream({ + write(chunk) { + values.push(chunk?.toString()); + } + }); + + pipelinePromise(rs, ws).then(common.mustCall(() => { + assert.deepStrictEqual(values, ['hello', 'world']); + })); + + c.enqueue('hello'); + c.enqueue('world'); + c.close(); +} + +{ + let c; + const rs = new ReadableStream({ + start(controller) { + c = controller; + } + }); + + const ws = new WritableStream({ + write() { } + }); + + pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, 'kaboom'); + })); + + c.error(new Error('kaboom')); +}