-
Notifications
You must be signed in to change notification settings - Fork 19
/
index.js
74 lines (61 loc) · 1.76 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
'use strict'
const {Worker} = require('worker_threads')
const {AsyncResource} = require('async_hooks')
const afterAll = require('after-all')
const noop = function () {}
module.exports = class Pool {
constructor (opts) {
opts = opts || {}
this._workers = new Set()
this._queue = []
this._max = opts.max || 1
this._maxWaiting = opts.maxWaiting || Infinity
}
get size () {
return this._workers.size
}
acquire (filename, opts, cb) {
if (typeof opts === 'function') return this.acquire(filename, undefined, opts)
if (this._workers.size === this._max) {
if (this._queue.length === this._maxWaiting) {
process.nextTick(cb.bind(null, new Error('Pool queue is full')))
return
}
this._queue.push(new QueuedWorkerThread(this, filename, opts, cb))
return
}
const self = this
const worker = new Worker(filename, opts)
worker.once('error', done)
worker.once('exit', done)
this._workers.add(worker)
process.nextTick(cb.bind(null, null, worker))
function done () {
self._workers.delete(worker)
worker.removeListener('error', done)
worker.removeListener('exit', done)
const resource = self._queue.shift()
if (resource) resource.addToPool()
}
}
destroy (cb = noop) {
const next = afterAll(cb)
for (let worker of this._workers) {
worker.terminate(next())
}
}
}
class QueuedWorkerThread extends AsyncResource {
constructor (pool, filename, opts, cb) {
super('worker-threads-pool:enqueue')
this.pool = pool
this.filename = filename
this.opts = opts
this.cb = cb
}
addToPool () {
this.pool.acquire(this.filename, this.opts, (err, worker) => {
this.runInAsyncScope(this.cb, null, err, worker)
})
}
}