Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/csi-snapshotter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var (
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed snapshot creation. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed snapshot creation.")
resyncPeriod = flag.Duration("resync-period", 60*time.Second, "Resync interval of the controller.")
snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot")
snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.")
Expand Down Expand Up @@ -176,6 +178,8 @@ func main() {
coreFactory.Core().V1().PersistentVolumeClaims(),
*createSnapshotContentRetryCount,
*createSnapshotContentInterval,
*retryIntervalStart,
*retryIntervalMax,
snapShotter,
*csiTimeout,
*resyncPeriod,
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Handler is responsible for handling VolumeSnapshot events from informer.
type Handler interface {
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error)
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error)
DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (bool, time.Time, int64, error)
}
Expand Down Expand Up @@ -58,19 +58,20 @@ func NewCSIHandler(
}
}

func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) {
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) {

ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()

snapshotName, err := makeSnapshotName(handler.snapshotNamePrefix, string(snapshot.UID), handler.snapshotNameUUIDLength)
if err != nil {
return "", "", time.Time{}, 0, false, err
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, err
}
newParameters, err := removePrefixedParameters(parameters)
if err != nil {
return "", "", time.Time{}, 0, false, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
}

return handler.snapshotter.CreateSnapshot(ctx, snapshotName, volume, newParameters, snapshotterCredentials)
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme"
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1beta1"
"github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -757,6 +758,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
coreFactory.Core().V1().PersistentVolumeClaims(),
3,
5*time.Millisecond,
5*time.Millisecond,
10*time.Second,
fakeSnapshot,
5*time.Millisecond,
60*time.Second,
Expand Down Expand Up @@ -1372,10 +1375,10 @@ type fakeSnapshotter struct {
t *testing.T
}

func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) {
func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) {
if f.createCallCounter >= len(f.createCalls) {
f.t.Errorf("Unexpected CSI Create Snapshot call: snapshotName=%s, volume=%v, index: %d, calls: %+v", snapshotName, volume.Name, f.createCallCounter, f.createCalls)
return "", "", time.Time{}, 0, false, fmt.Errorf("unexpected call")
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("unexpected call")
}
call := f.createCalls[f.createCallCounter]
f.createCallCounter++
Expand All @@ -1402,9 +1405,10 @@ func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName strin
}

if err != nil {
return "", "", time.Time{}, 0, false, fmt.Errorf("unexpected call")
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("unexpected call")
}
return call.driverName, call.snapshotId, call.creationTime, call.size, call.readyToUse, call.err

return call.driverName, call.snapshotId, call.creationTime, call.size, call.readyToUse, snapshotter.SnapshottingFinished, call.err
}

func (f *fakeSnapshotter) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) error {
Expand Down
56 changes: 37 additions & 19 deletions pkg/controller/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package controller

import (
"fmt"
"strings"
"time"

crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1beta1"
"github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,6 +32,8 @@ import (
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/slice"
"strings"
"time"
)

// ==================================================================
Expand Down Expand Up @@ -363,20 +363,37 @@ func (ctrl *csiSnapshotController) storeContentUpdate(content interface{}) (bool

// createSnapshot starts new asynchronous operation to create snapshot
func (ctrl *csiSnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot) error {
klog.V(5).Infof("createSnapshot[%s]: started", snapshotKey(snapshot))
opName := fmt.Sprintf("create-%s[%s]", snapshotKey(snapshot), string(snapshot.UID))
key := snapshotKey(snapshot)
klog.V(5).Infof("createSnapshot[%s]: started", key)
opName := fmt.Sprintf("create-%s[%s]", key, string(snapshot.UID))
ctrl.scheduleOperation(opName, func() error {
snapshotObj, err := ctrl.createSnapshotOperation(snapshot)
snapshotObj, state, err := ctrl.createSnapshotOperation(snapshot)
if err != nil {
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err)

// Handle state:
if state == snapshotter.SnapshottingFinished {
// Snapshotting finished, remove obj from snapshotsInProgress.
ctrl.snapshotsInProgress.Delete(key)
} else if state == snapshotter.SnapshottingInBackground {
// Snapshotting still in progress.
klog.V(4).Infof("createSnapshot [%s]: Temporary error received, adding Snapshot back in queue: %v", key, err)
ctrl.snapshotsInProgress.Store(key, snapshotObj)
} else {
// State is SnapshottingNoChange. Don't change snapshotsInProgress.
}

return err
}

// If no errors, update the snapshot.
_, updateErr := ctrl.storeSnapshotUpdate(snapshotObj)
if updateErr != nil {
// We will get an "snapshot update" event soon, this is not a big error
klog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr)
klog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", key, updateErr)
}

return nil
})
return nil
Expand Down Expand Up @@ -588,7 +605,7 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn
if err != nil {
return nil, err
}
driverName, snapshotID, creationTime, size, readyToUse, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
driverName, snapshotID, creationTime, size, readyToUse, _, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
if err != nil {
klog.Errorf("checkandUpdateBoundSnapshotStatusOperation: failed to call create snapshot to check whether the snapshot is ready to use %q", err)
return nil, err
Expand Down Expand Up @@ -622,35 +639,35 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn
// 2. Update VolumeSnapshot status with creationtimestamp information
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, snapshotter.SnapshottingState, error) {
klog.Infof("createSnapshot: Creating snapshot %s through the plugin ...", snapshotKey(snapshot))

if snapshot.Status != nil && snapshot.Status.Error != nil && snapshot.Status.Error.Message != nil && !isControllerUpdateFailError(snapshot.Status.Error) {
klog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", *snapshot.Status.Error.Message)
return snapshot, nil
return snapshot, snapshotter.SnapshottingNoChange, nil
}

// If PVC is not being deleted and finalizer is not added yet, a finalizer should be added.
klog.V(5).Infof("createSnapshotOperation: Check if PVC is not being deleted and add Finalizer for source of snapshot [%s] if needed", snapshot.Name)
err := ctrl.ensureSnapshotSourceFinalizer(snapshot)
if err != nil {
klog.Errorf("createSnapshotOperation failed to add finalizer for source of snapshot %s", err)
return nil, err
return nil, snapshotter.SnapshottingNoChange, err
}

class, volume, contentName, snapshotterSecretRef, err := ctrl.getCreateSnapshotInput(snapshot)
if err != nil {
return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
return nil, snapshotter.SnapshottingNoChange, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
}

snapshotterCredentials, err := getCredentials(ctrl.client, snapshotterSecretRef)
if err != nil {
return nil, err
return nil, snapshotter.SnapshottingNoChange, err
}

driverName, snapshotID, creationTime, size, readyToUse, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
driverName, snapshotID, creationTime, size, readyToUse, state, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
if err != nil {
return nil, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
return nil, state, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
}

klog.V(5).Infof("Created snapshot: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)
Expand All @@ -667,12 +684,12 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
}

if err != nil {
return nil, err
return nil, snapshotter.SnapshottingInBackground, err
}
// Create VolumeSnapshotContent in the database
snapshotRef, err := ref.GetReference(scheme.Scheme, snapshot)
if err != nil {
return nil, err
return nil, snapshotter.SnapshottingInBackground, err
}

timestamp := creationTime.UnixNano()
Expand Down Expand Up @@ -730,9 +747,10 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
strerr := fmt.Sprintf("Error creating volume snapshot content object for snapshot %s: %v.", snapshotKey(snapshot), err)
klog.Error(strerr)
ctrl.eventRecorder.Event(newSnapshot, v1.EventTypeWarning, "CreateSnapshotContentFailed", strerr)
return nil, newControllerUpdateError(snapshotKey(snapshot), err.Error())
return nil, snapshotter.SnapshottingInBackground, newControllerUpdateError(snapshotKey(snapshot), err.Error())
}
return newSnapshot, nil

return newSnapshot, snapshotter.SnapshottingFinished, nil
}

// Delete a snapshot
Expand Down
64 changes: 52 additions & 12 deletions pkg/controller/snapshot_controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"fmt"
"sync"
"time"

crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1beta1"
Expand Down Expand Up @@ -50,6 +51,9 @@ type csiSnapshotController struct {
snapshotQueue workqueue.RateLimitingInterface
contentQueue workqueue.RateLimitingInterface

// Map UID -> *Snapshot with all snapshots in progress in the background.
snapshotsInProgress sync.Map

snapshotLister storagelisters.VolumeSnapshotLister
snapshotListerSynced cache.InformerSynced
contentLister storagelisters.VolumeSnapshotContentLister
Expand All @@ -68,6 +72,8 @@ type csiSnapshotController struct {

createSnapshotContentRetryCount int
createSnapshotContentInterval time.Duration
retryIntervalStart time.Duration
retryIntervalMax time.Duration
resyncPeriod time.Duration
}

Expand All @@ -82,6 +88,8 @@ func NewCSISnapshotController(
pvcInformer coreinformers.PersistentVolumeClaimInformer,
createSnapshotContentRetryCount int,
createSnapshotContentInterval time.Duration,
retryIntervalStart time.Duration,
retryIntervalMax time.Duration,
snapshotter snapshotter.Snapshotter,
timeout time.Duration,
resyncPeriod time.Duration,
Expand All @@ -103,10 +111,12 @@ func NewCSISnapshotController(
runningOperations: goroutinemap.NewGoRoutineMap(true),
createSnapshotContentRetryCount: createSnapshotContentRetryCount,
createSnapshotContentInterval: createSnapshotContentInterval,
retryIntervalStart: retryIntervalStart,
retryIntervalMax: retryIntervalMax,
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(retryIntervalStart, retryIntervalMax), "csi-snapshotter-snapshot"),
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
}

Expand Down Expand Up @@ -215,22 +225,38 @@ func (ctrl *csiSnapshotController) snapshotWorker() {
klog.Errorf("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err)
return false
}
snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
if err == nil {
// The volume snapshot still exists in informer cache, the event must have
// been add/update/sync

// Attempt to get snapshot from the informer
var snapshot *crdv1.VolumeSnapshot
snapshot, err = ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
if err != nil && !errors.IsNotFound(err) {
klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err)
return false
} else if errors.IsNotFound(err) {
// Check snapshotsInProgress for the snapshot if not found from the informer
inProgressObj, ok := ctrl.snapshotsInProgress.Load(key)
if ok {
snapshot, ok = inProgressObj.(*crdv1.VolumeSnapshot)
if !ok {
klog.Errorf("expected vs, got %+v", inProgressObj)
return false
}
}

}

if snapshot != nil {
// If the volume snapshot still exists in informer cache, the event must have
// been add/update/sync. Otherwise, the volume snapshot was still in progress.
newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot)
if err == nil {
klog.V(5).Infof("passed checkAndUpdateSnapshotClass for snapshot %q", key)
ctrl.updateSnapshot(newSnapshot)
}
return false
}
if err != nil && !errors.IsNotFound(err) {
klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err)
return false
}
// The snapshot is not in informer cache, the event must have been "delete"

// The snapshot is not in informer cache or in progress, the event must have been "delete"
vsObj, found, err := ctrl.snapshotStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting snapshot %q from cache: %v", key, err)
Expand All @@ -251,6 +277,10 @@ func (ctrl *csiSnapshotController) snapshotWorker() {
if err == nil {
ctrl.deleteSnapshot(newSnapshot)
}

ctrl.snapshotQueue.Forget(keyObj)
ctrl.snapshotsInProgress.Delete(key)

return false
}

Expand Down Expand Up @@ -377,12 +407,22 @@ func (ctrl *csiSnapshotController) updateSnapshot(snapshot *crdv1.VolumeSnapshot
}
err = ctrl.syncSnapshot(snapshot)
if err != nil {
sKey := snapshotKey(snapshot)

// if the snapshot has been deleted, remove from snapshots in progress
if _, exists, _ := ctrl.snapshotStore.Get(sKey); !exists {
ctrl.snapshotsInProgress.Delete(sKey)
} else {
// otherwise, add back to the snapshot queue to retry.
ctrl.snapshotQueue.AddRateLimited(sKey)
}

if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(snapshot), err)
klog.V(3).Infof("could not sync claim %q: %+v", sKey, err)
} else {
klog.Errorf("could not sync volume %q: %+v", snapshotKey(snapshot), err)
klog.Errorf("could not sync volume %q: %+v", sKey, err)
}
}
}
Expand Down
Loading