-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.js
93 lines (72 loc) · 1.99 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
module.exports = ParallelQueue
function ParallelQueue (parallel, worker) {
if (!(this instanceof ParallelQueue)) return new ParallelQueue(parallel, worker)
this._worker = worker
this._queue = []
this._running = []
this._runningCount = 0
this.parallel = parallel
this.destroyed = false
}
Object.defineProperty(ParallelQueue.prototype, 'pending', {
enumerable: true,
get: function () {
return this._queue.length
}
})
Object.defineProperty(ParallelQueue.prototype, 'running', {
enumerable: true,
get: function () {
return this._runningCount
}
})
ParallelQueue.prototype.push = function (task, cb) {
if (this.destroyed === true) return cb(new Error('Already destroyed'))
var args = {
task: task,
cb: cb
}
this._queue.push(args)
process.nextTick(this._kick.bind(this))
return this._cancel.bind(this, args)
}
ParallelQueue.prototype.destroy = function (err) {
while (this._queue.length) this._cancel(this._queue[0], err)
while (this._running.length) this._cancel(this._running[0], err)
this.destroyed = true
}
ParallelQueue.prototype._cancel = function (args, err) {
var qidx = this._queue.indexOf(args)
if (qidx >= 0) {
this._queue.splice(qidx, 1)
}
var ridx = this._running.indexOf(args)
if (ridx >= 0) {
this._running.splice(ridx, 1)
}
if (ridx < 0 && qidx < 0) return
if (err == null) {
err = new Error('Cancelled operation')
err.cancel = true
}
args.cb(err)
process.nextTick(this._kick.bind(this))
}
ParallelQueue.prototype._kick = function () {
var self = this
if (self._runningCount >= self.parallel) return
var args = self._queue.shift()
if (args == null) return
self._running.push(args)
self._runningCount++
self._worker(args.task, done)
function done (err, res1, res2, res3) {
self._runningCount--
var ridx = self._running.indexOf(args)
if (ridx >= 0) {
self._running.splice(ridx, 1)
args.cb(err, res1, res2, res3)
}
process.nextTick(self._kick.bind(self))
}
}