Skip to content

Commit 094e571

Browse files
stream: handle generator destruction from Duplex.from()
1 parent 3c5ceff commit 094e571

File tree

2 files changed

+197
-14
lines changed

2 files changed

+197
-14
lines changed

lib/internal/streams/duplexify.js

+67-14
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,19 @@ module.exports = function duplexify(body, name) {
8383
}
8484

8585
if (typeof body === 'function') {
86-
const { value, write, final, destroy } = fromAsyncGen(body);
86+
let d;
87+
88+
const { value, write, final, destroy } = fromAsyncGen(body, (err) => {
89+
if (d) destroyer(d, err);
90+
});
8791

8892
// Body might be a constructor function instead of an async generator function.
8993
if (isDuplexNodeStream(value)) {
90-
return value;
94+
return d = value;
9195
}
9296

9397
if (isIterable(value)) {
94-
return from(Duplexify, value, {
98+
return d = from(Duplexify, value, {
9599
// TODO (ronag): highWaterMark?
96100
objectMode: true,
97101
write,
@@ -102,12 +106,11 @@ module.exports = function duplexify(body, name) {
102106

103107
const then = value?.then;
104108
if (typeof then === 'function') {
105-
let d;
106-
107109
const promise = FunctionPrototypeCall(
108110
then,
109111
value,
110112
(val) => {
113+
destroyer(d, null);
111114
if (val != null) {
112115
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
113116
}
@@ -208,11 +211,14 @@ module.exports = function duplexify(body, name) {
208211
body);
209212
};
210213

211-
function fromAsyncGen(fn) {
214+
function fromAsyncGen(fn, onAbort) {
212215
let { promise, resolve } = createDeferredPromise();
213216
const ac = new AbortController();
214217
const signal = ac.signal;
215-
const value = fn(async function*() {
218+
let ended = false;
219+
let error = null;
220+
221+
const asyncGenerator = (async function* () {
216222
while (true) {
217223
const _promise = promise;
218224
promise = null;
@@ -224,21 +230,68 @@ function fromAsyncGen(fn) {
224230
({ promise, resolve } = createDeferredPromise());
225231
yield chunk;
226232
}
227-
}(), { signal });
233+
})();
234+
235+
// Not using try/finally because asyncGenerator.return() should work even
236+
// if the generator was never started.
237+
238+
const originalReturn = asyncGenerator.return;
239+
asyncGenerator.return = async function(value) {
240+
// eslint-disable-next-line node-core/avoid-prototype-pollution
241+
if (ended) return { value, done: true };
242+
243+
const _promise = promise;
244+
promise = null;
245+
const { cb } = await _promise;
246+
process.nextTick(cb);
247+
248+
ended = true;
249+
250+
onAbort(null);
251+
return originalReturn.call(asyncGenerator, value);
252+
};
253+
254+
const originalThrow = asyncGenerator.throw;
255+
asyncGenerator.throw = async function(err) {
256+
if (ended) throw err;
257+
258+
const _promise = promise;
259+
promise = null;
260+
const { cb } = await _promise;
261+
process.nextTick(cb);
262+
263+
ended = true;
264+
error = err;
265+
onAbort(err);
266+
267+
return originalThrow.call(asyncGenerator, err);
268+
};
269+
270+
271+
const value = fn(asyncGenerator, { signal });
228272

229273
return {
230274
value,
231275
write(chunk, encoding, cb) {
232-
const _resolve = resolve;
233-
resolve = null;
234-
_resolve({ chunk, done: false, cb });
276+
if (ended) {
277+
cb(error);
278+
} else {
279+
const _resolve = resolve;
280+
resolve = null;
281+
_resolve({ chunk, done: false, cb });
282+
}
235283
},
236284
final(cb) {
237-
const _resolve = resolve;
238-
resolve = null;
239-
_resolve({ done: true, cb });
285+
if (ended) {
286+
cb(error);
287+
} else {
288+
const _resolve = resolve;
289+
resolve = null;
290+
_resolve({ done: true, cb });
291+
}
240292
},
241293
destroy(err, cb) {
294+
ended = true;
242295
ac.abort();
243296
cb(err);
244297
},

test/parallel/test-stream-duplex-from.js

+130
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const assert = require('assert');
55
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
66
const { ReadableStream, WritableStream } = require('stream/web');
77
const { Blob } = require('buffer');
8+
const sleep = require('util').promisify(setTimeout);
89

910
{
1011
const d = Duplex.from({
@@ -401,3 +402,132 @@ function makeATestWritableStream(writeFunc) {
401402
assert.strictEqual(d.writable, false);
402403
}));
403404
}
405+
406+
{
407+
const r = Readable.from(['foo', 'bar', 'bar']);
408+
pipeline(
409+
r,
410+
Duplex.from(async function(asyncGenerator) {
411+
// eslint-disable-next-line no-unused-vars
412+
for await (const _ of asyncGenerator);
413+
}),
414+
common.mustSucceed(() => {
415+
assert.strictEqual(r.destroyed, true);
416+
})
417+
);
418+
}
419+
420+
{
421+
const r = Readable.from(['foo', 'bar', 'bar']);
422+
pipeline(
423+
r,
424+
Duplex.from(async function(asyncGenerator) {
425+
// eslint-disable-next-line no-unused-vars
426+
for await (const _ of asyncGenerator) break;
427+
}),
428+
common.mustSucceed(() => {
429+
assert.strictEqual(r.destroyed, true);
430+
})
431+
);
432+
}
433+
434+
{
435+
const r = Readable.from(['foo', 'bar', 'baz']);
436+
pipeline(
437+
r,
438+
Duplex.from(async function(asyncGenerator) {
439+
const a = await asyncGenerator.next();
440+
assert.strictEqual(a.done, false);
441+
assert.strictEqual(a.value.toString(), 'foo');
442+
const b = await asyncGenerator.return();
443+
assert.strictEqual(b.done, true);
444+
}),
445+
common.mustSucceed(() => {
446+
assert.strictEqual(r.destroyed, true);
447+
})
448+
);
449+
}
450+
451+
{
452+
const r = Readable.from(['foo', 'bar', 'baz']);
453+
pipeline(
454+
r,
455+
Duplex.from(async function(asyncGenerator) {
456+
// Note: the generator is not even started at this point
457+
await asyncGenerator.return();
458+
}),
459+
common.mustSucceed(() => {
460+
assert.strictEqual(r.destroyed, true);
461+
})
462+
);
463+
}
464+
465+
{
466+
const r = Readable.from(['foo', 'bar', 'baz']);
467+
pipeline(
468+
r,
469+
Duplex.from(async function(asyncGenerator) {
470+
// Same as before, with a delay
471+
await sleep(100);
472+
await asyncGenerator.return();
473+
}),
474+
common.mustSucceed(() => {
475+
assert.strictEqual(r.destroyed, true);
476+
})
477+
);
478+
}
479+
480+
{
481+
const r = Readable.from(['foo', 'bar', 'baz']);
482+
pipeline(
483+
r,
484+
Duplex.from(async function(asyncGenerator) {}),
485+
common.mustCall((err) => {
486+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
487+
assert.strictEqual(r.destroyed, true);
488+
})
489+
);
490+
}
491+
492+
{
493+
const r = Readable.from(['foo', 'bar', 'baz']);
494+
pipeline(
495+
r,
496+
Duplex.from(function(asyncGenerator) {
497+
return sleep(100);
498+
}),
499+
common.mustCall((err) => {
500+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
501+
assert.strictEqual(r.destroyed, true);
502+
})
503+
);
504+
}
505+
506+
{
507+
const r = Readable.from(['foo']);
508+
pipeline(
509+
r,
510+
Duplex.from(async function(asyncGenerator) {
511+
await asyncGenerator.throw(new Error('my error'));
512+
}),
513+
common.mustCall((err) => {
514+
assert.strictEqual(err.message, 'my error');
515+
assert.strictEqual(r.destroyed, true);
516+
})
517+
);
518+
}
519+
520+
{
521+
const r = Readable.from(['foo', 'bar']);
522+
pipeline(
523+
r,
524+
Duplex.from(async function(asyncGenerator) {
525+
await asyncGenerator.next();
526+
await asyncGenerator.throw(new Error('my error'));
527+
}),
528+
common.mustCall((err) => {
529+
assert.strictEqual(err.message, 'my error');
530+
assert.strictEqual(r.destroyed, true);
531+
})
532+
);
533+
}

0 commit comments

Comments
 (0)