Skip to content

Commit

Permalink
stream: make finished call the callback if the stream is closed
Browse files Browse the repository at this point in the history
Make stream.finished callback invoked if stream is already
closed/destroyed.

PR-URL: nodejs#28748
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Trivikram Kamat <[email protected]>
  • Loading branch information
ronag authored and mcollina committed Sep 3, 2019
1 parent d62d2b4 commit b03845b
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 30 deletions.
10 changes: 0 additions & 10 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,6 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
Expand Down
40 changes: 23 additions & 17 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,49 @@ function eos(stream, opts, callback) {

callback = once(callback);

const onerror = (err) => {
callback.call(stream, err);
};

let writableFinished = stream.writableFinished ||
(stream._writableState && stream._writableState.finished);
let readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);

if (writableFinished || readableEnded || stream.destroyed ||
stream.aborted) {
if (opts.error !== false) stream.on('error', onerror);
// A destroy(err) call emits error in nextTick.
process.nextTick(callback.bind(stream));
return () => {
stream.removeListener('error', onerror);
};
}

let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

var writableEnded = stream._writableState && stream._writableState.finished;
const onfinish = () => {
writable = false;
writableEnded = true;
writableFinished = true;
if (!readable) callback.call(stream);
};

var readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
if (!writable) callback.call(stream);
};

const onerror = (err) => {
callback.call(stream, err);
};

const onclose = () => {
let err;
if (readable && !readableEnded) {
if (!stream._readableState || !stream._readableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
if (writable && !writableEnded) {
if (!stream._writableState || !stream._writableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (writable && !writableFinished) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
};

Expand Down
106 changes: 106 additions & 0 deletions test/parallel/test-http-client-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,109 @@ const { finished } = require('stream');
.end();
}));
}

{
// Test abort before finished.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}, common.mustNotCall());
req.abort();
finished(req, common.mustCall(() => {
server.close();
}));
}));
}

{
// Test abort after request.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).end();
finished(req, (err) => {
common.expectsError({
type: Error,
code: 'ERR_STREAM_PREMATURE_CLOSE'
})(err);
finished(req, common.mustCall(() => {
server.close();
}));
});
req.abort();
}));
}

{
// Test abort before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
req.abort();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test destroy before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
res.destroy();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test finish after end.

const server = http.createServer(function(req, res) {
res.end('asd');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}
140 changes: 137 additions & 3 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ const { promisify } = require('util');
}));
}

{
const rs = new Readable();

finished(rs, common.mustCall((err) => {
assert(err, 'premature close error');
}));

rs.push(null);
rs.emit('close');
rs.resume();
}

{
const rs = new Readable();

Expand All @@ -105,7 +117,9 @@ const { promisify } = require('util');
}));

rs.push(null);
rs.emit('close'); // Should not trigger an error
rs.on('end', common.mustCall(() => {
rs.emit('close'); // Should not trigger an error
}));
rs.resume();
}

Expand Down Expand Up @@ -155,8 +169,9 @@ const { promisify } = require('util');
rs.resume();
}

// Test that calling returned function removes listeners
{
// Nothing happens if disposed.

const ws = new Writable({
write(data, env, cb) {
cb();
Expand All @@ -168,6 +183,8 @@ const { promisify } = require('util');
}

{
// Nothing happens if disposed.

const rs = new Readable();
const removeListeners = finished(rs, common.mustNotCall());
removeListeners();
Expand All @@ -178,9 +195,126 @@ const { promisify } = require('util');
}

{
// Completed if readable-like is ended before.

const streamLike = new EE();
streamLike.readableEnded = true;
streamLike.readable = true;
finished(streamLike, common.mustCall);
finished(streamLike, common.mustCall());
}

{
// Completed if readable-like is never ended.

const streamLike = new EE();
streamLike.readableEnded = false;
streamLike.readable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
}));
streamLike.emit('close');
}

{
// Completed if writable-like is destroyed before.

const streamLike = new EE();
streamLike.destroyed = true;
streamLike.writable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if readable-like is aborted before.

const streamLike = new EE();
streamLike.destroyed = true;
streamLike.readable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if writable-like is aborted before.

const streamLike = new EE();
streamLike.aborted = true;
streamLike.writable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if readable-like is aborted before.

const streamLike = new EE();
streamLike.aborted = true;
streamLike.readable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if streamlike is finished before.

const streamLike = new EE();
streamLike.writableFinished = true;
streamLike.writable = true;
finished(streamLike, common.mustCall());
}

{
// Premature close if stream is not finished.

const streamLike = new EE();
streamLike.writableFinished = false;
streamLike.writable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
}));
streamLike.emit('close');
}

{
// Premature close if stream never emitted 'finish'
// even if writableFinished says something else.

const streamLike = new EE();
streamLike.writable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
}));
streamLike.writableFinished = true;
streamLike.emit('close');
}


{
// Premature close if stream never emitted 'end'
// even if readableEnded says something else.

const streamLike = new EE();
streamLike.readable = true;
finished(streamLike, common.expectsError({
code: 'ERR_STREAM_PREMATURE_CLOSE'
}));
streamLike.readableEnded = true;
streamLike.emit('close');
}

{
// Completes if already finished.

const w = new Writable();
finished(w, common.mustCall(() => {
finished(w, common.mustCall());
}));
w.destroy();
}

{
// Completes if already ended.

const r = new Readable();
finished(r, common.mustCall(() => {
finished(r, common.mustCall());
}));
r.destroy();
}

0 comments on commit b03845b

Please sign in to comment.