diff --git a/controller/controller.go b/controller/controller.go index 9bb4786..71aaea6 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -191,6 +191,9 @@ type ProvisionController struct { slowSet *slowset.SlowSet retryIntervalMax time.Duration + + // WaitGroup for all worker go routines created by the controller Run() method + workersWg *sync.WaitGroup } const ( @@ -690,6 +693,7 @@ func NewProvisionController( hasRunLock: &sync.Mutex{}, volumeNameHook: getProvisionedVolumeNameForClaim, retryIntervalMax: DefaultRetryIntervalMax, + workersWg: &sync.WaitGroup{}, } controller.slowSet = slowset.NewSlowSet(controller.retryIntervalMax) @@ -855,6 +859,13 @@ func (ctrl *ProvisionController) forgetVolume(obj any) { ctrl.volumeQueue.Forget(key) } +// Run(ctx) adds all worker go routines it creates to the wait group and shuts them down when the context is done. +// This allows the main process to wait for the provisioner goroutines to be finished and perform any cleanup afterwards, +// for example release leader election. +func (ctrl *ProvisionController) ControllerWaitGroup(wg *sync.WaitGroup) { + ctrl.workersWg = wg +} + // Run starts all of this controller's control loops func (ctrl *ProvisionController) Run(ctx context.Context) { run := func(ctx context.Context) { @@ -906,8 +917,16 @@ func (ctrl *ProvisionController) Run(ctx context.Context) { } for i := 0; i < ctrl.threadiness; i++ { - go wait.Until(func() { ctrl.runClaimWorker(ctx) }, time.Second, ctx.Done()) - go wait.Until(func() { ctrl.runVolumeWorker(ctx) }, time.Second, ctx.Done()) + ctrl.workersWg.Add(1) + go func() { + defer ctrl.workersWg.Done() + wait.Until(func() { ctrl.runClaimWorker(ctx) }, time.Second, ctx.Done()) + }() + ctrl.workersWg.Add(1) + go func() { + defer ctrl.workersWg.Done() + wait.Until(func() { ctrl.runVolumeWorker(ctx) }, time.Second, ctx.Done()) + }() } logger.Info("Started provisioner controller", "component", ctrl.component)