diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index 930d72259..e0075e00c 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions" "k8s.io/klog" @@ -61,6 +62,9 @@ var ( showVersion = flag.Bool("version", false, "Show version.") timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") + retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.") + retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.") + enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.") leaderElectionType = flag.String("leader-election-type", "configmap", "the type of leader election, options are 'configmaps' (default) or 'leases' (recommended). The 'configmaps' option is deprecated in favor of 'leases'.") leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") @@ -173,6 +177,8 @@ func main() { handler, factory.Storage().V1beta1().VolumeAttachments(), factory.Core().V1().PersistentVolumes(), + workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), ) run := func(ctx context.Context) { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 31342d304..6ba5e3d70 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -21,7 +21,7 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" @@ -70,7 +70,7 @@ type Handler interface { } // NewCSIAttachController returns a new *CSIAttachController -func NewCSIAttachController(client kubernetes.Interface, attacherName string, handler Handler, volumeAttachmentInformer storageinformers.VolumeAttachmentInformer, pvInformer coreinformers.PersistentVolumeInformer) *CSIAttachController { +func NewCSIAttachController(client kubernetes.Interface, attacherName string, handler Handler, volumeAttachmentInformer storageinformers.VolumeAttachmentInformer, pvInformer coreinformers.PersistentVolumeInformer, vaRateLimiter, paRateLimiter workqueue.RateLimiter) *CSIAttachController { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)}) var eventRecorder record.EventRecorder @@ -81,8 +81,8 @@ func NewCSIAttachController(client kubernetes.Interface, attacherName string, ha attacherName: attacherName, handler: handler, eventRecorder: eventRecorder, - vaQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-attacher-va"), - pvQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-attacher-pv"), + vaQueue: workqueue.NewNamedRateLimitingQueue(vaRateLimiter, "csi-attacher-va"), + pvQueue: workqueue.NewNamedRateLimitingQueue(paRateLimiter, "csi-attacher-pv"), } volumeAttachmentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index 5a9491f72..926014e77 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -29,7 +29,7 @@ import ( "github.com/kubernetes-csi/external-attacher/pkg/attacher" "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/client-go/util/workqueue" ) // This is an unit test framework. It is heavily inspired by serviceaccount @@ -175,7 +176,7 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) { // Construct controller csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls} handler := handlerFactory(client, informers, csiConnection) - ctrl := NewCSIAttachController(client, testAttacherName, handler, vaInformer, pvInformer) + ctrl := NewCSIAttachController(client, testAttacherName, handler, vaInformer, pvInformer, workqueue.DefaultControllerRateLimiter(), workqueue.DefaultControllerRateLimiter()) // Start the test by enqueueing the right event if test.addedVA != nil {