diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 27b5a2c1e..ab4d540a2 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -298,12 +298,16 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s // shutdownContext.Done(). func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context, workers int) error { defer optr.queue.ShutDown() + defer optr.availableUpdatesQueue.ShutDown() + defer optr.upgradeableQueue.ShutDown() stopCh := runContext.Done() - workerStopCh := make(chan struct{}) klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) defer klog.Info("Shutting down ClusterVersionOperator") + resultChannel := make(chan asyncResult, 1) + resultChannelCount := 0 + if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) { return fmt.Errorf("caches never synchronized: %w", runContext.Err()) } @@ -311,33 +315,96 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co // trigger the first cluster version reconcile always optr.queue.Add(optr.queueKey()) - // start the config sync loop, and have it notify the queue when new status is detected - go runThrottledStatusNotifier(runContext, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) - go optr.configSync.Start(runContext, 16, optr.name, optr.cvLister) - go wait.UntilWithContext(runContext, func(runContext context.Context) { - optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync) - }, time.Second) - go wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) - go wait.UntilWithContext(runContext, func(runContext context.Context) { - defer close(workerStopCh) - - // run the worker, then when the queue is closed sync one final time to flush any pending status - optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) }) - if err := optr.sync(shutdownContext, optr.queueKey()); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) - } - }, time.Second) - if optr.signatureStore != nil { - go optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2) - } + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + runThrottledStatusNotifier(runContext, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) + resultChannel <- asyncResult{name: "status notifier"} + }() - <-stopCh + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + optr.configSync.Start(runContext, 16, optr.name, optr.cvLister) + resultChannel <- asyncResult{name: "sync worker"} + }() - // stop the queue, then wait for the worker to exit - optr.queue.ShutDown() - <-workerStopCh + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + wait.UntilWithContext(runContext, func(runContext context.Context) { + optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync) + }, time.Second) + resultChannel <- asyncResult{name: "available updates"} + }() - return nil + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) + resultChannel <- asyncResult{name: "upgradeable"} + }() + + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + wait.UntilWithContext(runContext, func(runContext context.Context) { + // run the worker, then when the queue is closed sync one final time to flush any pending status + optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) }) + if err := optr.sync(shutdownContext, optr.queueKey()); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) + } + }, time.Second) + resultChannel <- asyncResult{name: "cluster version sync"} + }() + + if optr.signatureStore != nil { + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2) + resultChannel <- asyncResult{name: "signature store"} + }() + } + + shutdown := false + var loopError error + for resultChannelCount > 0 { + klog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount) + if shutdown { + select { + case result := <-resultChannel: + resultChannelCount-- + if result.error == nil { + klog.Infof("Collected %s goroutine.", result.name) + } else { + klog.Errorf("Collected %s goroutine: %v", result.name, result.error) + loopError = result.error + } + case <-shutdownContext.Done(): // out of time + klog.Errorf("Abandoning %d uncollected goroutines", resultChannelCount) + return shutdownContext.Err() + } + } else { + select { + case <-runContext.Done(): // clean shutdown + case result := <-resultChannel: // crashed before a shutdown was requested + resultChannelCount-- + if result.error == nil { + klog.Infof("Collected %s goroutine.", result.name) + } else { + klog.Errorf("Collected %s goroutine: %v", result.name, result.error) + loopError = result.error + } + } + shutdown = true + optr.queue.ShutDown() + optr.availableUpdatesQueue.ShutDown() + optr.upgradeableQueue.ShutDown() + } + } + + return loopError } func (optr *Operator) queueKey() string {