Skip to content

Commit

Permalink
stream: readable use bitmap accessors
Browse files Browse the repository at this point in the history
PR-URL: #50350
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Raz Luvaton <[email protected]>
  • Loading branch information
ronag authored Oct 25, 2023
1 parent 69c0571 commit 31e0759
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
function readableAddChunkUnshiftValue(stream, state, chunk) {
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
else if ((state[kState] & (kDestroyed | kErrored)) !== 0)
return false;
else
addChunk(stream, state, chunk, true);
Expand Down Expand Up @@ -608,7 +608,7 @@ function computeNewHighWaterMark(n) {
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
if (n <= 0 || (state.length === 0 && (state[kState] & kEnded) !== 0))
return 0;
if ((state[kState] & kObjectMode) !== 0)
return 1;
Expand Down Expand Up @@ -652,7 +652,7 @@ Readable.prototype.read = function(n) {
state.length >= state.highWaterMark :
state.length > 0) ||
(state[kState] & kEnded) !== 0)) {
debug('read: emitReadable', state.length, (state[kState] & kEnded) !== 0);
debug('read: emitReadable');
if (state.length === 0 && (state[kState] & kEnded) !== 0)
endReadable(this);
else
Expand Down Expand Up @@ -810,7 +810,7 @@ function emitReadable(stream) {
function emitReadable_(stream) {
const state = stream._readableState;
debug('emitReadable_');
if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || state.ended)) {
if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || (state[kState] & kEnded) !== 0)) {
stream.emit('readable');
state[kState] &= ~kEmittedReadable;
}
Expand Down Expand Up @@ -891,7 +891,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const state = this._readableState;

if (state.pipes.length === 1) {
if (!state.multiAwaitDrain) {
if ((state[kState] & kMultiAwaitDrain) === 0) {
state[kState] |= kMultiAwaitDrain;
state.awaitDrainWriters = new SafeSet(
state.awaitDrainWriters ? [state.awaitDrainWriters] : [],
Expand All @@ -907,7 +907,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest !== process.stderr;

const endFn = doEnd ? onend : unpipe;
if (state.endEmitted)
if ((state[kState] & kEndEmitted) !== 0)
process.nextTick(endFn);
else
src.once('end', endFn);
Expand Down Expand Up @@ -966,7 +966,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
state[kState] &= ~kMultiAwaitDrain;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
Expand Down Expand Up @@ -1038,7 +1038,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {

if (dest.writableNeedDrain === true) {
pause();
} else if (!state.flowing) {
} else if ((state[kState] & kFlowing) === 0) {
debug('pipe resume');
src.resume();
}
Expand All @@ -1056,7 +1056,7 @@ function pipeOnDrain(src, dest) {
if (state.awaitDrainWriters === dest) {
debug('pipeOnDrain', 1);
state.awaitDrainWriters = null;
} else if (state.multiAwaitDrain) {
} else if ((state[kState] & kMultiAwaitDrain) !== 0) {
debug('pipeOnDrain', state.awaitDrainWriters.size);
state.awaitDrainWriters.delete(dest);
}
Expand Down Expand Up @@ -1111,20 +1111,20 @@ Readable.prototype.on = function(ev, fn) {
if (ev === 'data') {
// Update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;
state[kState] |= this.listenerCount('readable') > 0 ? kReadableListening : 0;

// Try start flowing on next tick if stream isn't explicitly paused.
if (state.flowing !== false)
if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) {
this.resume();
}
} else if (ev === 'readable') {
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.flowing = false;
state.emittedReadable = false;
debug('on readable', state.length, state.reading);
if ((state[kState] & (kEndEmitted | kReadableListening)) === 0) {
state[kState] |= kReadableListening | kNeedReadable | kHasFlowing;
state[kState] &= ~(kFlowing | kEmittedReadable);
debug('on readable');
if (state.length) {
emitReadable(this);
} else if (!state.reading) {
} else if ((state[kState] & kReading) === 0) {
process.nextTick(nReadingNextTick, this);
}
}
Expand Down Expand Up @@ -1171,7 +1171,12 @@ Readable.prototype.removeAllListeners = function(ev) {

function updateReadableListening(self) {
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;

if (self.listenerCount('readable') > 0) {
state[kState] |= kReadableListening;
} else {
state[kState] &= ~kReadableListening;
}

if ((state[kState] & (kHasPaused | kPaused | kResumeScheduled)) === (kHasPaused | kResumeScheduled)) {
// Flowing needs to be set to true now, otherwise
Expand Down Expand Up @@ -1201,7 +1206,7 @@ Readable.prototype.resume = function() {
// for readable, but we still have to call
// resume().
state[kState] |= kHasFlowing;
if (!state.readableListening) {
if ((state[kState] & kReadableListening) === 0) {
state[kState] |= kFlowing;
} else {
state[kState] &= ~kFlowing;
Expand All @@ -1214,8 +1219,8 @@ Readable.prototype.resume = function() {
};

function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
if ((state[kState] & kResumeScheduled) === 0) {
state[kState] |= kResumeScheduled;
process.nextTick(resume_, stream, state);
}
}
Expand All @@ -1236,7 +1241,7 @@ function resume_(stream, state) {
Readable.prototype.pause = function() {
const state = this._readableState;
debug('call pause');
if (state.flowing !== false) {
if ((state[kState] & (kHasFlowing | kFlowing)) !== kHasFlowing) {
debug('pause');
state[kState] |= kHasFlowing;
state[kState] &= ~kFlowing;
Expand Down Expand Up @@ -1651,20 +1656,19 @@ function fromList(n, state) {
function endReadable(stream) {
const state = stream._readableState;

debug('endReadable', (state[kState] & kEndEmitted) !== 0);
debug('endReadable');
if ((state[kState] & kEndEmitted) === 0) {
state[kState] |= kEnded;
process.nextTick(endReadableNT, state, stream);
}
}

function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length);
debug('endReadableNT');

// Check that we didn't get one last unshift.
if (!state.errored && !state.closeEmitted &&
!state.endEmitted && state.length === 0) {
state.endEmitted = true;
if ((state[kState] & (kErrored | kCloseEmitted | kEndEmitted)) === 0 && state.length === 0) {
state[kState] |= kEndEmitted;
stream.emit('end');

if (stream.writable && stream.allowHalfOpen === false) {
Expand Down

0 comments on commit 31e0759

Please sign in to comment.