diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index e11f80163..75dd76ae1 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -291,8 +291,8 @@ func main() { handler, factory.Storage().V1().VolumeAttachments(), factory.Core().V1().PersistentVolumes(), - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), - workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), + workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), supportsListVolumesPublishedNodes, *reconcileSync, ) diff --git a/go.mod b/go.mod index 435d508ce..ffde476d6 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,10 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/evanphx/json-patch v5.9.11+incompatible github.com/golang/mock v1.6.0 - github.com/golang/protobuf v1.5.4 github.com/kubernetes-csi/csi-lib-utils v0.22.0 github.com/kubernetes-csi/csi-test/v5 v5.3.1 google.golang.org/grpc v1.72.1 + google.golang.org/protobuf v1.36.6 k8s.io/api v0.33.3 k8s.io/apimachinery v0.33.3 k8s.io/apiserver v0.33.3 @@ -43,6 +43,7 @@ require ( github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/google/cel-go v0.23.2 // indirect github.com/google/gnostic-models v0.6.9 // indirect github.com/google/go-cmp v0.7.0 // indirect @@ -94,7 +95,6 @@ require ( golang.org/x/time v0.11.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect - google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/pkg/attacher/attacher.go b/pkg/attacher/attacher.go index c11ab171b..b4cc156c4 100644 --- a/pkg/attacher/attacher.go +++ b/pkg/attacher/attacher.go @@ -40,8 +40,7 @@ type Attacher interface { } type attacher struct { - client csi.ControllerClient - capabilities []csi.ControllerServiceCapability + client csi.ControllerClient } var ( diff --git a/pkg/attacher/attacher_test.go b/pkg/attacher/attacher_test.go index 49c44c4c9..565ac7d7b 100644 --- a/pkg/attacher/attacher_test.go +++ b/pkg/attacher/attacher_test.go @@ -26,20 +26,20 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/mock/gomock" - "github.com/golang/protobuf/proto" "github.com/kubernetes-csi/csi-lib-utils/connection" "github.com/kubernetes-csi/csi-lib-utils/metrics" "github.com/kubernetes-csi/csi-test/v5/driver" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) type pbMatcher struct { x proto.Message } -func (p pbMatcher) Matches(x interface{}) bool { +func (p pbMatcher) Matches(x any) bool { y := x.(proto.Message) return proto.Equal(p.x, y) } @@ -48,7 +48,7 @@ func (p pbMatcher) String() string { return fmt.Sprintf("pb equal to %v", p.x) } -func pbMatch(x interface{}) gomock.Matcher { +func pbMatch(x any) gomock.Matcher { v := x.(proto.Message) return &pbMatcher{v} } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 22e35f1dd..e7efbae31 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -49,8 +49,8 @@ type CSIAttachController struct { client kubernetes.Interface attacherName string handler Handler - vaQueue workqueue.RateLimitingInterface - pvQueue workqueue.RateLimitingInterface + vaQueue workqueue.TypedRateLimitingInterface[string] + pvQueue workqueue.TypedRateLimitingInterface[string] vaLister storagelisters.VolumeAttachmentLister vaListerSynced cache.InformerSynced @@ -64,7 +64,7 @@ type CSIAttachController struct { // Handler is responsible for handling VolumeAttachment events from informer. type Handler interface { - Init(vaQueue workqueue.RateLimitingInterface, pvQueue workqueue.RateLimitingInterface) + Init(vaQueue workqueue.TypedRateLimitingInterface[string], pvQueue workqueue.TypedRateLimitingInterface[string]) // SyncNewOrUpdatedVolumeAttachment processes one Add/Updated event from // VolumeAttachment informers. It runs in a workqueue, guaranting that only @@ -88,7 +88,7 @@ func NewCSIAttachController( handler Handler, volumeAttachmentInformer storageinformers.VolumeAttachmentInformer, pvInformer coreinformers.PersistentVolumeInformer, - vaRateLimiter, paRateLimiter workqueue.RateLimiter, + vaRateLimiter, paRateLimiter workqueue.TypedRateLimiter[string], shouldReconcileVolumeAttachment bool, reconcileSync time.Duration, ) *CSIAttachController { @@ -96,8 +96,8 @@ func NewCSIAttachController( client: client, attacherName: attacherName, handler: handler, - vaQueue: workqueue.NewNamedRateLimitingQueue(vaRateLimiter, "csi-attacher-va"), - pvQueue: workqueue.NewNamedRateLimitingQueue(paRateLimiter, "csi-attacher-pv"), + vaQueue: workqueue.NewTypedRateLimitingQueueWithConfig(vaRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "csi-attacher-va"}), + pvQueue: workqueue.NewTypedRateLimitingQueueWithConfig(paRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "csi-attacher-pv"}), shouldReconcileVolumeAttachment: shouldReconcileVolumeAttachment, reconcileSync: reconcileSync, translator: csitrans.New(), @@ -163,7 +163,7 @@ func (ctrl *CSIAttachController) Run(ctx context.Context, workers int, wg *sync. }() } } else { - for i := 0; i < workers; i++ { + for range workers { go wait.UntilWithContext(ctx, ctrl.syncVA, 0) go wait.UntilWithContext(ctx, ctrl.syncPV, 0) } @@ -182,14 +182,14 @@ func (ctrl *CSIAttachController) Run(ctx context.Context, workers int, wg *sync. } // vaAdded reacts to a VolumeAttachment creation -func (ctrl *CSIAttachController) vaAdded(obj interface{}) { +func (ctrl *CSIAttachController) vaAdded(obj any) { va := obj.(*storage.VolumeAttachment) ctrl.vaQueue.Add(va.Name) } // vaUpdated return a function that reacts to a VolumeAttachment update -func (ctrl *CSIAttachController) vaUpdatedFunc(logger klog.Logger) func(old, new interface{}) { - return func(old, new interface{}) { +func (ctrl *CSIAttachController) vaUpdatedFunc(logger klog.Logger) func(old, new any) { + return func(old, new any) { oldVA := old.(*storage.VolumeAttachment) newVA := new.(*storage.VolumeAttachment) if shouldEnqueueVAChange(oldVA, newVA) { @@ -201,7 +201,7 @@ func (ctrl *CSIAttachController) vaUpdatedFunc(logger klog.Logger) func(old, new } // vaDeleted reacts to a VolumeAttachment deleted -func (ctrl *CSIAttachController) vaDeleted(obj interface{}) { +func (ctrl *CSIAttachController) vaDeleted(obj any) { if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { obj = unknown.Obj } @@ -213,7 +213,7 @@ func (ctrl *CSIAttachController) vaDeleted(obj interface{}) { } // pvAdded reacts to a PV creation -func (ctrl *CSIAttachController) pvAdded(obj interface{}) { +func (ctrl *CSIAttachController) pvAdded(obj any) { pv := obj.(*v1.PersistentVolume) if !ctrl.processFinalizers(pv) { return @@ -222,7 +222,7 @@ func (ctrl *CSIAttachController) pvAdded(obj interface{}) { } // pvUpdated reacts to a PV update -func (ctrl *CSIAttachController) pvUpdated(old, new interface{}) { +func (ctrl *CSIAttachController) pvUpdated(old, new any) { pv := new.(*v1.PersistentVolume) if !ctrl.processFinalizers(pv) { return @@ -232,13 +232,12 @@ func (ctrl *CSIAttachController) pvUpdated(old, new interface{}) { // syncVA deals with one key off the queue. It returns false when it's time to quit. func (ctrl *CSIAttachController) syncVA(ctx context.Context) { - key, quit := ctrl.vaQueue.Get() + vaName, quit := ctrl.vaQueue.Get() if quit { return } - defer ctrl.vaQueue.Done(key) + defer ctrl.vaQueue.Done(vaName) - vaName := key.(string) logger := klog.LoggerWithValues(klog.FromContext(ctx), "VolumeAttachment", vaName) ctx = klog.NewContext(ctx, logger) logger.V(4).Info("Started VolumeAttachment processing") @@ -285,13 +284,12 @@ func (ctrl *CSIAttachController) processFinalizers(pv *v1.PersistentVolume) bool // syncPV deals with one key off the queue. It returns false when it's time to quit. func (ctrl *CSIAttachController) syncPV(ctx context.Context) { - key, quit := ctrl.pvQueue.Get() + pvName, quit := ctrl.pvQueue.Get() if quit { return } - defer ctrl.pvQueue.Done(key) + defer ctrl.pvQueue.Done(pvName) - pvName := key.(string) logger := klog.LoggerWithValues(klog.FromContext(ctx), "PersistentVolume", pvName) ctx = klog.NewContext(ctx, logger) logger.V(4).Info("Started PersistentVolume processing") diff --git a/pkg/controller/csi_handler.go b/pkg/controller/csi_handler.go index 1f6a0e10e..2a90d7fee 100644 --- a/pkg/controller/csi_handler.go +++ b/pkg/controller/csi_handler.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "slices" "strconv" "sync" "time" @@ -68,7 +69,7 @@ type csiHandler struct { pvLister corelisters.PersistentVolumeLister csiNodeLister storagelisters.CSINodeLister vaLister storagelisters.VolumeAttachmentLister - vaQueue, pvQueue workqueue.RateLimitingInterface + vaQueue, pvQueue workqueue.TypedRateLimitingInterface[string] forceSync map[string]bool forceSyncMux sync.Mutex timeout time.Duration @@ -113,7 +114,7 @@ func NewCSIHandler( } } -func (h *csiHandler) Init(vaQueue workqueue.RateLimitingInterface, pvQueue workqueue.RateLimitingInterface) { +func (h *csiHandler) Init(vaQueue workqueue.TypedRateLimitingInterface[string], pvQueue workqueue.TypedRateLimitingInterface[string]) { h.vaQueue = vaQueue h.pvQueue = pvQueue } @@ -180,13 +181,7 @@ func (h *csiHandler) ReconcileVA(ctx context.Context) error { } // Check whether the volume is published to this node - found := false - for _, gotNodeID := range published[volumeHandle] { - if gotNodeID == nodeID { - found = true - break - } - } + found := slices.Contains(published[volumeHandle], nodeID) // If ListVolumes Attached Status is different, add to shared workQueue. if attachedStatus != found { @@ -311,12 +306,10 @@ func (h *csiHandler) syncDetach(ctx context.Context, va *storage.VolumeAttachmen func (h *csiHandler) prepareVAFinalizer(logger klog.Logger, va *storage.VolumeAttachment) (newVA *storage.VolumeAttachment, modified bool) { finalizerName := GetFinalizerName(h.attacherName) - for _, f := range va.Finalizers { - if f == finalizerName { - // Finalizer is already present - logger.V(4).Info("VolumeAttachment finalizer is already set") - return va, false - } + if slices.Contains(va.Finalizers, finalizerName) { + // Finalizer is already present + logger.V(4).Info("VolumeAttachment finalizer is already set") + return va, false } // Finalizer is not present, add it @@ -344,12 +337,10 @@ func (h *csiHandler) prepareVANodeID(logger klog.Logger, va *storage.VolumeAttac func (h *csiHandler) addPVFinalizer(ctx context.Context, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { logger := klog.LoggerWithValues(klog.FromContext(ctx), "PersistentVolume", pv.Name) finalizerName := GetFinalizerName(h.attacherName) - for _, f := range pv.Finalizers { - if f == finalizerName { - // Finalizer is already present - logger.V(4).Info("PersistentVolume finalizer is already set") - return pv, nil - } + if slices.Contains(pv.Finalizers, finalizerName) { + // Finalizer is already present + logger.V(4).Info("PersistentVolume finalizer is already set") + return pv, nil } // Finalizer is not present, add it @@ -368,12 +359,7 @@ func (h *csiHandler) addPVFinalizer(ctx context.Context, pv *v1.PersistentVolume func (h *csiHandler) hasVAFinalizer(va *storage.VolumeAttachment) bool { finalizerName := GetFinalizerName(h.attacherName) - for _, f := range va.Finalizers { - if f == finalizerName { - return true - } - } - return false + return slices.Contains(va.Finalizers, finalizerName) } // Checks if the PV (or) the inline-volume corresponding to the VA could have migrated from @@ -687,13 +673,7 @@ func (h *csiHandler) SyncNewOrUpdatedPersistentVolume(ctx context.Context, pv *v // Check if the PV has finalizer finalizer := GetFinalizerName(h.attacherName) - found := false - for _, f := range pv.Finalizers { - if f == finalizer { - found = true - break - } - } + found := slices.Contains(pv.Finalizers, finalizer) if !found { // No finalizer -> no action required logger.V(4).Info("CSIHandler: processing PersistentVolume: no finalizer, ignoring") diff --git a/pkg/controller/csi_handler_test.go b/pkg/controller/csi_handler_test.go index ecb1a5777..0c3412be9 100644 --- a/pkg/controller/csi_handler_test.go +++ b/pkg/controller/csi_handler_test.go @@ -265,7 +265,7 @@ func secret() *v1.Secret { } } -func patch(original, new interface{}) []byte { +func patch(original, new any) []byte { patch, err := createMergePatch(original, new) if err != nil { klog.Background().Error(err, "Failed to create patch") diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index d9961813a..51bee2ffb 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -184,7 +184,7 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) { lister := &fakeLister{t: t, publishedNodes: test.listerResponse} csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls, lister: lister} handler := handlerFactory(client, informers, csiConnection, lister) - ctrl := NewCSIAttachController(logger, client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultControllerRateLimiter(), workqueue.DefaultControllerRateLimiter(), test.listerResponse != nil, 1*time.Minute) + ctrl := NewCSIAttachController(logger, client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.DefaultTypedControllerRateLimiter[string](), test.listerResponse != nil, 1*time.Minute) // Start the test by enqueueing the right event if test.addedVA != nil { diff --git a/pkg/controller/trivial_handler.go b/pkg/controller/trivial_handler.go index a15c28618..4d503d287 100644 --- a/pkg/controller/trivial_handler.go +++ b/pkg/controller/trivial_handler.go @@ -32,7 +32,7 @@ import ( // nothing to detach). type trivialHandler struct { client kubernetes.Interface - vaQueue, pvQueue workqueue.RateLimitingInterface + vaQueue, pvQueue workqueue.TypedRateLimitingInterface[string] } var _ Handler = &trivialHandler{} @@ -42,7 +42,7 @@ func NewTrivialHandler(client kubernetes.Interface) Handler { return &trivialHandler{client: client} } -func (h *trivialHandler) Init(vaQueue workqueue.RateLimitingInterface, pvQueue workqueue.RateLimitingInterface) { +func (h *trivialHandler) Init(vaQueue workqueue.TypedRateLimitingInterface[string], pvQueue workqueue.TypedRateLimitingInterface[string]) { h.vaQueue = vaQueue h.pvQueue = pvQueue } diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 9c615dc11..7be07970a 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -210,7 +210,7 @@ func GetVolumeAttributes(csiSource *v1.CSIPersistentVolumeSource) (map[string]st } // createMergePatch return patch generated from original and new interfaces -func createMergePatch(original, new interface{}) ([]byte, error) { +func createMergePatch(original, new any) ([]byte, error) { pvByte, err := json.Marshal(original) if err != nil { return nil, err