Skip to content

Commit 3025459

Browse files
committed
fix(merge): catch promise errors to avoid unhandled exceptions
fixes #353
1 parent 1e4dd6f commit 3025459

File tree

1 file changed

+14
-14
lines changed

1 file changed

+14
-14
lines changed

src/asynciterable/merge.ts

+14-14
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import { safeRace } from '../util/safeRace';
66
// eslint-disable-next-line @typescript-eslint/no-empty-function
77
const NEVER_PROMISE = new Promise(() => {});
88

9-
type MergeResult<T> = { value: T; index: number };
9+
type MergeResult<T> = { value: T; index: number; done?: boolean, error?: any };
1010

11-
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) {
12-
return promise.then((value) => ({ value, index })) as Promise<MergeResult<T>>;
11+
function wrapPromiseWithIndex<T>(promise: Promise<IteratorResult<T>>, index: number) {
12+
return promise
13+
.then(({value, done}) => ({ value, done, index }))
14+
.catch((error) => ({ error, index })) as Promise<MergeResult<T>>;
1315
}
1416

1517
/** @ignore */
@@ -25,7 +27,7 @@ export class MergeAsyncIterable<T> extends AsyncIterableX<T> {
2527
throwIfAborted(signal);
2628
const length = this._source.length;
2729
const iterators = new Array<AsyncIterator<T>>(length);
28-
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length);
30+
const nexts = new Array<Promise<MergeResult<T>>>(length);
2931
let active = length;
3032
for (let i = 0; i < length; i++) {
3133
const iterator = wrapWithAbort(this._source[i], signal)[Symbol.asyncIterator]();
@@ -34,18 +36,16 @@ export class MergeAsyncIterable<T> extends AsyncIterableX<T> {
3436
}
3537

3638
while (active > 0) {
37-
const next = safeRace(nexts);
38-
const {
39-
value: { done: done$, value: value$ },
40-
index,
41-
} = await next;
42-
if (done$) {
43-
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE;
39+
const next = await safeRace(nexts);
40+
if (next.hasOwnProperty('error')) {
41+
throw next.error;
42+
} else if (next.done) {
43+
nexts[next.index] = <Promise<MergeResult<T>>>NEVER_PROMISE;
4444
active--;
4545
} else {
46-
const iterator$ = iterators[index];
47-
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
48-
yield value$;
46+
const iterator$ = iterators[next.index];
47+
nexts[next.index] = wrapPromiseWithIndex(iterator$.next(), next.index);
48+
yield next.value;
4949
}
5050
}
5151
}

0 commit comments

Comments
 (0)