Skip to content

Commit

Permalink
feat(bucketclaim): added EventRecorder
Browse files Browse the repository at this point in the history
  • Loading branch information
shanduur committed May 25, 2023
1 parent 5240fb3 commit ee3d153
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 18 deletions.
67 changes: 49 additions & 18 deletions pkg/bucketclaim/bucketclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

Expand All @@ -17,18 +19,20 @@ import (
"sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
)

// bucketClaimListener is a resource handler for bucket requests objects
type bucketClaimListener struct {
// BucketClaimListener is a resource handler for bucket requests objects
type BucketClaimListener struct {
eventRecorder record.EventRecorder

kubeClient kubeclientset.Interface
bucketClient bucketclientset.Interface
}

func NewBucketClaimListener() *bucketClaimListener {
return &bucketClaimListener{}
func NewBucketClaimListener() *BucketClaimListener {
return &BucketClaimListener{}
}

// Add creates a bucket in response to a bucketClaim
func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Add BucketClaim",
"name", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace,
Expand Down Expand Up @@ -65,7 +69,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
}

// update processes any updates made to the bucket request
func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Update BucketClaim",
"name", old.Name,
"ns", old.Namespace)
Expand Down Expand Up @@ -94,22 +98,28 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
}

// Delete processes a bucket for which bucket request is deleted
func (b *bucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).Info("Delete BucketClaim",
func (b *BucketClaimListener) Delete(ctx context.Context, bucketClaim *v1alpha1.BucketClaim) error {
klog.V(3).InfoS("Delete BucketClaim",
"name", bucketClaim.ObjectMeta.Name,
"ns", bucketClaim.ObjectMeta.Namespace)

return nil
}

// provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim.
//
// Recorded events
//
// InvalidBucket - Bucket provided in the BucketClaim does not exist
// InvalidBucketClass - BucketClass provided in the BucketClaim does not exist
//
// Return values
//
// nil - BucketClaim successfully processed
// ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
// ErrBucketAlreadyExists - BucketClaim already processed
// non-nil err - Internal error [requeue'd with exponential backoff]
func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error {
func (b *BucketClaimListener) provisionBucketClaimOperation(ctx context.Context, inputBucketClaim *v1alpha1.BucketClaim) error {
bucketClaim := inputBucketClaim.DeepCopy()
if bucketClaim.Status.BucketReady {
return util.ErrBucketAlreadyExists
Expand All @@ -121,7 +131,10 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
if bucketClaim.Spec.ExistingBucketName != "" {
bucketName = bucketClaim.Spec.ExistingBucketName
bucket, err := b.buckets().Get(ctx, bucketName, metav1.GetOptions{})
if err != nil {
if kubeerrors.IsNotFound(err) {
b.recordEvent(inputBucketClaim, v1.EventTypeWarning, "InvalidBucket", "Bucket provided in the BucketClaim does not exist")
return err
} else if err != nil {
klog.V(3).ErrorS(err, "Get Bucket with ExistingBucketName error", "name", bucketClaim.Spec.ExistingBucketName)
return err
}
Expand Down Expand Up @@ -153,7 +166,10 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
}

bucketClass, err := b.bucketClasses().Get(ctx, bucketClassName, metav1.GetOptions{})
if err != nil {
if kubeerrors.IsNotFound(err) {
b.recordEvent(inputBucketClaim, v1.EventTypeWarning, "InvalidBucketClass", "BucketClass provided in the BucketClaim does not exist")
return util.ErrInvalidBucketClass
} else if err != nil {
klog.V(3).ErrorS(err, "Get Bucketclass Error", "name", bucketClassName)
return util.ErrInvalidBucketClass
}
Expand All @@ -180,7 +196,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,

bucket.Spec.Protocols = protocolCopy
bucket, err = b.buckets().Create(ctx, bucket, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
if err != nil && !kubeerrors.IsAlreadyExists(err) {
klog.V(3).ErrorS(err, "Error creationg bucket",
"bucket", bucketName,
"bucketClaim", bucketClaim.ObjectMeta.Name)
Expand Down Expand Up @@ -212,31 +228,46 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
return nil
}

func (b *bucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
// InitializeKubeClient initializes the kubernetes client
func (b *BucketClaimListener) InitializeKubeClient(k kubeclientset.Interface) {
b.kubeClient = k
}

func (b *bucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
// InitializeBucketClient initializes the object storage bucket client
func (b *BucketClaimListener) InitializeBucketClient(bc bucketclientset.Interface) {
b.bucketClient = bc
}

func (b *bucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface {
// InitializeEventRecorder initializes the event recorder
func (b *BucketClaimListener) InitializeEventRecorder(er record.EventRecorder) {
b.eventRecorder = er
}

func (b *BucketClaimListener) buckets() objectstoragev1alpha1.BucketInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().Buckets()
}
panic("uninitialized listener")
}

func (b *bucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface {
func (b *BucketClaimListener) bucketClasses() objectstoragev1alpha1.BucketClassInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().BucketClasses()
}
panic("uninitialized listener")
}

func (b *bucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
func (b *BucketClaimListener) bucketClaims(namespace string) objectstoragev1alpha1.BucketClaimInterface {
if b.bucketClient != nil {
return b.bucketClient.ObjectstorageV1alpha1().BucketClaims(namespace)
}
panic("uninitialized listener")
}

// recordEvent during the processing of the objects
func (b *BucketClaimListener) recordEvent(subject runtime.Object, eventtype, reason, message string) {
if b.eventRecorder == nil {
return
}
b.eventRecorder.Event(subject, eventtype, reason, message)
}
7 changes: 7 additions & 0 deletions pkg/bucketclaim/bucketclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"

types "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/fake"
Expand Down Expand Up @@ -84,10 +85,12 @@ func runCreateBucket(t *testing.T, name string) {

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down Expand Up @@ -127,10 +130,12 @@ func runCreateBucketWithMultipleBR(t *testing.T, name string) {

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down Expand Up @@ -181,10 +186,12 @@ func runCreateBucketIdempotency(t *testing.T, name string) {

client := bucketclientset.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
eventRecorder := record.NewFakeRecorder(3)

listener := NewBucketClaimListener()
listener.InitializeKubeClient(kubeClient)
listener.InitializeBucketClient(client)
listener.InitializeEventRecorder(eventRecorder)

bucketclass, err := util.CreateBucketClass(ctx, client, &goldClass)
if err != nil {
Expand Down

0 comments on commit ee3d153

Please sign in to comment.