Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: throw on premature close in Readable[AsyncIterator] #39117

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ const {

const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
ERR_STREAM_PREMATURE_CLOSE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
} = require('internal/errors').codes;
const { validateObject } = require('internal/validators');

Expand Down Expand Up @@ -1138,7 +1139,7 @@ async function* createAsyncIterator(stream, options) {
} else if (endEmitted) {
break;
} else if (closeEmitted) {
break;
throw new ERR_STREAM_PREMATURE_CLOSE();
} else {
await new Promise(next);
}
Expand All @@ -1152,7 +1153,6 @@ async function* createAsyncIterator(stream, options) {
} finally {
if (!errorThrown && opts.destroyOnReturn) {
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
}
}
Expand Down
75 changes: 60 additions & 15 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
} = require('stream');
const assert = require('assert');
const http = require('http');
const fs = require('fs');

async function tests() {
{
Expand Down Expand Up @@ -338,11 +339,17 @@ async function tests() {
process.nextTick(async () => {
readable.on('close', common.mustNotCall());
let received = 0;
for await (const k of readable) {
// Just make linting pass. This should never run.
assert.strictEqual(k, 'hello');
received++;
let err = null;
try {
for await (const k of readable) {
// Just make linting pass. This should never run.
assert.strictEqual(k, 'hello');
received++;
}
} catch (_err) {
err = _err;
}
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(received, 0);
});
}
Expand Down Expand Up @@ -412,8 +419,13 @@ async function tests() {

readable.destroy();

const { done } = await readable[Symbol.asyncIterator]().next();
assert.strictEqual(done, true);
const it = await readable[Symbol.asyncIterator]();
const next = it.next();
next
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
}

{
Expand Down Expand Up @@ -458,7 +470,7 @@ async function tests() {
}

{
console.log('destroy mid-stream does not error');
console.log('destroy mid-stream errors');
const r = new Readable({
objectMode: true,
read() {
Expand All @@ -467,10 +479,16 @@ async function tests() {
}
});

// eslint-disable-next-line no-unused-vars
for await (const a of r) {
r.destroy(null);
let err = null;
try {
// eslint-disable-next-line no-unused-vars
for await (const a of r) {
r.destroy(null);
}
} catch (_err) {
err = _err;
}
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}

{
Expand Down Expand Up @@ -514,7 +532,7 @@ async function tests() {
}

{
console.log('all next promises must be resolved on destroy');
console.log('all next promises must be rejected on destroy');
const r = new Readable({
objectMode: true,
read() {
Expand All @@ -525,7 +543,11 @@ async function tests() {
const c = b.next();
const d = b.next();
r.destroy();
assert.deepStrictEqual(await c, { done: true, value: undefined });
c
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
assert.deepStrictEqual(await d, { done: true, value: undefined });
}

Expand Down Expand Up @@ -675,7 +697,7 @@ async function tests() {
}

{
// AsyncIterator should finish correctly if destroyed.
// AsyncIterator should not finish correctly if destroyed.

const r = new Readable({
objectMode: true,
Expand All @@ -688,11 +710,34 @@ async function tests() {
const it = r[Symbol.asyncIterator]();
const next = it.next();
next
.then(common.mustCall(({ done }) => assert.strictEqual(done, true)))
.catch(common.mustNotCall());
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
});
}

{
// AsyncIterator should throw if prematurely closed
// before end has been emitted.
(async function() {
const readable = fs.createReadStream(__filename);

try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable) {
readable.close();
}

assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());
}

// AsyncIterator non-destroying iterator
{
function createReadable() {
Expand Down