-
-
Notifications
You must be signed in to change notification settings - Fork 6.5k
/
Copy pathBaseWorkerPool.ts
127 lines (103 loc) · 3.34 KB
/
BaseWorkerPool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
* Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
import * as path from 'path';
import mergeStream = require('merge-stream');
import {
CHILD_MESSAGE_END,
PoolExitResult,
WorkerInterface,
WorkerOptions,
WorkerPoolOptions,
} from '../types';
// How long to wait for the child process to terminate
// after CHILD_MESSAGE_END before sending force exiting.
const FORCE_EXIT_DELAY = 500;
/* istanbul ignore next */
const emptyMethod = () => {};
export default class BaseWorkerPool {
private readonly _stderr: NodeJS.ReadableStream;
private readonly _stdout: NodeJS.ReadableStream;
protected readonly _options: WorkerPoolOptions;
private readonly _workers: Array<WorkerInterface>;
constructor(workerPath: string, options: WorkerPoolOptions) {
this._options = options;
this._workers = new Array(options.numWorkers);
if (!path.isAbsolute(workerPath)) {
workerPath = require.resolve(workerPath);
}
const stdout = mergeStream();
const stderr = mergeStream();
const {forkOptions, maxRetries, resourceLimits, setupArgs} = options;
for (let i = 0; i < options.numWorkers; i++) {
const workerOptions: WorkerOptions = {
forkOptions,
maxRetries,
resourceLimits,
setupArgs,
workerId: i,
workerPath,
};
const worker = this.createWorker(workerOptions);
const workerStdout = worker.getStdout();
const workerStderr = worker.getStderr();
if (workerStdout) {
stdout.add(workerStdout);
}
if (workerStderr) {
stderr.add(workerStderr);
}
this._workers[i] = worker;
}
this._stdout = stdout;
this._stderr = stderr;
}
getStderr(): NodeJS.ReadableStream {
return this._stderr;
}
getStdout(): NodeJS.ReadableStream {
return this._stdout;
}
getWorkers(): Array<WorkerInterface> {
return this._workers;
}
getWorkerById(workerId: number): WorkerInterface {
return this._workers[workerId];
}
createWorker(_workerOptions: WorkerOptions): WorkerInterface {
throw Error('Missing method createWorker in WorkerPool');
}
async end(): Promise<PoolExitResult> {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
const workerExitPromises = this._workers.map(async worker => {
worker.send(
[CHILD_MESSAGE_END, false],
emptyMethod,
emptyMethod,
emptyMethod,
);
// Schedule a force exit in case worker fails to exit gracefully so
// await worker.waitForExit() never takes longer than FORCE_EXIT_DELAY
let forceExited = false;
const forceExitTimeout = setTimeout(() => {
worker.forceExit();
forceExited = true;
}, FORCE_EXIT_DELAY);
await worker.waitForExit();
// Worker ideally exited gracefully, don't send force exit then
clearTimeout(forceExitTimeout);
return forceExited;
});
const workerExits = await Promise.all(workerExitPromises);
return workerExits.reduce<PoolExitResult>(
(result, forceExited) => ({
forceExited: result.forceExited || forceExited,
}),
{forceExited: false},
);
}
}