Skip to content

Commit

Permalink
Added tests for cloning protection
Browse files Browse the repository at this point in the history
  • Loading branch information
Danil-Grigorev committed Mar 13, 2020
1 parent 9dda579 commit 5b36954
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 8 deletions.
12 changes: 5 additions & 7 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ func main() {
factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)

// -------------------------------
// StorageClass lister
// Listers
// Create informer to prevent hit the API server for all resource request
scLister := factory.Storage().V1().StorageClasses().Lister()
claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()

var csiNodeLister storagelistersv1beta1.CSINodeLister
var nodeLister v1.NodeLister
Expand All @@ -190,9 +191,7 @@ func main() {
// PersistentVolumeClaims informer
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")

claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()

// Setup options
provisionerOptions := []func(*controller.ProvisionController) error{
Expand Down Expand Up @@ -256,16 +255,15 @@ func main() {
)

run := func(context.Context) {
stopCh := context.Background().Done()
factory.Start(stopCh)
cacheSyncResult := factory.WaitForCacheSync(stopCh)
factory.Start(context.Background().Done())
cacheSyncResult := factory.WaitForCacheSync(context.Background().Done())
for _, v := range cacheSyncResult {
if !v {
klog.Fatalf("Failed to sync Informers!")
}
}

go csiClaimController.Run(int(*workerThreads), stopCh)
go csiClaimController.Run(int(*workerThreads), context.Background().Done())
provisionController.Run(wait.NeverStop)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func (p *CloningProtectionController) syncClaim(obj interface{}) error {
return nil
}

pvcList, err := p.claimLister.PersistentVolumeClaims("").List(labels.Everything())
// Checking for PVCs in the same namespace to have other states aside from Pending, which means that cloning is still in progress
pvcList, err := p.claimLister.PersistentVolumeClaims(claim.Namespace).List(labels.Everything())
if err != nil {
return err
}
Expand Down
204 changes: 204 additions & 0 deletions pkg/controller/clone_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package controller

import (
"context"
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
fakeclientset "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
)

var requestedBytes int64 = 1000
var fakeSc1 string = "fake-sc-1"

const (
srcName = "clone-source-pvc"
dstName = "destination-pvc"
srcNamespace = "fake-pvc-namespace"
pvName = "test-testi"
)

func pvcFinalizers(pvc *v1.PersistentVolumeClaim, finalizers ...string) *v1.PersistentVolumeClaim {
pvc.Finalizers = append(pvc.Finalizers, finalizers...)
return pvc
}

func pvcDataSourceClone(pvc *v1.PersistentVolumeClaim, srcName string) *v1.PersistentVolumeClaim {
apiGr := ""
pvc.Spec.DataSource = &v1.TypedLocalObjectReference{
APIGroup: &apiGr,
Kind: pvcKind,
Name: srcName,
}
return pvc
}

func pvcNamed(pvc *v1.PersistentVolumeClaim, name string) *v1.PersistentVolumeClaim {
pvc.Name = name
return pvc
}

func pvcNamespaced(pvc *v1.PersistentVolumeClaim, namespace string) *v1.PersistentVolumeClaim {
pvc.Namespace = namespace
return pvc
}

func pvcPhase(pvc *v1.PersistentVolumeClaim, phase v1.PersistentVolumeClaimPhase) *v1.PersistentVolumeClaim {
pvc.Status.Phase = phase
return pvc
}

func baseClaim() *v1.PersistentVolumeClaim {
return fakeClaim(srcName, srcNamespace, "fake-claim-uid", requestedBytes, pvName, v1.ClaimBound, &fakeSc1, "").DeepCopy()
}

func pvcDeletionMarked(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
timeX := metav1.NewTime(time.Now())
pvc.DeletionTimestamp = &timeX
return pvc
}

// TestCloneFinalizerRemoval tests create volume clone
func TestCloneFinalizerRemoval(t *testing.T) {

testcases := map[string]struct {
initialClaims []runtime.Object
expectClaim bool
dstPVCStatusPhase v1.PersistentVolumeClaimPhase
}{
"delete source pvc with no cloning in progress": {
initialClaims: []runtime.Object{
pvcFinalizers(baseClaim(), pvcCloneFinalizer),
pvcDataSourceClone(
pvcNamed(baseClaim(), dstName),
srcName,
),
},
},
"delete source pvc when destination pvc status is claim pending": {
initialClaims: []runtime.Object{
pvcFinalizers(baseClaim(), pvcCloneFinalizer),
pvcPhase(
pvcDataSourceClone(
pvcNamed(baseClaim(), dstName),
srcName,
),
v1.ClaimPending)},
expectClaim: true,
},
"delete source pvc when at least one destination pvc status is claim pending": {
initialClaims: []runtime.Object{
pvcFinalizers(baseClaim(), pvcCloneFinalizer),
pvcDataSourceClone(
pvcNamed(baseClaim(), dstName),
srcName,
),
pvcPhase(
pvcDataSourceClone(
pvcNamed(baseClaim(), dstName+"1"),
srcName,
),
v1.ClaimPending)},
expectClaim: true,
},
"delete source pvc located in another namespace should not block": {
initialClaims: []runtime.Object{
pvcNamespaced(
pvcFinalizers(
baseClaim(), pvcCloneFinalizer,
),
srcNamespace+"1"),
pvcPhase(
pvcDataSourceClone(
pvcNamed(baseClaim(), dstName+"1"),
srcName,
),
v1.ClaimPending)},
},
"delete source pvc which is not cloned by any other pvc": {
initialClaims: []runtime.Object{pvcFinalizers(baseClaim(), pvcCloneFinalizer)},
},
"delete source pvc without finalizer": {
initialClaims: []runtime.Object{baseClaim()},
},
}

for k, tc := range testcases {
tc := tc
t.Run(k, func(t *testing.T) {
t.Parallel()
var clientSet *fakeclientset.Clientset

utilruntime.ReallyCrash = false

clientSet = fakeclientset.NewSimpleClientset(tc.initialClaims...)
informerFactory := informers.NewSharedInformerFactory(clientSet, 1*time.Second)
claimInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer()
claimLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 2*time.Second)
claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")

for _, claim := range tc.initialClaims {
claimInformer.GetStore().Add(claim)
}

informerFactory.WaitForCacheSync(context.TODO().Done())
go informerFactory.Start(context.TODO().Done())

cloningProtector := NewCloningProtectionController(
clientSet,
claimLister,
claimInformer,
claimQueue,
)

go cloningProtector.Run(1, context.TODO().Done())

// Get PVC as accepted by fake client
var claim *v1.PersistentVolumeClaim
claims, _ := claimLister.List(labels.Everything())
for _, c := range claims {
if c.Name == srcName {
claim = c
}
}

// Simulate Delete behavior
clientSet.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(pvcDeletionMarked(claim))

// Wait for couple reconciles for controller to adjust finalizers
time.Sleep(2 * time.Second)

claim, _ = clientSet.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(claim.Name, metav1.GetOptions{})
if !checkFinalizer(claim, pvcCloneFinalizer) {
clientSet.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(claim.Name, &metav1.DeleteOptions{})
// Wait for deletion and cache update
time.Sleep(2 * time.Second)
}

// Check finalizers removal
if tc.expectClaim {
_, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
if err != nil {
t.Errorf("Source claim does not exist: %s", err)
}
} else {
claims, err := claimLister.List(labels.Everything())
if err != nil {
t.Errorf("error listing claims: %s", err)
}
if len(claims) == len(tc.initialClaims) {
t.Error("Claim was not removed")
}
}
})
}

}

0 comments on commit 5b36954

Please sign in to comment.