Skip to content

Commit

Permalink
stream: defer readable and flow when sync
Browse files Browse the repository at this point in the history
PR-URL: nodejs#18515
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
mafintosh authored and MayaLekova committed May 8, 2018
1 parent 360abc8 commit e0c9d43
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 5 deletions.
17 changes: 12 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,18 @@ function onEofChunk(stream, state) {
}
state.ended = true;

// emit 'readable' now to make sure it gets picked up.
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
emitReadable_(stream);
if (state.sync && state.length) {
// if we are sync and have data in the buffer, wait until next tick
// to emit the data. otherwise we risk emitting data in the flow()
// the readable code triggers during a read() call
emitReadable(stream);
} else {
// emit 'readable' now to make sure it gets picked up.
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
emitReadable_(stream);
}
}
}

Expand Down
67 changes: 67 additions & 0 deletions test/parallel/test-stream-pipe-flow.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict';
const common = require('../common');
const { Readable, Writable, PassThrough } = require('stream');

{
let ticks = 17;

const rs = new Readable({
objectMode: true,
read: () => {
if (ticks-- > 0)
return process.nextTick(() => rs.push({}));
rs.push({});
rs.push(null);
}
});

const ws = new Writable({
highWaterMark: 0,
objectMode: true,
write: (data, end, cb) => setImmediate(cb)
});

rs.on('end', common.mustCall());
ws.on('finish', common.mustCall());
rs.pipe(ws);
}

{
let missing = 8;

const rs = new Readable({
objectMode: true,
read: () => {
if (missing--) rs.push({});
else rs.push(null);
}
});

const pt = rs
.pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }))
.pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));

pt.on('end', function() {
wrapper.push(null);
});

const wrapper = new Readable({
objectMode: true,
read: () => {
process.nextTick(function() {
let data = pt.read();
if (data === null) {
pt.once('readable', function() {
data = pt.read();
if (data !== null) wrapper.push(data);
});
} else {
wrapper.push(data);
}
});
}
});

wrapper.resume();
wrapper.on('end', common.mustCall());
}
40 changes: 40 additions & 0 deletions test/parallel/test-stream-readable-pause-and-resume.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const { Readable } = require('stream');
const common = require('../common');

let ticks = 18;
let expectedData = 19;

const rs = new Readable({
objectMode: true,
read: () => {
if (ticks-- > 0)
return process.nextTick(() => rs.push({}));
rs.push({});
rs.push(null);
}
});

rs.on('end', common.mustCall());
readAndPause();

function readAndPause() {
// Does a on(data) -> pause -> wait -> resume -> on(data) ... loop.
// Expects on(data) to never fire if the stream is paused.
const ondata = common.mustCall((data) => {
rs.pause();

expectedData--;
if (expectedData <= 0)
return;

setImmediate(function() {
rs.removeListener('data', ondata);
readAndPause();
rs.resume();
});
}, 1); // only call ondata once

rs.on('data', ondata);
}

0 comments on commit e0c9d43

Please sign in to comment.