Skip to content

Commit b440cb1

Browse files
abbshrtargos
authored andcommitted
stream: fix readable state awaitDrain increase in recursion
PR-URL: nodejs#27572 Reviewed-By: Anna Henningsen <[email protected]>
1 parent af392a1 commit b440cb1

6 files changed

+101
-39
lines changed

lib/_stream_readable.js

+51-17
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const {
2727
NumberIsNaN,
2828
ObjectDefineProperties,
2929
ObjectSetPrototypeOf,
30+
Set,
3031
SymbolAsyncIterator,
3132
Symbol
3233
} = primordials;
@@ -146,8 +147,10 @@ function ReadableState(options, stream, isDuplex) {
146147
// Everything else in the universe uses 'utf8', though.
147148
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';
148149

149-
// The number of writers that are awaiting a drain event in .pipe()s
150-
this.awaitDrain = 0;
150+
// Ref the piped dest which we need a drain event on it
151+
// type: null | Writable | Set<Writable>
152+
this.awaitDrainWriters = null;
153+
this.multiAwaitDrain = false;
151154

152155
// If true, a maybeReadMore has been scheduled
153156
this.readingMore = false;
@@ -282,7 +285,13 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
282285

283286
function addChunk(stream, state, chunk, addToFront) {
284287
if (state.flowing && state.length === 0 && !state.sync) {
285-
state.awaitDrain = 0;
288+
// Use the guard to avoid creating `Set()` repeatedly
289+
// when we have multiple pipes.
290+
if (state.multiAwaitDrain) {
291+
state.awaitDrainWriters.clear();
292+
} else {
293+
state.awaitDrainWriters = null;
294+
}
286295
stream.emit('data', chunk);
287296
} else {
288297
// Update the buffer info.
@@ -475,7 +484,11 @@ Readable.prototype.read = function(n) {
475484
n = 0;
476485
} else {
477486
state.length -= n;
478-
state.awaitDrain = 0;
487+
if (state.multiAwaitDrain) {
488+
state.awaitDrainWriters.clear();
489+
} else {
490+
state.awaitDrainWriters = null;
491+
}
479492
}
480493

481494
if (state.length === 0) {
@@ -620,6 +633,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
620633
const src = this;
621634
const state = this._readableState;
622635

636+
if (state.pipesCount === 1) {
637+
if (!state.multiAwaitDrain) {
638+
state.multiAwaitDrain = true;
639+
state.awaitDrainWriters = new Set(
640+
state.awaitDrainWriters ? [state.awaitDrainWriters] : []
641+
);
642+
}
643+
}
644+
623645
switch (state.pipesCount) {
624646
case 0:
625647
state.pipes = dest;
@@ -684,7 +706,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
684706
// flowing again.
685707
// So, if this is awaiting a drain, then we just call it now.
686708
// If we don't know, then assume that we are waiting for one.
687-
if (ondrain && state.awaitDrain &&
709+
if (ondrain && state.awaitDrainWriters &&
688710
(!dest._writableState || dest._writableState.needDrain))
689711
ondrain();
690712
}
@@ -699,21 +721,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
699721
// to get stuck in a permanently paused state if that write
700722
// also returned false.
701723
// => Check whether `dest` is still a piping destination.
702-
if (((state.pipesCount === 1 && state.pipes === dest) ||
703-
(state.pipesCount > 1 && state.pipes.includes(dest))) &&
704-
!cleanedUp) {
705-
debug('false write response, pause', state.awaitDrain);
706-
state.awaitDrain++;
707-
}
708724
if (!cleanedUp) {
725+
if (state.pipesCount === 1 && state.pipes === dest) {
726+
debug('false write response, pause', 0);
727+
state.awaitDrainWriters = dest;
728+
state.multiAwaitDrain = false;
729+
} else if (state.pipesCount > 1 && state.pipes.includes(dest)) {
730+
debug('false write response, pause', state.awaitDrainWriters.size);
731+
state.awaitDrainWriters.add(dest);
732+
}
709733
src.pause();
710734
}
711735
if (!ondrain) {
712736
// When the dest drains, it reduces the awaitDrain counter
713737
// on the source. This would be more elegant with a .once()
714738
// handler in flow(), but adding and removing repeatedly is
715739
// too slow.
716-
ondrain = pipeOnDrain(src);
740+
ondrain = pipeOnDrain(src, dest);
717741
dest.on('drain', ondrain);
718742
}
719743
}
@@ -762,13 +786,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
762786
return dest;
763787
};
764788

765-
function pipeOnDrain(src) {
789+
function pipeOnDrain(src, dest) {
766790
return function pipeOnDrainFunctionResult() {
767791
const state = src._readableState;
768-
debug('pipeOnDrain', state.awaitDrain);
769-
if (state.awaitDrain)
770-
state.awaitDrain--;
771-
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
792+
793+
// `ondrain` will call directly,
794+
// `this` maybe not a reference to dest,
795+
// so we use the real dest here.
796+
if (state.awaitDrainWriters === dest) {
797+
debug('pipeOnDrain', 1);
798+
state.awaitDrainWriters = null;
799+
} else if (state.multiAwaitDrain) {
800+
debug('pipeOnDrain', state.awaitDrainWriters.size);
801+
state.awaitDrainWriters.delete(dest);
802+
}
803+
804+
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
805+
EE.listenerCount(src, 'data')) {
772806
state.flowing = true;
773807
flow(src);
774808
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { PassThrough } = require('stream');
4+
5+
const encode = new PassThrough({
6+
highWaterMark: 1
7+
});
8+
9+
const decode = new PassThrough({
10+
highWaterMark: 1
11+
});
12+
13+
const send = common.mustCall((buf) => {
14+
encode.write(buf);
15+
}, 4);
16+
17+
let i = 0;
18+
const onData = common.mustCall(() => {
19+
if (++i === 2) {
20+
send(Buffer.from([0x3]));
21+
send(Buffer.from([0x4]));
22+
}
23+
}, 4);
24+
25+
encode.pipe(decode).on('data', onData);
26+
27+
send(Buffer.from([0x1]));
28+
send(Buffer.from([0x2]));

test/parallel/test-stream-pipe-await-drain-manual-resume.js

+13-12
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ readable.pipe(writable);
2828

2929
readable.once('pause', common.mustCall(() => {
3030
assert.strictEqual(
31-
readable._readableState.awaitDrain,
32-
1,
33-
'Expected awaitDrain to equal 1 but instead got ' +
34-
`${readable._readableState.awaitDrain}`
31+
readable._readableState.awaitDrainWriters,
32+
writable,
33+
'Expected awaitDrainWriters to be a Writable but instead got ' +
34+
`${readable._readableState.awaitDrainWriters}`
3535
);
3636
// First pause, resume manually. The next write() to writable will still
3737
// return false, because chunks are still being buffered, so it will increase
@@ -43,10 +43,10 @@ readable.once('pause', common.mustCall(() => {
4343

4444
readable.once('pause', common.mustCall(() => {
4545
assert.strictEqual(
46-
readable._readableState.awaitDrain,
47-
1,
48-
'.resume() should not reset the counter but instead got ' +
49-
`${readable._readableState.awaitDrain}`
46+
readable._readableState.awaitDrainWriters,
47+
writable,
48+
'.resume() should not reset the awaitDrainWriters, but instead got ' +
49+
`${readable._readableState.awaitDrainWriters}`
5050
);
5151
// Second pause, handle all chunks from now on. Once all callbacks that
5252
// are currently queued up are handled, the awaitDrain drain counter should
@@ -65,10 +65,11 @@ readable.push(null);
6565

6666
writable.on('finish', common.mustCall(() => {
6767
assert.strictEqual(
68-
readable._readableState.awaitDrain,
69-
0,
70-
'awaitDrain should equal 0 after all chunks are written but instead got' +
71-
`${readable._readableState.awaitDrain}`
68+
readable._readableState.awaitDrainWriters,
69+
null,
70+
`awaitDrainWriters should be reset to null
71+
after all chunks are written but instead got
72+
${readable._readableState.awaitDrainWriters}`
7273
);
7374
// Everything okay, all chunks were written.
7475
}));

test/parallel/test-stream-pipe-await-drain-push-while-write.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ const assert = require('assert');
66
const writable = new stream.Writable({
77
write: common.mustCall(function(chunk, encoding, cb) {
88
assert.strictEqual(
9-
readable._readableState.awaitDrain,
10-
0
9+
readable._readableState.awaitDrainWriters,
10+
null,
1111
);
1212

1313
if (chunk.length === 32 * 1024) { // first chunk
1414
readable.push(Buffer.alloc(34 * 1024)); // above hwm
1515
// We should check if awaitDrain counter is increased in the next
1616
// tick, because awaitDrain is incremented after this method finished
1717
process.nextTick(() => {
18-
assert.strictEqual(readable._readableState.awaitDrain, 1);
18+
assert.strictEqual(readable._readableState.awaitDrainWriters, writable);
1919
});
2020
}
2121

test/parallel/test-stream-pipe-await-drain.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) {
2424

2525
writer1.once('chunk-received', () => {
2626
assert.strictEqual(
27-
reader._readableState.awaitDrain,
27+
reader._readableState.awaitDrainWriters.size,
2828
0,
2929
'awaitDrain initial value should be 0, actual is ' +
30-
reader._readableState.awaitDrain
30+
reader._readableState.awaitDrainWriters
3131
);
3232
setImmediate(() => {
3333
// This one should *not* get through to writer1 because writer2 is not
@@ -39,10 +39,10 @@ writer1.once('chunk-received', () => {
3939
// A "slow" consumer:
4040
writer2._write = common.mustCall((chunk, encoding, cb) => {
4141
assert.strictEqual(
42-
reader._readableState.awaitDrain,
42+
reader._readableState.awaitDrainWriters.size,
4343
1,
4444
'awaitDrain should be 1 after first push, actual is ' +
45-
reader._readableState.awaitDrain
45+
reader._readableState.awaitDrainWriters
4646
);
4747
// Not calling cb here to "simulate" slow stream.
4848
// This should be called exactly once, since the first .write() call
@@ -51,10 +51,10 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {
5151

5252
writer3._write = common.mustCall((chunk, encoding, cb) => {
5353
assert.strictEqual(
54-
reader._readableState.awaitDrain,
54+
reader._readableState.awaitDrainWriters.size,
5555
2,
5656
'awaitDrain should be 2 after second push, actual is ' +
57-
reader._readableState.awaitDrain
57+
reader._readableState.awaitDrainWriters
5858
);
5959
// Not calling cb here to "simulate" slow stream.
6060
// This should be called exactly once, since the first .write() call

test/parallel/test-stream2-basic.js

-1
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,6 @@ class TestWriter extends EE {
355355
assert.strictEqual(v, null);
356356

357357
const w = new R();
358-
359358
w.write = function(buffer) {
360359
written = true;
361360
assert.strictEqual(ended, false);

0 commit comments

Comments
 (0)