Skip to content

Commit

Permalink
stream: async iteration should work with destroyed stream
Browse files Browse the repository at this point in the history
Fixes #23730.

PR-URL: #23785
Reviewed-By: Gus Caplan <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Matheus Marchini <[email protected]>
  • Loading branch information
mcollina authored and MylesBorins committed Nov 26, 2018
1 parent 9844188 commit dbd37ab
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 27 deletions.
69 changes: 43 additions & 26 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const finished = require('internal/streams/end-of-stream');

const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
Expand Down Expand Up @@ -34,30 +36,6 @@ function onReadable(iter) {
process.nextTick(readAndResolve, iter);
}

function onEnd(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(createIterResult(null, true));
}
iter[kEnded] = true;
}

function onError(iter, err) {
const reject = iter[kLastReject];
// reject if we are waiting for data in the Promise
// returned by next() and store the error
if (reject !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
reject(err);
}
iter[kError] = err;
}

function wrapForNext(lastPromise, iter) {
return function(resolve, reject) {
lastPromise.then(function() {
Expand Down Expand Up @@ -86,6 +64,22 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
return Promise.resolve(createIterResult(null, true));
}

if (this[kStream].destroyed) {
// We need to defer via nextTick because if .destroy(err) is
// called, the error will be emitted via nextTick, and
// we cannot guarantee that there is no error lingering around
// waiting to be emitted.
return new Promise((resolve, reject) => {
process.nextTick(() => {
if (this[kError]) {
reject(this[kError]);
} else {
resolve(createIterResult(null, true));
}
});
});
}

// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
Expand Down Expand Up @@ -155,9 +149,32 @@ const createReadableStreamAsyncIterator = (stream) => {
},
});

finished(stream, (err) => {
if (err) {
const reject = iterator[kLastReject];
// reject if we are waiting for data in the Promise
// returned by next() and store the error
if (reject !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
reject(err);
}
iterator[kError] = err;
return;
}

const resolve = iterator[kLastResolve];
if (resolve !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(null, true));
}
iterator[kEnded] = true;
});

stream.on('readable', onReadable.bind(null, iterator));
stream.on('end', onEnd.bind(null, iterator));
stream.on('error', onError.bind(null, iterator));

return iterator;
};
Expand Down
40 changes: 39 additions & 1 deletion test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

const common = require('../common');
const { Readable } = require('stream');
const { Readable, PassThrough, pipeline } = require('stream');
const assert = require('assert');

async function tests() {
Expand Down Expand Up @@ -324,6 +324,44 @@ async function tests() {

assert.strictEqual(data, expected);
})();

await (async function() {
console.log('.next() on destroyed stream');
const readable = new Readable({
read() {
// no-op
}
});

readable.destroy();

try {
await readable[Symbol.asyncIterator]().next();
} catch (e) {
assert.strictEqual(e.code, 'ERR_STREAM_PREMATURE_CLOSE');
}
})();

await (async function() {
console.log('.next() on pipelined stream');
const readable = new Readable({
read() {
// no-op
}
});

const passthrough = new PassThrough();
const err = new Error('kaboom');
pipeline(readable, passthrough, common.mustCall((e) => {
assert.strictEqual(e, err);
}));
readable.destroy(err);
try {
await readable[Symbol.asyncIterator]().next();
} catch (e) {
assert.strictEqual(e, err);
}
})();
}

// to avoid missing some tests if a promise does not resolve
Expand Down

0 comments on commit dbd37ab

Please sign in to comment.