Skip to content

Commit

Permalink
stream: avoid calls to listenerCount
Browse files Browse the repository at this point in the history
PR-URL: nodejs#50357
Reviewed-By: Raz Luvaton <[email protected]>
Reviewed-By: Vinícius Lourenço Claro Cardoso <[email protected]>
Reviewed-By: Yagiz Nizipli <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
  • Loading branch information
ronag authored and alexfernandez committed Nov 1, 2023
1 parent 27b620e commit b41b18f
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ const kHasFlowing = 1 << 23;
const kFlowing = 1 << 24;
const kHasPaused = 1 << 25;
const kPaused = 1 << 26;
const kDataListening = 1 << 27;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -527,8 +528,7 @@ function canPushMore(state) {
}

function addChunk(stream, state, chunk, addToFront) {
if ((state[kState] & (kFlowing | kSync)) === kFlowing && state.length === 0 &&
stream.listenerCount('data') > 0) {
if ((state[kState] & (kFlowing | kSync | kDataListening)) === (kFlowing | kDataListening) && state.length === 0) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if ((state[kState] & kMultiAwaitDrain) !== 0) {
Expand Down Expand Up @@ -1062,7 +1062,7 @@ function pipeOnDrain(src, dest) {
}

if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
src.listenerCount('data')) {
(state[kState] & kDataListening) !== 0) {
src.resume();
}
};
Expand Down Expand Up @@ -1109,6 +1109,8 @@ Readable.prototype.on = function(ev, fn) {
const state = this._readableState;

if (ev === 'data') {
state[kState] |= kDataListening;

// Update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
state[kState] |= this.listenerCount('readable') > 0 ? kReadableListening : 0;
Expand All @@ -1135,6 +1137,8 @@ Readable.prototype.on = function(ev, fn) {
Readable.prototype.addListener = Readable.prototype.on;

Readable.prototype.removeListener = function(ev, fn) {
const state = this._readableState;

const res = Stream.prototype.removeListener.call(this,
ev, fn);

Expand All @@ -1146,6 +1150,8 @@ Readable.prototype.removeListener = function(ev, fn) {
// resume within the same tick will have no
// effect.
process.nextTick(updateReadableListening, this);
} else if (ev === 'data' && this.listenerCount('data') === 0) {
state[kState] &= ~kDataListening;
}

return res;
Expand Down Expand Up @@ -1184,7 +1190,7 @@ function updateReadableListening(self) {
state[kState] |= kHasFlowing | kFlowing;

// Crude way to check if we should resume.
} else if (self.listenerCount('data') > 0) {
} else if ((state[kState] & kDataListening) !== 0) {
self.resume();
} else if ((state[kState] & kReadableListening) === 0) {
state[kState] &= ~(kHasFlowing | kFlowing);
Expand Down

0 comments on commit b41b18f

Please sign in to comment.