-
Notifications
You must be signed in to change notification settings - Fork 65
/
unlimited_pool.go
164 lines (125 loc) · 3.88 KB
/
unlimited_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package pool
import (
"fmt"
"math"
"runtime"
"sync"
)
var _ Pool = new(unlimitedPool)
// unlimitedPool contains all information for an unlimited pool instance.
type unlimitedPool struct {
units []*workUnit
cancel chan struct{}
closed bool
m sync.Mutex
}
// New returns a new unlimited pool instance
func New() Pool {
p := &unlimitedPool{
units: make([]*workUnit, 0, 4), // init capacity to 4, assuming if using pool, then probably a few have at least that many and will reduce array resizes
}
p.initialize()
return p
}
func (p *unlimitedPool) initialize() {
p.cancel = make(chan struct{})
p.closed = false
}
// Queue queues the work to be run, and starts processing immediately
func (p *unlimitedPool) Queue(fn WorkFunc) WorkUnit {
w := &workUnit{
done: make(chan struct{}),
fn: fn,
}
p.m.Lock()
if p.closed {
w.err = &ErrPoolClosed{s: errClosed}
// if w.cancelled.Load() == nil {
close(w.done)
// }
p.m.Unlock()
return w
}
p.units = append(p.units, w)
go func(w *workUnit) {
defer func(w *workUnit) {
if err := recover(); err != nil {
trace := make([]byte, 1<<16)
n := runtime.Stack(trace, true)
s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))
w.cancelled.Store(struct{}{})
w.err = &ErrRecovery{s: s}
close(w.done)
}
}(w)
// support for individual WorkUnit cancellation
// and batch job cancellation
if w.cancelled.Load() == nil {
val, err := w.fn(w)
w.writing.Store(struct{}{})
// need to check again in case the WorkFunc cancelled this unit of work
// otherwise we'll have a race condition
if w.cancelled.Load() == nil && w.cancelling.Load() == nil {
w.value, w.err = val, err
// who knows where the Done channel is being listened to on the other end
// don't want this to block just because caller is waiting on another unit
// of work to be done first so we use close
close(w.done)
}
}
}(w)
p.m.Unlock()
return w
}
// Reset reinitializes a pool that has been closed/cancelled back to a working state.
// if the pool has not been closed/cancelled, nothing happens as the pool is still in
// a valid running state
func (p *unlimitedPool) Reset() {
p.m.Lock()
if !p.closed {
p.m.Unlock()
return
}
// cancelled the pool, not closed it, pool will be usable after calling initialize().
p.initialize()
p.m.Unlock()
}
func (p *unlimitedPool) closeWithError(err error) {
p.m.Lock()
if !p.closed {
close(p.cancel)
p.closed = true
// clear out array values for garbage collection, but reuse array just in case going to reuse
// go in reverse order to try and cancel as many as possbile
// one at end are less likely to have run than those at the beginning
for i := len(p.units) - 1; i >= 0; i-- {
p.units[i].cancelWithError(err)
p.units[i] = nil
}
p.units = p.units[0:0]
}
p.m.Unlock()
}
// Cancel cleans up the pool workers and channels and cancels and pending
// work still yet to be processed.
// call Reset() to reinitialize the pool for use.
func (p *unlimitedPool) Cancel() {
err := &ErrCancelled{s: errCancelled}
p.closeWithError(err)
}
// Close cleans up the pool workers and channels and cancels any pending
// work still yet to be processed.
// call Reset() to reinitialize the pool for use.
func (p *unlimitedPool) Close() {
err := &ErrPoolClosed{s: errClosed}
p.closeWithError(err)
}
// Batch creates a new Batch object for queueing Work Units separate from any others
// that may be running on the pool. Grouping these Work Units together allows for individual
// Cancellation of the Batch Work Units without affecting anything else running on the pool
// as well as outputting the results on a channel as they complete.
// NOTE: Batch is not reusable, once QueueComplete() has been called it's lifetime has been sealed
// to completing the Queued items.
func (p *unlimitedPool) Batch() Batch {
return newBatch(p)
}