Skip to content

Commit

Permalink
fix: make promise throttle level consistent (#240)
Browse files Browse the repository at this point in the history
* fix: make promising throttle level consistent

@W-13709025@

* test: handling of undefineds

* chore: add some really good tests

Thanks Shane

* test: ut for ensureArray

* style: concise test assertions

* chore: properly remove entries from concurrency pool

* chore: make concurrency pool a map

* test: another ensureArray UT

* chore: apply review comment

---------

Co-authored-by: mshanemc <[email protected]>
  • Loading branch information
peternhale and mshanemc authored Jul 5, 2023
1 parent 1991aa5 commit 8778166
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/

export const ensureArray = <T>(entryOrArray: T | T[] | undefined): T[] => {
if (entryOrArray) {
if (entryOrArray !== undefined && entryOrArray !== null) {
return Array.isArray(entryOrArray) ? entryOrArray : [entryOrArray];
}
return [];
Expand Down
60 changes: 49 additions & 11 deletions src/throttledPromiseAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ export type PromiseItem<T, O = T | undefined> = {
producer: (source: T, throttledPromise: ThrottledPromiseAll<T, O | undefined>) => Promise<O | undefined>;
};

type IndexedProducer<T, O = T> = PromiseItem<T, O> & {
index: number;
};

type IndexedResult<O> = {
index: number;
result: O | undefined;
};

/**
* A promise that throttles the number of promises running at a time.
*
Expand All @@ -39,7 +48,7 @@ export class ThrottledPromiseAll<T, O = T> {
private readonly concurrency: number;
private wait: Duration;
private timeout: NodeJS.Timeout | undefined;
readonly #results: Array<O | undefined> = [];
readonly #results: Array<IndexedResult<O> | undefined> = [];

/**
* Construct a new ThrottledPromiseAll.
Expand All @@ -56,7 +65,7 @@ export class ThrottledPromiseAll<T, O = T> {
* Returns the results of the promises that have been resolved.
*/
public get results(): Array<O | undefined> {
return this.#results;
return this.#results.sort((a, b) => (a?.index ?? 0) - (b?.index ?? 0)).map((r) => r?.result);
}

/**
Expand All @@ -73,7 +82,7 @@ export class ThrottledPromiseAll<T, O = T> {
source: T | T[],
producer: (source: T, throttledPromise: ThrottledPromiseAll<T, O | undefined>) => Promise<O | undefined>
): void {
ensureArray(source).forEach((s) => this.queue.push({ source: s, producer }));
ensureArray<T>(source).forEach((s) => this.queue.push({ source: s, producer }));
}

/**
Expand Down Expand Up @@ -109,7 +118,7 @@ export class ThrottledPromiseAll<T, O = T> {
await this.dequeue();
}
this.stop();
return this.#results;
return this.results;
} catch (e) {
this.stop();
throw e;
Expand All @@ -124,14 +133,43 @@ export class ThrottledPromiseAll<T, O = T> {
}

private async dequeue(): Promise<void> {
while (this.queue.length > 0) {
const next = this.queue.slice(0, this.concurrency);
this.queue.splice(0, this.concurrency);
const generator = function* (
data: Array<PromiseItem<T, O | undefined>>
): Generator<PromiseItem<T, O | undefined> | undefined> {
while (data.length > 0) {
yield data.shift();
}
};
const concurrencyPool: Map<number, Promise<IndexedResult<O> | undefined>> = new Map<
number,
Promise<IndexedResult<O> | undefined>
>();
const get = generator(this.queue);
let index = 0;
while (this.queue.length > 0 || concurrencyPool.size > 0) {
while (concurrencyPool.size < this.concurrency) {
const item = get.next().value as PromiseItem<T, O | undefined>;
if (!item) {
break;
}

const p: IndexedProducer<T, O> = { ...item, index: index++ };
concurrencyPool.set(
p.index,
p
.producer(item.source, this)
.then((result) => ({ index: p.index, result }))
.catch((e) => Promise.reject(e))
);
}
// eslint-disable-next-line no-await-in-loop
const results = await Promise.all(
next.map((item) => item.producer(item.source, this).catch((e) => Promise.reject(e)))
);
this.#results.push(...results);
const r = await Promise.race(concurrencyPool.values());
const rIndex = r?.index ?? -1;
if (!concurrencyPool.has(rIndex)) {
throw new Error(`PromiseQueue: Could not find index ${r?.index} in pool`);
}
concurrencyPool.delete(rIndex);
this.#results.push(r);
}
}
}
12 changes: 10 additions & 2 deletions test/collections.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@ import { ensureArray } from '../src/collections';
describe('collections', () => {
describe('ensureArray', () => {
it('undefined => empty array', () => {
const input = undefined;
expect(ensureArray(input)).to.deep.equal([]);
expect(ensureArray(undefined)).to.deep.equal([]);
});
it('null => empty array', () => {
expect(ensureArray(null)).to.deep.equal([]);
});
it('zero => array with zero', () => {
expect(ensureArray(0)).to.deep.equal([0]);
});
it('empty array => empty array', () => {
expect(ensureArray([])).to.deep.equal([]);
});
it('an array => the array', () => {
const input = ['a', 'b'];
Expand Down
51 changes: 47 additions & 4 deletions test/promiseQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
import { expect } from 'chai';
import { ThrottledPromiseAll } from '../src/throttledPromiseAll';
import { Duration } from '../src/duration';
import { ThrottledPromiseAll } from '../src';
import { Duration } from '../src';

describe('throttledPromiseAll', () => {
const numberProducer = (
Expand All @@ -19,7 +19,10 @@ describe('throttledPromiseAll', () => {
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 1 });
for (const i of [1, 2, 3, 4, 5]) {
// eslint-disable-next-line no-await-in-loop
throttledPromiseAll.add(i, numberProducer);
throttledPromiseAll.add(
i,
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), (5 - i) * 100))
);
}
await throttledPromiseAll.all();
const results = throttledPromiseAll.results as number[];
Expand Down Expand Up @@ -65,7 +68,7 @@ describe('throttledPromiseAll', () => {
});
throttledPromiseAll.add(
[1, 2, 3, 4, 5],
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), 10000))
(source) => new Promise((resolve) => setTimeout(() => resolve(source + 1), 200))
);
await throttledPromiseAll.all();
} catch (e) {
Expand All @@ -87,4 +90,44 @@ describe('throttledPromiseAll', () => {
const results = throttledPromiseAll.results as number[];
expect(results).to.deep.equal([1, 2, 3, 4, 5].map((i) => i + 1));
});

it('empty array', async () => {
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 5 });
await throttledPromiseAll.all();
expect(throttledPromiseAll.results).to.deep.equal([]);
});

it('add single arg that returns undefined', async () => {
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 5 });
throttledPromiseAll.add(0, () => Promise.resolve(undefined));
await throttledPromiseAll.all();
expect(throttledPromiseAll.results).to.deep.equal([undefined]);
});

it('add single arg that returns null', async () => {
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 5 });
throttledPromiseAll.add(0, () => Promise.resolve(-10));
await throttledPromiseAll.all();
expect(throttledPromiseAll.results).to.deep.equal([-10]);
});

it('add with producer of undefined', async () => {
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 5 });
throttledPromiseAll.add(0, () => Promise.resolve(undefined));
[1, 2, 3, 4, 5].forEach((i) => throttledPromiseAll.add(i, numberProducer));
throttledPromiseAll.add(6, () => Promise.resolve(undefined));
await throttledPromiseAll.all();
expect(throttledPromiseAll.results).to.deep.equal([undefined, 2, 3, 4, 5, 6, undefined]);
});

it('multiple adds to check order/sort', async () => {
const throttledPromiseAll: ThrottledPromiseAll<number, number> = new ThrottledPromiseAll({ concurrency: 2 });
throttledPromiseAll.add(0, () => Promise.resolve(undefined));
[1, 2].forEach((i) => throttledPromiseAll.add(i, numberProducer));
throttledPromiseAll.add(6, () => Promise.resolve(undefined));
[6, 7].forEach((i) => throttledPromiseAll.add(i, numberProducer));
throttledPromiseAll.add(6, () => Promise.resolve(undefined));
await throttledPromiseAll.all();
expect(throttledPromiseAll.results).to.deep.equal([undefined, 2, 3, undefined, 7, 8, undefined]);
});
});
7 changes: 1 addition & 6 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -716,12 +716,7 @@ acorn-walk@^8.1.1:
version "8.1.1"
resolved "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.1.1.tgz"

acorn@^8.4.1:
version "8.8.0"
resolved "https://registry.npmjs.org/acorn/-/acorn-8.8.0.tgz#88c0187620435c7f6015803f5539dae05a9dbea8"
integrity sha512-QOxyigPVrpZ2GXT+PFyZTl6TtOFc5egxHIP9IlQ+RbupQuX4RkT/Bee4/kQuC02Xkzg84JcT7oLYtDIQxp+v7w==

acorn@^8.9.0:
acorn@^8.4.1, acorn@^8.9.0:
version "8.9.0"
resolved "https://registry.npmjs.org/acorn/-/acorn-8.9.0.tgz#78a16e3b2bcc198c10822786fa6679e245db5b59"
integrity sha512-jaVNAFBHNLXspO543WnNNPZFRtavh3skAkITqD0/2aeMkKZTN+254PyhwxFYrk3vQ1xfY+2wbesJMs/JC8/PwQ==
Expand Down

0 comments on commit 8778166

Please sign in to comment.