From 3eaadf32e068a3af4a825c706155f6486643a6d5 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 19 Jan 2023 19:31:29 +0530 Subject: [PATCH] stream: add abort signal for ReadableStream and WritableStream Refs: https://github.com/nodejs/node/issues/39316 --- lib/internal/streams/add-abort-signal.js | 28 ++++--- lib/internal/streams/utils.js | 2 + lib/internal/webstreams/readablestream.js | 4 +- lib/internal/webstreams/writablestream.js | 4 + .../test-webstreams-abort-controller.js | 76 +++++++++++++++++++ 5 files changed, 104 insertions(+), 10 deletions(-) create mode 100644 test/parallel/test-webstreams-abort-controller.js diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index 9bcd202ec63c1e..94763d75140076 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -5,6 +5,12 @@ const { codes, } = require('internal/errors'); +const { + isNodeStream, + isWebStream, + kControllerErrorFunction, +} = require('internal/streams/utils'); + const eos = require('internal/streams/end-of-stream'); const { ERR_INVALID_ARG_TYPE } = codes; @@ -18,24 +24,28 @@ const validateAbortSignal = (signal, name) => { } }; -function isNodeStream(obj) { - return !!(obj && typeof obj.pipe === 'function'); -} - module.exports.addAbortSignal = function addAbortSignal(signal, stream) { validateAbortSignal(signal, 'signal'); - if (!isNodeStream(stream)) { - throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream); + if (!isNodeStream(stream) && !isWebStream(stream)) { + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream); } return module.exports.addAbortSignalNoValidate(signal, stream); }; + module.exports.addAbortSignalNoValidate = function(signal, stream) { if (typeof signal !== 'object' || !('aborted' in signal)) { return stream; } - const onAbort = () => { - stream.destroy(new AbortError(undefined, { cause: signal.reason })); - }; + let onAbort; + if (isNodeStream(stream)) { + onAbort = () => { + stream.destroy(new AbortError(undefined, { cause: signal.reason })); + }; + } else { + onAbort = () => { + stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason })); + }; + } if (signal.aborted) { onAbort(); } else { diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 74faca5fe9bb2a..c9e61ca8cdd8eb 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -13,6 +13,7 @@ const kIsReadable = Symbol('kIsReadable'); const kIsDisturbed = Symbol('kIsDisturbed'); const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise'); +const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction'); function isReadableNodeStream(obj, strict = false) { return !!( @@ -305,6 +306,7 @@ module.exports = { isReadable, kIsReadable, kIsClosedPromise, + kControllerErrorFunction, isClosed, isDestroyed, isDuplexNodeStream, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 0df8f7aa7f6f35..fe18d352c29a37 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -86,6 +86,7 @@ const { kIsErrored, kIsReadable, kIsClosedPromise, + kControllerErrorFunction, } = require('internal/streams/utils'); const { @@ -263,6 +264,7 @@ class ReadableStream { }; this[kIsClosedPromise] = createDeferredPromise(); + this[kControllerErrorFunction] = () => {}; // The spec requires handling of the strategy first // here. Specifically, if getting the size and @@ -1893,7 +1895,6 @@ function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; stream[kIsClosedPromise].resolve(); - const { reader, } = stream[kState]; @@ -2332,6 +2333,7 @@ function setupReadableStreamDefaultController( stream, }; stream[kState].controller = controller; + stream[kControllerErrorFunction] = controller.error.bind(controller); const startResult = startAlgorithm(); diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index a8922c08456358..f5e2d5a5071a59 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -71,6 +71,7 @@ const { const { kIsClosedPromise, + kControllerErrorFunction, } = require('internal/streams/utils'); const { @@ -199,6 +200,7 @@ class WritableStream { }; this[kIsClosedPromise] = createDeferredPromise(); + this[kControllerErrorFunction] = () => {}; const size = extractSizeAlgorithm(strategy?.size); const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1); @@ -370,6 +372,7 @@ function TransferredWritableStream() { }, }; this[kIsClosedPromise] = createDeferredPromise(); + this[kControllerErrorFunction] = () => {}; }, [], WritableStream)); } @@ -1282,6 +1285,7 @@ function setupWritableStreamDefaultController( writeAlgorithm, }; stream[kState].controller = controller; + stream[kControllerErrorFunction] = controller.error.bind(controller); writableStreamUpdateBackpressure( stream, diff --git a/test/parallel/test-webstreams-abort-controller.js b/test/parallel/test-webstreams-abort-controller.js new file mode 100644 index 00000000000000..5e8c150c99058d --- /dev/null +++ b/test/parallel/test-webstreams-abort-controller.js @@ -0,0 +1,76 @@ +'use strict'; + +const common = require('../common'); +const { finished, addAbortSignal } = require('stream'); +const { ReadableStream, WritableStream } = require('stream/web'); +const assert = require('assert'); + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('hello'); + controller.close(); + } + }); + + const reader = rs.getReader(); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, rs); + + finished(rs, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); + + ac.abort(); + + assert.rejects(reader.read(), 'AbortError: The operation was aborted.'); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + controller.close(); + } + }); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, rs); + + assert.rejects((async () => { + for await (const chunk of rs) { + if (chunk === 'b') { + ac.abort(); + } + } + })(), /AbortError/); +} + +{ + const values = []; + const ws = new WritableStream({ + write(chunk) { + values.push(chunk); + } + }); + + finished(ws, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.deepStrictEqual(values, ['a']); + })); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, ws); + + const writer = ws.getWriter(); + + writer.write('a').then(() => { + ac.abort(); + }); +}