Skip to content

Commit

Permalink
stream: use bitmap in readable state
Browse files Browse the repository at this point in the history
PR-URL: nodejs#49745
Reviewed-By: Yagiz Nizipli <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Moshe Atlow <[email protected]>
  • Loading branch information
benjamingr authored and alexfernandez committed Nov 1, 2023
1 parent 6341b14 commit c305b72
Showing 1 changed file with 91 additions and 59 deletions.
150 changes: 91 additions & 59 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,75 @@ const nop = () => {};

const { errorOrDestroy } = destroyImpl;

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
const kEndEmitted = 1 << 2;
const kReading = 1 << 3;
const kConstructed = 1 << 4;
const kSync = 1 << 5;
const kNeedReadable = 1 << 6;
const kEmittedReadable = 1 << 7;
const kReadableListening = 1 << 8;
const kResumeScheduled = 1 << 9;
const kErrorEmitted = 1 << 10;
const kEmitClose = 1 << 11;
const kAutoDestroy = 1 << 12;
const kDestroyed = 1 << 13;
const kClosed = 1 << 14;
const kCloseEmitted = 1 << 15;
const kMultiAwaitDrain = 1 << 16;
const kReadingMore = 1 << 17;
const kDataEmitted = 1 << 18;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
return {
enumerable: false,
get() { return (this.state & bit) !== 0; },
set(value) {
if (value) this.state |= bit;
else this.state &= ~bit;
},
};
}
ObjectDefineProperties(ReadableState.prototype, {
objectMode: makeBitMapDescriptor(kObjectMode),
ended: makeBitMapDescriptor(kEnded),
endEmitted: makeBitMapDescriptor(kEndEmitted),
reading: makeBitMapDescriptor(kReading),
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
constructed: makeBitMapDescriptor(kConstructed),
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
sync: makeBitMapDescriptor(kSync),
// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
needReadable: makeBitMapDescriptor(kNeedReadable),
emittedReadable: makeBitMapDescriptor(kEmittedReadable),
readableListening: makeBitMapDescriptor(kReadableListening),
resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
// True if the error was already emitted and should not be thrown again.
errorEmitted: makeBitMapDescriptor(kErrorEmitted),
emitClose: makeBitMapDescriptor(kEmitClose),
autoDestroy: makeBitMapDescriptor(kAutoDestroy),
// Has it been destroyed.
destroyed: makeBitMapDescriptor(kDestroyed),
// Indicates whether the stream has finished destroying.
closed: makeBitMapDescriptor(kClosed),
// True if close has been emitted or would have been emitted
// depending on emitClose.
closeEmitted: makeBitMapDescriptor(kCloseEmitted),
multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
// If true, a maybeReadMore has been scheduled.
readingMore: makeBitMapDescriptor(kReadingMore),
dataEmitted: makeBitMapDescriptor(kDataEmitted),
});

function ReadableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
Expand All @@ -92,13 +161,15 @@ function ReadableState(options, stream, isDuplex) {
if (typeof isDuplex !== 'boolean')
isDuplex = stream instanceof Stream.Duplex;

// Bit map field to store ReadableState more effciently with 1 bit per field
// instead of a V8 slot per field.
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync;
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away.
this.objectMode = !!(options && options.objectMode);
if (options && options.objectMode) this.state |= kObjectMode;

if (isDuplex)
this.objectMode = this.objectMode ||
!!(options && options.readableObjectMode);
if (isDuplex && options && options.readableObjectMode)
this.state |= kObjectMode;

// The point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
Expand All @@ -113,54 +184,22 @@ function ReadableState(options, stream, isDuplex) {
this.length = 0;
this.pipes = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
this.reading = false;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
this.sync = true;

// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this[kPaused] = null;

// True if the error was already emitted and should not be thrown again.
this.errorEmitted = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = !options || options.emitClose !== false;
if (options && options.emitClose === false) this.state &= ~kEmitClose;

// Should .destroy() be called after 'end' (and potentially 'finish').
this.autoDestroy = !options || options.autoDestroy !== false;
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy;

// Has it been destroyed.
this.destroyed = false;

// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = null;

// Indicates whether the stream has finished destroying.
this.closed = false;

// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
Expand All @@ -177,12 +216,6 @@ function ReadableState(options, stream, isDuplex) {
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>.
this.awaitDrainWriters = null;
this.multiAwaitDrain = false;

// If true, a maybeReadMore has been scheduled.
this.readingMore = false;

this.dataEmitted = false;

this.decoder = null;
this.encoding = null;
Expand Down Expand Up @@ -263,7 +296,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
const state = stream._readableState;

let err;
if (!state.objectMode) {
if ((state.state & kObjectMode) === 0) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
Expand All @@ -290,11 +323,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
state.reading = false;
state.state &= ~kReading;
onEofChunk(stream, state);
} else if (state.objectMode || (chunk && chunk.length > 0)) {
} else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
if (addToFront) {
if (state.endEmitted)
if ((state.state & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
Expand All @@ -305,7 +338,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
} else if (state.destroyed || state.errored) {
return false;
} else {
state.reading = false;
state.state &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
Expand All @@ -317,7 +350,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
}
}
} else if (!addToFront) {
state.reading = false;
state.state &= ~kReading;
maybeReadMore(stream, state);
}

Expand All @@ -333,7 +366,7 @@ function addChunk(stream, state, chunk, addToFront) {
stream.listenerCount('data') > 0) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if (state.multiAwaitDrain) {
if ((state.state & kMultiAwaitDrain) !== 0) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
Expand All @@ -349,7 +382,7 @@ function addChunk(stream, state, chunk, addToFront) {
else
state.buffer.push(chunk);

if (state.needReadable)
if ((state.state & kNeedReadable) !== 0)
emitReadable(stream);
}
maybeReadMore(stream, state);
Expand Down Expand Up @@ -404,7 +437,7 @@ function computeNewHighWaterMark(n) {
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
if (state.objectMode)
if ((state.state & kObjectMode) !== 0)
return 1;
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
Expand Down Expand Up @@ -435,7 +468,7 @@ Readable.prototype.read = function(n) {
state.highWaterMark = computeNewHighWaterMark(n);

if (n !== 0)
state.emittedReadable = false;
state.state &= ~kEmittedReadable;

// If we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
Expand Down Expand Up @@ -486,7 +519,7 @@ Readable.prototype.read = function(n) {
// 3. Actually pull the requested chunks out of the buffer and return.

// if we need a readable event, then we need to do some reading.
let doRead = state.needReadable;
let doRead = (state.state & kNeedReadable) !== 0;
debug('need readable', doRead);

// If we currently have less than the highWaterMark, then also read some.
Expand All @@ -504,20 +537,19 @@ Readable.prototype.read = function(n) {
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
state.state |= kReading | kSync;
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;
state.state |= kNeedReadable;

// Call internal read method
try {
this._read(state.highWaterMark);
} catch (err) {
errorOrDestroy(this, err);
}
state.state &= ~kSync;

state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if (!state.reading)
Expand Down

0 comments on commit c305b72

Please sign in to comment.