1+ import { AsyncResource } from 'async_hooks' ;
12import { Worker } from 'worker_threads' ;
23import { cpus } from 'os' ;
34import { EventEmitter } from 'events' ;
45
56import serializeJavascript from 'serialize-javascript' ;
67
8+ import { freeWorker , taskInfo } from './constants' ;
9+
710import type {
811 WorkerCallback ,
912 WorkerContext ,
1013 WorkerOutput ,
1114 WorkerPoolOptions ,
12- WorkerPoolTask
15+ WorkerPoolTask ,
16+ WorkerWithTaskInfo
1317} from './type' ;
1418
15- const symbol = Symbol . for ( 'FreeWoker' ) ;
19+ class WorkerPoolTaskInfo extends AsyncResource {
20+ constructor ( private callback : WorkerCallback ) {
21+ super ( 'WorkerPoolTaskInfo' ) ;
22+ }
23+
24+ done ( err : Error | null , result : any ) {
25+ this . runInAsyncScope ( this . callback , null , err , result ) ;
26+ this . emitDestroy ( ) ;
27+ }
28+ }
1629
1730export class WorkerPool extends EventEmitter {
1831 protected maxInstances : number ;
@@ -21,37 +34,30 @@ export class WorkerPool extends EventEmitter {
2134
2235 protected tasks : WorkerPoolTask [ ] = [ ] ;
2336
24- protected workers = 0 ;
37+ protected workers : WorkerWithTaskInfo [ ] = [ ] ;
38+ protected freeWorkers : WorkerWithTaskInfo [ ] = [ ] ;
2539
2640 constructor ( options : WorkerPoolOptions ) {
2741 super ( ) ;
2842
2943 this . maxInstances = options . maxWorkers || cpus ( ) . length ;
3044 this . filePath = options . filePath ;
3145
32- this . on ( symbol , ( ) => {
46+ this . on ( freeWorker , ( ) => {
3347 if ( this . tasks . length > 0 ) {
34- this . run ( ) ;
48+ const { context, cb } = this . tasks . shift ( ) ! ;
49+ this . runTask ( context , cb ) ;
3550 }
3651 } ) ;
3752 }
3853
39- add ( context : WorkerContext , cb : WorkerCallback ) {
40- this . tasks . push ( {
41- context,
42- cb
43- } ) ;
44-
45- if ( this . workers >= this . maxInstances ) {
46- return ;
47- }
48-
49- this . run ( ) ;
54+ get numWorkers ( ) : number {
55+ return this . workers . length ;
5056 }
5157
52- async addAsync ( context : WorkerContext ) : Promise < WorkerOutput > {
58+ addAsync ( context : WorkerContext ) : Promise < WorkerOutput > {
5359 return new Promise ( ( resolve , reject ) => {
54- this . add ( context , ( err , output ) => {
60+ this . runTask ( context , ( err , output ) => {
5561 if ( err ) {
5662 reject ( err ) ;
5763 return ;
@@ -67,51 +73,54 @@ export class WorkerPool extends EventEmitter {
6773 } ) ;
6874 }
6975
70- private run ( ) {
71- if ( this . tasks . length === 0 ) {
72- return ;
73- }
74-
75- const task = this . tasks . shift ( ) ;
76-
77- if ( typeof task === 'undefined' ) {
78- return ;
76+ close ( ) {
77+ for ( let i = 0 ; i < this . workers . length ; i ++ ) {
78+ const worker = this . workers [ i ] ;
79+ worker . terminate ( ) ;
7980 }
81+ }
8082
81- this . workers += 1 ;
82-
83- let called = false ;
84- const callCallback = ( err : Error | null , output ?: WorkerOutput ) => {
85- if ( called ) {
86- return ;
87- }
88- called = true ;
89-
90- this . workers -= 1 ;
91-
92- task . cb ( err , output ) ;
93- this . emit ( symbol ) ;
94- } ;
95-
96- const worker = new Worker ( this . filePath , {
97- workerData : {
98- code : task . context . code ,
99- options : serializeJavascript ( task . context . options )
100- }
101- } ) ;
83+ private addNewWorker ( ) {
84+ const worker : WorkerWithTaskInfo = new Worker ( this . filePath ) ;
10285
103- worker . on ( 'message' , ( data ) => {
104- callCallback ( null , data ) ;
86+ worker . on ( 'message' , ( result ) => {
87+ worker [ taskInfo ] ?. done ( null , result ) ;
88+ worker [ taskInfo ] = null ;
89+ this . freeWorkers . push ( worker ) ;
90+ this . emit ( freeWorker ) ;
10591 } ) ;
10692
10793 worker . on ( 'error' , ( err ) => {
108- callCallback ( err ) ;
94+ if ( worker [ taskInfo ] ) {
95+ worker [ taskInfo ] . done ( err , null ) ;
96+ } else {
97+ this . emit ( 'error' , err ) ;
98+ }
99+ this . workers . splice ( this . workers . indexOf ( worker ) , 1 ) ;
100+ this . addNewWorker ( ) ;
109101 } ) ;
110102
111- worker . on ( 'exit' , ( code ) => {
112- if ( code !== 0 ) {
113- callCallback ( new Error ( `Minify worker stopped with exit code ${ code } ` ) ) ;
103+ this . workers . push ( worker ) ;
104+ this . freeWorkers . push ( worker ) ;
105+ this . emit ( freeWorker ) ;
106+ }
107+
108+ private runTask ( context : WorkerContext , cb : WorkerCallback ) {
109+ if ( this . freeWorkers . length === 0 ) {
110+ this . tasks . push ( { context, cb } ) ;
111+ if ( this . numWorkers < this . maxInstances ) {
112+ this . addNewWorker ( ) ;
114113 }
115- } ) ;
114+ return ;
115+ }
116+
117+ const worker = this . freeWorkers . pop ( ) ;
118+ if ( worker ) {
119+ worker [ taskInfo ] = new WorkerPoolTaskInfo ( cb ) ;
120+ worker . postMessage ( {
121+ code : context . code ,
122+ options : serializeJavascript ( context . options )
123+ } ) ;
124+ }
116125 }
117126}
0 commit comments