Skip to content

Commit

Permalink
stream: improve webstream readable async iterator performance
Browse files Browse the repository at this point in the history
PR-URL: nodejs#49662
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Yagiz Nizipli <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Moshe Atlow <[email protected]>
  • Loading branch information
rluvaton authored Sep 18, 2023
1 parent b03a757 commit 5c39ee6
Showing 1 changed file with 44 additions and 29 deletions.
73 changes: 44 additions & 29 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,13 @@ class ReadableStream {

// eslint-disable-next-line no-use-before-define
const reader = new ReadableStreamDefaultReader(this);
let done = false;

// No __proto__ here to avoid the performance hit.
const state = {
done: false,
current: undefined,
};
let started = false;
let current;

// The nextSteps function is not an async function in order
// to make it more efficient. Because nextSteps explicitly
Expand All @@ -488,7 +492,7 @@ class ReadableStream {
// unnecessary Promise allocations to occur, which just add
// cost.
function nextSteps() {
if (done)
if (state.done)
return PromiseResolve({ done: true, value: undefined });

if (reader[kState].stream === undefined) {
Expand All @@ -498,31 +502,15 @@ class ReadableStream {
}
const promise = createDeferredPromise();

readableStreamDefaultReaderRead(reader, {
[kChunk](chunk) {
current = undefined;
promise.resolve({ value: chunk, done: false });
},
[kClose]() {
current = undefined;
done = true;
readableStreamReaderGenericRelease(reader);
promise.resolve({ done: true, value: undefined });
},
[kError](error) {
current = undefined;
done = true;
readableStreamReaderGenericRelease(reader);
promise.reject(error);
},
});
// eslint-disable-next-line no-use-before-define
readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise));
return promise.promise;
}

async function returnSteps(value) {
if (done)
if (state.done)
return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution
done = true;
state.done = true;

if (reader[kState].stream === undefined) {
throw new ERR_INVALID_STATE.TypeError(
Expand Down Expand Up @@ -559,19 +547,19 @@ class ReadableStream {
// need to investigate if it's a bug in our impl or
// the spec.
if (!started) {
current = PromiseResolve();
state.current = PromiseResolve();
started = true;
}
current = current !== undefined ?
PromisePrototypeThen(current, nextSteps, nextSteps) :
state.current = state.current !== undefined ?
PromisePrototypeThen(state.current, nextSteps, nextSteps) :
nextSteps();
return current;
return state.current;
},

return(error) {
return current ?
return state.current ?
PromisePrototypeThen(
current,
state.current,
() => returnSteps(error),
() => returnSteps(error)) :
returnSteps(error);
Expand Down Expand Up @@ -773,6 +761,33 @@ function createReadableStreamBYOBRequest(controller, view) {
return stream;
}

class ReadableStreamAsyncIteratorReadRequest {
constructor(reader, state, promise) {
this.reader = reader;
this.state = state;
this.promise = promise;
}

[kChunk](chunk) {
this.state.current = undefined;
this.promise.resolve({ value: chunk, done: false });
}

[kClose]() {
this.state.current = undefined;
this.state.done = true;
readableStreamReaderGenericRelease(this.reader);
this.promise.resolve({ done: true, value: undefined });
}

[kError](error) {
this.state.current = undefined;
this.state.done = true;
readableStreamReaderGenericRelease(this.reader);
this.promise.reject(error);
}
}

class DefaultReadRequest {
constructor() {
this[kState] = createDeferredPromise();
Expand Down

0 comments on commit 5c39ee6

Please sign in to comment.