Skip to content

Commit

Permalink
revert eos change
Browse files Browse the repository at this point in the history
  • Loading branch information
RaisinTen committed Jul 4, 2021
1 parent e178f04 commit 93ee36e
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 78 deletions.
3 changes: 0 additions & 3 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -806,9 +806,6 @@ function onSocketNT(req, socket, err) {
socket.emit('free');
} else {
finished(socket.destroy(err || req[kError]), (er) => {
if (er?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
er = null;
}
_destroy(req, er || err);
});
return;
Expand Down
3 changes: 0 additions & 3 deletions lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,6 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
if (this.socket && !this.socket.destroyed && this.aborted) {
this.socket.destroy(err);
const cleanup = finished(this.socket, (e) => {
if (e?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
e = null;
}
cleanup();
process.nextTick(onError, this, e || err, cb);
});
Expand Down
61 changes: 26 additions & 35 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ function eos(stream, options, callback) {
isWritable(stream) === writable
);

let writableFinished = stream.writableFinished || wState?.finished;
let writableFinished = stream.writableFinished ||
(wState && wState.finished);
const onfinish = () => {
writableFinished = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -110,7 +111,8 @@ function eos(stream, options, callback) {
if (!readable || readableEnded) callback.call(stream);
};

let readableEnded = stream.readableEnded || rState?.endEmitted;
let readableEnded = stream.readableEnded ||
(rState && rState.endEmitted);
const onend = () => {
readableEnded = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -126,17 +128,7 @@ function eos(stream, options, callback) {
callback.call(stream, err);
};

let closed = wState?.closed || rState?.closed;

const onclose = () => {
closed = true;

const errored = wState?.errored || rState?.errored;

if (errored && typeof errored !== 'boolean') {
return callback.call(stream, errored);
}

if (readable && !readableEnded) {
if (!isReadableEnded(stream))
return callback.call(stream,
Expand All @@ -147,7 +139,6 @@ function eos(stream, options, callback) {
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}

callback.call(stream);
};

Expand Down Expand Up @@ -177,29 +168,29 @@ function eos(stream, options, callback) {
if (options.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

// _closed is for OutgoingMessage which is not a proper Writable.
const closed = (!wState && !rState && stream._closed === true) || (
(wState && wState.closed) ||
(rState && rState.closed) ||
(wState && wState.errorEmitted) ||
(rState && rState.errorEmitted) ||
(rState && stream.req && stream.aborted) ||
(
(!writable || (wState && wState.finished)) &&
(!readable || (rState && rState.endEmitted))
)
);

if (closed) {
process.nextTick(onclose);
} else if (wState?.errorEmitted || rState?.errorEmitted) {
if (!willEmitClose) {
process.nextTick(onclose);
}
} else if (
!readable &&
(!willEmitClose || stream.readable) &&
writableFinished
) {
process.nextTick(onclose);
} else if (
!writable &&
(!willEmitClose || stream.writable) &&
readableEnded
) {
process.nextTick(onclose);
} else if (!wState && !rState && stream._closed === true) {
// _closed is for OutgoingMessage which is not a proper Writable.
process.nextTick(onclose);
} else if ((rState && stream.req && stream.aborted)) {
process.nextTick(onclose);
// TODO(ronag): Re-throw error if errorEmitted?
// TODO(ronag): Throw premature close as if finished was called?
// before being closed? i.e. if closed but not errored, ended or finished.
// TODO(ronag): Throw some kind of error? Does it make sense
// to call finished() on a "finished" stream?
// TODO(ronag): willEmitClose?
process.nextTick(() => {
callback();
});
}

const cleanup = () => {
Expand Down
34 changes: 20 additions & 14 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ const {
getDefaultHighWaterMark
} = require('internal/streams/state');

const eos = require('internal/streams/end-of-stream');

const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
ERR_STREAM_PREMATURE_CLOSE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
} = require('internal/errors').codes;
const { validateObject } = require('internal/validators');

Expand Down Expand Up @@ -1111,18 +1110,23 @@ async function* createAsyncIterator(stream, options) {
let error = state.errored;
let errorEmitted = state.errorEmitted;
let endEmitted = state.endEmitted;
let closeEmitted = state.closeEmitted;

stream.on('readable', next);

eos(stream, (err) => {
if (err) {
errorEmitted = true;
stream
.on('readable', next)
.on('error', function(err) {
error = err;
}

endEmitted = true;
next.call(stream);
});
errorEmitted = true;
next.call(this);
})
.on('end', function() {
endEmitted = true;
next.call(this);
})
.on('close', function() {
closeEmitted = true;
next.call(this);
});

let errorThrown = false;
try {
Expand All @@ -1134,6 +1138,8 @@ async function* createAsyncIterator(stream, options) {
throw error;
} else if (endEmitted) {
break;
} else if (closeEmitted) {
throw new ERR_STREAM_PREMATURE_CLOSE();
} else {
await new Promise(next);
}
Expand Down
23 changes: 0 additions & 23 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -608,26 +608,3 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
assert.strictEqual(closed, true);
}));
}

{
const w = new Writable();
const _err = new Error();
w.destroy(_err);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
}));
}));
}

{
const w = new Writable();
w.destroy();
finished(w, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
finished(w, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
}));
}

0 comments on commit 93ee36e

Please sign in to comment.