Skip to content

Commit

Permalink
stream: fix finished w/ 'close' before 'end'
Browse files Browse the repository at this point in the history
Emitting 'close' before 'end' on a Readable should
result in a premature close error.
  • Loading branch information
ronag committed Feb 28, 2020
1 parent 3d894d0 commit f1cf385
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 13 deletions.
9 changes: 8 additions & 1 deletion lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ function isWritableFinished(stream) {

function nop() {}

function isReadableEnded(stream) {
if (stream.readableEnded) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
return rState.endEmitted || (rState.ended && rState.length === 0);
}

function eos(stream, opts, callback) {
if (arguments.length === 2) {
callback = opts;
Expand Down Expand Up @@ -84,7 +91,7 @@ function eos(stream, opts, callback) {
const onclose = () => {
let err;
if (readable && !readableEnded) {
if (!rState || !rState.ended)
if (!isReadableEnded(stream))
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
Expand Down
38 changes: 26 additions & 12 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,37 @@ let PassThrough;
let createReadableStreamAsyncIterator;

function destroyer(stream, reading, writing, callback) {
callback = once(callback);
let destroyed = false;
const _destroy = once((err) => {
destroyImpl.destroyer(stream, err);
callback(err);
});

if (eos === undefined) eos = require('internal/streams/end-of-stream');
eos(stream, { readable: reading, writable: writing }, (err) => {
if (destroyed) return;
destroyed = true;
destroyImpl.destroyer(stream, err);
callback(err);
const rState = stream._readableState;
if (
err &&
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
reading &&
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
) {
// Some readable streams will emit 'close' before 'end'. However, since
// this is on the readable side 'end' should still be emitted if the
// stream has been ended and no error emitted. This should be allowed in
// favor of backwards compatibility. Since the stream is piped to a
// destination this should not result in any observable difference.
// We don't need to check if this is a writable premature close since
// eos will only fail with premature close on the reading side for
// duplex streams.
stream
.once('end', _destroy)
.once('error', _destroy);
} else {
_destroy(err);
}
});

return (err) => {
if (destroyed) return;
destroyed = true;
destroyImpl.destroyer(stream, err);
callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
return (err) => _destroy(err || new ERR_STREAM_DESTROYED('pipe'));
}

function popCallback(streams) {
Expand Down
10 changes: 10 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,13 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
d._writableState.errored = true;
d.emit('close');
}

{
const r = new Readable();
finished(r, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
r.push('asd');
r.push(null);
r.destroy();
}
18 changes: 18 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -922,3 +922,21 @@ const { promisify } = require('util');
}));
src.end();
}

{
// Make sure 'close' before 'end' finishes without error
// if readable has received eof.
// Ref: https://github.com/nodejs/node/issues/29699
const r = new Readable();
const w = new Writable({
write(chunk, encoding, cb) {
cb();
}
});
pipeline(r, w, (err) => {
assert.strictEqual(err, undefined);
});
r.push('asd');
r.push(null);
r.emit('close');
}

0 comments on commit f1cf385

Please sign in to comment.