-
Notifications
You must be signed in to change notification settings - Fork 0
/
waitGroup.go
170 lines (135 loc) · 3.69 KB
/
waitGroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package errors
import (
"context"
"sync"
)
// WaitGroup is sync.WaitGroup with error support.
type WaitGroup struct {
noCopy noCopy
options *WaitGroupOptions
errors MultiError
gch chan struct{}
ctx context.Context
cancel context.CancelCauseFunc
cancelOnce sync.Once
}
// NewWaitGroup create new WaitGroup.
func NewWaitGroup(options ...WaitGroupOption) *WaitGroup {
ops := &WaitGroupOptions{
Wg: &sync.WaitGroup{},
TaskRunner: func(task func()) { go task() },
Ctx: context.Background(),
}
for _, op := range options {
op(ops)
}
var gch chan struct{}
if ops.TaskLimit > 0 {
gch = make(chan struct{}, ops.TaskLimit)
}
ctx, cancel := context.WithCancelCause(ops.Ctx)
return &WaitGroup{options: ops, gch: gch, ctx: ctx, cancel: cancel}
}
// Context of current waitGroup.
func (g *WaitGroup) Context() context.Context {
return g.ctx
}
// Stop send cancel signal to all tasks.
func (g *WaitGroup) Stop(err error) {
g.cancelOnce.Do(func() { g.cancel(err) })
}
// Wait is sync.WaitGroup.Wait.
func (g *WaitGroup) Wait() (err error) {
g.options.Wg.Wait()
defer func() { g.Stop(err) }()
return g.errors.Err()
}
// Add is sync.WaitGroup.Add.
func (g *WaitGroup) Add(delta int) {
g.options.Wg.Add(delta)
}
// Done is sync.WaitGroup.Done, but is support error as parameter.
func (g *WaitGroup) Done(err error) {
g.options.Wg.Done()
if err == nil {
return
}
if g.options.StopOnError {
g.Stop(err)
}
g.errors.Add(err)
}
// Do run the given function in a new goroutine with internal context.
func (g *WaitGroup) Do(f func(ctx context.Context) error) {
g.DoWithContext(g.ctx, f)
}
// DoWithContext run the given function in a new goroutine with custom context.
// if passed context is nil, use the internal context.
func (g *WaitGroup) DoWithContext(ctx context.Context, f func(ctx context.Context) error) {
if ctx == nil {
ctx = g.ctx
}
if g.gch != nil {
g.gch <- struct{}{}
}
g.Add(1)
g.options.TaskRunner(func() {
g.Done(f(ctx))
if g.gch != nil {
<-g.gch
}
})
}
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
// WaitGroupOptions for WaitGroup.
type WaitGroupOptions struct {
Wg *sync.WaitGroup
TaskLimit int
TaskRunner WaitGroupTaskRunner
StopOnError bool
Ctx context.Context
}
type WaitGroupOption func(group *WaitGroupOptions)
type WaitGroupTaskRunner func(task func())
// WaitGroupWithSyncWaitGroup used if you want to use parent sync.WaitGroup.
func WaitGroupWithSyncWaitGroup(wg *sync.WaitGroup) WaitGroupOption {
return func(g *WaitGroupOptions) {
g.Wg = wg
}
}
// WaitGroupWithTaskLimit used if you want set limitation for task count.
// this option works only for Do method.
func WaitGroupWithTaskLimit(limit int) WaitGroupOption {
return func(g *WaitGroupOptions) {
g.TaskLimit = limit
}
}
// WaitGroupWithTaskRunner used if you want your custom task runner instead of Go routine.
func WaitGroupWithTaskRunner(runner WaitGroupTaskRunner) WaitGroupOption {
return func(g *WaitGroupOptions) {
g.TaskRunner = runner
}
}
// WaitGroupWithStopOnError used if you want to stop all tasks on first error.
func WaitGroupWithStopOnError() WaitGroupOption {
return func(g *WaitGroupOptions) {
g.StopOnError = true
}
}
// WaitGroupWithContext if you want to pass your context.
func WaitGroupWithContext(ctx context.Context) WaitGroupOption {
return func(g *WaitGroupOptions) {
g.Ctx = ctx
}
}
// WaitChanel turn WaitGroup Wait method into chanel.
func WaitChanel(wg *WaitGroup) chan error {
wait := make(chan error)
go func() { wait <- wg.Wait() }()
return wait
}