From a9e075a79dcdf8d040620154118c87c37b062eda Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Thu, 28 Jan 2021 12:10:57 -0800 Subject: [PATCH] pkg/cvo/cvo: Guard Operator.Run goroutine handling from early cancels In [1], we had a CVO fail to gracefully release the leader lock, with: $ grep start.go: namespaces/openshift-cluster-version/pods/cluster-version-operator-968d9fd48-25zkz/cluster-version-operator/cluster-version-operator/logs/current.log 2021-01-26T19:02:48.678276895Z I0126 19:02:48.678120 1 start.go:21] ClusterVersionOperator 4.6.0-202012050130.p0-39a4256 2021-01-26T19:02:49.273117163Z I0126 19:02:49.273074 1 start.go:260] Waiting on 2 outstanding goroutines. 2021-01-26T19:02:49.273512006Z E0126 19:02:49.273474 1 start.go:271] Collected metrics server goroutine: listen tcp 0.0.0.0:9099: bind: address already in use 2021-01-26T19:02:49.273587622Z I0126 19:02:49.273551 1 start.go:260] Waiting on 1 outstanding goroutines. 2021-01-26T19:02:49.273587622Z I0126 19:02:49.273569 1 start.go:264] Run context completed; beginning two-minute graceful shutdown period. 2021-01-26T19:02:49.273587622Z I0126 19:02:49.273577 1 start.go:260] Waiting on 1 outstanding goroutines. 2021-01-26T19:04:49.273938652Z I0126 19:04:49.273877 1 start.go:260] Waiting on 1 outstanding goroutines. 2021-01-26T19:05:39.264746935Z I0126 19:05:39.264690 1 start.go:190] Shutting down due to terminated So the bind failed, which should be rare, but can happen if the new CVO is scheduled before the kernel gets around to reaping the port from a previous CVO pod. The start.go handling notices the metrics server dying, and immediately cancels runContext, and we go into the two-minute shutdown grace period. That happened so early that the context was already done by the time we got into Operator.Run, with its: go wait.UntilWithContext(runContext, func(runContext context.Context) { defer close(workerStopCh) From the godocs for UntilWithContext: $ git --no-pager grep -n '// UntilWithContext is syntactic sugar\|f may not be invoked if context\|func JitterUntilWithContext' vendor/k8s.io/apimachinery/pkg/util/wait vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:95:// UntilWithContext is syntactic sugar on top of JitterUntilWithContext vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:183:// Cancel context to stop. f may not be invoked if context is already expired. vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:184:func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) { So when runContext is already done, we never close workerStopCh, and we hang forever when we block on it before returning from Operator.Run. This commit addresses that issue by bringing in a resultChannel flow like we already use in pkg/start/start.go and pkg/cvo/metrics.go. It will give us logging for the number of outstanding goroutines, and tell us as each of them are reaped, which will make it easier to diagnose any other issues with hanging children, while also using shutdownContext to bail out to avoid a stuck child causing major problems. This commit also adds ShutDown calls for availableUpdatesQueue and upgradeableQueue, so we don't hang those two goroutines in processNextWorkItem's queue.Get() call. The shutdowns should have been added when the queues landed in 90e9881bd5 (cvo: Change the core CVO loops to report status to ClusterVersion, 2018-11-02, #45) and 04528144fe (cvo: long running upgradeable sync worker, 2019-09-05, until this commit tightened down on the goroutine collection. There is a risk with these cluster-touching children, that if we hit the shutdownContext-timeout safety valve, we might exit Operator.Run and release the leader lock, and then have our abandoned child fighting over in-cluster state with some subsequent CVO which just acquired the leader lock. We might want to write an event or something to make it more obvious when that happens, but for now, I'm more interested in closing the context-already-canceled bug. [1]: https://bugzilla.redhat.com/show_bug.cgi?id=1921413 --- pkg/cvo/cvo.go | 117 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 92 insertions(+), 25 deletions(-) diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 27b5a2c1e..ab4d540a2 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -298,12 +298,16 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s // shutdownContext.Done(). func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context, workers int) error { defer optr.queue.ShutDown() + defer optr.availableUpdatesQueue.ShutDown() + defer optr.upgradeableQueue.ShutDown() stopCh := runContext.Done() - workerStopCh := make(chan struct{}) klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) defer klog.Info("Shutting down ClusterVersionOperator") + resultChannel := make(chan asyncResult, 1) + resultChannelCount := 0 + if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) { return fmt.Errorf("caches never synchronized: %w", runContext.Err()) } @@ -311,33 +315,96 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co // trigger the first cluster version reconcile always optr.queue.Add(optr.queueKey()) - // start the config sync loop, and have it notify the queue when new status is detected - go runThrottledStatusNotifier(runContext, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) - go optr.configSync.Start(runContext, 16, optr.name, optr.cvLister) - go wait.UntilWithContext(runContext, func(runContext context.Context) { - optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync) - }, time.Second) - go wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) - go wait.UntilWithContext(runContext, func(runContext context.Context) { - defer close(workerStopCh) - - // run the worker, then when the queue is closed sync one final time to flush any pending status - optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) }) - if err := optr.sync(shutdownContext, optr.queueKey()); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) - } - }, time.Second) - if optr.signatureStore != nil { - go optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2) - } + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + runThrottledStatusNotifier(runContext, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) + resultChannel <- asyncResult{name: "status notifier"} + }() - <-stopCh + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + optr.configSync.Start(runContext, 16, optr.name, optr.cvLister) + resultChannel <- asyncResult{name: "sync worker"} + }() - // stop the queue, then wait for the worker to exit - optr.queue.ShutDown() - <-workerStopCh + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + wait.UntilWithContext(runContext, func(runContext context.Context) { + optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync) + }, time.Second) + resultChannel <- asyncResult{name: "available updates"} + }() - return nil + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) + resultChannel <- asyncResult{name: "upgradeable"} + }() + + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + wait.UntilWithContext(runContext, func(runContext context.Context) { + // run the worker, then when the queue is closed sync one final time to flush any pending status + optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) }) + if err := optr.sync(shutdownContext, optr.queueKey()); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) + } + }, time.Second) + resultChannel <- asyncResult{name: "cluster version sync"} + }() + + if optr.signatureStore != nil { + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2) + resultChannel <- asyncResult{name: "signature store"} + }() + } + + shutdown := false + var loopError error + for resultChannelCount > 0 { + klog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount) + if shutdown { + select { + case result := <-resultChannel: + resultChannelCount-- + if result.error == nil { + klog.Infof("Collected %s goroutine.", result.name) + } else { + klog.Errorf("Collected %s goroutine: %v", result.name, result.error) + loopError = result.error + } + case <-shutdownContext.Done(): // out of time + klog.Errorf("Abandoning %d uncollected goroutines", resultChannelCount) + return shutdownContext.Err() + } + } else { + select { + case <-runContext.Done(): // clean shutdown + case result := <-resultChannel: // crashed before a shutdown was requested + resultChannelCount-- + if result.error == nil { + klog.Infof("Collected %s goroutine.", result.name) + } else { + klog.Errorf("Collected %s goroutine: %v", result.name, result.error) + loopError = result.error + } + } + shutdown = true + optr.queue.ShutDown() + optr.availableUpdatesQueue.ShutDown() + optr.upgradeableQueue.ShutDown() + } + } + + return loopError } func (optr *Operator) queueKey() string {