@@ -682,6 +682,128 @@ never be called.
682
682
* Returns: {number} The same ` triggerAsyncId ` that is passed to the
683
683
` AsyncResource ` constructor.
684
684
685
+ <a id =" async-resource-worker-pool " ></a >
686
+ ### Using ` AsyncResource ` for a ` Worker ` thread pool
687
+
688
+ The following example shows how to use the ` AsyncResource ` class to properly
689
+ provide async tracking for a [ ` Worker ` ] [ ] pool. Other resource pools, such as
690
+ database connection pools, can follow a similar model.
691
+
692
+ Assuming that the task is adding two numbers, using a file named
693
+ ` task_processor.js ` with the following content:
694
+
695
+ ``` js
696
+ const { parentPort } = require (' worker_threads' );
697
+ parentPort .on (' message' , (task ) => {
698
+ parentPort .postMessage (task .a + task .b );
699
+ });
700
+ ```
701
+
702
+ a Worker pool around it could use the following structure:
703
+
704
+ ``` js
705
+ const { AsyncResource } = require (' async_hooks' );
706
+ const { EventEmitter } = require (' events' );
707
+ const path = require (' path' );
708
+ const { Worker } = require (' worker_threads' );
709
+
710
+ const kTaskInfo = Symbol (' kTaskInfo' );
711
+ const kWorkerFreedEvent = Symbol (' kWorkerFreedEvent' );
712
+
713
+ class WorkerPoolTaskInfo extends AsyncResource {
714
+ constructor (callback ) {
715
+ super (' WorkerPoolTaskInfo' );
716
+ this .callback = callback;
717
+ }
718
+
719
+ done (err , result ) {
720
+ this .runInAsyncScope (this .callback , null , err, result);
721
+ this .emitDestroy (); // `TaskInfo`s are used only once.
722
+ }
723
+ }
724
+
725
+ class WorkerPool extends EventEmitter {
726
+ constructor (numThreads ) {
727
+ super ();
728
+ this .numThreads = numThreads;
729
+ this .workers = [];
730
+ this .freeWorkers = [];
731
+
732
+ for (let i = 0 ; i < numThreads; i++ )
733
+ this .addNewWorker ();
734
+ }
735
+
736
+ addNewWorker () {
737
+ const worker = new Worker (path .resolve (__dirname , ' task_processor.js' ));
738
+ worker .on (' message' , (result ) => {
739
+ // In case of success: Call the callback that was passed to `runTask`,
740
+ // remove the `TaskInfo` associated with the Worker, and mark it as free
741
+ // again.
742
+ worker[kTaskInfo].done (null , result);
743
+ worker[kTaskInfo] = null ;
744
+ this .freeWorkers .push (worker);
745
+ this .emit (kWorkerFreedEvent);
746
+ });
747
+ worker .on (' error' , (err ) => {
748
+ // In case of an uncaught exception: Call the callback that was passed to
749
+ // `runTask` with the error.
750
+ if (worker[kTaskInfo])
751
+ worker[kTaskInfo].done (err, null );
752
+ else
753
+ this .emit (' error' , err);
754
+ // Remove the worker from the list and start a new Worker to replace the
755
+ // current one.
756
+ this .workers .splice (this .workers .indexOf (worker), 1 );
757
+ this .addNewWorker ();
758
+ });
759
+ this .workers .push (worker);
760
+ this .freeWorkers .push (worker);
761
+ }
762
+
763
+ runTask (task , callback ) {
764
+ if (this .freeWorkers .length === 0 ) {
765
+ // No free threads, wait until a worker thread becomes free.
766
+ this .once (kWorkerFreedEvent, () => this .runTask (task, callback));
767
+ return ;
768
+ }
769
+
770
+ const worker = this .freeWorkers .pop ();
771
+ worker[kTaskInfo] = new WorkerPoolTaskInfo (callback);
772
+ worker .postMessage (task);
773
+ }
774
+
775
+ close () {
776
+ for (const worker of this .workers ) worker .terminate ();
777
+ }
778
+ }
779
+
780
+ module .exports = WorkerPool;
781
+ ```
782
+
783
+ Without the explicit tracking added by the ` WorkerPoolTaskInfo ` objects,
784
+ it would appear that the callbacks are associated with the individual ` Worker `
785
+ objects. However, the creation of the ` Worker ` s is not associated with the
786
+ creation of the tasks and does not provide information about when tasks
787
+ were scheduled.
788
+
789
+ This pool could be used as follows:
790
+
791
+ ``` js
792
+ const WorkerPool = require (' ./worker_pool.js' );
793
+ const os = require (' os' );
794
+
795
+ const pool = new WorkerPool (os .cpus ().length );
796
+
797
+ let finished = 0 ;
798
+ for (let i = 0 ; i < 10 ; i++ ) {
799
+ pool .runTask ({ a: 42 , b: 100 }, (err , result ) => {
800
+ console .log (i, err, result);
801
+ if (++ finished === 10 )
802
+ pool .close ();
803
+ });
804
+ }
805
+ ```
806
+
685
807
[ `after` callback ] : #async_hooks_after_asyncid
686
808
[ `before` callback ] : #async_hooks_before_asyncid
687
809
[ `destroy` callback ] : #async_hooks_destroy_asyncid
0 commit comments