Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: always defer 'readable' with nextTick #17979

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ The listener callback will be passed a single `Error` object.
##### Event: 'readable'
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/17979
description: 'readable' is always emitted in the next tick after
.push() is called
-->

The `'readable'` event is emitted when there is data available to be read from
Expand Down Expand Up @@ -1647,6 +1652,13 @@ const myReadable = new Readable({
```

#### readable.\_read(size)
<!--
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/17979
description: call _read() only once per microtick
-->

* `size` {number} Number of bytes to read asynchronously

Expand All @@ -1666,6 +1678,8 @@ additional data onto the queue.

*Note*: Once the `readable._read()` method has been called, it will not be
called again until the [`readable.push()`][stream-push] method is called.
`readable._read()` is guaranteed to be called only once within a
synchronous execution, i.e. a microtick.

The `size` argument is advisory. For implementations where a "read" is a
single operation that returns data can use the `size` argument to determine how
Expand Down
22 changes: 14 additions & 8 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
Expand Down Expand Up @@ -496,7 +495,11 @@ function onEofChunk(stream, state) {
state.ended = true;

// emit 'readable' now to make sure it gets picked up.
emitReadable(stream);
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
emitReadable_(stream);
}
}

// Don't emit readable right away in sync mode, because this can trigger
Expand All @@ -508,16 +511,15 @@ function emitReadable(stream) {
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
if (state.sync)
process.nextTick(emitReadable_, stream);
else
emitReadable_(stream);
process.nextTick(emitReadable_, stream);
}
}

function emitReadable_(stream) {
var state = stream._readableState;
debug('emit readable');
stream.emit('readable');
state.needReadable = !state.flowing && !state.ended;
flow(stream);
}

Expand All @@ -537,7 +539,7 @@ function maybeReadMore(stream, state) {

function maybeReadMore_(stream, state) {
var len = state.length;
while (!state.reading && !state.flowing && !state.ended &&
while (!state.reading && !state.ended &&
state.length < state.highWaterMark) {
debug('maybeReadMore read 0');
stream.read(0);
Expand Down Expand Up @@ -644,6 +646,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
debug('dest.write', ret);
if (false === ret && !increasedAwaitDrain) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
Expand Down Expand Up @@ -824,8 +827,8 @@ function resume(stream, state) {
}

function resume_(stream, state) {
debug('resume', state.reading);
if (!state.reading) {
debug('resume read 0');
stream.read(0);
}

Expand Down Expand Up @@ -1087,13 +1090,16 @@ function copyFromBuffer(n, list) {
function endReadable(stream) {
var state = stream._readableState;

debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}

function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length);

// Check that we didn't get one last unshift.
if (!state.endEmitted && state.length === 0) {
state.endEmitted = true;
Expand Down
12 changes: 8 additions & 4 deletions test/parallel/test-net-end-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@ const uv = process.binding('uv');
const s = new net.Socket({
handle: {
readStart: function() {
process.nextTick(() => this.onread(uv.UV_EOF, null));
setImmediate(() => this.onread(uv.UV_EOF, null));
},
close: (cb) => process.nextTick(cb)
close: (cb) => setImmediate(cb)
},
writable: false
});
assert.strictEqual(s, s.resume());

const events = [];

s.on('end', () => events.push('end'));
s.on('close', () => events.push('close'));
s.on('end', () => {
events.push('end');
});
s.on('close', () => {
events.push('close');
});

process.on('exit', () => {
assert.deepStrictEqual(events, [ 'end', 'close' ]);
Expand Down
32 changes: 12 additions & 20 deletions test/parallel/test-stream-pipe-await-drain-push-while-write.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,24 @@ const common = require('../common');
const stream = require('stream');
const assert = require('assert');

const awaitDrainStates = [
1, // after first chunk before callback
1, // after second chunk before callback
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
];

// A writable stream which pushes data onto the stream which pipes into it,
// but only the first time it's written to. Since it's not paused at this time,
// a second write will occur. If the pipe increases awaitDrain twice, we'll
// never get subsequent chunks because 'drain' is only emitted once.
const writable = new stream.Writable({
write: common.mustCall(function(chunk, encoding, cb) {
if (chunk.length === 32 * 1024) { // first chunk
const beforePush = readable._readableState.awaitDrain;
readable.push(Buffer.alloc(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased.
const afterPush = readable._readableState.awaitDrain;
assert.strictEqual(afterPush - beforePush, 1,
'Counter is not increased for awaitDrain');
}

assert.strictEqual(
awaitDrainStates.shift(),
readable._readableState.awaitDrain,
0,
'State variable awaitDrain is not correct.'
);

if (chunk.length === 32 * 1024) { // first chunk
readable.push(Buffer.alloc(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased in the next
// tick, because awaitDrain is incremented after this method finished
process.nextTick(() => {
assert.strictEqual(readable._readableState.awaitDrain, 1,
'Counter is not increased for awaitDrain');
});
}

cb();
}, 3)
});
Expand Down
17 changes: 10 additions & 7 deletions test/parallel/test-stream-readable-emittedReadable.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,33 @@ const readable = new Readable({
// Initialized to false.
assert.strictEqual(readable._readableState.emittedReadable, false);

const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
readable.on('readable', common.mustCall(() => {
// emittedReadable should be true when the readable event is emitted
assert.strictEqual(readable._readableState.emittedReadable, true);
readable.read();
assert.deepStrictEqual(readable.read(), expected.shift());
// emittedReadable is reset to false during read()
assert.strictEqual(readable._readableState.emittedReadable, false);
}, 4));
}, 3));

// When the first readable listener is just attached,
// emittedReadable should be false
assert.strictEqual(readable._readableState.emittedReadable, false);

// Each one of these should trigger a readable event.
// These trigger a single 'readable', as things are batched up
process.nextTick(common.mustCall(() => {
readable.push('foo');
}));
process.nextTick(common.mustCall(() => {
readable.push('bar');
}));
process.nextTick(common.mustCall(() => {

// these triggers two readable events
setImmediate(common.mustCall(() => {
readable.push('quo');
}));
process.nextTick(common.mustCall(() => {
readable.push(null);
process.nextTick(common.mustCall(() => {
readable.push(null);
}));
}));

const noRead = new Readable({
Expand Down
23 changes: 12 additions & 11 deletions test/parallel/test-stream-readable-needReadable.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ asyncReadable.on('readable', common.mustCall(() => {
// then we need to notify the reader on future changes.
assert.strictEqual(asyncReadable._readableState.needReadable, true);
}
}, 3));
}, 2));

process.nextTick(common.mustCall(() => {
asyncReadable.push('foooo');
}));
process.nextTick(common.mustCall(() => {
asyncReadable.push('bar');
}));
process.nextTick(common.mustCall(() => {
setImmediate(common.mustCall(() => {
asyncReadable.push(null);
assert.strictEqual(asyncReadable._readableState.needReadable, false);
}));

const flowing = new Readable({
Expand Down Expand Up @@ -84,13 +85,13 @@ slowProducer.on('readable', common.mustCall(() => {

process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
}));
process.nextTick(common.mustCall(() => {
slowProducer.push(null);
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
process.nextTick(common.mustCall(() => {
slowProducer.push('foo');
process.nextTick(common.mustCall(() => {
slowProducer.push(null);
}));
}));
}));
}));
Loading