-
Notifications
You must be signed in to change notification settings - Fork 0
/
chantask.go
191 lines (172 loc) · 4.43 KB
/
chantask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package chantask
import (
"sync"
)
type ChanTask struct {
closeSend chan struct{}
closeRecv chan struct{}
ret chan interface{}
wgSenders sync.WaitGroup
wgReceivers sync.WaitGroup
routineSega chan struct{}
senders []SenderFunc
receivers []ReceiverFunc
senderArgs []interface{}
receiverArgs []interface{}
started bool
ended bool
}
type SenderFunc func(task *ChanTask, args ...interface{})
type ReceiverFunc func(task *ChanTask, args ...interface{})
// First create a task. The task manage a chan by which you can send/receive any data.
// Then add some senders which can send data to this chan and add some receivers to receive these data from it.
// Finally call task.Start() and senders and receivers will run on goroutines automatically.
// You can determine the buffer size of the chan and how many senders can run at the same time at most.
func CreateChanTask(chanBufferSize, routineNum int) *ChanTask {
if routineNum <= 0 {
routineNum = 1
}
return &ChanTask{
closeSend: make(chan struct{}),
closeRecv: make(chan struct{}),
ret: make(chan interface{}, chanBufferSize),
routineSega: make(chan struct{}, routineNum),
}
}
// Please be careful of the `args`. If you use a param in the SenderFunc and the param is from outside of the
// SenderFunc, you'd better pass the param by the `args ...interface{}` unless the param is stable which means you must
// make sure it will not change until task stops.
func (task *ChanTask) AddSender(senderFunc SenderFunc, args ...interface{}) error {
if task.ended {
return ErrTaskStoped
}
if task.started {
return ErrTaskIsRunning
}
task.senders = append(task.senders, senderFunc)
task.senderArgs = append(task.senderArgs, args)
return nil
}
// Please be careful of the `args`. @see AddSender
func (task *ChanTask) AddReceiver(receiverFunc ReceiverFunc, args ...interface{}) error {
if task.ended {
return ErrTaskStoped
}
if task.started {
return ErrTaskIsRunning
}
task.receivers = append(task.receivers, receiverFunc)
task.receiverArgs = append(task.receiverArgs, args)
return nil
}
func (task *ChanTask) Start() error {
if task.ended {
return ErrTaskStoped
}
if task.started {
return ErrTaskIsRunning
}
task.started = true
// sender part
go func() {
needBlockRoutine := cap(task.routineSega) < len(task.senders)
for k, produceFn := range task.senders {
task.wgSenders.Add(1)
if task.IsSendStopped() {
task.wgSenders.Done()
break
}
if needBlockRoutine {
task.routineSega <- struct{}{} //block here when task.routineSega reaches max buffer size
}
go func(fn SenderFunc, args []interface{}) {
defer func() {
task.wgSenders.Done()
if needBlockRoutine {
<-task.routineSega
}
}()
if task.IsSendStopped() {
return
}
fn(task, args...)
}(produceFn, task.senderArgs[k].([]interface{}))
}
task.wgSenders.Wait()
close(task.ret) // Close task.ret to make sure Senders not blocked when no data in the chan.
}()
// receiver part
for k, recdFn := range task.receivers {
task.wgReceivers.Add(1)
go func(fn ReceiverFunc, args []interface{}) {
defer func() {
task.wgReceivers.Done()
}()
if task.IsReceiveStopped() {
return
}
fn(task, args...)
}(recdFn, task.receiverArgs[k].([]interface{}))
}
task.wgSenders.Wait()
task.wgReceivers.Wait()
task.ended = true
for range task.ret {
}
return nil
}
func (task *ChanTask) IsSendStopped() bool {
select {
case <-task.closeSend:
return true
default:
return false
}
}
func (task *ChanTask) Send(data interface{}) (ok bool) {
select {
case <-task.closeSend:
return false
default:
task.ret <- data
return true
}
}
// When StopSend is called, all the running Senders will
// stop send data and the waiting Senders will not run.
func (task *ChanTask) StopSend() {
select {
case <-task.closeSend:
default:
close(task.closeSend)
}
}
func (task *ChanTask) IsReceiveStopped() bool {
select {
case <-task.closeRecv:
return true
default:
return false
}
}
func (task *ChanTask) Receive() (data interface{}, ok bool) {
select {
case <-task.closeRecv:
return nil, false
default:
v, ok := <-task.ret
if !ok {
return v, false
}
return v, true
}
}
// When StopReceive is called, all the running Receivers will
// receive a nil data and the waring Receivers will not run.
func (task *ChanTask) StopReceive() {
select {
case <-task.closeRecv:
default:
close(task.closeRecv)
}
}