From d99602f648fe5a3a01f33b4da36c3c3f43eb4117 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 14 Jan 2023 02:28:30 +0530 Subject: [PATCH] streams: implement finished() for webstreams Refs: https://github.com/nodejs/node/issues/39316 --- lib/internal/streams/end-of-stream.js | 23 ++++++++++-- lib/internal/streams/utils.js | 10 +++--- lib/internal/webstreams/readablestream.js | 14 +++++++- lib/internal/webstreams/writablestream.js | 14 +++++++- test/parallel/test-stream-end-of-streams.js | 20 ----------- test/parallel/test-stream-finished.js | 7 +--- test/parallel/test-webstreams-finished.js | 40 +++++++++++++++++++++ 7 files changed, 92 insertions(+), 36 deletions(-) delete mode 100644 test/parallel/test-stream-end-of-streams.js create mode 100644 test/parallel/test-webstreams-finished.js diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index ca42174c86459a..06684f032ce00c 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -22,7 +22,7 @@ const { validateBoolean } = require('internal/validators'); -const { Promise } = primordials; +const { Promise, PromisePrototypeThen } = primordials; const { isClosed, @@ -38,6 +38,15 @@ const { willEmitClose: _willEmitClose, } = require('internal/streams/utils'); +const { + isBrandCheck, +} = require('internal/webstreams/util'); + +const isReadableStream = + isBrandCheck('ReadableStream'); +const isWritableStream = + isBrandCheck('WritableStream'); + function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } @@ -62,8 +71,7 @@ function eos(stream, options, callback) { const writable = options.writable ?? isWritableNodeStream(stream); if (!isNodeStream(stream)) { - // TODO: Webstreams. - throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream); + return eosWeb(stream, options, callback); } const wState = stream._writableState; @@ -255,6 +263,15 @@ function eos(stream, options, callback) { return cleanup; } +function eosWeb(stream, opts, callback) { + PromisePrototypeThen( + stream.streamClosed, + () => callback.call(stream), + (err) => callback.call(stream, err) + ); + return nop; +} + function finished(stream, opts) { let autoCleanup = false; if (opts === null) { diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 4d4f00ab456fa7..e5089e4d4f4983 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -83,7 +83,7 @@ function isWritableEnded(stream) { // Have emitted 'finish'. function isWritableFinished(stream, strict) { - if (!isWritableNodeStream(stream)) return null; + if (!isWritableNodeStream(stream)) return stream?.state === 'closed' ? true : null; if (stream.writableFinished === true) return true; const wState = stream._writableState; if (wState?.errored) return false; @@ -106,7 +106,7 @@ function isReadableEnded(stream) { // Have emitted 'end'. function isReadableFinished(stream, strict) { - if (!isReadableNodeStream(stream)) return null; + if (!isReadableNodeStream(stream)) stream?.state === 'closed' ? true : null; const rState = stream._readableState; if (rState?.errored) return false; if (typeof rState?.endEmitted !== 'boolean') return null; @@ -155,7 +155,7 @@ function isFinished(stream, opts) { function isWritableErrored(stream) { if (!isNodeStream(stream)) { - return null; + return stream?.state === 'errored' ? true : null; } if (stream.writableErrored) { @@ -167,7 +167,7 @@ function isWritableErrored(stream) { function isReadableErrored(stream) { if (!isNodeStream(stream)) { - return null; + return stream?.state === 'errored' ? true : null; } if (stream.readableErrored) { @@ -179,7 +179,7 @@ function isReadableErrored(stream) { function isClosed(stream) { if (!isNodeStream(stream)) { - return null; + return stream?.state === 'closed' ? true : null; } if (typeof stream.closed === 'boolean') { diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 5344785b90cd3e..ecbbaa071e5767 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -231,7 +231,8 @@ class ReadableStream { port1: undefined, port2: undefined, promise: undefined, - } + }, + streamClosed: createDeferredPromise(), }; // The spec requires handling of the strategy first @@ -288,6 +289,12 @@ class ReadableStream { return isReadableStreamLocked(this); } + get streamClosed() { + if (!isReadableStream(this)) + throw new ERR_INVALID_THIS('ReadableStream'); + return this[kState].streamClosed.promise; + } + /** * @param {any} [reason] * @returns { Promise } @@ -1869,6 +1876,7 @@ function readableStreamCancel(stream, reason) { function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; + stream[kState].streamClosed?.resolve?.(); const { reader, @@ -1900,6 +1908,10 @@ function readableStreamError(stream, error) { reader[kState].close.reject(error); setPromiseHandled(reader[kState].close.promise); + if (stream[kState].streamClosed?.promise !== undefined) { + stream[kState].streamClosed?.reject?.(error); + setPromiseHandled(stream[kState].streamClosed?.promise); + } if (readableStreamHasDefaultReader(stream)) { for (let n = 0; n < reader[kState].readRequests.length; n++) diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index ba66cea7a4850d..ba5cf75ca71885 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -175,7 +175,8 @@ class WritableStream { port1: undefined, port2: undefined, promise: undefined, - } + }, + streamClosed: createDeferredPromise(), }; const size = extractSizeAlgorithm(strategy?.size); @@ -201,6 +202,12 @@ class WritableStream { return isWritableStreamLocked(this); } + get streamClosed() { + if (!isWritableStream(this)) + throw new ERR_INVALID_THIS('WritableStream'); + return this[kState].streamClosed.promise; + } + /** * @param {any} reason * @returns {Promise} @@ -733,6 +740,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { writer[kState].close.reject?.(stream[kState].storedError); setPromiseHandled(writer[kState].close.promise); } + if (stream[kState].streamClosed?.promise !== undefined) { + stream[kState].streamClosed.reject?.(stream[kState]?.storedError); + setPromiseHandled(stream[kState].streamClosed?.promise); + } } function writableStreamMarkFirstWriteRequestInFlight(stream) { @@ -839,6 +850,7 @@ function writableStreamFinishInFlightClose(stream) { stream[kState].state = 'closed'; if (stream[kState].writer !== undefined) stream[kState].writer[kState].close.resolve?.(); + stream[kState].streamClosed?.resolve?.(); assert(stream[kState].pendingAbortRequest.abort.promise === undefined); assert(stream[kState].storedError === undefined); } diff --git a/test/parallel/test-stream-end-of-streams.js b/test/parallel/test-stream-end-of-streams.js deleted file mode 100644 index 80a39d052bf8b4..00000000000000 --- a/test/parallel/test-stream-end-of-streams.js +++ /dev/null @@ -1,20 +0,0 @@ -'use strict'; -require('../common'); -const assert = require('assert'); - -const { Duplex, finished } = require('stream'); - -assert.throws( - () => { - // Passing empty object to mock invalid stream - // should throw error - finished({}, () => {}); - }, - { code: 'ERR_INVALID_ARG_TYPE' } -); - -const streamObj = new Duplex(); -streamObj.end(); -// Below code should not throw any errors as the -// streamObj is `Stream` -finished(streamObj, () => {}); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index c7513805e7ac6f..6c850a63a7c56b 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -260,12 +260,7 @@ const http = require('http'); const streamLike = new EE(); streamLike.readableEnded = true; streamLike.readable = true; - assert.throws( - () => { - finished(streamLike, () => {}); - }, - { code: 'ERR_INVALID_ARG_TYPE' } - ); + finished(streamLike, common.mustCall()); streamLike.emit('close'); } diff --git a/test/parallel/test-webstreams-finished.js b/test/parallel/test-webstreams-finished.js new file mode 100644 index 00000000000000..c2c1e03564a6f7 --- /dev/null +++ b/test/parallel/test-webstreams-finished.js @@ -0,0 +1,40 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { ReadableStream, WritableStream } = require('stream/web'); +const { finished } = require('stream'); + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + }, + }); + finished(rs, common.mustSucceed()); + async function test() { + const values = []; + for await (const chunk of rs) { + values.push(chunk); + } + assert.deepStrictEqual(values, ['asd']); + } + test(); +} + +{ + let str = ''; + const ws = new WritableStream({ + write(chunk) { + console.log(chunk); + str += chunk; + } + }); + finished(ws, common.mustSucceed(() => { + assert.strictEqual(str, 'asd'); + })); + const writer = ws.getWriter(); + writer.write('asd'); + writer.close(); +}