From ee3d1530061fb15c087794d1ea3fe8529d7c80b7 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Thu, 11 May 2023 15:06:23 +0200 Subject: [PATCH] feat(bucketclaim): added EventRecorder --- pkg/bucketclaim/bucketclaim.go | 67 +++++++++++++++++++++-------- pkg/bucketclaim/bucketclaim_test.go | 7 +++ 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/pkg/bucketclaim/bucketclaim.go b/pkg/bucketclaim/bucketclaim.go index 963e2cb..539a27f 100644 --- a/pkg/bucketclaim/bucketclaim.go +++ b/pkg/bucketclaim/bucketclaim.go @@ -4,9 +4,11 @@ import ( "context" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -17,18 +19,20 @@ import ( "sigs.k8s.io/container-object-storage-interface-controller/pkg/util" ) -// bucketClaimListener is a resource handler for bucket requests objects -type bucketClaimListener struct { +// BucketClaimListener is a resource handler for bucket requests objects +type BucketClaimListener struct { + eventRecorder record.EventRecorder + kubeClient kubeclientset.Interface bucketClient bucketclientset.Interface } -func NewBucketClaimListener() *bucketClaimListener { - return &bucketClaimListener{} +func NewBucketClaimListener() *BucketClaimListener { + return &BucketClaimListener{} } // Add creates a bucket in response to a bucketClaim -func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error { +func (b *BucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error { klog.V(3).InfoS("Add BucketClaim", "name", bucketClaim.ObjectMeta.Name, "ns", bucketClaim.ObjectMeta.Namespace, @@ -65,7 +69,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc } // update processes any updates made to the bucket request -func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error { +func (b *BucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error { klog.V(3).InfoS("Update BucketClaim", "name", old.Name, "ns", old.Namespace) @@ -94,8 +98,8 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc } // Delete processes a bucket for which bucket request is deleted -func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error { - klog.V(3).Info("Delete BucketClaim", +func (b *BucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error { + klog.V(3).InfoS("Delete BucketClaim", "name", bucketClaim.ObjectMeta.Name, "ns", bucketClaim.ObjectMeta.Namespace) @@ -103,13 +107,19 @@ func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1. } // provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim. +// +// Recorded events +// +// InvalidBucket - Bucket provided in the BucketClaim does not exist +// InvalidBucketClass - BucketClass provided in the BucketClaim does not exist +// // Return values // // nil - BucketClaim successfully processed // ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff] // ErrBucketAlreadyExists - BucketClaim already processed // non-nil err - Internal error [requeue'd with exponential backoff] -func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error { +func (b *BucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error { bucketClaim := inputBucketClaim.DeepCopy() if bucketClaim.Status.BucketReady { return util.ErrBucketAlreadyExists @@ -121,7 +131,10 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, if bucketClaim.Spec.ExistingBucketName != "" { bucketName = bucketClaim.Spec.ExistingBucketName bucket, err := b.buckets().Get(ctx, bucketName, metav1.GetOptions{}) - if err != nil { + if kubeerrors.IsNotFound(err) { + b.recordEvent(inputBucketClaim, v1.EventTypeWarning, "InvalidBucket", "Bucket provided in the BucketClaim does not exist") + return err + } else if err != nil { klog.V(3).ErrorS(err, "Get Bucket with ExistingBucketName error", "name", bucketClaim.Spec.ExistingBucketName) return err } @@ -153,7 +166,10 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, } bucketClass, err := b.bucketClasses().Get(ctx, bucketClassName, metav1.GetOptions{}) - if err != nil { + if kubeerrors.IsNotFound(err) { + b.recordEvent(inputBucketClaim, v1.EventTypeWarning, "InvalidBucketClass", "BucketClass provided in the BucketClaim does not exist") + return util.ErrInvalidBucketClass + } else if err != nil { klog.V(3).ErrorS(err, "Get Bucketclass Error", "name", bucketClassName) return util.ErrInvalidBucketClass } @@ -180,7 +196,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, bucket.Spec.Protocols = protocolCopy bucket, err = b.buckets().Create(ctx, bucket, metav1.CreateOptions{}) - if err != nil && !errors.IsAlreadyExists(err) { + if err != nil && !kubeerrors.IsAlreadyExists(err) { klog.V(3).ErrorS(err, "Error creationg bucket", "bucket", bucketName, "bucketClaim", bucketClaim.ObjectMeta.Name) @@ -212,31 +228,46 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, return nil } -func (b *bucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) { +// InitializeKubeClient initializes the kubernetes client +func (b *BucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) { b.kubeClient = k } -func (b *bucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) { +// InitializeBucketClient initializes the object storage bucket client +func (b *BucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) { b.bucketClient = bc } -func (b *bucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface { +// InitializeEventRecorder initializes the event recorder +func (b *BucketClaimListener) InitializeEventRecorder(er record.EventRecorder) { + b.eventRecorder = er +} + +func (b *BucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface { if b.bucketClient != nil { return b.bucketClient.ObjectstorageV1alpha1().Buckets() } panic("uninitialized listener") } -func (b *bucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface { +func (b *BucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface { if b.bucketClient != nil { return b.bucketClient.ObjectstorageV1alpha1().BucketClasses() } panic("uninitialized listener") } -func (b *bucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface { +func (b *BucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface { if b.bucketClient != nil { return b.bucketClient.ObjectstorageV1alpha1().BucketClaims(namespace) } panic("uninitialized listener") } + +// recordEvent during the processing of the objects +func (b *BucketClaimListener) recordEvent(subject runtime.Object, eventtype, reason, message string) { + if b.eventRecorder == nil { + return + } + b.eventRecorder.Event(subject, eventtype, reason, message) +} diff --git a/pkg/bucketclaim/bucketclaim_test.go b/pkg/bucketclaim/bucketclaim_test.go index f377a78..e010c96 100644 --- a/pkg/bucketclaim/bucketclaim_test.go +++ b/pkg/bucketclaim/bucketclaim_test.go @@ -6,6 +6,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" types "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1" bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/fake" @@ -84,10 +85,12 @@ func runCreateBucket(t *testing.T, name string) { client := bucketclientset.NewSimpleClientset() kubeClient := fake.NewSimpleClientset() + eventRecorder := record.NewFakeRecorder(3) listener := NewBucketClaimListener() listener.InitializeKubeClient(kubeClient) listener.InitializeBucketClient(client) + listener.InitializeEventRecorder(eventRecorder) bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass) if err != nil { @@ -127,10 +130,12 @@ func runCreateBucketWithMultipleBR(t *testing.T, name string) { client := bucketclientset.NewSimpleClientset() kubeClient := fake.NewSimpleClientset() + eventRecorder := record.NewFakeRecorder(3) listener := NewBucketClaimListener() listener.InitializeKubeClient(kubeClient) listener.InitializeBucketClient(client) + listener.InitializeEventRecorder(eventRecorder) bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass) if err != nil { @@ -181,10 +186,12 @@ func runCreateBucketIdempotency(t *testing.T, name string) { client := bucketclientset.NewSimpleClientset() kubeClient := fake.NewSimpleClientset() + eventRecorder := record.NewFakeRecorder(3) listener := NewBucketClaimListener() listener.InitializeKubeClient(kubeClient) listener.InitializeBucketClient(client) + listener.InitializeEventRecorder(eventRecorder) bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass) if err != nil {