Skip to content

Commit 1dbe765

Browse files
addaleaxcodebytere
authored andcommitted
doc: add AsyncResource + Worker pool example
Use Worker thread pools as an example of how `AsyncResource` can be used to track async state across callbacks. PR-URL: #31601 Reviewed-By: Gireesh Punathil <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Chengzhong Wu <[email protected]> Reviewed-By: Rich Trott <[email protected]> Reviewed-By: Anto Aravinth <[email protected]>
1 parent 435b9c9 commit 1dbe765

File tree

2 files changed

+126
-1
lines changed

2 files changed

+126
-1
lines changed

doc/api/async_hooks.md

+122
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,128 @@ never be called.
682682
* Returns: {number} The same `triggerAsyncId` that is passed to the
683683
`AsyncResource` constructor.
684684

685+
<a id="async-resource-worker-pool"></a>
686+
### Using `AsyncResource` for a `Worker` thread pool
687+
688+
The following example shows how to use the `AsyncResource` class to properly
689+
provide async tracking for a [`Worker`][] pool. Other resource pools, such as
690+
database connection pools, can follow a similar model.
691+
692+
Assuming that the task is adding two numbers, using a file named
693+
`task_processor.js` with the following content:
694+
695+
```js
696+
const { parentPort } = require('worker_threads');
697+
parentPort.on('message', (task) => {
698+
parentPort.postMessage(task.a + task.b);
699+
});
700+
```
701+
702+
a Worker pool around it could use the following structure:
703+
704+
```js
705+
const { AsyncResource } = require('async_hooks');
706+
const { EventEmitter } = require('events');
707+
const path = require('path');
708+
const { Worker } = require('worker_threads');
709+
710+
const kTaskInfo = Symbol('kTaskInfo');
711+
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
712+
713+
class WorkerPoolTaskInfo extends AsyncResource {
714+
constructor(callback) {
715+
super('WorkerPoolTaskInfo');
716+
this.callback = callback;
717+
}
718+
719+
done(err, result) {
720+
this.runInAsyncScope(this.callback, null, err, result);
721+
this.emitDestroy(); // `TaskInfo`s are used only once.
722+
}
723+
}
724+
725+
class WorkerPool extends EventEmitter {
726+
constructor(numThreads) {
727+
super();
728+
this.numThreads = numThreads;
729+
this.workers = [];
730+
this.freeWorkers = [];
731+
732+
for (let i = 0; i < numThreads; i++)
733+
this.addNewWorker();
734+
}
735+
736+
addNewWorker() {
737+
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
738+
worker.on('message', (result) => {
739+
// In case of success: Call the callback that was passed to `runTask`,
740+
// remove the `TaskInfo` associated with the Worker, and mark it as free
741+
// again.
742+
worker[kTaskInfo].done(null, result);
743+
worker[kTaskInfo] = null;
744+
this.freeWorkers.push(worker);
745+
this.emit(kWorkerFreedEvent);
746+
});
747+
worker.on('error', (err) => {
748+
// In case of an uncaught exception: Call the callback that was passed to
749+
// `runTask` with the error.
750+
if (worker[kTaskInfo])
751+
worker[kTaskInfo].done(err, null);
752+
else
753+
this.emit('error', err);
754+
// Remove the worker from the list and start a new Worker to replace the
755+
// current one.
756+
this.workers.splice(this.workers.indexOf(worker), 1);
757+
this.addNewWorker();
758+
});
759+
this.workers.push(worker);
760+
this.freeWorkers.push(worker);
761+
}
762+
763+
runTask(task, callback) {
764+
if (this.freeWorkers.length === 0) {
765+
// No free threads, wait until a worker thread becomes free.
766+
this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
767+
return;
768+
}
769+
770+
const worker = this.freeWorkers.pop();
771+
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
772+
worker.postMessage(task);
773+
}
774+
775+
close() {
776+
for (const worker of this.workers) worker.terminate();
777+
}
778+
}
779+
780+
module.exports = WorkerPool;
781+
```
782+
783+
Without the explicit tracking added by the `WorkerPoolTaskInfo` objects,
784+
it would appear that the callbacks are associated with the individual `Worker`
785+
objects. However, the creation of the `Worker`s is not associated with the
786+
creation of the tasks and does not provide information about when tasks
787+
were scheduled.
788+
789+
This pool could be used as follows:
790+
791+
```js
792+
const WorkerPool = require('./worker_pool.js');
793+
const os = require('os');
794+
795+
const pool = new WorkerPool(os.cpus().length);
796+
797+
let finished = 0;
798+
for (let i = 0; i < 10; i++) {
799+
pool.runTask({ a: 42, b: 100 }, (err, result) => {
800+
console.log(i, err, result);
801+
if (++finished === 10)
802+
pool.close();
803+
});
804+
}
805+
```
806+
685807
[`after` callback]: #async_hooks_after_asyncid
686808
[`before` callback]: #async_hooks_before_asyncid
687809
[`destroy` callback]: #async_hooks_destroy_asyncid

doc/api/worker_threads.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ overhead of creating Workers would likely exceed their benefit.
5151

5252
When implementing a worker pool, use the [`AsyncResource`][] API to inform
5353
diagnostic tools (e.g. in order to provide asynchronous stack traces) about the
54-
correlation between tasks and their outcomes.
54+
correlation between tasks and their outcomes. See
55+
["Using `AsyncResource` for a `Worker` thread pool"][async-resource-worker-pool]
56+
in the `async_hooks` documentation for an example implementation.
5557

5658
## `worker.isMainThread`
5759
<!-- YAML
@@ -767,6 +769,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
767769
[`worker.terminate()`]: #worker_threads_worker_terminate
768770
[`worker.threadId`]: #worker_threads_worker_threadid_1
769771
[Addons worker support]: addons.html#addons_worker_support
772+
[async-resource-worker-pool]: async_hooks.html#async-resource-worker-pool
770773
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
771774
[Signals events]: process.html#process_signal_events
772775
[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API

0 commit comments

Comments
 (0)