diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index 9d7c4ef5a..dd4d62fbb 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -62,6 +62,8 @@ var ( csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.") createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.") + retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed snapshot creation. It doubles with each failure, up to retry-interval-max.") + retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed snapshot creation.") resyncPeriod = flag.Duration("resync-period", 60*time.Second, "Resync interval of the controller.") snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot") snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.") @@ -176,6 +178,8 @@ func main() { coreFactory.Core().V1().PersistentVolumeClaims(), *createSnapshotContentRetryCount, *createSnapshotContentInterval, + *retryIntervalStart, + *retryIntervalMax, snapShotter, *csiTimeout, *resyncPeriod, diff --git a/pkg/controller/csi_handler.go b/pkg/controller/csi_handler.go index 840ab150e..92a11ecb4 100644 --- a/pkg/controller/csi_handler.go +++ b/pkg/controller/csi_handler.go @@ -30,7 +30,7 @@ import ( // Handler is responsible for handling VolumeSnapshot events from informer. type Handler interface { - CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) + CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (bool, time.Time, int64, error) } @@ -58,19 +58,20 @@ func NewCSIHandler( } } -func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) { +func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) { ctx, cancel := context.WithTimeout(context.Background(), handler.timeout) defer cancel() snapshotName, err := makeSnapshotName(handler.snapshotNamePrefix, string(snapshot.UID), handler.snapshotNameUUIDLength) if err != nil { - return "", "", time.Time{}, 0, false, err + return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, err } newParameters, err := removePrefixedParameters(parameters) if err != nil { - return "", "", time.Time{}, 0, false, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err) + return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err) } + return handler.snapshotter.CreateSnapshot(ctx, snapshotName, volume, newParameters, snapshotterCredentials) } diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index b2120843c..13dec40a0 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -35,6 +35,7 @@ import ( snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme" informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions" storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1beta1" + "github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -757,6 +758,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte coreFactory.Core().V1().PersistentVolumeClaims(), 3, 5*time.Millisecond, + 5*time.Millisecond, + 10*time.Second, fakeSnapshot, 5*time.Millisecond, 60*time.Second, @@ -1372,10 +1375,10 @@ type fakeSnapshotter struct { t *testing.T } -func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) { +func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) { if f.createCallCounter >= len(f.createCalls) { f.t.Errorf("Unexpected CSI Create Snapshot call: snapshotName=%s, volume=%v, index: %d, calls: %+v", snapshotName, volume.Name, f.createCallCounter, f.createCalls) - return "", "", time.Time{}, 0, false, fmt.Errorf("unexpected call") + return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("unexpected call") } call := f.createCalls[f.createCallCounter] f.createCallCounter++ @@ -1402,9 +1405,10 @@ func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName strin } if err != nil { - return "", "", time.Time{}, 0, false, fmt.Errorf("unexpected call") + return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("unexpected call") } - return call.driverName, call.snapshotId, call.creationTime, call.size, call.readyToUse, call.err + + return call.driverName, call.snapshotId, call.creationTime, call.size, call.readyToUse, snapshotter.SnapshottingFinished, call.err } func (f *fakeSnapshotter) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) error { diff --git a/pkg/controller/snapshot_controller.go b/pkg/controller/snapshot_controller.go index 88f6ed206..482de1bcd 100644 --- a/pkg/controller/snapshot_controller.go +++ b/pkg/controller/snapshot_controller.go @@ -18,10 +18,8 @@ package controller import ( "fmt" - "strings" - "time" - crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1beta1" + "github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" @@ -34,6 +32,8 @@ import ( "k8s.io/kubernetes/pkg/util/goroutinemap" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/slice" + "strings" + "time" ) // ================================================================== @@ -363,20 +363,37 @@ func (ctrl *csiSnapshotController) storeContentUpdate(content interface{}) (bool // createSnapshot starts new asynchronous operation to create snapshot func (ctrl *csiSnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot) error { - klog.V(5).Infof("createSnapshot[%s]: started", snapshotKey(snapshot)) - opName := fmt.Sprintf("create-%s[%s]", snapshotKey(snapshot), string(snapshot.UID)) + key := snapshotKey(snapshot) + klog.V(5).Infof("createSnapshot[%s]: started", key) + opName := fmt.Sprintf("create-%s[%s]", key, string(snapshot.UID)) ctrl.scheduleOperation(opName, func() error { - snapshotObj, err := ctrl.createSnapshotOperation(snapshot) + snapshotObj, state, err := ctrl.createSnapshotOperation(snapshot) if err != nil { ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err)) klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err) + + // Handle state: + if state == snapshotter.SnapshottingFinished { + // Snapshotting finished, remove obj from snapshotsInProgress. + ctrl.snapshotsInProgress.Delete(key) + } else if state == snapshotter.SnapshottingInBackground { + // Snapshotting still in progress. + klog.V(4).Infof("createSnapshot [%s]: Temporary error received, adding Snapshot back in queue: %v", key, err) + ctrl.snapshotsInProgress.Store(key, snapshotObj) + } else { + // State is SnapshottingNoChange. Don't change snapshotsInProgress. + } + return err } + + // If no errors, update the snapshot. _, updateErr := ctrl.storeSnapshotUpdate(snapshotObj) if updateErr != nil { // We will get an "snapshot update" event soon, this is not a big error - klog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr) + klog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", key, updateErr) } + return nil }) return nil @@ -588,7 +605,7 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn if err != nil { return nil, err } - driverName, snapshotID, creationTime, size, readyToUse, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials) + driverName, snapshotID, creationTime, size, readyToUse, _, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials) if err != nil { klog.Errorf("checkandUpdateBoundSnapshotStatusOperation: failed to call create snapshot to check whether the snapshot is ready to use %q", err) return nil, err @@ -622,12 +639,12 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn // 2. Update VolumeSnapshot status with creationtimestamp information // 3. Create the VolumeSnapshotContent object with the snapshot id information. // 4. Bind the VolumeSnapshot and VolumeSnapshotContent object -func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) { +func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, snapshotter.SnapshottingState, error) { klog.Infof("createSnapshot: Creating snapshot %s through the plugin ...", snapshotKey(snapshot)) if snapshot.Status != nil && snapshot.Status.Error != nil && snapshot.Status.Error.Message != nil && !isControllerUpdateFailError(snapshot.Status.Error) { klog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", *snapshot.Status.Error.Message) - return snapshot, nil + return snapshot, snapshotter.SnapshottingNoChange, nil } // If PVC is not being deleted and finalizer is not added yet, a finalizer should be added. @@ -635,22 +652,22 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum err := ctrl.ensureSnapshotSourceFinalizer(snapshot) if err != nil { klog.Errorf("createSnapshotOperation failed to add finalizer for source of snapshot %s", err) - return nil, err + return nil, snapshotter.SnapshottingNoChange, err } class, volume, contentName, snapshotterSecretRef, err := ctrl.getCreateSnapshotInput(snapshot) if err != nil { - return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err) + return nil, snapshotter.SnapshottingNoChange, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err) } snapshotterCredentials, err := getCredentials(ctrl.client, snapshotterSecretRef) if err != nil { - return nil, err + return nil, snapshotter.SnapshottingNoChange, err } - driverName, snapshotID, creationTime, size, readyToUse, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials) + driverName, snapshotID, creationTime, size, readyToUse, state, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials) if err != nil { - return nil, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err) + return nil, state, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err) } klog.V(5).Infof("Created snapshot: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse) @@ -667,12 +684,12 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum } if err != nil { - return nil, err + return nil, snapshotter.SnapshottingInBackground, err } // Create VolumeSnapshotContent in the database snapshotRef, err := ref.GetReference(scheme.Scheme, snapshot) if err != nil { - return nil, err + return nil, snapshotter.SnapshottingInBackground, err } timestamp := creationTime.UnixNano() @@ -730,9 +747,10 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum strerr := fmt.Sprintf("Error creating volume snapshot content object for snapshot %s: %v.", snapshotKey(snapshot), err) klog.Error(strerr) ctrl.eventRecorder.Event(newSnapshot, v1.EventTypeWarning, "CreateSnapshotContentFailed", strerr) - return nil, newControllerUpdateError(snapshotKey(snapshot), err.Error()) + return nil, snapshotter.SnapshottingInBackground, newControllerUpdateError(snapshotKey(snapshot), err.Error()) } - return newSnapshot, nil + + return newSnapshot, snapshotter.SnapshottingFinished, nil } // Delete a snapshot diff --git a/pkg/controller/snapshot_controller_base.go b/pkg/controller/snapshot_controller_base.go index 7f58554ca..9b796fa38 100644 --- a/pkg/controller/snapshot_controller_base.go +++ b/pkg/controller/snapshot_controller_base.go @@ -18,6 +18,7 @@ package controller import ( "fmt" + "sync" "time" crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1beta1" @@ -50,6 +51,9 @@ type csiSnapshotController struct { snapshotQueue workqueue.RateLimitingInterface contentQueue workqueue.RateLimitingInterface + // Map UID -> *Snapshot with all snapshots in progress in the background. + snapshotsInProgress sync.Map + snapshotLister storagelisters.VolumeSnapshotLister snapshotListerSynced cache.InformerSynced contentLister storagelisters.VolumeSnapshotContentLister @@ -68,6 +72,8 @@ type csiSnapshotController struct { createSnapshotContentRetryCount int createSnapshotContentInterval time.Duration + retryIntervalStart time.Duration + retryIntervalMax time.Duration resyncPeriod time.Duration } @@ -82,6 +88,8 @@ func NewCSISnapshotController( pvcInformer coreinformers.PersistentVolumeClaimInformer, createSnapshotContentRetryCount int, createSnapshotContentInterval time.Duration, + retryIntervalStart time.Duration, + retryIntervalMax time.Duration, snapshotter snapshotter.Snapshotter, timeout time.Duration, resyncPeriod time.Duration, @@ -103,10 +111,12 @@ func NewCSISnapshotController( runningOperations: goroutinemap.NewGoRoutineMap(true), createSnapshotContentRetryCount: createSnapshotContentRetryCount, createSnapshotContentInterval: createSnapshotContentInterval, + retryIntervalStart: retryIntervalStart, + retryIntervalMax: retryIntervalMax, resyncPeriod: resyncPeriod, snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), - snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"), + snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(retryIntervalStart, retryIntervalMax), "csi-snapshotter-snapshot"), contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"), } @@ -215,10 +225,29 @@ func (ctrl *csiSnapshotController) snapshotWorker() { klog.Errorf("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err) return false } - snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name) - if err == nil { - // The volume snapshot still exists in informer cache, the event must have - // been add/update/sync + + // Attempt to get snapshot from the informer + var snapshot *crdv1.VolumeSnapshot + snapshot, err = ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name) + if err != nil && !errors.IsNotFound(err) { + klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err) + return false + } else if errors.IsNotFound(err) { + // Check snapshotsInProgress for the snapshot if not found from the informer + inProgressObj, ok := ctrl.snapshotsInProgress.Load(key) + if ok { + snapshot, ok = inProgressObj.(*crdv1.VolumeSnapshot) + if !ok { + klog.Errorf("expected vs, got %+v", inProgressObj) + return false + } + } + + } + + if snapshot != nil { + // If the volume snapshot still exists in informer cache, the event must have + // been add/update/sync. Otherwise, the volume snapshot was still in progress. newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot) if err == nil { klog.V(5).Infof("passed checkAndUpdateSnapshotClass for snapshot %q", key) @@ -226,11 +255,8 @@ func (ctrl *csiSnapshotController) snapshotWorker() { } return false } - if err != nil && !errors.IsNotFound(err) { - klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err) - return false - } - // The snapshot is not in informer cache, the event must have been "delete" + + // The snapshot is not in informer cache or in progress, the event must have been "delete" vsObj, found, err := ctrl.snapshotStore.GetByKey(key) if err != nil { klog.V(2).Infof("error getting snapshot %q from cache: %v", key, err) @@ -251,6 +277,10 @@ func (ctrl *csiSnapshotController) snapshotWorker() { if err == nil { ctrl.deleteSnapshot(newSnapshot) } + + ctrl.snapshotQueue.Forget(keyObj) + ctrl.snapshotsInProgress.Delete(key) + return false } @@ -377,12 +407,22 @@ func (ctrl *csiSnapshotController) updateSnapshot(snapshot *crdv1.VolumeSnapshot } err = ctrl.syncSnapshot(snapshot) if err != nil { + sKey := snapshotKey(snapshot) + + // if the snapshot has been deleted, remove from snapshots in progress + if _, exists, _ := ctrl.snapshotStore.Get(sKey); !exists { + ctrl.snapshotsInProgress.Delete(sKey) + } else { + // otherwise, add back to the snapshot queue to retry. + ctrl.snapshotQueue.AddRateLimited(sKey) + } + if errors.IsConflict(err) { // Version conflict error happens quite often and the controller // recovers from it easily. - klog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(snapshot), err) + klog.V(3).Infof("could not sync claim %q: %+v", sKey, err) } else { - klog.Errorf("could not sync volume %q: %+v", snapshotKey(snapshot), err) + klog.Errorf("could not sync volume %q: %+v", sKey, err) } } } diff --git a/pkg/snapshotter/snapshotter.go b/pkg/snapshotter/snapshotter.go index 157f08972..af5b321e6 100644 --- a/pkg/snapshotter/snapshotter.go +++ b/pkg/snapshotter/snapshotter.go @@ -26,15 +26,18 @@ import ( csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/api/core/v1" + "k8s.io/klog" ) // Snapshotter implements CreateSnapshot/DeleteSnapshot operations against a remote CSI driver. type Snapshotter interface { // CreateSnapshot creates a snapshot for a volume - CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp time.Time, size int64, readyToUse bool, err error) + CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp time.Time, size int64, readyToUse bool, snapshotterState SnapshottingState, err error) // DeleteSnapshot deletes a snapshot from a volume DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error) @@ -47,23 +50,40 @@ type snapshot struct { conn *grpc.ClientConn } +// SnapshottingState is the state of volume snapshotting. It tells the controller if snapshotting could +// be in progress in the background after a CreateSnapshot() call returns or the snapshotting is +// 100% finished (with or without success) +type SnapshottingState string + +const ( + // SnapshottingInBackground tells the controller that snapshotting may be in progress + // after CreateSnapshot exits. + SnapshottingInBackground SnapshottingState = "Background" + // SnapshottingFinished tells the controller that snapshotting is not running in the background + // and has exited successfully or with errors. + SnapshottingFinished SnapshottingState = "Finished" + // SnapshottingNoChange tells the controller that snapshotting status has not changed since + // CreateSnapshot was called. + SnapshottingNoChange SnapshottingState = "NoChange" +) + func NewSnapshotter(conn *grpc.ClientConn) Snapshotter { return &snapshot{ conn: conn, } } -func (s *snapshot) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) { +func (s *snapshot) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, SnapshottingState, error) { klog.V(5).Infof("CSI CreateSnapshot: %s", snapshotName) if volume.Spec.CSI == nil { - return "", "", time.Time{}, 0, false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") + return "", "", time.Time{}, 0, false, SnapshottingFinished, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") } client := csi.NewControllerClient(s.conn) driverName, err := csirpc.GetDriverName(ctx, s.conn) if err != nil { - return "", "", time.Time{}, 0, false, err + return "", "", time.Time{}, 0, false, SnapshottingFinished, err } req := csi.CreateSnapshotRequest{ @@ -75,15 +95,43 @@ func (s *snapshot) CreateSnapshot(ctx context.Context, snapshotName string, volu rsp, err := client.CreateSnapshot(ctx, &req) if err != nil { - return "", "", time.Time{}, 0, false, err + if isFinalError(err) { + return "", "", time.Time{}, 0, false, SnapshottingFinished, err + } + return "", "", time.Time{}, 0, false, SnapshottingInBackground, err } klog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%d] size [%d] readyToUse [%v]", snapshotName, driverName, rsp.Snapshot.SnapshotId, rsp.Snapshot.CreationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse) creationTime, err := ptypes.Timestamp(rsp.Snapshot.CreationTime) if err != nil { - return "", "", time.Time{}, 0, false, err + return "", "", time.Time{}, 0, false, SnapshottingFinished, err } - return driverName, rsp.Snapshot.SnapshotId, creationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse, nil + return driverName, rsp.Snapshot.SnapshotId, creationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse, SnapshottingFinished, nil +} + +func isFinalError(err error) bool { + // Sources: + // https://github.com/kubernetes-csi/external-provisioner/commit/8203a03c47ce2b86a2a2c2421d74345b76183b14 + // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md + // https://github.com/container-storage-interface/spec/blob/master/spec.md + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The operation must have failed before gRPC + // method was called, otherwise we would get gRPC error. + // We don't know if any previous CreateSnapshot is in progress, be on the safe side. + return false + } + switch st.Code() { + case codes.Canceled, // gRPC: Client Application cancelled the request + codes.DeadlineExceeded, // gRPC: Timeout + codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateSnapshot() may be still in progress. + codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous CreateSnapshot() may be still in progress. + codes.Aborted: // CSI: Operation pending for snapshot + return false + } + // All other errors mean that snapshot creation either did not + // even start or failed. It is for sure not in progress. + return true } func (s *snapshot) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error) { diff --git a/pkg/snapshotter/snapshotter_test.go b/pkg/snapshotter/snapshotter_test.go index b0ff50f10..37217c0b3 100644 --- a/pkg/snapshotter/snapshotter_test.go +++ b/pkg/snapshotter/snapshotter_test.go @@ -143,6 +143,7 @@ func TestCreateSnapshot(t *testing.T) { injectError codes.Code expectError bool expectResult *snapshotResult + expectState SnapshottingState }{ { name: "success", @@ -152,6 +153,7 @@ func TestCreateSnapshot(t *testing.T) { output: defaultResponse, expectError: false, expectResult: result, + expectState: SnapshottingFinished, }, { name: "attributes", @@ -162,6 +164,7 @@ func TestCreateSnapshot(t *testing.T) { output: defaultResponse, expectError: false, expectResult: result, + expectState: SnapshottingFinished, }, { name: "secrets", @@ -172,6 +175,7 @@ func TestCreateSnapshot(t *testing.T) { output: defaultResponse, expectError: false, expectResult: result, + expectState: SnapshottingFinished, }, { name: "fail for volume without csi source", @@ -180,6 +184,7 @@ func TestCreateSnapshot(t *testing.T) { input: nil, output: nil, expectError: true, + expectState: SnapshottingFinished, }, { name: "gRPC transient error", @@ -189,6 +194,7 @@ func TestCreateSnapshot(t *testing.T) { output: nil, injectError: codes.DeadlineExceeded, expectError: true, + expectState: SnapshottingInBackground, }, { name: "gRPC final error", @@ -198,6 +204,7 @@ func TestCreateSnapshot(t *testing.T) { output: nil, injectError: codes.NotFound, expectError: true, + expectState: SnapshottingFinished, }, } @@ -224,7 +231,7 @@ func TestCreateSnapshot(t *testing.T) { } s := NewSnapshotter(csiConn) - driverName, snapshotId, timestamp, size, readyToUse, err := s.CreateSnapshot(context.Background(), test.snapshotName, test.volume, test.parameters, test.secrets) + driverName, snapshotId, timestamp, size, readyToUse, snapState, err := s.CreateSnapshot(context.Background(), test.snapshotName, test.volume, test.parameters, test.secrets) if test.expectError && err == nil { t.Errorf("test %q: Expected error, got none", test.name) } @@ -252,6 +259,9 @@ func TestCreateSnapshot(t *testing.T) { t.Errorf("test %q: expected readyToUse: %v, got: %v", test.name, test.expectResult.readyToUse, readyToUse) } } + if test.expectState != snapState { + t.Errorf("test %q: expected snapState: %v, got: %v", test.name, test.expectState, snapState) + } } }