Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ func main() {
handler,
factory.Storage().V1().VolumeAttachments(),
factory.Core().V1().PersistentVolumes(),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
supportsListVolumesPublishedNodes,
*reconcileSync,
)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/evanphx/json-patch v5.9.11+incompatible
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.4
github.com/kubernetes-csi/csi-lib-utils v0.22.0
github.com/kubernetes-csi/csi-test/v5 v5.3.1
google.golang.org/grpc v1.72.1
google.golang.org/protobuf v1.36.6
k8s.io/api v0.33.3
k8s.io/apimachinery v0.33.3
k8s.io/apiserver v0.33.3
Expand Down Expand Up @@ -43,6 +43,7 @@ require (
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/cel-go v0.23.2 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/go-cmp v0.7.0 // indirect
Expand Down Expand Up @@ -94,7 +95,6 @@ require (
golang.org/x/time v0.11.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
3 changes: 1 addition & 2 deletions pkg/attacher/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ type Attacher interface {
}

type attacher struct {
client csi.ControllerClient
capabilities []csi.ControllerServiceCapability
client csi.ControllerClient
}

var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/attacher/attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/proto"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
"github.com/kubernetes-csi/csi-test/v5/driver"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

type pbMatcher struct {
x proto.Message
}

func (p pbMatcher) Matches(x interface{}) bool {
func (p pbMatcher) Matches(x any) bool {
y := x.(proto.Message)
return proto.Equal(p.x, y)
}
Expand All @@ -48,7 +48,7 @@ func (p pbMatcher) String() string {
return fmt.Sprintf("pb equal to %v", p.x)
}

func pbMatch(x interface{}) gomock.Matcher {
func pbMatch(x any) gomock.Matcher {
v := x.(proto.Message)
return &pbMatcher{v}
}
Expand Down
36 changes: 17 additions & 19 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type CSIAttachController struct {
client kubernetes.Interface
attacherName string
handler Handler
vaQueue workqueue.RateLimitingInterface
pvQueue workqueue.RateLimitingInterface
vaQueue workqueue.TypedRateLimitingInterface[string]
pvQueue workqueue.TypedRateLimitingInterface[string]

vaLister storagelisters.VolumeAttachmentLister
vaListerSynced cache.InformerSynced
Expand All @@ -64,7 +64,7 @@ type CSIAttachController struct {

// Handler is responsible for handling VolumeAttachment events from informer.
type Handler interface {
Init(vaQueue workqueue.RateLimitingInterface, pvQueue workqueue.RateLimitingInterface)
Init(vaQueue workqueue.TypedRateLimitingInterface[string], pvQueue workqueue.TypedRateLimitingInterface[string])

// SyncNewOrUpdatedVolumeAttachment processes one Add/Updated event from
// VolumeAttachment informers. It runs in a workqueue, guaranting that only
Expand All @@ -88,16 +88,16 @@ func NewCSIAttachController(
handler Handler,
volumeAttachmentInformer storageinformers.VolumeAttachmentInformer,
pvInformer coreinformers.PersistentVolumeInformer,
vaRateLimiter, paRateLimiter workqueue.RateLimiter,
vaRateLimiter, paRateLimiter workqueue.TypedRateLimiter[string],
shouldReconcileVolumeAttachment bool,
reconcileSync time.Duration,
) *CSIAttachController {
ctrl := &CSIAttachController{
client: client,
attacherName: attacherName,
handler: handler,
vaQueue: workqueue.NewNamedRateLimitingQueue(vaRateLimiter, "csi-attacher-va"),
pvQueue: workqueue.NewNamedRateLimitingQueue(paRateLimiter, "csi-attacher-pv"),
vaQueue: workqueue.NewTypedRateLimitingQueueWithConfig(vaRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "csi-attacher-va"}),
pvQueue: workqueue.NewTypedRateLimitingQueueWithConfig(paRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "csi-attacher-pv"}),
shouldReconcileVolumeAttachment: shouldReconcileVolumeAttachment,
reconcileSync: reconcileSync,
translator: csitrans.New(),
Expand Down Expand Up @@ -163,7 +163,7 @@ func (ctrl *CSIAttachController) Run(ctx context.Context, workers int, wg *sync.
}()
}
} else {
for i := 0; i < workers; i++ {
for range workers {
go wait.UntilWithContext(ctx, ctrl.syncVA, 0)
go wait.UntilWithContext(ctx, ctrl.syncPV, 0)
}
Expand All @@ -182,14 +182,14 @@ func (ctrl *CSIAttachController) Run(ctx context.Context, workers int, wg *sync.
}

// vaAdded reacts to a VolumeAttachment creation
func (ctrl *CSIAttachController) vaAdded(obj interface{}) {
func (ctrl *CSIAttachController) vaAdded(obj any) {
va := obj.(*storage.VolumeAttachment)
ctrl.vaQueue.Add(va.Name)
}

// vaUpdated return a function that reacts to a VolumeAttachment update
func (ctrl *CSIAttachController) vaUpdatedFunc(logger klog.Logger) func(old, new interface{}) {
return func(old, new interface{}) {
func (ctrl *CSIAttachController) vaUpdatedFunc(logger klog.Logger) func(old, new any) {
return func(old, new any) {
oldVA := old.(*storage.VolumeAttachment)
newVA := new.(*storage.VolumeAttachment)
if shouldEnqueueVAChange(oldVA, newVA) {
Expand All @@ -201,7 +201,7 @@ func (ctrl *CSIAttachController) vaUpdatedFunc(logger klog.Logger) func(old, new
}

// vaDeleted reacts to a VolumeAttachment deleted
func (ctrl *CSIAttachController) vaDeleted(obj interface{}) {
func (ctrl *CSIAttachController) vaDeleted(obj any) {
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
Expand All @@ -213,7 +213,7 @@ func (ctrl *CSIAttachController) vaDeleted(obj interface{}) {
}

// pvAdded reacts to a PV creation
func (ctrl *CSIAttachController) pvAdded(obj interface{}) {
func (ctrl *CSIAttachController) pvAdded(obj any) {
pv := obj.(*v1.PersistentVolume)
if !ctrl.processFinalizers(pv) {
return
Expand All @@ -222,7 +222,7 @@ func (ctrl *CSIAttachController) pvAdded(obj interface{}) {
}

// pvUpdated reacts to a PV update
func (ctrl *CSIAttachController) pvUpdated(old, new interface{}) {
func (ctrl *CSIAttachController) pvUpdated(old, new any) {
pv := new.(*v1.PersistentVolume)
if !ctrl.processFinalizers(pv) {
return
Expand All @@ -232,13 +232,12 @@ func (ctrl *CSIAttachController) pvUpdated(old, new interface{}) {

// syncVA deals with one key off the queue. It returns false when it's time to quit.
func (ctrl *CSIAttachController) syncVA(ctx context.Context) {
key, quit := ctrl.vaQueue.Get()
vaName, quit := ctrl.vaQueue.Get()
if quit {
return
}
defer ctrl.vaQueue.Done(key)
defer ctrl.vaQueue.Done(vaName)

vaName := key.(string)
logger := klog.LoggerWithValues(klog.FromContext(ctx), "VolumeAttachment", vaName)
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("Started VolumeAttachment processing")
Expand Down Expand Up @@ -285,13 +284,12 @@ func (ctrl *CSIAttachController) processFinalizers(pv *v1.PersistentVolume) bool

// syncPV deals with one key off the queue. It returns false when it's time to quit.
func (ctrl *CSIAttachController) syncPV(ctx context.Context) {
key, quit := ctrl.pvQueue.Get()
pvName, quit := ctrl.pvQueue.Get()
if quit {
return
}
defer ctrl.pvQueue.Done(key)
defer ctrl.pvQueue.Done(pvName)

pvName := key.(string)
logger := klog.LoggerWithValues(klog.FromContext(ctx), "PersistentVolume", pvName)
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("Started PersistentVolume processing")
Expand Down
48 changes: 14 additions & 34 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -68,7 +69,7 @@ type csiHandler struct {
pvLister corelisters.PersistentVolumeLister
csiNodeLister storagelisters.CSINodeLister
vaLister storagelisters.VolumeAttachmentLister
vaQueue, pvQueue workqueue.RateLimitingInterface
vaQueue, pvQueue workqueue.TypedRateLimitingInterface[string]
forceSync map[string]bool
forceSyncMux sync.Mutex
timeout time.Duration
Expand Down Expand Up @@ -113,7 +114,7 @@ func NewCSIHandler(
}
}

func (h *csiHandler) Init(vaQueue workqueue.RateLimitingInterface, pvQueue workqueue.RateLimitingInterface) {
func (h *csiHandler) Init(vaQueue workqueue.TypedRateLimitingInterface[string], pvQueue workqueue.TypedRateLimitingInterface[string]) {
h.vaQueue = vaQueue
h.pvQueue = pvQueue
}
Expand Down Expand Up @@ -180,13 +181,7 @@ func (h *csiHandler) ReconcileVA(ctx context.Context) error {
}

// Check whether the volume is published to this node
found := false
for _, gotNodeID := range published[volumeHandle] {
if gotNodeID == nodeID {
found = true
break
}
}
found := slices.Contains(published[volumeHandle], nodeID)

// If ListVolumes Attached Status is different, add to shared workQueue.
if attachedStatus != found {
Expand Down Expand Up @@ -311,12 +306,10 @@ func (h *csiHandler) syncDetach(ctx context.Context, va *storage.VolumeAttachmen

func (h *csiHandler) prepareVAFinalizer(logger klog.Logger, va *storage.VolumeAttachment) (newVA *storage.VolumeAttachment, modified bool) {
finalizerName := GetFinalizerName(h.attacherName)
for _, f := range va.Finalizers {
if f == finalizerName {
// Finalizer is already present
logger.V(4).Info("VolumeAttachment finalizer is already set")
return va, false
}
if slices.Contains(va.Finalizers, finalizerName) {
// Finalizer is already present
logger.V(4).Info("VolumeAttachment finalizer is already set")
return va, false
}

// Finalizer is not present, add it
Expand Down Expand Up @@ -344,12 +337,10 @@ func (h *csiHandler) prepareVANodeID(logger klog.Logger, va *storage.VolumeAttac
func (h *csiHandler) addPVFinalizer(ctx context.Context, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "PersistentVolume", pv.Name)
finalizerName := GetFinalizerName(h.attacherName)
for _, f := range pv.Finalizers {
if f == finalizerName {
// Finalizer is already present
logger.V(4).Info("PersistentVolume finalizer is already set")
return pv, nil
}
if slices.Contains(pv.Finalizers, finalizerName) {
// Finalizer is already present
logger.V(4).Info("PersistentVolume finalizer is already set")
return pv, nil
}

// Finalizer is not present, add it
Expand All @@ -368,12 +359,7 @@ func (h *csiHandler) addPVFinalizer(ctx context.Context, pv *v1.PersistentVolume

func (h *csiHandler) hasVAFinalizer(va *storage.VolumeAttachment) bool {
finalizerName := GetFinalizerName(h.attacherName)
for _, f := range va.Finalizers {
if f == finalizerName {
return true
}
}
return false
return slices.Contains(va.Finalizers, finalizerName)
}

// Checks if the PV (or) the inline-volume corresponding to the VA could have migrated from
Expand Down Expand Up @@ -687,13 +673,7 @@ func (h *csiHandler) SyncNewOrUpdatedPersistentVolume(ctx context.Context, pv *v

// Check if the PV has finalizer
finalizer := GetFinalizerName(h.attacherName)
found := false
for _, f := range pv.Finalizers {
if f == finalizer {
found = true
break
}
}
found := slices.Contains(pv.Finalizers, finalizer)
if !found {
// No finalizer -> no action required
logger.V(4).Info("CSIHandler: processing PersistentVolume: no finalizer, ignoring")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/csi_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func secret() *v1.Secret {
}
}

func patch(original, new interface{}) []byte {
func patch(original, new any) []byte {
patch, err := createMergePatch(original, new)
if err != nil {
klog.Background().Error(err, "Failed to create patch")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
lister := &fakeLister{t: t, publishedNodes: test.listerResponse}
csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls, lister: lister}
handler := handlerFactory(client, informers, csiConnection, lister)
ctrl := NewCSIAttachController(logger, client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultControllerRateLimiter(), workqueue.DefaultControllerRateLimiter(), test.listerResponse != nil, 1*time.Minute)
ctrl := NewCSIAttachController(logger, client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.DefaultTypedControllerRateLimiter[string](), test.listerResponse != nil, 1*time.Minute)

// Start the test by enqueueing the right event
if test.addedVA != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/trivial_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// nothing to detach).
type trivialHandler struct {
client kubernetes.Interface
vaQueue, pvQueue workqueue.RateLimitingInterface
vaQueue, pvQueue workqueue.TypedRateLimitingInterface[string]
}

var _ Handler = &trivialHandler{}
Expand All @@ -42,7 +42,7 @@ func NewTrivialHandler(client kubernetes.Interface) Handler {
return &trivialHandler{client: client}
}

func (h *trivialHandler) Init(vaQueue workqueue.RateLimitingInterface, pvQueue workqueue.RateLimitingInterface) {
func (h *trivialHandler) Init(vaQueue workqueue.TypedRateLimitingInterface[string], pvQueue workqueue.TypedRateLimitingInterface[string]) {
h.vaQueue = vaQueue
h.pvQueue = pvQueue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func GetVolumeAttributes(csiSource *v1.CSIPersistentVolumeSource) (map[string]st
}

// createMergePatch return patch generated from original and new interfaces
func createMergePatch(original, new interface{}) ([]byte, error) {
func createMergePatch(original, new any) ([]byte, error) {
pvByte, err := json.Marshal(original)
if err != nil {
return nil, err
Expand Down