Skip to content

Commit

Permalink
stream: finished should error on errored stream
Browse files Browse the repository at this point in the history
Calling finished before or after a stream has errored or closed
should end up with the same behavior.

PR-URL: #39235
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
  • Loading branch information
ronag authored and Trott committed Jul 5, 2021
1 parent 68548fd commit 0738a2b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 26 deletions.
3 changes: 3 additions & 0 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,9 @@ function onSocketNT(req, socket, err) {
socket.emit('free');
} else {
finished(socket.destroy(err || req[kError]), (er) => {
if (er?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
er = null;
}
_destroy(req, er || err);
});
return;
Expand Down
3 changes: 3 additions & 0 deletions lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
if (this.socket && !this.socket.destroyed && this.aborted) {
this.socket.destroy(err);
const cleanup = finished(this.socket, (e) => {
if (e?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
e = null;
}
cleanup();
process.nextTick(onError, this, e || err, cb);
});
Expand Down
61 changes: 35 additions & 26 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ function eos(stream, options, callback) {
isWritable(stream) === writable
);

let writableFinished = stream.writableFinished ||
(wState && wState.finished);
let writableFinished = stream.writableFinished || wState?.finished;
const onfinish = () => {
writableFinished = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -111,8 +110,7 @@ function eos(stream, options, callback) {
if (!readable || readableEnded) callback.call(stream);
};

let readableEnded = stream.readableEnded ||
(rState && rState.endEmitted);
let readableEnded = stream.readableEnded || rState?.endEmitted;
const onend = () => {
readableEnded = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -128,7 +126,17 @@ function eos(stream, options, callback) {
callback.call(stream, err);
};

let closed = wState?.closed || rState?.closed;

const onclose = () => {
closed = true;

const errored = wState?.errored || rState?.errored;

if (errored && typeof errored !== 'boolean') {
return callback.call(stream, errored);
}

if (readable && !readableEnded) {
if (!isReadableEnded(stream))
return callback.call(stream,
Expand All @@ -139,6 +147,7 @@ function eos(stream, options, callback) {
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}

callback.call(stream);
};

Expand Down Expand Up @@ -168,29 +177,29 @@ function eos(stream, options, callback) {
if (options.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

// _closed is for OutgoingMessage which is not a proper Writable.
const closed = (!wState && !rState && stream._closed === true) || (
(wState && wState.closed) ||
(rState && rState.closed) ||
(wState && wState.errorEmitted) ||
(rState && rState.errorEmitted) ||
(rState && stream.req && stream.aborted) ||
(
(!writable || (wState && wState.finished)) &&
(!readable || (rState && rState.endEmitted))
)
);

if (closed) {
// TODO(ronag): Re-throw error if errorEmitted?
// TODO(ronag): Throw premature close as if finished was called?
// before being closed? i.e. if closed but not errored, ended or finished.
// TODO(ronag): Throw some kind of error? Does it make sense
// to call finished() on a "finished" stream?
// TODO(ronag): willEmitClose?
process.nextTick(() => {
callback();
});
process.nextTick(onclose);
} else if (wState?.errorEmitted || rState?.errorEmitted) {
if (!willEmitClose) {
process.nextTick(onclose);
}
} else if (
!readable &&
(!willEmitClose || stream.readable) &&
writableFinished
) {
process.nextTick(onclose);
} else if (
!writable &&
(!willEmitClose || stream.writable) &&
readableEnded
) {
process.nextTick(onclose);
} else if (!wState && !rState && stream._closed === true) {
// _closed is for OutgoingMessage which is not a proper Writable.
process.nextTick(onclose);
} else if ((rState && stream.req && stream.aborted)) {
process.nextTick(onclose);
}

const cleanup = () => {
Expand Down
23 changes: 23 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -608,3 +608,26 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
assert.strictEqual(closed, true);
}));
}

{
const w = new Writable();
const _err = new Error();
w.destroy(_err);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
}));
}));
}

{
const w = new Writable();
w.destroy();
finished(w, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
finished(w, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
}));
}

0 comments on commit 0738a2b

Please sign in to comment.