Skip to content

Commit

Permalink
stream: fix readable behavior for highWaterMark === 0
Browse files Browse the repository at this point in the history
Avoid trying to emit 'readable' due to the fact that
state.length is always >= state.highWaterMark if highWaterMark is 0.
Therefore upon .read(0) call (through .on('readable')) stream assumed
that it has enough data to emit 'readable' even though
state.length === 0 instead of issuing _read(). Which led to the TTY
not recognizing that someone is waiting for the input.

Fixes: #20503
Refs: #18372

PR-URL: #21690
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Ruben Bridgewater <[email protected]>
  • Loading branch information
lundibundi authored and mcollina committed Aug 10, 2018
1 parent b854604 commit fe47b8b
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 1 deletion.
6 changes: 5 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,10 @@ Readable.prototype.read = function(n) {
// the 'readable' event and move on.
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
((state.highWaterMark !== 0 ?
state.length >= state.highWaterMark :
state.length > 0) ||
state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
Expand Down Expand Up @@ -808,6 +811,7 @@ Readable.prototype.on = function(ev, fn) {
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.emittedReadable = false;
debug('on readable', state.length, state.reading);
if (state.length) {
emitReadable(this);
} else if (!state.reading) {
Expand Down
16 changes: 16 additions & 0 deletions test/parallel/test-readable-single-end.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict';

const common = require('../common');
const { Readable } = require('stream');

// This test ensures that there will not be an additional empty 'readable'
// event when stream has ended (only 1 event signalling about end)

const r = new Readable({
read: () => {},
});

r.push(null);

r.on('readable', common.mustCall());
r.on('end', common.mustCall());
30 changes: 30 additions & 0 deletions test/parallel/test-stream-readable-hwm-0.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';

const common = require('../common');

// This test ensures that Readable stream will call _read() for streams
// with highWaterMark === 0 upon .read(0) instead of just trying to
// emit 'readable' event.

const assert = require('assert');
const { Readable } = require('stream');

const r = new Readable({
// must be called only once upon setting 'readable' listener
read: common.mustCall(),
highWaterMark: 0,
});

let pushedNull = false;
// this will trigger read(0) but must only be called after push(null)
// because the we haven't pushed any data
r.on('readable', common.mustCall(() => {
assert.strictEqual(r.read(), null);
assert.strictEqual(pushedNull, true);
}));
r.on('end', common.mustCall());
process.nextTick(() => {
assert.strictEqual(r.read(), null);
pushedNull = true;
r.push(null);
});
29 changes: 29 additions & 0 deletions test/pseudo-tty/test-readable-tty-keepalive.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const common = require('../common');
const assert = require('assert');

// This test ensures that Node.js will not ignore tty 'readable' subscribers
// when it's the only tty subscriber and the only thing keeping event loop alive
// https://github.com/nodejs/node/issues/20503

process.stdin.setEncoding('utf8');

const expectedInput = ['foo', 'bar', null];

process.stdin.on('readable', common.mustCall(function() {
const data = process.stdin.read();
assert.strictEqual(data, expectedInput.shift());
}, 3)); // first 2 data, then end

process.stdin.on('end', common.mustCall());

setTimeout(() => {
process.stdin.push('foo');
process.nextTick(() => {
process.stdin.push('bar');
process.nextTick(() => {
process.stdin.push(null);
});
});
}, 1);
Empty file.

0 comments on commit fe47b8b

Please sign in to comment.