Skip to content

Commit 5541300

Browse files
stream: handle generator destruction from Duplex.from()
PR-URL: #55096 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Jason Zhang <[email protected]>
1 parent 6cd1805 commit 5541300

File tree

2 files changed

+243
-7
lines changed

2 files changed

+243
-7
lines changed

lib/internal/streams/duplexify.js

+52-7
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,19 @@ module.exports = function duplexify(body, name) {
8383
}
8484

8585
if (typeof body === 'function') {
86-
const { value, write, final, destroy } = fromAsyncGen(body);
86+
let d;
87+
88+
const { value, write, final, destroy } = fromAsyncGen(body, () => {
89+
destroyer(d);
90+
});
8791

8892
// Body might be a constructor function instead of an async generator function.
8993
if (isDuplexNodeStream(value)) {
90-
return value;
94+
return d = value;
9195
}
9296

9397
if (isIterable(value)) {
94-
return from(Duplexify, value, {
98+
return d = from(Duplexify, value, {
9599
// TODO (ronag): highWaterMark?
96100
objectMode: true,
97101
write,
@@ -102,12 +106,16 @@ module.exports = function duplexify(body, name) {
102106

103107
const then = value?.then;
104108
if (typeof then === 'function') {
105-
let d;
109+
let finalized = false;
106110

107111
const promise = FunctionPrototypeCall(
108112
then,
109113
value,
110114
(val) => {
115+
// The function returned without (fully) consuming the generator.
116+
if (!finalized) {
117+
destroyer(d);
118+
}
111119
if (val != null) {
112120
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
113121
}
@@ -123,6 +131,7 @@ module.exports = function duplexify(body, name) {
123131
readable: false,
124132
write,
125133
final(cb) {
134+
finalized = true;
126135
final(async () => {
127136
try {
128137
await promise;
@@ -208,11 +217,12 @@ module.exports = function duplexify(body, name) {
208217
body);
209218
};
210219

211-
function fromAsyncGen(fn) {
220+
function fromAsyncGen(fn, destructor) {
212221
let { promise, resolve } = PromiseWithResolvers();
213222
const ac = new AbortController();
214223
const signal = ac.signal;
215-
const value = fn(async function*() {
224+
225+
const asyncGenerator = (async function* () {
216226
while (true) {
217227
const _promise = promise;
218228
promise = null;
@@ -222,9 +232,44 @@ function fromAsyncGen(fn) {
222232
if (signal.aborted)
223233
throw new AbortError(undefined, { cause: signal.reason });
224234
({ promise, resolve } = PromiseWithResolvers());
235+
// Next line will "break" the loop if the generator is returned/thrown.
225236
yield chunk;
226237
}
227-
}(), { signal });
238+
})();
239+
240+
const originalReturn = asyncGenerator.return;
241+
asyncGenerator.return = async function(value) {
242+
try {
243+
return await originalReturn.call(this, value);
244+
} finally {
245+
if (promise) {
246+
const _promise = promise;
247+
promise = null;
248+
const { cb } = await _promise;
249+
process.nextTick(cb);
250+
251+
process.nextTick(destructor);
252+
}
253+
}
254+
};
255+
256+
const originalThrow = asyncGenerator.throw;
257+
asyncGenerator.throw = async function(err) {
258+
try {
259+
return await originalThrow.call(this, err);
260+
} finally {
261+
if (promise) {
262+
const _promise = promise;
263+
promise = null;
264+
const { cb } = await _promise;
265+
266+
// asyncGenerator.throw(undefined) should cause a callback error
267+
process.nextTick(cb, err ?? new AbortError());
268+
}
269+
}
270+
};
271+
272+
const value = fn(asyncGenerator, { signal });
228273

229274
return {
230275
value,

test/parallel/test-stream-duplex-from.js

+191
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const assert = require('assert');
55
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
66
const { ReadableStream, WritableStream } = require('stream/web');
77
const { Blob } = require('buffer');
8+
const sleep = require('util').promisify(setTimeout);
89

910
{
1011
const d = Duplex.from({
@@ -401,3 +402,193 @@ function makeATestWritableStream(writeFunc) {
401402
assert.strictEqual(d.writable, false);
402403
}));
403404
}
405+
406+
{
407+
const r = Readable.from(['foo', 'bar', 'baz']);
408+
pipeline(
409+
r,
410+
Duplex.from(async function(asyncGenerator) {
411+
const values = await Array.fromAsync(asyncGenerator);
412+
assert.deepStrictEqual(values, ['foo', 'bar', 'baz']);
413+
414+
await asyncGenerator.return();
415+
await asyncGenerator.return();
416+
await asyncGenerator.return();
417+
}),
418+
common.mustSucceed(() => {
419+
assert.strictEqual(r.destroyed, true);
420+
})
421+
);
422+
}
423+
424+
{
425+
const r = Readable.from(['foo', 'bar', 'baz']);
426+
pipeline(
427+
r,
428+
Duplex.from(async function(asyncGenerator) {
429+
// eslint-disable-next-line no-unused-vars
430+
for await (const _ of asyncGenerator) break;
431+
}),
432+
common.mustSucceed(() => {
433+
assert.strictEqual(r.destroyed, true);
434+
})
435+
);
436+
}
437+
438+
{
439+
const r = Readable.from(['foo', 'bar', 'baz']);
440+
pipeline(
441+
r,
442+
Duplex.from(async function(asyncGenerator) {
443+
const a = await asyncGenerator.next();
444+
assert.strictEqual(a.done, false);
445+
assert.strictEqual(a.value.toString(), 'foo');
446+
const b = await asyncGenerator.return();
447+
assert.strictEqual(b.done, true);
448+
}),
449+
common.mustSucceed(() => {
450+
assert.strictEqual(r.destroyed, true);
451+
})
452+
);
453+
}
454+
455+
{
456+
const r = Readable.from(['foo', 'bar', 'baz']);
457+
pipeline(
458+
r,
459+
Duplex.from(async function(asyncGenerator) {
460+
// Note: the generator is not even started at this point
461+
await asyncGenerator.return();
462+
}),
463+
common.mustSucceed(() => {
464+
assert.strictEqual(r.destroyed, true);
465+
})
466+
);
467+
}
468+
469+
{
470+
const r = Readable.from(['foo', 'bar', 'baz']);
471+
pipeline(
472+
r,
473+
Duplex.from(async function(asyncGenerator) {
474+
// Same as before, with a delay
475+
await sleep(100);
476+
await asyncGenerator.return();
477+
}),
478+
common.mustSucceed(() => {
479+
assert.strictEqual(r.destroyed, true);
480+
})
481+
);
482+
}
483+
484+
{
485+
const r = Readable.from(['foo', 'bar', 'baz']);
486+
pipeline(
487+
r,
488+
Duplex.from(async function(asyncGenerator) {}),
489+
common.mustCall((err) => {
490+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
491+
assert.strictEqual(r.destroyed, true);
492+
})
493+
);
494+
}
495+
496+
{
497+
const r = Readable.from(['foo', 'bar', 'baz']);
498+
pipeline(
499+
r,
500+
Duplex.from(async function(asyncGenerator) {
501+
await sleep(100);
502+
}),
503+
common.mustCall((err) => {
504+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
505+
assert.strictEqual(r.destroyed, true);
506+
})
507+
);
508+
}
509+
510+
{
511+
const r = Readable.from(['foo', 'bar', 'baz']);
512+
const d = Duplex.from(async function(asyncGenerator) {
513+
while (!(await asyncGenerator.next()).done) await sleep(100);
514+
});
515+
516+
setTimeout(() => d.destroy(), 150);
517+
518+
pipeline(
519+
r,
520+
d,
521+
common.mustCall((err) => {
522+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
523+
assert.strictEqual(r.destroyed, true);
524+
})
525+
);
526+
}
527+
528+
{
529+
const r = Duplex.from(async function* () {
530+
for (const value of ['foo', 'bar', 'baz']) {
531+
await sleep(50);
532+
yield value;
533+
}
534+
});
535+
const d = Duplex.from(async function(asyncGenerator) {
536+
while (!(await asyncGenerator.next()).done);
537+
});
538+
539+
setTimeout(() => r.destroy(), 75);
540+
541+
pipeline(
542+
r,
543+
d,
544+
common.mustCall((err) => {
545+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
546+
assert.strictEqual(r.destroyed, true);
547+
assert.strictEqual(d.destroyed, true);
548+
})
549+
);
550+
}
551+
552+
{
553+
const r = Readable.from(['foo']);
554+
pipeline(
555+
r,
556+
Duplex.from(async function(asyncGenerator) {
557+
await asyncGenerator.throw(new Error('my error'));
558+
}),
559+
common.mustCall((err) => {
560+
assert.strictEqual(err.message, 'my error');
561+
assert.strictEqual(r.destroyed, true);
562+
})
563+
);
564+
}
565+
566+
{
567+
const r = Readable.from(['foo', 'bar']);
568+
pipeline(
569+
r,
570+
Duplex.from(async function(asyncGenerator) {
571+
await asyncGenerator.next();
572+
await asyncGenerator.throw(new Error('my error'));
573+
}),
574+
common.mustCall((err) => {
575+
assert.strictEqual(err.message, 'my error');
576+
assert.strictEqual(r.destroyed, true);
577+
})
578+
);
579+
}
580+
581+
{
582+
const r = Readable.from(['foo', 'bar']);
583+
pipeline(
584+
r,
585+
Duplex.from(async function(asyncGenerator) {
586+
await asyncGenerator.next();
587+
await asyncGenerator.throw();
588+
}),
589+
common.mustCall((err) => {
590+
assert.strictEqual(err.code, 'ABORT_ERR');
591+
assert.strictEqual(r.destroyed, true);
592+
})
593+
);
594+
}

0 commit comments

Comments
 (0)