Skip to content

Commit

Permalink
stream: add abort signal for ReadableStream and WritableStream
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Feb 2, 2023
1 parent 23effb2 commit 3eaadf3
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 10 deletions.
28 changes: 19 additions & 9 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ const {
codes,
} = require('internal/errors');

const {
isNodeStream,
isWebStream,
kControllerErrorFunction,
} = require('internal/streams/utils');

const eos = require('internal/streams/end-of-stream');
const { ERR_INVALID_ARG_TYPE } = codes;

Expand All @@ -18,24 +24,28 @@ const validateAbortSignal = (signal, name) => {
}
};

function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
if (!isNodeStream(stream) && !isWebStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
};

module.exports.addAbortSignalNoValidate = function(signal, stream) {
if (typeof signal !== 'object' || !('aborted' in signal)) {
return stream;
}
const onAbort = () => {
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
};
let onAbort;
if (isNodeStream(stream)) {
onAbort = () => {
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
};
} else {
onAbort = () => {
stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason }));
};
}
if (signal.aborted) {
onAbort();
} else {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const kIsReadable = Symbol('kIsReadable');
const kIsDisturbed = Symbol('kIsDisturbed');

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');

function isReadableNodeStream(obj, strict = false) {
return !!(
Expand Down Expand Up @@ -305,6 +306,7 @@ module.exports = {
isReadable,
kIsReadable,
kIsClosedPromise,
kControllerErrorFunction,
isClosed,
isDestroyed,
isDuplexNodeStream,
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const {
kIsErrored,
kIsReadable,
kIsClosedPromise,
kControllerErrorFunction,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -263,6 +264,7 @@ class ReadableStream {
};

this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
Expand Down Expand Up @@ -1893,7 +1895,6 @@ function readableStreamClose(stream) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'closed';
stream[kIsClosedPromise].resolve();

const {
reader,
} = stream[kState];
Expand Down Expand Up @@ -2332,6 +2333,7 @@ function setupReadableStreamDefaultController(
stream,
};
stream[kState].controller = controller;
stream[kControllerErrorFunction] = controller.error.bind(controller);

const startResult = startAlgorithm();

Expand Down
4 changes: 4 additions & 0 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const {

const {
kIsClosedPromise,
kControllerErrorFunction,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -199,6 +200,7 @@ class WritableStream {
};

this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};

const size = extractSizeAlgorithm(strategy?.size);
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
Expand Down Expand Up @@ -370,6 +372,7 @@ function TransferredWritableStream() {
},
};
this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};
},
[], WritableStream));
}
Expand Down Expand Up @@ -1282,6 +1285,7 @@ function setupWritableStreamDefaultController(
writeAlgorithm,
};
stream[kState].controller = controller;
stream[kControllerErrorFunction] = controller.error.bind(controller);

writableStreamUpdateBackpressure(
stream,
Expand Down
76 changes: 76 additions & 0 deletions test/parallel/test-webstreams-abort-controller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
'use strict';

const common = require('../common');
const { finished, addAbortSignal } = require('stream');
const { ReadableStream, WritableStream } = require('stream/web');
const assert = require('assert');

{
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.close();
}
});

const reader = rs.getReader();

const ac = new AbortController();

addAbortSignal(ac.signal, rs);

finished(rs, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));

ac.abort();

assert.rejects(reader.read(), 'AbortError: The operation was aborted.');
}

{
const rs = new ReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
controller.close();
}
});

const ac = new AbortController();

addAbortSignal(ac.signal, rs);

assert.rejects((async () => {
for await (const chunk of rs) {
if (chunk === 'b') {
ac.abort();
}
}
})(), /AbortError/);
}

{
const values = [];
const ws = new WritableStream({
write(chunk) {
values.push(chunk);
}
});

finished(ws, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.deepStrictEqual(values, ['a']);
}));

const ac = new AbortController();

addAbortSignal(ac.signal, ws);

const writer = ws.getWriter();

writer.write('a').then(() => {
ac.abort();
});
}

0 comments on commit 3eaadf3

Please sign in to comment.