From eabe4ea82f2919d66ed9253d9cc6acbde2ce0fb2 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Fri, 2 Aug 2019 13:42:38 +0200 Subject: [PATCH] Send event on successful provisioning This restores event "Successfully provisioned volume XYZ" when saving queue is used. --- controller/controller.go | 47 ++++++++++++++++++----------------- controller/controller_test.go | 2 +- controller/volume_store.go | 35 ++++++++++++++++++++++---- 3 files changed, 55 insertions(+), 29 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 2e2c3bb..117089a 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -617,29 +617,6 @@ func NewProvisionController( controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes") - if controller.createProvisionerPVLimiter != nil { - glog.V(2).Infof("Using saving PVs to API server in background") - controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter) - } else { - if controller.createProvisionedPVBackoff == nil { - // Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default. - if controller.createProvisionedPVInterval == 0 { - controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval - } - if controller.createProvisionedPVRetryCount == 0 { - controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount - } - controller.createProvisionedPVBackoff = &wait.Backoff{ - Duration: controller.createProvisionedPVInterval, - Factor: 1, // linear backoff - Steps: controller.createProvisionedPVRetryCount, - //Cap: controller.createProvisionedPVInterval, - } - } - glog.V(2).Infof("Using blocking saving PVs to API server") - controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller) - } - informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod) // ---------------------- @@ -698,6 +675,30 @@ func NewProvisionController( } } controller.classes = controller.classInformer.GetStore() + + if controller.createProvisionerPVLimiter != nil { + glog.V(2).Infof("Using saving PVs to API server in background") + controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder) + } else { + if controller.createProvisionedPVBackoff == nil { + // Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default. + if controller.createProvisionedPVInterval == 0 { + controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval + } + if controller.createProvisionedPVRetryCount == 0 { + controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount + } + controller.createProvisionedPVBackoff = &wait.Backoff{ + Duration: controller.createProvisionedPVInterval, + Factor: 1, // linear backoff + Steps: controller.createProvisionedPVRetryCount, + //Cap: controller.createProvisionedPVInterval, + } + } + glog.V(2).Infof("Using blocking saving PVs to API server") + controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller) + } + return controller } diff --git a/controller/controller_test.go b/controller/controller_test.go index c8b4599..a258b9d 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -394,7 +394,7 @@ func TestController(t *testing.T) { } if test.volumeQueueStore { - ctrl.volumeStore = NewVolumeStoreQueue(client, workqueue.DefaultItemBasedRateLimiter()) + ctrl.volumeStore = NewVolumeStoreQueue(client, workqueue.DefaultItemBasedRateLimiter(), ctrl.claimsIndexer, ctrl.eventRecorder) } if test.enqueueClaim != nil { diff --git a/controller/volume_store.go b/controller/volume_store.go index 1d47ca4..d535f2a 100644 --- a/controller/volume_store.go +++ b/controller/volume_store.go @@ -24,11 +24,12 @@ import ( "k8s.io/client-go/tools/record" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" ) @@ -56,8 +57,10 @@ type VolumeStore interface { // PVs to API server using a workqueue running in its own goroutine(s). // After failed save, volume is re-qeueued with exponential backoff. type queueStore struct { - client kubernetes.Interface - queue workqueue.RateLimitingInterface + client kubernetes.Interface + queue workqueue.RateLimitingInterface + eventRecorder record.EventRecorder + claimsIndexer cache.Indexer volumes sync.Map } @@ -68,11 +71,15 @@ var _ VolumeStore = &queueStore{} func NewVolumeStoreQueue( client kubernetes.Interface, limiter workqueue.RateLimiter, + claimsIndexer cache.Indexer, + eventRecorder record.EventRecorder, ) VolumeStore { return &queueStore{ - client: client, - queue: workqueue.NewNamedRateLimitingQueue(limiter, "unsavedpvs"), + client: client, + queue: workqueue.NewNamedRateLimitingQueue(limiter, "unsavedpvs"), + claimsIndexer: claimsIndexer, + eventRecorder: eventRecorder, } } @@ -148,11 +155,29 @@ func (q *queueStore) doSaveVolume(volume *v1.PersistentVolume) error { _, err := q.client.CoreV1().PersistentVolumes().Create(volume) if err == nil || apierrs.IsAlreadyExists(err) { klog.V(5).Infof("Volume %s saved", volume.Name) + q.sendSuccessEvent(volume) return nil } return fmt.Errorf("error saving volume %s: %s", volume.Name, err) } +func (q *queueStore) sendSuccessEvent(volume *v1.PersistentVolume) { + claimObjs, err := q.claimsIndexer.ByIndex(uidIndex, string(volume.Spec.ClaimRef.UID)) + if err != nil { + klog.V(2).Infof("Error sending event to claim %s: %s", volume.Spec.ClaimRef.UID, err) + return + } + if len(claimObjs) != 1 { + return + } + claim, ok := claimObjs[0].(*v1.PersistentVolumeClaim) + if !ok { + return + } + msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name) + q.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg) +} + // backoffStore is implementation of VolumeStore that blocks and tries to save // a volume to API server with configurable backoff. If saving fails, // StoreVolume() deletes the storage asset in the end and returns appropriate