From b79ae0307946b9d238f5dc941489724d15382950 Mon Sep 17 00:00:00 2001 From: Ondra Kupka Date: Wed, 22 Apr 2026 14:08:06 +0200 Subject: [PATCH 1/2] certregenerationcontroller: Improve goroutine mgmt Use a WaitGroup to manage all goroutines. --- pkg/cmd/certregenerationcontroller/cmd.go | 28 +++++++++++++++-------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/pkg/cmd/certregenerationcontroller/cmd.go b/pkg/cmd/certregenerationcontroller/cmd.go index 085592fa3f..28f9d3a685 100644 --- a/pkg/cmd/certregenerationcontroller/cmd.go +++ b/pkg/cmd/certregenerationcontroller/cmd.go @@ -3,6 +3,7 @@ package certregenerationcontroller import ( "context" "fmt" + "sync" "time" "github.com/spf13/cobra" @@ -117,8 +118,17 @@ func (o *Options) Run(ctx context.Context, clock clock.Clock) error { o.controllerContext.EventRecorder, ) - go configInformers.Start(ctx.Done()) - go featureGateAccessor.Run(ctx) + var wg sync.WaitGroup + defer wg.Wait() + // cancel must happen before wg.Wait (so in a later defer), otherwise we can get stuck on early return. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + configInformers.Start(ctx.Done()) + + wg.Go(func() { + featureGateAccessor.Run(ctx) + }) var featureGates featuregates.FeatureGate select { @@ -128,6 +138,8 @@ func (o *Options) Run(ctx context.Context, clock clock.Clock) error { case <-time.After(1 * time.Minute): klog.Errorf("timed out waiting for FeatureGate detection") return fmt.Errorf("timed out waiting for FeatureGate detection") + case <-ctx.Done(): + return ctx.Err() } kubeAPIServerCertRotationController, err := certrotationcontroller.NewCertRotationControllerOnlyWhenExpired( @@ -156,16 +168,12 @@ func (o *Options) Run(ctx context.Context, clock clock.Clock) error { dynamicInformers.Start(ctx.Done()) configInformers.Start(ctx.Done()) - // FIXME: These are missing a wait group to track goroutines and handle graceful termination - // (@deads2k wants time to think it through) - - go func() { + wg.Go(func() { kubeAPIServerCertRotationController.Run(ctx, 1) - }() - - go func() { + }) + wg.Go(func() { caBundleController.Run(ctx) - }() + }) <-ctx.Done() From e6bd2b12a5716734cad8ad98e00a0d1b8af80509 Mon Sep 17 00:00:00 2001 From: Ondra Kupka Date: Wed, 22 Apr 2026 14:09:04 +0200 Subject: [PATCH 2/2] certregenerationcontroller: Use factory.New Rewrite the controller init to use factory.New to remove unnecessary boilerplate code. --- .../cabundlesyncer.go | 95 +++---------------- pkg/cmd/certregenerationcontroller/cmd.go | 7 +- 2 files changed, 16 insertions(+), 86 deletions(-) diff --git a/pkg/cmd/certregenerationcontroller/cabundlesyncer.go b/pkg/cmd/certregenerationcontroller/cabundlesyncer.go index 02d6c2916d..ee6604085f 100644 --- a/pkg/cmd/certregenerationcontroller/cabundlesyncer.go +++ b/pkg/cmd/certregenerationcontroller/cabundlesyncer.go @@ -2,17 +2,13 @@ package certregenerationcontroller import ( "context" - "fmt" "time" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/operator/v1helpers" @@ -20,109 +16,46 @@ import ( "github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/targetconfigcontroller" ) -const workQueueKey = "key" - -// CABundleController composes individual certs into CA bundle that is used +// caBundleController composes individual certs into CA bundle that is used // by kube-apiserver to validate clients. // Cert recovery refreshes "kube-control-plane-signer-ca" and needs the containing // bundle regenerated so kube-controller-manager and kube-scheduler can connect // using client certs. -type CABundleController struct { +type caBundleController struct { configMapGetter corev1client.ConfigMapsGetter configMapLister corev1listers.ConfigMapLister - - eventRecorder events.Recorder - - cachesToSync []cache.InformerSynced - - // queue only ever has one item, but it has nice error handling backoff/retry semantics - queue workqueue.RateLimitingInterface + eventRecorder events.Recorder } func NewCABundleController( configMapGetter corev1client.ConfigMapsGetter, kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, eventRecorder events.Recorder, -) (*CABundleController, error) { - c := &CABundleController{ +) factory.Controller { + c := &caBundleController{ configMapGetter: configMapGetter, configMapLister: kubeInformersForNamespaces.ConfigMapLister(), eventRecorder: eventRecorder.WithComponentSuffix("manage-client-ca-bundle-recovery-controller"), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CABundleRecoveryController"), } - handler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) }, - UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) }, - DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) }, - } - - // we react to some config changes namespaces := []string{ operatorclient.GlobalUserSpecifiedConfigNamespace, operatorclient.GlobalMachineSpecifiedConfigNamespace, operatorclient.OperatorNamespace, operatorclient.TargetNamespace, } - for _, namespace := range namespaces { - informers := kubeInformersForNamespaces.InformersFor(namespace) - informers.Core().V1().ConfigMaps().Informer().AddEventHandler(handler) - c.cachesToSync = append(c.cachesToSync, informers.Core().V1().ConfigMaps().Informer().HasSynced) - } - - return c, nil -} - -func (c *CABundleController) Run(ctx context.Context) { - defer utilruntime.HandleCrash() - - // FIXME: These are missing a wait group to track goroutines and handle graceful termination - // (@deads2k wants time to think it through) - - klog.Info("Starting CA bundle controller") - defer func() { - klog.Info("Shutting down CA bundle controller") - c.queue.ShutDown() - klog.Info("CA bundle controller shut down") - }() - - if !cache.WaitForNamedCacheSync("CABundleController", ctx.Done(), c.cachesToSync...) { - return + var informers []factory.Informer + for _, ns := range namespaces { + informers = append(informers, kubeInformersForNamespaces.InformersFor(ns).Core().V1().ConfigMaps().Informer()) } - go func() { - wait.UntilWithContext(ctx, c.runWorker, time.Second) - }() - - <-ctx.Done() -} - -func (c *CABundleController) runWorker(ctx context.Context) { - for c.processNextItem(ctx) { - } -} - -func (c *CABundleController) processNextItem(ctx context.Context) bool { - key, quit := c.queue.Get() - if quit { - return false - } - defer c.queue.Done(key) - - err := c.sync(ctx) - - if err == nil { - c.queue.Forget(key) - return true - } - - utilruntime.HandleError(fmt.Errorf("%v failed with : %w", key, err)) - c.queue.AddRateLimited(key) - - return true + return factory.New(). + WithInformers(informers...). + WithSync(c.sync). + ToController("CABundleRecoveryController", c.eventRecorder) } -func (c *CABundleController) sync(ctx context.Context) error { +func (c *caBundleController) sync(ctx context.Context, _ factory.SyncContext) error { // Always start 10 seconds later after a change occurred. Makes us less likely to steal work and logs from the operator. timer := time.NewTimer(10 * time.Second) defer timer.Stop() diff --git a/pkg/cmd/certregenerationcontroller/cmd.go b/pkg/cmd/certregenerationcontroller/cmd.go index 28f9d3a685..dea96836ec 100644 --- a/pkg/cmd/certregenerationcontroller/cmd.go +++ b/pkg/cmd/certregenerationcontroller/cmd.go @@ -154,14 +154,11 @@ func (o *Options) Run(ctx context.Context, clock clock.Clock) error { return err } - caBundleController, err := NewCABundleController( + caBundleController := NewCABundleController( kubeClient.CoreV1(), kubeAPIServerInformersForNamespaces, o.controllerContext.EventRecorder, ) - if err != nil { - return err - } // We can't start informers until after the resources have been requested. Now is the time. kubeAPIServerInformersForNamespaces.Start(ctx.Done()) @@ -172,7 +169,7 @@ func (o *Options) Run(ctx context.Context, clock clock.Clock) error { kubeAPIServerCertRotationController.Run(ctx, 1) }) wg.Go(func() { - caBundleController.Run(ctx) + caBundleController.Run(ctx, 1) }) <-ctx.Done()