Skip to content

Commit

Permalink
stream: implement min option for ReadableStreamBYOBReader.read
Browse files Browse the repository at this point in the history
PR-URL: #50888
Backport-PR-URL: #54044
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Debadree Chatterjee <[email protected]>
  • Loading branch information
MattiasBuelens committed Jul 25, 2024
1 parent 30b859f commit 22340d9
Show file tree
Hide file tree
Showing 15 changed files with 968 additions and 114 deletions.
15 changes: 12 additions & 3 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ added: v16.5.0
-->

* Returns: A promise fulfilled with an object:
* `value` {ArrayBuffer}
* `value` {any}
* `done` {boolean}

Requests the next chunk of data from the underlying {ReadableStream}
Expand Down Expand Up @@ -613,15 +613,24 @@ added: v16.5.0
{ReadableStream} is closed or rejected if the stream errors or the reader's
lock is released before the stream finishes closing.
#### `readableStreamBYOBReader.read(view)`
#### `readableStreamBYOBReader.read(view[, options])`
<!-- YAML
added: v16.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/50888

Check warning on line 622 in doc/api/webstreams.md

View workflow job for this annotation

GitHub Actions / lint-pr-url

pr-url doesn't match the URL of the current PR.
description: Added `min` option.
-->
* `view` {Buffer|TypedArray|DataView}
* `options` {Object}
* `min` {number} When set, the returned promise will only be
fulfilled as soon as `min` number of elements are available.
When not set, the promise fulfills when at least one element
is available.
* Returns: A promise fulfilled with an object:
* `value` {ArrayBuffer}
* `value` {TypedArray|DataView}
* `done` {boolean}
Requests the next chunk of data from the underlying {ReadableStream}
Expand Down
8 changes: 1 addition & 7 deletions lib/internal/encoding.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ const {
const {
validateString,
validateObject,
kValidateObjectAllowNullable,
kValidateObjectAllowArray,
kValidateObjectAllowFunction,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');
const binding = internalBinding('encoding_binding');
const {
Expand Down Expand Up @@ -393,10 +391,6 @@ const TextDecoder =
makeTextDecoderICU() :
makeTextDecoderJS();

const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
kValidateObjectAllowArray |
kValidateObjectAllowFunction;

function makeTextDecoderICU() {
const {
decode: _decode,
Expand Down
7 changes: 7 additions & 0 deletions lib/internal/validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ const kValidateObjectNone = 0;
const kValidateObjectAllowNullable = 1 << 0;
const kValidateObjectAllowArray = 1 << 1;
const kValidateObjectAllowFunction = 1 << 2;
const kValidateObjectAllowObjects = kValidateObjectAllowArray |
kValidateObjectAllowFunction;
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
kValidateObjectAllowArray |
kValidateObjectAllowFunction;

/**
* @callback validateObject
Expand Down Expand Up @@ -583,6 +588,8 @@ module.exports = {
kValidateObjectAllowNullable,
kValidateObjectAllowArray,
kValidateObjectAllowFunction,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
validateOneOf,
validatePlainFunction,
validatePort,
Expand Down
107 changes: 67 additions & 40 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
SymbolAsyncIterator,
SymbolDispose,
SymbolToStringTag,
TypedArrayPrototypeGetLength,
Uint8Array,
} = primordials;

Expand All @@ -34,6 +35,7 @@ const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_STATE,
ERR_INVALID_THIS,
ERR_OUT_OF_RANGE,
},
} = require('internal/errors');

Expand All @@ -59,8 +61,8 @@ const {
validateAbortSignal,
validateBuffer,
validateObject,
kValidateObjectAllowNullable,
kValidateObjectAllowFunction,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');

const {
Expand Down Expand Up @@ -247,9 +249,9 @@ class ReadableStream {
* @param {UnderlyingSource} [source]
* @param {QueuingStrategy} [strategy]
*/
constructor(source = {}, strategy = kEmptyObject) {
if (source === null)
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
constructor(source = kEmptyObject, strategy = kEmptyObject) {
validateObject(source, 'source', kValidateObjectAllowObjects);
validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
Expand Down Expand Up @@ -335,7 +337,7 @@ class ReadableStream {
getReader(options = kEmptyObject) {
if (!isReadableStream(this))
throw new ERR_INVALID_THIS('ReadableStream');
validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const mode = options?.mode;

if (mode === undefined)
Expand Down Expand Up @@ -373,6 +375,7 @@ class ReadableStream {

// The web platform tests require that these be handled one at a
// time and in a specific order. options can be null or undefined.
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventAbort = options?.preventAbort;
const preventCancel = options?.preventCancel;
const preventClose = options?.preventClose;
Expand Down Expand Up @@ -415,6 +418,7 @@ class ReadableStream {
destination);
}

validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventAbort = options?.preventAbort;
const preventCancel = options?.preventCancel;
const preventClose = options?.preventClose;
Expand Down Expand Up @@ -459,10 +463,8 @@ class ReadableStream {
values(options = kEmptyObject) {
if (!isReadableStream(this))
throw new ERR_INVALID_THIS('ReadableStream');
validateObject(options, 'options');
const {
preventCancel = false,
} = options;
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventCancel = !!(options?.preventCancel);

// eslint-disable-next-line no-use-before-define
const reader = new ReadableStreamDefaultReader(this);
Expand Down Expand Up @@ -926,47 +928,62 @@ class ReadableStreamBYOBReader {

/**
* @param {ArrayBufferView} view
* @param {{
* min? : number
* }} [options]
* @returns {Promise<{
* view : ArrayBufferView,
* value : ArrayBufferView,
* done : boolean,
* }>}
*/
read(view) {
async read(view, options = kEmptyObject) {
if (!isReadableStreamBYOBReader(this))
return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');
if (!isArrayBufferView(view)) {
return PromiseReject(
new ERR_INVALID_ARG_TYPE(
'view',
[
'Buffer',
'TypedArray',
'DataView',
],
view));
throw new ERR_INVALID_ARG_TYPE(
'view',
[
'Buffer',
'TypedArray',
'DataView',
],
view,
);
}
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);

const viewByteLength = ArrayBufferViewGetByteLength(view);
const viewBuffer = ArrayBufferViewGetBuffer(view);
const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);

if (viewByteLength === 0 || viewBufferByteLength === 0) {
return PromiseReject(
new ERR_INVALID_STATE.TypeError(
'View or Viewed ArrayBuffer is zero-length or detached',
),
);
throw new ERR_INVALID_STATE.TypeError(
'View or Viewed ArrayBuffer is zero-length or detached');
}

// Supposed to assert here that the view's buffer is not
// detached, but there's no API available to use to check that.

const min = options?.min ?? 1;
if (typeof min !== 'number')
throw new ERR_INVALID_ARG_TYPE('options.min', 'number', min);
if (!NumberIsInteger(min))
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer');
if (min <= 0)
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0');
if (!isDataView(view)) {
if (min > TypedArrayPrototypeGetLength(view)) {
throw new ERR_OUT_OF_RANGE('options.min', '<= view.length', min);
}
} else if (min > viewByteLength) {
throw new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min);
}

if (this[kState].stream === undefined) {
return PromiseReject(
new ERR_INVALID_STATE.TypeError(
'The reader is not attached to a stream'));
throw new ERR_INVALID_STATE.TypeError('The reader is not attached to a stream');
}
const readIntoRequest = new ReadIntoRequest();
readableStreamBYOBReaderRead(this, view, readIntoRequest);
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
return readIntoRequest.promise;
}

Expand Down Expand Up @@ -1880,7 +1897,7 @@ function readableByteStreamTee(stream) {
reading = false;
},
};
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
}

function pull1Algorithm() {
Expand Down Expand Up @@ -2207,7 +2224,7 @@ function readableStreamReaderGenericRelease(reader) {
reader[kState].stream = undefined;
}

function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
const {
stream,
} = reader[kState];
Expand All @@ -2220,6 +2237,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
readableByteStreamControllerPullInto(
stream[kState].controller,
view,
min,
readIntoRequest);
}

Expand Down Expand Up @@ -2492,7 +2510,7 @@ function readableByteStreamControllerClose(controller) {

if (pendingPullIntos.length) {
const firstPendingPullInto = pendingPullIntos[0];
if (firstPendingPullInto.bytesFilled > 0) {
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
const error = new ERR_INVALID_STATE.TypeError('Partial read');
readableByteStreamControllerError(controller, error);
throw error;
Expand All @@ -2509,7 +2527,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {

let done = false;
if (stream[kState].state === 'closed') {
desc.bytesFilled = 0;
assert(desc.bytesFilled % desc.elementSize === 0);
done = true;
}

Expand Down Expand Up @@ -2598,6 +2616,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
function readableByteStreamControllerPullInto(
controller,
view,
min,
readIntoRequest) {
const {
closeRequested,
Expand All @@ -2610,6 +2629,11 @@ function readableByteStreamControllerPullInto(
elementSize = view.constructor.BYTES_PER_ELEMENT;
ctor = view.constructor;
}

const minimumFill = min * elementSize;
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
assert(minimumFill % elementSize === 0);

const buffer = ArrayBufferViewGetBuffer(view);
const byteOffset = ArrayBufferViewGetByteOffset(view);
const byteLength = ArrayBufferViewGetByteLength(view);
Expand All @@ -2628,6 +2652,7 @@ function readableByteStreamControllerPullInto(
byteOffset,
byteLength,
bytesFilled: 0,
minimumFill,
elementSize,
ctor,
type: 'byob',
Expand Down Expand Up @@ -2715,7 +2740,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
}

function readableByteStreamControllerRespondInClosedState(controller, desc) {
assert(!desc.bytesFilled);
assert(desc.bytesFilled % desc.elementSize === 0);
if (desc.type === 'none') {
readableByteStreamControllerShiftPendingPullInto(controller);
}
Expand Down Expand Up @@ -2892,17 +2917,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
byteLength,
byteOffset,
bytesFilled,
minimumFill,
elementSize,
} = desc;
const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
const maxBytesToCopy = MathMin(
controller[kState].queueTotalSize,
byteLength - bytesFilled);
const maxBytesFilled = bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
if (maxAlignedBytes > currentAlignedBytes) {
assert(bytesFilled < minimumFill);
if (maxAlignedBytes >= minimumFill) {
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
ready = true;
}
Expand Down Expand Up @@ -2945,7 +2971,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
if (!ready) {
assert(!controller[kState].queueTotalSize);
assert(desc.bytesFilled > 0);
assert(desc.bytesFilled < elementSize);
assert(desc.bytesFilled < minimumFill);
}
return ready;
}
Expand Down Expand Up @@ -3001,7 +3027,7 @@ function readableByteStreamControllerRespondInReadableState(
return;
}

if (desc.bytesFilled < desc.elementSize)
if (desc.bytesFilled < desc.minimumFill)
return;

readableByteStreamControllerShiftPendingPullInto(controller);
Expand Down Expand Up @@ -3186,6 +3212,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
byteOffset: 0,
byteLength: autoAllocateChunkSize,
bytesFilled: 0,
minimumFill: 1,
elementSize: 1,
ctor: Uint8Array,
type: 'default',
Expand Down
11 changes: 10 additions & 1 deletion lib/internal/webstreams/transformstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ const {
kEnumerableProperty,
} = require('internal/util');

const {
validateObject,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');

const {
kDeserialize,
kTransfer,
Expand Down Expand Up @@ -119,9 +125,12 @@ class TransformStream {
* @param {QueuingStrategy} [readableStrategy]
*/
constructor(
transformer = null,
transformer = kEmptyObject,
writableStrategy = kEmptyObject,
readableStrategy = kEmptyObject) {
validateObject(transformer, 'transformer', kValidateObjectAllowObjects);
validateObject(writableStrategy, 'writableStrategy', kValidateObjectAllowObjectsAndNull);
validateObject(readableStrategy, 'readableStrategy', kValidateObjectAllowObjectsAndNull);
const readableType = transformer?.readableType;
const writableType = transformer?.writableType;
const start = transformer?.start;
Expand Down
Loading

0 comments on commit 22340d9

Please sign in to comment.