Skip to content

Commit

Permalink
Add ReadableStreamBYOBReader.prototype.read(view, { min })
Browse files Browse the repository at this point in the history
When `read(view)` is called with the `min` option, the read will only be fulfilled as soon as `min` number of elements are available in the stream.

Fixes #1143, and fixes #1175.
  • Loading branch information
MattiasBuelens authored Nov 13, 2023
1 parent d58a68c commit 4dc123a
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 58 deletions.
171 changes: 132 additions & 39 deletions index.bs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ exports.implementation = class ReadableByteStreamControllerImpl {
byteOffset: 0,
byteLength: autoAllocateChunkSize,
bytesFilled: 0,
minimumFill: 1,
elementSize: 1,
viewConstructor: Uint8Array,
readerType: 'default'
Expand Down
21 changes: 19 additions & 2 deletions reference-implementation/lib/ReadableStreamBYOBReader-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class ReadableStreamBYOBReaderImpl {
aos.SetUpReadableStreamBYOBReader(this, stream);
}

read(view) {
read(view, options) {
if (view.byteLength === 0) {
return promiseRejectedWith(new TypeError('view must have non-zero byteLength'));
}
Expand All @@ -22,6 +22,23 @@ class ReadableStreamBYOBReaderImpl {
return promiseRejectedWith(new TypeError('view\'s buffer has been detached'));
}

if (options.min === 0) {
return promiseRejectedWith(
new TypeError('options.min must be greater than 0')
);
}
if (view.constructor !== DataView) {
if (options.min > view.length) {
return promiseRejectedWith(
new RangeError('options.min must be less than or equal to view\'s length')
);
}
} else if (options.min > view.byteLength) {
return promiseRejectedWith(
new RangeError('options.min must be less than or equal to view\'s byteLength')
);
}

if (this._stream === undefined) {
return promiseRejectedWith(readerLockException('read'));
}
Expand All @@ -32,7 +49,7 @@ class ReadableStreamBYOBReaderImpl {
closeSteps: chunk => resolvePromise(promise, { value: chunk, done: true }),
errorSteps: e => rejectPromise(promise, e)
};
aos.ReadableStreamBYOBReaderRead(this, view, readIntoRequest);
aos.ReadableStreamBYOBReaderRead(this, view, options.min, readIntoRequest);
return promise;
}

Expand Down
6 changes: 5 additions & 1 deletion reference-implementation/lib/ReadableStreamBYOBReader.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
interface ReadableStreamBYOBReader {
constructor(ReadableStream stream);

Promise<ReadableStreamReadResult> read(ArrayBufferView view);
Promise<ReadableStreamReadResult> read(ArrayBufferView view, optional ReadableStreamBYOBReaderReadOptions options = {});
undefined releaseLock();
};
ReadableStreamBYOBReader includes ReadableStreamGenericReader;

dictionary ReadableStreamBYOBReaderReadOptions {
[EnforceRange] unsigned long long min = 1;
};
37 changes: 22 additions & 15 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ function ReadableByteStreamTee(stream) {
reading = false;
}
};
ReadableStreamBYOBReaderRead(reader, view, readIntoRequest);
ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
}

function pull1Algorithm() {
Expand Down Expand Up @@ -913,7 +913,7 @@ function ReadableStreamReaderGenericRelease(reader) {
reader._stream = undefined;
}

function ReadableStreamBYOBReaderRead(reader, view, readIntoRequest) {
function ReadableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
const stream = reader._stream;

assert(stream !== undefined);
Expand All @@ -923,7 +923,7 @@ function ReadableStreamBYOBReaderRead(reader, view, readIntoRequest) {
if (stream._state === 'errored') {
readIntoRequest.errorSteps(stream._storedError);
} else {
ReadableByteStreamControllerPullInto(stream._controller, view, readIntoRequest);
ReadableByteStreamControllerPullInto(stream._controller, view, min, readIntoRequest);
}
}

Expand Down Expand Up @@ -1272,7 +1272,7 @@ function ReadableByteStreamControllerClose(controller) {

if (controller._pendingPullIntos.length > 0) {
const firstPendingPullInto = controller._pendingPullIntos[0];
if (firstPendingPullInto.bytesFilled > 0) {
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
const e = new TypeError('Insufficient bytes to fill elements in the given buffer');
ReadableByteStreamControllerError(controller, e);

Expand All @@ -1290,7 +1290,7 @@ function ReadableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDe

let done = false;
if (stream._state === 'closed') {
assert(pullIntoDescriptor.bytesFilled === 0);
assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0);
done = true;
}

Expand Down Expand Up @@ -1419,18 +1419,18 @@ function ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, size
}

function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) {
const elementSize = pullIntoDescriptor.elementSize;

const currentAlignedBytes = pullIntoDescriptor.bytesFilled - pullIntoDescriptor.bytesFilled % elementSize;

const maxBytesToCopy = Math.min(controller._queueTotalSize,
pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled);
const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - maxBytesFilled % elementSize;

let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
if (maxAlignedBytes > currentAlignedBytes) {
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill);
const remainderBytes = maxBytesFilled % pullIntoDescriptor.elementSize;
const maxAlignedBytes = maxBytesFilled - remainderBytes;
// A descriptor for a read() request that is not yet filled up to its minimum length will stay at the head
// of the queue, so the underlying source can keep filling it.
if (maxAlignedBytes >= pullIntoDescriptor.minimumFill) {
totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;
ready = true;
}
Expand Down Expand Up @@ -1461,7 +1461,7 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller,
if (ready === false) {
assert(controller._queueTotalSize === 0);
assert(pullIntoDescriptor.bytesFilled > 0);
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill);
}

return ready;
Expand Down Expand Up @@ -1563,14 +1563,18 @@ function ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller) {
}
}

function ReadableByteStreamControllerPullInto(controller, view, readIntoRequest) {
function ReadableByteStreamControllerPullInto(controller, view, min, readIntoRequest) {
const stream = controller._stream;

let elementSize = 1;
if (view.constructor !== DataView) {
elementSize = view.constructor.BYTES_PER_ELEMENT;
}

const minimumFill = min * elementSize;
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
assert(minimumFill % elementSize === 0);

const ctor = view.constructor;

let buffer;
Expand All @@ -1587,6 +1591,7 @@ function ReadableByteStreamControllerPullInto(controller, view, readIntoRequest)
byteOffset: view.byteOffset,
byteLength: view.byteLength,
bytesFilled: 0,
minimumFill,
elementSize,
viewConstructor: ctor,
readerType: 'byob'
Expand Down Expand Up @@ -1660,7 +1665,7 @@ function ReadableByteStreamControllerRespond(controller, bytesWritten) {
}

function ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor) {
assert(firstDescriptor.bytesFilled === 0);
assert(firstDescriptor.bytesFilled % firstDescriptor.elementSize === 0);

if (firstDescriptor.readerType === 'none') {
ReadableByteStreamControllerShiftPendingPullInto(controller);
Expand All @@ -1686,7 +1691,9 @@ function ReadableByteStreamControllerRespondInReadableState(controller, bytesWri
return;
}

if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.minimumFill) {
// A descriptor for a read() request that is not yet filled up to its minimum length will stay at the head
// of the queue, so the underlying source can keep filling it.
return;
}

Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/web-platform-tests

0 comments on commit 4dc123a

Please sign in to comment.