Skip to content

Commit 1fa598e

Browse files
addaleaxMayaLekova
authored andcommitted
stream: always reset awaitDrain when emitting data
The complicated `awaitDrain` machinery can be made a bit slimmer, and more correct, by just resetting the value each time `stream.emit('data')` is called. By resetting the value before emitting the data chunk, and seeing whether any pipe destinations return `.write() === false`, we always end up in a consistent state and don’t need to worry about odd situations (like `dest.write(chunk)` emitting more data). PR-URL: nodejs#18516 Fixes: nodejs#18484 Fixes: nodejs#18512 Refs: nodejs#18515 Reviewed-By: Anatoli Papirovski <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Minwoo Jung <[email protected]> Reviewed-By: Ruben Bridgewater <[email protected]>
1 parent 9ec0d81 commit 1fa598e

File tree

2 files changed

+38
-9
lines changed

2 files changed

+38
-9
lines changed

lib/_stream_readable.js

+3-9
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
258258

259259
function addChunk(stream, state, chunk, addToFront) {
260260
if (state.flowing && state.length === 0 && !state.sync) {
261+
state.awaitDrain = 0;
261262
stream.emit('data', chunk);
262263
} else {
263264
// update the buffer info.
@@ -456,6 +457,7 @@ Readable.prototype.read = function(n) {
456457
n = 0;
457458
} else {
458459
state.length -= n;
460+
state.awaitDrain = 0;
459461
}
460462

461463
if (state.length === 0) {
@@ -637,18 +639,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
637639
ondrain();
638640
}
639641

640-
// If the user pushes more data while we're writing to dest then we'll end up
641-
// in ondata again. However, we only want to increase awaitDrain once because
642-
// dest will only emit one 'drain' event for the multiple writes.
643-
// => Introduce a guard on increasing awaitDrain.
644-
var increasedAwaitDrain = false;
645642
src.on('data', ondata);
646643
function ondata(chunk) {
647644
debug('ondata');
648-
increasedAwaitDrain = false;
649645
var ret = dest.write(chunk);
650646
debug('dest.write', ret);
651-
if (false === ret && !increasedAwaitDrain) {
647+
if (ret === false) {
652648
// If the user unpiped during `dest.write()`, it is possible
653649
// to get stuck in a permanently paused state if that write
654650
// also returned false.
@@ -658,7 +654,6 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
658654
!cleanedUp) {
659655
debug('false write response, pause', state.awaitDrain);
660656
state.awaitDrain++;
661-
increasedAwaitDrain = true;
662657
}
663658
src.pause();
664659
}
@@ -834,7 +829,6 @@ function resume_(stream, state) {
834829
}
835830

836831
state.resumeScheduled = false;
837-
state.awaitDrain = 0;
838832
stream.emit('resume');
839833
flow(stream);
840834
if (state.flowing && !state.reading)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
'use strict';
2+
const common = require('../common');
3+
const stream = require('stream');
4+
5+
function test(throwCodeInbetween) {
6+
// Check that a pipe does not stall if .read() is called unexpectedly
7+
// (i.e. the stream is not resumed by the pipe).
8+
9+
const n = 1000;
10+
let counter = n;
11+
const rs = stream.Readable({
12+
objectMode: true,
13+
read: common.mustCallAtLeast(() => {
14+
if (--counter >= 0)
15+
rs.push({ counter });
16+
else
17+
rs.push(null);
18+
}, n)
19+
});
20+
21+
const ws = stream.Writable({
22+
objectMode: true,
23+
write: common.mustCall((data, enc, cb) => {
24+
setImmediate(cb);
25+
}, n)
26+
});
27+
28+
setImmediate(() => throwCodeInbetween(rs, ws));
29+
30+
rs.pipe(ws);
31+
}
32+
33+
test((rs) => rs.read());
34+
test((rs) => rs.resume());
35+
test(() => 0);

0 commit comments

Comments
 (0)