Skip to content

Commit

Permalink
stream: allow transfer of readable byte streams
Browse files Browse the repository at this point in the history
Updates the `ReadableStream` constructor to mark byte streams as
transferable. When transferred, byte streams become regular streams.

Refs: #39062
Refs: https://streams.spec.whatwg.org/#rs-transfer
PR-URL: #45955
Reviewed-By: Daeyeon Jeong <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
mrbbot authored Feb 4, 2023
1 parent 806a516 commit 03854f6
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 9 deletions.
17 changes: 8 additions & 9 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,16 @@ class ReadableStream {
this,
source,
extractHighWaterMark(highWaterMark, 0));
return;
} else {
if (type !== undefined)
throw new ERR_INVALID_ARG_VALUE('source.type', type);
setupReadableStreamDefaultControllerFromSource(
this,
source,
extractHighWaterMark(highWaterMark, 1),
extractSizeAlgorithm(size));
}

if (type !== undefined)
throw new ERR_INVALID_ARG_VALUE('source.type', type);
setupReadableStreamDefaultControllerFromSource(
this,
source,
extractHighWaterMark(highWaterMark, 1),
extractSizeAlgorithm(size));

// eslint-disable-next-line no-constructor-return
return makeTransferable(this);
}
Expand Down
55 changes: 55 additions & 0 deletions test/parallel/test-whatwg-webstreams-transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const {

const {
isReadableStream,
isReadableByteStreamController,
} = require('internal/webstreams/readablestream');

const {
Expand All @@ -25,6 +26,10 @@ const {
isTransformStream,
} = require('internal/webstreams/transformstream');

const {
kState,
} = require('internal/webstreams/util');

const {
makeTransferable,
kClone,
Expand Down Expand Up @@ -107,6 +112,56 @@ const theData = 'hello';
assert(readable.locked);
}

{
const { port1, port2 } = new MessageChannel();
port1.onmessageerror = common.mustNotCall();
port2.onmessageerror = common.mustNotCall();

// This test repeats the test above, but with a readable byte stream.
// Note transferring a readable byte stream results in a regular
// value-oriented stream on the other side:
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable

const theByteData = new Uint8Array([1, 2, 3]);

const readable = new ReadableStream({
type: 'bytes',
start: common.mustCall((controller) => {
// `enqueue` will detach its argument's buffer, so clone first
controller.enqueue(theByteData.slice());
controller.close();
}),
});
assert(isReadableByteStreamController(readable[kState].controller));

port2.onmessage = common.mustCall(({ data }) => {
assert(isReadableStream(data));
assert(!isReadableByteStreamController(data[kState].controller));

const reader = data.getReader();
reader.read().then(common.mustCall((chunk) => {
assert.deepStrictEqual(chunk, { done: false, value: theByteData });
}));

port2.close();
});

port1.onmessage = common.mustCall(({ data }) => {
assert(isReadableStream(data));
assert(!isReadableByteStreamController(data[kState].controller));
assert(!data.locked);
port1.postMessage(data, [data]);
assert(data.locked);
});

assert.throws(() => port2.postMessage(readable), {
code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST',
});

port2.postMessage(readable, [readable]);
assert(readable.locked);
}

{
const { port1, port2 } = new MessageChannel();
port1.onmessageerror = common.mustNotCall();
Expand Down

0 comments on commit 03854f6

Please sign in to comment.