From c13dc7a329b6c817d2e89b327b775f7739131bc2 Mon Sep 17 00:00:00 2001 From: Pavan Date: Mon, 30 Sep 2024 16:51:40 +0200 Subject: [PATCH] [Misc] Operator: Typed Workqueue used Replace: RateLimitingInterface --> TypedRateLimitingInterface DefaultControllerRateLimiter --> DefaultTypedControllerRateLimiter NewRateLimitingQueue --> NewTypedRateLimitingQueueWithConfig Replaced deprecated StartStructuredLogging with StartLogging. --- internal/controller/controller.go | 33 +++++++++++--------------- internal/controller/controller_test.go | 6 ++--- internal/controller/informers_test.go | 8 +++---- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index e7c259f..f4e698c 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -53,18 +53,19 @@ type Controller struct { gardenerCertInformerFactory gardenerCertInformers.SharedInformerFactory certManagerInformerFactory certManagerInformers.SharedInformerFactory gardenerDNSInformerFactory gardenerDNSInformers.SharedInformerFactory - queues map[int]workqueue.RateLimitingInterface + queues map[int]workqueue.TypedRateLimitingInterface[QueueItem] eventBroadcaster events.EventBroadcaster eventRecorder events.EventRecorder } func NewController(client kubernetes.Interface, crdClient versioned.Interface, istioClient istio.Interface, gardenerCertificateClient gardenerCert.Interface, certManagerCertificateClient certManager.Interface, gardenerDNSClient gardenerDNS.Interface, apiExtClient apiext.Interface, promClient promop.Interface) *Controller { - queues := map[int]workqueue.RateLimitingInterface{ - ResourceCAPApplication: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - ResourceCAPApplicationVersion: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - ResourceCAPTenant: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - ResourceCAPTenantOperation: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - ResourceOperatorDomains: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + + queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{ + ResourceCAPApplication: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplication]}), + ResourceCAPApplicationVersion: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplicationVersion]}), + ResourceCAPTenant: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenant]}), + ResourceCAPTenantOperation: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenantOperation]}), + ResourceOperatorDomains: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceOperatorDomains]}), } // Use 30mins as the default Resync interval for kube / proprietary resources @@ -96,7 +97,7 @@ func NewController(client kubernetes.Interface, crdClient versioned.Interface, i v1alpha1scheme.AddToScheme(scheme) istioscheme.AddToScheme(scheme) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) - eventBroadcaster.StartStructuredLogging(klog.Level(1)) + eventBroadcaster.StartLogging(klog.Background()) recorder := eventBroadcaster.NewRecorder(scheme, "cap-controller.sme.sap.com") c := &Controller{ @@ -222,24 +223,19 @@ func (c *Controller) processQueueItem(ctx context.Context, key int) error { klog.V(2).InfoS("Processing queue item in work queue", "resource", getResourceKindFromKey(key), "queue length", q.Len()) - i, shutdown := q.Get() + item, shutdown := q.Get() if shutdown { return fmt.Errorf("queue (%d) shutdown", key) // stop processing when the queue has been shutdown } // [IMPORTANT] always mark the item as done (after processing it) - defer q.Done(i) + defer q.Done(item) var ( err error skipItem bool result *ReconcileResult ) - item, ok := i.(QueueItem) - if !ok { - klog.ErrorS(err, "unknown item found in queue", "resource", getResourceKindFromKey(key)) - return nil // process next item - } attempts := q.NumRequeues(item) @@ -268,14 +264,14 @@ func (c *Controller) processQueueItem(ctx context.Context, key int) error { klog.ErrorS(err, "queue processing error", "resource", getResourceKindFromKey(key)) if !skipItem { // add back to queue for re-processing - q.AddRateLimited(i) + q.AddRateLimited(item) return nil } } // Forget the item after processing it // This just clears the rate limiter from tracking the item - q.Forget(i) + q.Forget(item) if result != nil { // requeue resources specified in the reconciliation result @@ -300,7 +296,7 @@ func (c *Controller) processReconcileResult(result *ReconcileResult) { } } -func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q workqueue.RateLimitingInterface) { +func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q workqueue.TypedRateLimitingInterface[QueueItem]) { if r := recover(); r != nil { // Log the Error / Stack Trace err := fmt.Errorf("panic: %v", r) @@ -320,5 +316,4 @@ func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q wor // Add the item back to the queue to be processed again with a RateLimited delay q.AddRateLimited(item) } - } diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 093cef7..d8b4218 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -230,7 +230,7 @@ func TestController_processQueueItem(t *testing.T) { c := getTestController(testResources{cas: []*v1alpha1.CAPApplication{ca}, cats: []*v1alpha1.CAPTenant{cat}, preventStart: true}) if tt.resource == 9 || tt.resource == 99 { - c.queues[tt.resource] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + c.queues[tt.resource] = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{}) } dummyKubeInformerFactory := &dummyInformerFactoryType{c.kubeInformerFactory, tt.resourceNamespace, nil} @@ -263,10 +263,8 @@ func TestController_processQueueItem(t *testing.T) { cancel() expectedRes = testC.processQueueItem(ctx, tt.resource) } else { - if tt.resource < 4 || tt.resource == 9 { + if tt.resource < 4 || tt.resource == 9 || tt.resource == 99 { q.Add(item) - } else if tt.resource == 99 { - q.Add(tt.resource) } expectedRes = testC.processQueueItem(context.TODO(), tt.resource) } diff --git a/internal/controller/informers_test.go b/internal/controller/informers_test.go index 91ebb96..f64dd1f 100644 --- a/internal/controller/informers_test.go +++ b/internal/controller/informers_test.go @@ -18,14 +18,14 @@ import ( var expectedResult = false type dummyType struct { - workqueue.RateLimitingInterface + workqueue.TypedRateLimitingInterface[QueueItem] } -func (q *dummyType) Add(item interface{}) { +func (q *dummyType) Add(item QueueItem) { expectedResult = true } -func (q *dummyType) AddAfter(item interface{}, duration time.Duration) { +func (q *dummyType) AddAfter(item QueueItem, duration time.Duration) { expectedResult = true } @@ -87,7 +87,7 @@ func TestController_initializeInformers(t *testing.T) { c := getTestController(testResources{}) expectedResult = false - queues := map[int]workqueue.RateLimitingInterface{ + queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{ ResourceCAPApplication: &dummyType{}, ResourceCAPApplicationVersion: &dummyType{}, ResourceCAPTenant: &dummyType{},