Skip to content

Commit

Permalink
New API: async generator
Browse files Browse the repository at this point in the history
  • Loading branch information
rxaviers committed Mar 18, 2022
1 parent a234e39 commit b2d8122
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 187 deletions.
56 changes: 32 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,35 @@

## Why?

Existing solutions also re-implement Promise 😩...
The goal of this library is to use native async iterator (ES9), native async functions and native Promise to implement the concurrency behavior (look our source code).

The goal of this library is to use native async functions (if ES7 is available) and/or native Promise (ES6) including `Promise.race()` and `Promise.all()` to implement the concurrency behavior (look our source code).
If you need ES6 as baseline, please use our version [1.x](https://github.com/rxaviers/async-pool/tree/1.x).

## What?

`asyncPool` runs multiple promise-returning & async functions in a limited concurrency pool. It rejects immediately as soon as one of the promises rejects. It resolves when all the promises completes. It calls the iterator function as soon as possible (under concurrency limit). For example:

```js
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
const timeout = ms => new Promise(resolve => setTimeout(() => resolve(ms), ms));

for await (const ms of asyncPool(2, [1000, 5000, 3000, 2000], timeout)) {
console.log(ms);
}
// Call iterator timeout(1000)
// Call iterator timeout(5000)
// Concurrency limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// for await...of outputs "1000"
// Call iterator timeout(3000)
// Concurrency limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// for await...of outputs "3000"
// Call iterator timeout(2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// for await...of outputs "5000"
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
// for await...of outputs "2000"
```

## Usage
Expand All @@ -37,33 +43,35 @@ $ npm install tiny-async-pool
import asyncPool from "tiny-async-pool";
```

### ES7 async
### ES9 for await...of

```js
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
const results = await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
for await (const value of asyncPool(concurrency, iterable, iteratorFn)) {
...
}
```

Note: Something really nice will be possible soon https://github.com/tc39/proposal-async-iteration
## Migrating from 1.x

### ES6 Promise
The main difference: [1.x API](https://github.com/rxaviers/async-pool/tree/1.x) waits until all of the promises completes, then all results are returned (example below). The new API (thanks to [async iteration](https://github.com/tc39/proposal-async-iteration)) let each result be returned as soon as it completes (example above).

```js
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => {
...
});
// ES7 API available on our previous 1.x version
const results = await asyncPool(concurrency, iterable, iteratorFn);

// ES6 API available on our previous 1.x version
return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => {...});
```

## API

### `asyncPool(poolLimit, iterable, iteratorFn)`
### `asyncPool(concurrency, iterable, iteratorFn)`

Runs multiple promise-returning & async functions in a limited concurrency pool. It rejects immediately as soon as one of the promises rejects. It resolves when all the promises completes. It calls the iterator function as soon as possible (under concurrency limit).
Runs multiple promise-returning & async functions in a limited concurrency pool. It rejects immediately as soon as one of the promises rejects. It calls the iterator function as soon as possible (under concurrency limit). It returns an async iterator that yields as soon as a promise completes (under concurrency limit).

#### poolLimit
#### concurrency

The pool limit number (>= 1).
The concurrency limit number (>= 1).

#### iterable

Expand Down
11 changes: 0 additions & 11 deletions dist/node.js

This file was deleted.

24 changes: 0 additions & 24 deletions lib/es6.js

This file was deleted.

17 changes: 0 additions & 17 deletions lib/es7.js

This file was deleted.

20 changes: 20 additions & 0 deletions lib/es9.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
async function* asyncPool(concurrency, iterable, iteratorFn) {
let done;
const ret = [];
const executing = new Set();
for (const item of iterable) {
const promise = Promise.resolve().then(() => iteratorFn(item, iterable));
ret.push(promise);
executing.add(promise);
const clean = () => executing.delete(promise);
promise.then(clean).catch(clean);
if (executing.size >= concurrency) {
yield await Promise.race(executing);
}
}
while (executing.size) {
yield await Promise.race(executing);
}
}

module.exports = asyncPool;
23 changes: 0 additions & 23 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
{
"name": "tiny-async-pool",
"version": "1.3.0",
"description": "Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7",
"main": "dist/node.js",
"description": "Run multiple promise-returning & async functions with limited concurrency using native ES9",
"main": "lib/es9.js",
"files": [
"LICENSE-MIT",
"dist",
"lib"
],
"scripts": {
Expand All @@ -21,8 +20,10 @@
"concurrency",
"promise",
"async",
"es6",
"es7"
"async iterator",
"async iteration",
"async generator",
"es9"
],
"author": "Rafael Xavier de Souza",
"license": "MIT",
Expand All @@ -35,7 +36,5 @@
"mocha": "^5.0.0",
"prettier": "^1.10.2"
},
"dependencies": {
"semver": "^5.5.0"
}
"dependencies": {}
}
148 changes: 68 additions & 80 deletions test/unit/index.js
Original file line number Diff line number Diff line change
@@ -1,90 +1,78 @@
const es6AsyncPool = require("../../lib/es6");
const es7AsyncPool = require("../../lib/es7");
const asyncPool = require("../../lib/es9");
const { expect } = require("chai");

describe("asyncPool", function() {
for (const [title, asyncPool] of [
["ES6 support", es6AsyncPool],
["ES7 support", es7AsyncPool]
]) {
describe(title, function() {
it("only runs as many promises in parallel as given by the pool limit", async function() {
const results = [];
const timeout = i =>
new Promise(resolve =>
setTimeout(() => {
results.push(i);
resolve();
}, i)
);
await asyncPool(2, [100, 500, 300, 200], timeout);
expect(results).to.deep.equal([100, 300, 500, 200]);
});

it("runs all promises in parallel when the pool is bigger than needed", async function() {
const results = [];
const timeout = i =>
new Promise(resolve =>
setTimeout(() => {
results.push(i);
resolve();
}, i)
);
await asyncPool(5, [100, 500, 300, 200], timeout);
expect(results).to.deep.equal([100, 200, 300, 500]);
});
const timeout = i =>
new Promise(resolve =>
setTimeout(() => {
resolve(i);
}, i)
);

it("rejects on error (but does not leave unhandled rejections) (1/2)", async function() {
const timeout = _ => Promise.reject();
return expect(
asyncPool(5, [100, 500, 300, 200], timeout)
).to.be.rejected;
// check console - no UnhandledPromiseRejectionWarning should appear
});
describe("asyncPool", function() {
it("only runs as many promises in parallel as given by the pool limit", async function() {
const gen = asyncPool(2, [10, 50, 30, 20], timeout);
expect((await gen.next()).value).to.equal(10);
expect((await gen.next()).value).to.equal(30);
expect((await gen.next()).value).to.equal(50);
expect((await gen.next()).value).to.equal(20);
});

it("rejects on error (but does not leave unhandled rejections) (2/2)", async function() {
return expect(
asyncPool(
2,
[0, 1, 2],
(i, a) =>
i < a.length - 1 ? Promise.resolve(i) : Promise.reject(i)
)
).to.be.rejected;
// check console - no UnhandledPromiseRejectionWarning should appear
});
it("runs all promises in parallel when the pool is bigger than needed", async function() {
const gen = asyncPool(5, [10, 50, 30, 20], timeout);
expect((await gen.next()).value).to.equal(10);
expect((await gen.next()).value).to.equal(20);
expect((await gen.next()).value).to.equal(30);
expect((await gen.next()).value).to.equal(50);
});

it("rejects as soon as first promise rejects", async function() {
const startedTasks = [];
const finishedTasks = [];
const timeout = i => {
startedTasks.push(i);
return new Promise((resolve, reject) =>
setTimeout(() => {
if (i === 300) {
reject(new Error("Oops"));
} else {
finishedTasks.push(i);
resolve();
}
}, i)
);
};
it("rejects on error (but does not leave unhandled rejections) (1/2)", async function() {
const timeout = _ => Promise.reject();
const gen = asyncPool(5, [10, 50, 30, 20], timeout);
expect(gen.next()).to.be.rejected;
// check console - no UnhandledPromiseRejectionWarning should appear
});

const testResult = await expect(
asyncPool(2, [100, 500, 300, 200], timeout)
).to.be.rejected;
it("rejects on error (but does not leave unhandled rejections) (2/2)", async function() {
const gen = asyncPool(
2,
[0, 1, 2],
(i, a) => (i < a.length - 1 ? Promise.resolve(i) : Promise.reject(i))
);
expect((await gen.next()).value).to.equal(0);
expect(gen.next()).to.be.rejected;
// check console - no UnhandledPromiseRejectionWarning should appear
});

expect(startedTasks).to.deep.equal([100, 500, 300]);
expect(finishedTasks).to.deep.equal([100]);
it("rejects as soon as first promise rejects", async function() {
const startedTasks = [];
const finishedTasks = [];
const timeout = i => {
startedTasks.push(i);
return new Promise((resolve, reject) =>
setTimeout(() => {
if (i === 30) {
reject(new Error("Oops"));
} else {
finishedTasks.push(i);
resolve();
}
}, i)
);
};

// tasks started before the error will continue, though - just wait a bit
await new Promise(resolve => setTimeout(() => resolve(), 500));
expect(startedTasks).to.deep.equal([100, 500, 300]);
expect(finishedTasks).to.deep.equal([100, 500]);
const gen = asyncPool(2, [10, 50, 30, 20], timeout);
const step1 = gen.next();
const step2 = gen.next();
const step3 = gen.next();
expect(step1).to.be.fulfilled;
await expect(step2).to.be.rejected;
expect(startedTasks).to.deep.equal([10, 50, 30]);
expect(finishedTasks).to.deep.equal([10]);
await expect(step3).to.be.fulfilled;

return testResult;
});
});
}
// tasks started before the exception will continue, though - just wait a bit
await new Promise(resolve => setTimeout(() => resolve(), 50));
expect(startedTasks).to.deep.equal([10, 50, 30]);
expect(finishedTasks).to.deep.equal([10, 50]);
});
});

0 comments on commit b2d8122

Please sign in to comment.