@@ -14,7 +14,7 @@ type executor struct {
14
14
cancel context.CancelFunc
15
15
logger Logger
16
16
stopCh chan struct {}
17
- jobsIDsIn chan uuid. UUID
17
+ jobsIn chan jobIn
18
18
jobIDsOut chan uuid.UUID
19
19
jobOutRequest chan jobOutRequest
20
20
stopTimeout time.Duration
@@ -25,8 +25,13 @@ type executor struct {
25
25
locker Locker
26
26
}
27
27
28
+ type jobIn struct {
29
+ id uuid.UUID
30
+ shouldSendOut bool
31
+ }
32
+
28
33
type singletonRunner struct {
29
- in chan uuid. UUID
34
+ in chan jobIn
30
35
rescheduleLimiter chan struct {}
31
36
}
32
37
@@ -35,7 +40,7 @@ type limitModeConfig struct {
35
40
mode LimitMode
36
41
limit uint
37
42
rescheduleLimiter chan struct {}
38
- in chan uuid. UUID
43
+ in chan jobIn
39
44
// singletonJobs is used to track singleton jobs that are running
40
45
// in the limit mode runner. This is used to prevent the same job
41
46
// from running multiple times across limit mode runners when both
@@ -72,7 +77,7 @@ func (e *executor) start() {
72
77
// are run immediately.
73
78
// 2. sent from time.AfterFuncs in which job schedules
74
79
// are spun up by the scheduler
75
- case id := <- e .jobsIDsIn :
80
+ case jIn := <- e .jobsIn :
76
81
select {
77
82
case <- e .stopCh :
78
83
e .stop (standardJobsWg , singletonJobsWg , limitModeJobsWg )
@@ -111,14 +116,16 @@ func (e *executor) start() {
111
116
// the executor from building up a waiting queue
112
117
// and forces rescheduling
113
118
case e .limitMode .rescheduleLimiter <- struct {}{}:
114
- e .limitMode .in <- id
119
+ e .limitMode .in <- jIn
115
120
default :
116
121
// all runners are busy, reschedule the work for later
117
122
// which means we just skip it here and do nothing
118
123
// TODO when metrics are added, this should increment a rescheduled metric
119
- select {
120
- case e .jobIDsOut <- id :
121
- default :
124
+ if jIn .shouldSendOut {
125
+ select {
126
+ case e .jobIDsOut <- jIn .id :
127
+ default :
128
+ }
122
129
}
123
130
}
124
131
} else {
@@ -127,51 +134,53 @@ func (e *executor) start() {
127
134
// to work through the channel backlog. A hard limit of 1000 is in place
128
135
// at which point this call would block.
129
136
// TODO when metrics are added, this should increment a wait metric
130
- e .limitMode .in <- id
137
+ e .limitMode .in <- jIn
131
138
}
132
139
} else {
133
140
// no limit mode, so we're either running a regular job or
134
141
// a job with a singleton mode
135
142
//
136
143
// get the job, so we can figure out what kind it is and how
137
144
// to execute it
138
- j := requestJobCtx (ctx , id , e .jobOutRequest )
145
+ j := requestJobCtx (ctx , jIn . id , e .jobOutRequest )
139
146
if j == nil {
140
147
// safety check as it'd be strange bug if this occurred
141
148
return
142
149
}
143
150
if j .singletonMode {
144
151
// for singleton mode, get the existing runner for the job
145
152
// or spin up a new one
146
- runner , ok := e .singletonRunners [id ]
153
+ runner , ok := e .singletonRunners [jIn . id ]
147
154
if ! ok {
148
- runner .in = make (chan uuid. UUID , 1000 )
155
+ runner .in = make (chan jobIn , 1000 )
149
156
if j .singletonLimitMode == LimitModeReschedule {
150
157
runner .rescheduleLimiter = make (chan struct {}, 1 )
151
158
}
152
- e .singletonRunners [id ] = runner
159
+ e .singletonRunners [jIn . id ] = runner
153
160
singletonJobsWg .Add (1 )
154
- go e .singletonModeRunner ("singleton-" + id .String (), runner .in , singletonJobsWg , j .singletonLimitMode , runner .rescheduleLimiter )
161
+ go e .singletonModeRunner ("singleton-" + jIn . id .String (), runner .in , singletonJobsWg , j .singletonLimitMode , runner .rescheduleLimiter )
155
162
}
156
163
157
164
if j .singletonLimitMode == LimitModeReschedule {
158
165
// reschedule mode uses the limiter channel to check
159
166
// for a running job and reschedules if the channel is full.
160
167
select {
161
168
case runner .rescheduleLimiter <- struct {}{}:
162
- runner .in <- id
169
+ runner .in <- jIn
163
170
default :
164
171
// runner is busy, reschedule the work for later
165
172
// which means we just skip it here and do nothing
166
173
// TODO when metrics are added, this should increment a rescheduled metric
167
- select {
168
- case e .jobIDsOut <- id :
169
- default :
174
+ if jIn .shouldSendOut {
175
+ select {
176
+ case e .jobIDsOut <- jIn .id :
177
+ default :
178
+ }
170
179
}
171
180
}
172
181
} else {
173
182
// wait mode, fill up that queue (buffered channel, so it's ok)
174
- runner .in <- id
183
+ runner .in <- jIn
175
184
}
176
185
} else {
177
186
select {
@@ -187,7 +196,7 @@ func (e *executor) start() {
187
196
// complete.
188
197
standardJobsWg .Add (1 )
189
198
go func (j internalJob ) {
190
- e .runJob (j )
199
+ e .runJob (j , jIn . shouldSendOut )
191
200
standardJobsWg .Done ()
192
201
}(* j )
193
202
}
@@ -200,11 +209,11 @@ func (e *executor) start() {
200
209
}
201
210
}
202
211
203
- func (e * executor ) limitModeRunner (name string , in chan uuid. UUID , wg * waitGroupWithMutex , limitMode LimitMode , rescheduleLimiter chan struct {}) {
212
+ func (e * executor ) limitModeRunner (name string , in chan jobIn , wg * waitGroupWithMutex , limitMode LimitMode , rescheduleLimiter chan struct {}) {
204
213
e .logger .Debug ("gocron: limitModeRunner starting" , "name" , name )
205
214
for {
206
215
select {
207
- case id := <- in :
216
+ case jIn := <- in :
208
217
select {
209
218
case <- e .ctx .Done ():
210
219
e .logger .Debug ("gocron: limitModeRunner shutting down" , "name" , name )
@@ -214,24 +223,28 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
214
223
}
215
224
216
225
ctx , cancel := context .WithCancel (e .ctx )
217
- j := requestJobCtx (ctx , id , e .jobOutRequest )
226
+ j := requestJobCtx (ctx , jIn . id , e .jobOutRequest )
218
227
cancel ()
219
228
if j != nil {
220
229
if j .singletonMode {
221
230
e .limitMode .singletonJobsMu .Lock ()
222
- _ , ok := e .limitMode .singletonJobs [id ]
231
+ _ , ok := e .limitMode .singletonJobs [jIn . id ]
223
232
if ok {
224
233
// this job is already running, so don't run it
225
234
// but instead reschedule it
226
235
e .limitMode .singletonJobsMu .Unlock ()
227
- select {
228
- case <- e .ctx .Done ():
229
- return
230
- case <- j .ctx .Done ():
231
- return
232
- case e .jobIDsOut <- j .id :
236
+ if jIn .shouldSendOut {
237
+ select {
238
+ case <- e .ctx .Done ():
239
+ return
240
+ case <- j .ctx .Done ():
241
+ return
242
+ case e .jobIDsOut <- j .id :
243
+ }
233
244
}
234
- // remove the limiter block to allow another job to be scheduled
245
+ // remove the limiter block, as this particular job
246
+ // was a singleton already running, and we want to
247
+ // allow another job to be scheduled
235
248
if limitMode == LimitModeReschedule {
236
249
select {
237
250
case <- rescheduleLimiter :
@@ -240,14 +253,14 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
240
253
}
241
254
continue
242
255
}
243
- e .limitMode .singletonJobs [id ] = struct {}{}
256
+ e .limitMode .singletonJobs [jIn . id ] = struct {}{}
244
257
e .limitMode .singletonJobsMu .Unlock ()
245
258
}
246
- e .runJob (* j )
259
+ e .runJob (* j , jIn . shouldSendOut )
247
260
248
261
if j .singletonMode {
249
262
e .limitMode .singletonJobsMu .Lock ()
250
- delete (e .limitMode .singletonJobs , id )
263
+ delete (e .limitMode .singletonJobs , jIn . id )
251
264
e .limitMode .singletonJobsMu .Unlock ()
252
265
}
253
266
}
@@ -267,11 +280,11 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
267
280
}
268
281
}
269
282
270
- func (e * executor ) singletonModeRunner (name string , in chan uuid. UUID , wg * waitGroupWithMutex , limitMode LimitMode , rescheduleLimiter chan struct {}) {
283
+ func (e * executor ) singletonModeRunner (name string , in chan jobIn , wg * waitGroupWithMutex , limitMode LimitMode , rescheduleLimiter chan struct {}) {
271
284
e .logger .Debug ("gocron: limitModeRunner starting" , "name" , name )
272
285
for {
273
286
select {
274
- case id := <- in :
287
+ case jIn := <- in :
275
288
select {
276
289
case <- e .ctx .Done ():
277
290
e .logger .Debug ("gocron: limitModeRunner shutting down" , "name" , name )
@@ -281,10 +294,10 @@ func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitG
281
294
}
282
295
283
296
ctx , cancel := context .WithCancel (e .ctx )
284
- j := requestJobCtx (ctx , id , e .jobOutRequest )
297
+ j := requestJobCtx (ctx , jIn . id , e .jobOutRequest )
285
298
cancel ()
286
299
if j != nil {
287
- e .runJob (* j )
300
+ e .runJob (* j , jIn . shouldSendOut )
288
301
}
289
302
290
303
// remove the limiter block to allow another job to be scheduled
@@ -302,7 +315,7 @@ func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitG
302
315
}
303
316
}
304
317
305
- func (e * executor ) runJob (j internalJob ) {
318
+ func (e * executor ) runJob (j internalJob , shouldSendOut bool ) {
306
319
if j .ctx == nil {
307
320
return
308
321
}
@@ -327,12 +340,14 @@ func (e *executor) runJob(j internalJob) {
327
340
}
328
341
_ = callJobFuncWithParams (j .beforeJobRuns , j .id , j .name )
329
342
330
- select {
331
- case <- e .ctx .Done ():
332
- return
333
- case <- j .ctx .Done ():
334
- return
335
- case e .jobIDsOut <- j .id :
343
+ if shouldSendOut {
344
+ select {
345
+ case <- e .ctx .Done ():
346
+ return
347
+ case <- j .ctx .Done ():
348
+ return
349
+ case e .jobIDsOut <- j .id :
350
+ }
336
351
}
337
352
338
353
err := callJobFuncWithParams (j .function , j .parameters ... )
0 commit comments