From 8c627f7d783bdafe7d2ef2ca9fe6183f7a35cb12 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 27 Feb 2023 14:20:39 +0530 Subject: [PATCH] stream: enable usage of webstreams on compose() Refs: https://github.com/nodejs/node/issues/39316 PR-URL: https://github.com/nodejs/node/pull/46675 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Robert Nagy Reviewed-By: James M Snell --- doc/api/stream.md | 7 +- lib/internal/streams/compose.js | 178 ++++++--- lib/internal/streams/pipeline.js | 3 +- test/parallel/test-webstreams-compose.js | 483 +++++++++++++++++++++++ 4 files changed, 619 insertions(+), 52 deletions(-) create mode 100644 test/parallel/test-webstreams-compose.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 3c1e5aaaf033d3..84228ebc27816e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2796,11 +2796,16 @@ const server = http.createServer((req, res) => { > Stability: 1 - `stream.compose` is experimental. -* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} +* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]| + ReadableStream\[]|WritableStream\[]|TransformStream\[]} * Returns: {stream.Duplex} Combines two or more streams into a `Duplex` stream that writes to the diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 14c68e37b2e7af..16dc10ad69643e 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -7,6 +7,10 @@ const { isNodeStream, isReadable, isWritable, + isWebStream, + isTransformStream, + isWritableStream, + isReadableStream, } = require('internal/streams/utils'); const { AbortError, @@ -15,6 +19,7 @@ const { ERR_MISSING_ARGS, }, } = require('internal/errors'); +const eos = require('internal/streams/end-of-stream'); module.exports = function compose(...streams) { if (streams.length === 0) { @@ -37,18 +42,32 @@ module.exports = function compose(...streams) { } for (let n = 0; n < streams.length; ++n) { - if (!isNodeStream(streams[n])) { + if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { // TODO(ronag): Add checks for non streams. continue; } - if (n < streams.length - 1 && !isReadable(streams[n])) { + if ( + n < streams.length - 1 && + !( + isReadable(streams[n]) || + isReadableStream(streams[n]) || + isTransformStream(streams[n]) + ) + ) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, orgStreams[n], 'must be readable', ); } - if (n > 0 && !isWritable(streams[n])) { + if ( + n > 0 && + !( + isWritable(streams[n]) || + isWritableStream(streams[n]) || + isTransformStream(streams[n]) + ) + ) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, orgStreams[n], @@ -79,8 +98,16 @@ module.exports = function compose(...streams) { const head = streams[0]; const tail = pipeline(streams, onfinished); - const writable = !!isWritable(head); - const readable = !!isReadable(tail); + const writable = !!( + isWritable(head) || + isWritableStream(head) || + isTransformStream(head) + ); + const readable = !!( + isReadable(tail) || + isReadableStream(tail) || + isTransformStream(tail) + ); // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. @@ -94,28 +121,55 @@ module.exports = function compose(...streams) { }); if (writable) { - d._write = function(chunk, encoding, callback) { - if (head.write(chunk, encoding)) { - callback(); - } else { - ondrain = callback; - } - }; - - d._final = function(callback) { - head.end(); - onfinish = callback; - }; + if (isNodeStream(head)) { + d._write = function(chunk, encoding, callback) { + if (head.write(chunk, encoding)) { + callback(); + } else { + ondrain = callback; + } + }; + + d._final = function(callback) { + head.end(); + onfinish = callback; + }; + + head.on('drain', function() { + if (ondrain) { + const cb = ondrain; + ondrain = null; + cb(); + } + }); + } else if (isWebStream(head)) { + const writable = isTransformStream(head) ? head.writable : head; + const writer = writable.getWriter(); + + d._write = async function(chunk, encoding, callback) { + try { + await writer.ready; + writer.write(chunk).catch(() => {}); + callback(); + } catch (err) { + callback(err); + } + }; + + d._final = async function(callback) { + try { + await writer.ready; + writer.close().catch(() => {}); + onfinish = callback; + } catch (err) { + callback(err); + } + }; + } - head.on('drain', function() { - if (ondrain) { - const cb = ondrain; - ondrain = null; - cb(); - } - }); + const toRead = isTransformStream(tail) ? tail.readable : tail; - tail.on('finish', function() { + eos(toRead, () => { if (onfinish) { const cb = onfinish; onfinish = null; @@ -125,32 +179,54 @@ module.exports = function compose(...streams) { } if (readable) { - tail.on('readable', function() { - if (onreadable) { - const cb = onreadable; - onreadable = null; - cb(); - } - }); - - tail.on('end', function() { - d.push(null); - }); - - d._read = function() { - while (true) { - const buf = tail.read(); - - if (buf === null) { - onreadable = d._read; - return; + if (isNodeStream(tail)) { + tail.on('readable', function() { + if (onreadable) { + const cb = onreadable; + onreadable = null; + cb(); } - - if (!d.push(buf)) { - return; + }); + + tail.on('end', function() { + d.push(null); + }); + + d._read = function() { + while (true) { + const buf = tail.read(); + if (buf === null) { + onreadable = d._read; + return; + } + + if (!d.push(buf)) { + return; + } } - } - }; + }; + } else if (isWebStream(tail)) { + const readable = isTransformStream(tail) ? tail.readable : tail; + const reader = readable.getReader(); + d._read = async function() { + while (true) { + try { + const { value, done } = await reader.read(); + + if (!d.push(value)) { + return; + } + + if (done) { + d.push(null); + return; + } + } catch { + return; + } + } + }; + } } d._destroy = function(err, callback) { @@ -166,7 +242,9 @@ module.exports = function compose(...streams) { callback(err); } else { onclose = callback; - destroyer(tail, err); + if (isNodeStream(tail)) { + destroyer(tail, err); + } } }; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 22bb042d71b420..95737d95e48e41 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -286,7 +286,7 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadableNodeStream(stream)) { + } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { ret = stream; } else { ret = Duplex.from(stream); @@ -385,6 +385,7 @@ function pipelineImpl(streams, callback, opts) { finishCount++; pumpToWeb(ret, stream, finish, { end }); } else if (isTransformStream(ret)) { + finishCount++; pumpToWeb(ret.readable, stream, finish, { end }); } else { throw new ERR_INVALID_ARG_TYPE( diff --git a/test/parallel/test-webstreams-compose.js b/test/parallel/test-webstreams-compose.js new file mode 100644 index 00000000000000..5514d12bd02eb0 --- /dev/null +++ b/test/parallel/test-webstreams-compose.js @@ -0,0 +1,483 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +const { + Transform, + Readable, + Writable, + compose +} = require('stream'); + +const { + TransformStream, + ReadableStream, + WritableStream, +} = require('stream/web'); + +{ + let res = ''; + + const d = compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.replace(' ', '_')); + }) + }), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.toUpperCase()); + }) + }) + ); + + d.on('data', common.mustCall((chunk) => { + res += chunk; + })); + + d.on('end', common.mustCall(() => { + assert.strictEqual(res, 'HELLO_WORLD'); + })); + + d.end('hello world'); +} + +{ + let res = ''; + + compose( + new Transform({ + transform: common.mustCall((chunk, encoding, callback) => { + callback(null, chunk + chunk); + }) + }), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }) + ) + .end('asd') + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASDASD'); + })); +} + +{ + let res = ''; + + compose( + async function*(source) { + for await (const chunk of source) { + yield chunk + chunk; + } + }, + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }), + }) + ) + .end('asd') + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASDASD'); + })); +} + +{ + let res = ''; + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }), + }), + async function*(source) { + for await (const chunk of source) { + yield chunk + chunk; + } + }, + new Transform({ + transform: common.mustCall((chunk, enc, clb) => { + clb(null, chunk?.toString()?.replaceAll('A', 'B')); + }) + }) + ) + .end('asd') + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'BSDBSD'); + })); +} + +{ + let res = ''; + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }), + }), + async function*(source) { + for await (const chunk of source) { + yield chunk + chunk; + } + }, + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.replaceAll('A', 'B')); + }) + }) + ) + .end('asd') + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'BSDBSD'); + })); +} + +{ + let res = ''; + compose( + new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.toUpperCase()); + }) + }) + ) + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }), + new Transform({ + transform: common.mustCall((chunk, enc, clb) => { + clb(null, chunk?.toString()?.toUpperCase()); + }) + }) + ) + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + Readable.from(['asd']), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk?.toString()?.toUpperCase()); + }) + }) + ) + .on('data', common.mustCall((buf) => { + res += buf; + })) + .on('end', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + new Writable({ + write: common.mustCall((chunk, encoding, callback) => { + res += chunk; + callback(null); + }) + }) + ) + .end('asd') + .on('finish', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new Transform({ + transform: common.mustCall((chunk, encoding, callback) => { + callback(null, chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + new WritableStream({ + write: common.mustCall((chunk) => { + res += chunk; + }) + }) + ) + .end('asd') + .on('finish', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + new WritableStream({ + write: common.mustCall((chunk) => { + res += chunk; + }) + }) + ) + .end('asd') + .on('finish', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + let res = ''; + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + async function(source) { + for await (const chunk of source) { + res += chunk; + } + } + ) + .end('asd') + .on('finish', common.mustCall(() => { + assert.strictEqual(res, 'ASD'); + })); +} + +{ + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.error(new Error('asd')); + }) + }), + new TransformStream({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk); + }) + }), + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.error(new Error('asd')); + }) + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk); + }) + }), + async function*(source) { // eslint-disable-line require-yield + let tmp = ''; + for await (const chunk of source) { + tmp += chunk; + throw new Error('asd'); + } + return tmp; + }, + new TransformStream({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.error(new Error('asd')); + }) + }), + new Transform({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + + compose( + new Transform({ + transform: common.mustCall((chunk, enc, clb) => { + clb(new Error('asd')); + }) + }), + new TransformStream({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + compose( + new ReadableStream({ + start(controller) { + controller.enqueue(new Error('asd')); + } + }), + new TransformStream({ + transform: common.mustNotCall() + }) + ) + .on('data', common.mustNotCall()) + .on('end', common.mustNotCall()) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + new WritableStream({ + write: common.mustCall((chunk, controller) => { + controller.error(new Error('asd')); + }) + }) + ) + .on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }) + .end('xyz'); +} + +{ + compose( + new TransformStream({ + transform: common.mustCall((chunk, controller) => { + controller.enqueue(chunk.toString().toUpperCase()); + }) + }), + async function*(source) { + for await (const chunk of source) { + yield chunk; + } + }, + async function(source) { + throw new Error('asd'); + } + ).on('error', (err) => { + assert.strictEqual(err?.message, 'asd'); + }).end('xyz'); +}