forked from zenthangplus/goccm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
goccm.go
141 lines (112 loc) · 3.38 KB
/
goccm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package goccm
import "sync/atomic"
type (
// ConcurrencyManager Interface
ConcurrencyManager interface {
// Wait until a slot is available for the new goroutine.
Wait()
// Done Mark a goroutine as finished
Done()
// Close the manager manually
Close()
// WaitAllDone Wait for all goroutines are done
WaitAllDone()
// RunningCount Returns the number of goroutines which are running
RunningCount() int32
}
concurrencyManager struct {
// The number of goroutines that are allowed to run concurrently
max int
// The manager channel to coordinate the number of concurrent goroutines.
managerCh chan interface{}
// The done channel indicates when a single goroutine has finished its job.
doneCh chan bool
// This channel indicates when all goroutines have finished their job.
allDoneCh chan bool
// The closed channel is closed which controller should close
closed chan bool
// The running count allows we know the number of goroutines are running
runningCount int32
}
)
// New concurrencyManager
func New(maxGoRoutines int) *concurrencyManager {
// Initiate the manager object
c := concurrencyManager{
max: maxGoRoutines,
managerCh: make(chan interface{}, maxGoRoutines),
doneCh: make(chan bool),
allDoneCh: make(chan bool),
closed: make(chan bool),
}
// Fill the manager channel by placeholder values
for i := 0; i < c.max; i++ {
c.managerCh <- nil
}
// Start the controller to collect all the jobs
go c.controller()
return &c
}
// Create the controller to collect all the jobs.
// When a goroutine is finished, we can release a slot for another goroutine.
func (c *concurrencyManager) controller() {
for {
// This will block until a goroutine is finished
<-c.doneCh
// Say that another goroutine can now start
c.managerCh <- nil
// When the closed flag is set,
// we need to close the manager if it doesn't have any running goroutine
if c.isClosed() && c.RunningCount() == 0 {
break
}
}
// Say that all goroutines are finished, we can close the manager
c.allDoneCh <- true
}
// Wait until a slot is available for the new goroutine.
// A goroutine have to start after this function.
func (c *concurrencyManager) Wait() {
// Try to receive from the manager channel. When we have something,
// it means a slot is available and we can start a new goroutine.
// Otherwise, it will block until a slot is available.
<-c.managerCh
// Increase the running count to help we know how many goroutines are running.
atomic.AddInt32(&c.runningCount, 1)
}
// Done Mark a goroutine as finished
func (c *concurrencyManager) Done() {
// Decrease the number of running count
atomic.AddInt32(&c.runningCount, -1)
c.doneCh <- true
}
// Close the manager manually
// terminate if channel is already closed
func (c *concurrencyManager) Close() {
// terminate if channel is already closed
select {
case <-c.closed:
return
default:
close(c.closed)
}
}
func (c *concurrencyManager) isClosed() bool {
select {
case <-c.closed:
return true
default:
return false
}
}
// WaitAllDone Wait for all goroutines are done
func (c *concurrencyManager) WaitAllDone() {
// Close the manager automatic
c.Close()
// This will block until allDoneCh was marked
<-c.allDoneCh
}
// RunningCount Returns the number of goroutines which are running
func (c *concurrencyManager) RunningCount() int32 {
return atomic.AddInt32(&c.runningCount, 0)
}