Skip to content

Commit f8d2b30

Browse files
authored
feat(sdk): restart goroutines (#5821)
Signed-off-by: francois samin <[email protected]>
1 parent 2f40a2e commit f8d2b30

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+199
-127
lines changed

cli/cdsctl/events.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ func eventsListenRun(v cli.Values) error {
6060
chanMessageToSend := make(chan []sdk.WebsocketFilter)
6161
chanErrorReceived := make(chan error)
6262

63-
sdk.NewGoRoutines().Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
64-
client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
63+
sdk.NewGoRoutines(ctx).Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
64+
client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(ctx), chanMessageToSend, chanMessageReceived, chanErrorReceived)
6565
})
6666

6767
switch {

cli/cdsctl/workflow_log.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func workflowLogStreamRun(v cli.Values) error {
437437
chanMsgReceived := make(chan json.RawMessage)
438438
chanErrorReceived := make(chan error)
439439

440-
goRoutines := sdk.NewGoRoutines()
440+
goRoutines := sdk.NewGoRoutines(ctx)
441441
goRoutines.Exec(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
442442
for ctx.Err() == nil {
443443
if err := client.RequestWebsocket(ctx, goRoutines, fmt.Sprintf("%s/item/stream", link.CDNURL), chanMessageToSend, chanMsgReceived, chanErrorReceived); err != nil {

cli/cdsctl/workflow_transform_as_code.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func workflowTransformAsCodeRun(v cli.Values) (interface{}, error) {
5353
chanMessageToSend := make(chan []sdk.WebsocketFilter)
5454
chanErrorReceived := make(chan error)
5555

56-
sdk.NewGoRoutines().Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
57-
client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
56+
sdk.NewGoRoutines(ctx).Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
57+
client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(ctx), chanMessageToSend, chanMessageReceived, chanErrorReceived)
5858
})
5959

6060
ope, err := client.WorkflowTransformAsCode(projectKey, v.GetString(_WorkflowName), branch, message)

engine/api/api.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ func (a *API) Serve(ctx context.Context) error {
545545
}
546546

547547
log.Info(ctx, "Initializing HTTP router")
548-
a.GoRoutines = sdk.NewGoRoutines()
548+
a.GoRoutines = sdk.NewGoRoutines(ctx)
549549
a.Router = &Router{
550550
Mux: mux.NewRouter(),
551551
Background: ctx,
@@ -652,12 +652,12 @@ func (a *API) Serve(ctx context.Context) error {
652652
log.Error(ctx, "error while initializing event system: %s", err)
653653
}
654654

655-
a.GoRoutines.Run(ctx, "event.dequeue", func(ctx context.Context) {
655+
a.GoRoutines.RunWithRestart(ctx, "event.dequeue", func(ctx context.Context) {
656656
event.DequeueEvent(ctx, a.mustDB())
657657
})
658658

659659
log.Info(ctx, "Initializing internal routines...")
660-
a.GoRoutines.Run(ctx, "maintenance.Subscribe", func(ctx context.Context) {
660+
a.GoRoutines.RunWithRestart(ctx, "maintenance.Subscribe", func(ctx context.Context) {
661661
if err := a.listenMaintenance(ctx); err != nil {
662662
log.Error(ctx, "error while initializing listen maintenance routine: %s", err)
663663
}
@@ -668,7 +668,7 @@ func (a *API) Serve(ctx context.Context) error {
668668
log.Error(ctx, "error while initializing worker models routine: %s", err)
669669
}
670670
})
671-
a.GoRoutines.Run(ctx, "worker.Initialize", func(ctx context.Context) {
671+
a.GoRoutines.RunWithRestart(ctx, "worker.Initialize", func(ctx context.Context) {
672672
if err := worker.Initialize(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), a.Cache); err != nil {
673673
log.Error(ctx, "error while initializing workers routine: %s", err)
674674
}
@@ -684,25 +684,25 @@ func (a *API) Serve(ctx context.Context) error {
684684
a.GoRoutines.Run(ctx, "audit.ComputeWorkflowAudit", func(ctx context.Context) {
685685
audit.ComputeWorkflowAudit(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
686686
})
687-
a.GoRoutines.Run(ctx, "auditCleanerRoutine(ctx", func(ctx context.Context) {
687+
a.GoRoutines.Run(ctx, "auditCleanerRoutine", func(ctx context.Context) {
688688
auditCleanerRoutine(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
689689
})
690-
a.GoRoutines.Run(ctx, "repositoriesmanager.ReceiveEvents", func(ctx context.Context) {
690+
a.GoRoutines.RunWithRestart(ctx, "repositoriesmanager.ReceiveEvents", func(ctx context.Context) {
691691
repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), a.Cache)
692692
})
693-
a.GoRoutines.Run(ctx, "services.KillDeadServices", func(ctx context.Context) {
693+
a.GoRoutines.RunWithRestart(ctx, "services.KillDeadServices", func(ctx context.Context) {
694694
services.KillDeadServices(ctx, a.mustDB)
695695
})
696696
a.GoRoutines.Run(ctx, "broadcast.Initialize", func(ctx context.Context) {
697697
broadcast.Initialize(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
698698
})
699-
a.GoRoutines.Run(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) {
699+
a.GoRoutines.RunWithRestart(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) {
700700
a.serviceAPIHeartbeat(ctx)
701701
})
702-
a.GoRoutines.Run(ctx, "authentication.SessionCleaner", func(ctx context.Context) {
702+
a.GoRoutines.RunWithRestart(ctx, "authentication.SessionCleaner", func(ctx context.Context) {
703703
authentication.SessionCleaner(ctx, a.mustDB, 10*time.Second)
704704
})
705-
a.GoRoutines.Run(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
705+
a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
706706
a.WorkflowRunCraft(ctx, 100*time.Millisecond)
707707
})
708708

engine/api/api_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.Fak
4242
api.AuthenticationDrivers[sdk.ConsumerBuiltin] = builtin.NewDriver()
4343
api.AuthenticationDrivers[sdk.ConsumerTest] = authdrivertest.NewDriver(t)
4444
api.AuthenticationDrivers[sdk.ConsumerTest2] = authdrivertest.NewDriver(t)
45-
api.GoRoutines = sdk.NewGoRoutines()
45+
api.GoRoutines = sdk.NewGoRoutines(context.TODO())
4646

4747
api.InitRouter()
4848

@@ -90,7 +90,7 @@ func newTestServer(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.
9090
api.AuthenticationDrivers = make(map[sdk.AuthConsumerType]sdk.AuthDriver)
9191
api.AuthenticationDrivers[sdk.ConsumerLocal] = local.NewDriver(context.TODO(), false, "http://localhost:8080", "")
9292
api.AuthenticationDrivers[sdk.ConsumerBuiltin] = builtin.NewDriver()
93-
api.GoRoutines = sdk.NewGoRoutines()
93+
api.GoRoutines = sdk.NewGoRoutines(context.TODO())
9494

9595
api.InitRouter()
9696
ts := httptest.NewServer(router.Mux)

engine/api/application_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) {
274274
chanMessageReceived := make(chan sdk.WebsocketEvent)
275275
chanMessageToSend := make(chan []sdk.WebsocketFilter)
276276
chanErrorReceived := make(chan error)
277-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
277+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
278278
chanMessageToSend <- []sdk.WebsocketFilter{{
279279
Type: sdk.WebsocketFilterTypeAscodeEvent,
280280
ProjectKey: proj.Key,

engine/api/environment_ascode_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func TestUpdateAsCodeEnvironmentHandler(t *testing.T) {
233233
chanMessageReceived := make(chan sdk.WebsocketEvent)
234234
chanMessageToSend := make(chan []sdk.WebsocketFilter)
235235
chanErrorReceived := make(chan error)
236-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
236+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
237237
chanMessageToSend <- []sdk.WebsocketFilter{{
238238
Type: sdk.WebsocketFilterTypeAscodeEvent,
239239
ProjectKey: proj.Key,

engine/api/migrate/migration.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func Run(ctx context.Context, db gorp.SqlExecutor) {
4646
wg.Add(1)
4747
}
4848

49-
sdk.NewGoRoutines().Run(ctx, "migrate_"+currentMigration.Name, func(contex context.Context) {
49+
sdk.NewGoRoutines(ctx).Run(ctx, "migrate_"+currentMigration.Name, func(contex context.Context) {
5050
defer func() {
5151
if currentMigration.Blocker {
5252
wg.Done()

engine/api/pipeline_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func TestUpdateAsCodePipelineHandler(t *testing.T) {
205205
chanMessageToSend := make(chan []sdk.WebsocketFilter)
206206
chanErrorReceived := make(chan error)
207207

208-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
208+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
209209
chanMessageToSend <- []sdk.WebsocketFilter{{
210210
Type: sdk.WebsocketFilterTypeAscodeEvent,
211211
ProjectKey: proj.Key,

engine/api/websocket_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func Test_websocketWrongFilters(t *testing.T) {
4545
InsecureSkipVerifyTLS: true,
4646
BuitinConsumerAuthenticationToken: jws,
4747
})
48-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
48+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
4949

5050
// Subscribe to project without project key
5151
chanMessageToSend <- []sdk.WebsocketFilter{{
@@ -126,7 +126,7 @@ func Test_websocketGetWorkflowEvent(t *testing.T) {
126126
InsecureSkipVerifyTLS: true,
127127
SessionToken: jwt,
128128
})
129-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
129+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
130130
var lastResponse *sdk.WebsocketEvent
131131
go func() {
132132
for e := range chanMessageReceived {
@@ -268,7 +268,7 @@ func TestWebsocketNoEventLoose(t *testing.T) {
268268
InsecureSkipVerifyTLS: true,
269269
SessionToken: jwt,
270270
})
271-
go client1.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chan1MessageToSend, chan1MessageReceived, chan1ErrorReceived)
271+
go client1.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(ctx), chan1MessageToSend, chan1MessageReceived, chan1ErrorReceived)
272272
var client1EventCount int64
273273
go func() {
274274
for {
@@ -299,7 +299,7 @@ func TestWebsocketNoEventLoose(t *testing.T) {
299299
SessionToken: jwt,
300300
})
301301
var client2EventCount int64
302-
go client2.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chan2MessageToSend, chan2MessageReceived, chan2ErrorReceived)
302+
go client2.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(ctx), chan2MessageToSend, chan2MessageReceived, chan2ErrorReceived)
303303
go func() {
304304
for {
305305
select {

engine/api/workflow_ascode_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func TestPostUpdateWorkflowAsCodeHandler(t *testing.T) {
211211
chanMessageReceived := make(chan sdk.WebsocketEvent)
212212
chanMessageToSend := make(chan []sdk.WebsocketFilter)
213213
chanErrorReceived := make(chan error)
214-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
214+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
215215
chanMessageToSend <- []sdk.WebsocketFilter{{
216216
Type: sdk.WebsocketFilterTypeAscodeEvent,
217217
ProjectKey: proj.Key,
@@ -422,7 +422,7 @@ func TestPostMigrateWorkflowAsCodeHandler(t *testing.T) {
422422
chanMessageReceived := make(chan sdk.WebsocketEvent)
423423
chanMessageToSend := make(chan []sdk.WebsocketFilter)
424424
chanErrorReceived := make(chan error)
425-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
425+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
426426
chanMessageToSend <- []sdk.WebsocketFilter{{
427427
Type: sdk.WebsocketFilterTypeAscodeEvent,
428428
ProjectKey: proj.Key,

engine/api/workflow_purge_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func Test_purgeDryRunHandler(t *testing.T) {
9898
})
9999
contextWS, cancel := context.WithCancel(context.Background())
100100
t.Cleanup(cancel)
101-
go client.WebsocketEventsListen(contextWS, sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
101+
go client.WebsocketEventsListen(contextWS, sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
102102

103103
// Subscribe to workflow retention
104104
chanMessageToSend <- []sdk.WebsocketFilter{{

engine/api/workflow_run_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1407,7 +1407,7 @@ func Test_postWorkflowRunAsyncFailedHandler(t *testing.T) {
14071407
OperationUUID: ope.UUID,
14081408
}
14091409

1410-
ascode.UpdateAsCodeResult(context.TODO(), api.mustDB(), api.Cache, sdk.NewGoRoutines(), *proj, *w1, app, ed, u)
1410+
ascode.UpdateAsCodeResult(context.TODO(), api.mustDB(), api.Cache, sdk.NewGoRoutines(context.TODO()), *proj, *w1, app, ed, u)
14111411

14121412
// Prepare request
14131413
uri := router.GetRoute("POST", api.postWorkflowRunHandler, map[string]string{

engine/cdn/cdn.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ const (
3636
// New returns a new service
3737
func New() *Service {
3838
s := new(Service)
39-
s.GoRoutines = sdk.NewGoRoutines()
40-
39+
s.GoRoutines = sdk.NewGoRoutines(context.Background())
4140
return s
4241
}
4342

engine/cdn/cdn_gc_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ func TestCleanSynchronizedItem(t *testing.T) {
3939
Cache: cache,
4040
Mapper: m,
4141
}
42-
s.GoRoutines = sdk.NewGoRoutines()
42+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
4343

4444
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
4545
require.NoError(t, err)
4646

4747
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
4848
t.Cleanup(cancel)
4949

50-
cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
50+
cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{
5151
HashLocatorSalt: "thisismysalt",
5252
Buffers: map[string]storage.BufferConfiguration{
5353
"redis_buffer": {
@@ -208,15 +208,15 @@ func TestCleanSynchronizedItemWithDisabledStorage(t *testing.T) {
208208
Cache: cache,
209209
Mapper: m,
210210
}
211-
s.GoRoutines = sdk.NewGoRoutines()
211+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
212212

213213
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
214214
require.NoError(t, err)
215215

216216
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
217217
t.Cleanup(cancel)
218218

219-
cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
219+
cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{
220220
HashLocatorSalt: "thisismysalt",
221221
Buffers: map[string]storage.BufferConfiguration{
222222
"redis_buffer": {
@@ -351,7 +351,7 @@ func TestCleanWaitingItem(t *testing.T) {
351351
Cache: cache,
352352
Mapper: m,
353353
}
354-
s.GoRoutines = sdk.NewGoRoutines()
354+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
355355

356356
ctx, cancel := context.WithCancel(context.TODO())
357357
t.Cleanup(cancel)
@@ -402,7 +402,7 @@ func TestCleanWaitingItemWithoutItemUnit(t *testing.T) {
402402
Cache: cache,
403403
Mapper: m,
404404
}
405-
s.GoRoutines = sdk.NewGoRoutines()
405+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
406406

407407
ctx, cancel := context.WithCancel(context.TODO())
408408
t.Cleanup(cancel)
@@ -446,7 +446,7 @@ func TestPurgeItem(t *testing.T) {
446446
Cache: cache,
447447
Mapper: m,
448448
}
449-
s.GoRoutines = sdk.NewGoRoutines()
449+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
450450

451451
ctx, cancel := context.WithCancel(context.TODO())
452452
t.Cleanup(cancel)

engine/cdn/cdn_item_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestGetItemValue(t *testing.T) {
4141
Cache: cache,
4242
Mapper: m,
4343
}
44-
s.GoRoutines = sdk.NewGoRoutines()
44+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
4545

4646
ctx, cancel := context.WithCancel(context.TODO())
4747
t.Cleanup(cancel)
@@ -202,7 +202,7 @@ func TestGetItemValue_ThousandLines(t *testing.T) {
202202
Cache: cache,
203203
Mapper: m,
204204
}
205-
s.GoRoutines = sdk.NewGoRoutines()
205+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
206206

207207
ctx, cancel := context.WithCancel(context.TODO())
208208
t.Cleanup(cancel)
@@ -309,7 +309,7 @@ func TestGetItemValue_Reverse(t *testing.T) {
309309
Cache: cache,
310310
Mapper: m,
311311
}
312-
s.GoRoutines = sdk.NewGoRoutines()
312+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
313313

314314
ctx, cancel := context.WithCancel(context.TODO())
315315
t.Cleanup(cancel)
@@ -419,7 +419,7 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {
419419
Mapper: m,
420420
}
421421
s.Cfg.Log.StepMaxSize = 200000
422-
s.GoRoutines = sdk.NewGoRoutines()
422+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
423423

424424
ctx, cancel := context.WithCancel(context.TODO())
425425
t.Cleanup(cancel)

engine/cdn/cdn_log_store_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestStoreNewStepLog(t *testing.T) {
4040
Cache: cache,
4141
Mapper: m,
4242
}
43-
s.GoRoutines = sdk.NewGoRoutines()
43+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
4444

4545
ctx, cancel := context.WithCancel(context.TODO())
4646
t.Cleanup(cancel)
@@ -123,7 +123,7 @@ func TestStoreLastStepLog(t *testing.T) {
123123
Cache: cache,
124124
Mapper: m,
125125
}
126-
s.GoRoutines = sdk.NewGoRoutines()
126+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
127127

128128
ctx, cancel := context.WithCancel(context.TODO())
129129
t.Cleanup(cancel)
@@ -212,7 +212,7 @@ func TestStoreNewServiceLog(t *testing.T) {
212212
Cache: cache,
213213
Mapper: m,
214214
}
215-
s.GoRoutines = sdk.NewGoRoutines()
215+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
216216

217217
ctx, cancel := context.WithCancel(context.TODO())
218218
t.Cleanup(cancel)

engine/cdn/cdn_log_tcp_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestWorkerLogCDNEnabled(t *testing.T) {
6161
}
6262
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
6363
require.NoError(t, err)
64-
cdnUnits, err := storage.Init(context.TODO(), m, store, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
64+
cdnUnits, err := storage.Init(context.TODO(), m, store, db.DbMap, sdk.NewGoRoutines(context.TODO()), storage.Configuration{
6565
HashLocatorSalt: "thisismysalt",
6666
Buffers: map[string]storage.BufferConfiguration{
6767
"redis_buffer": {
@@ -84,7 +84,7 @@ func TestWorkerLogCDNEnabled(t *testing.T) {
8484
s.Units = cdnUnits
8585

8686
s.Cfg.Log.StepMaxSize = 1000
87-
s.GoRoutines = sdk.NewGoRoutines()
87+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
8888

8989
signature := cdn.Signature{
9090
Worker: &cdn.SignatureWorker{
@@ -177,10 +177,10 @@ func TestServiceLogCDNDisabled(t *testing.T) {
177177
Mapper: m,
178178
}
179179
s.Cfg.Log.StepMaxSize = 1000
180-
s.GoRoutines = sdk.NewGoRoutines()
180+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
181181
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
182182
require.NoError(t, err)
183-
cdnUnits, err := storage.Init(context.TODO(), m, store, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
183+
cdnUnits, err := storage.Init(context.TODO(), m, store, db.DbMap, sdk.NewGoRoutines(context.TODO()), storage.Configuration{
184184
HashLocatorSalt: "thisismysalt",
185185
Buffers: map[string]storage.BufferConfiguration{
186186
"redis_buffer": {
@@ -292,7 +292,7 @@ func TestStoreTruncatedLogs(t *testing.T) {
292292
Cache: cache,
293293
Mapper: m,
294294
}
295-
s.GoRoutines = sdk.NewGoRoutines()
295+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
296296

297297
ctx, ccl := context.WithCancel(context.TODO())
298298
t.Cleanup(ccl)

0 commit comments

Comments
 (0)