Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 105 additions & 29 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ import (
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
)

const annSnapshotCompleted = "snapshot.storage.kubernetes.io/completed"

type CSISnapshotController struct {
clientset clientset.Interface
client kubernetes.Interface
Expand Down Expand Up @@ -429,16 +427,18 @@ func (ctrl *CSISnapshotController) syncContent(content *crdv1.VolumeSnapshotCont
func (ctrl *CSISnapshotController) syncSnapshot(snapshot *crdv1.VolumeSnapshot) error {
glog.V(4).Infof("synchonizing VolumeSnapshot[%s]: %s", snapshotKey(snapshot), getSnapshotStatusForLogging(snapshot))

if !metav1.HasAnnotation(snapshot.ObjectMeta, annSnapshotCompleted) {
return ctrl.syncUncompleteSnapshot(snapshot)
if !snapshot.Status.Bound {
return ctrl.syncUnboundSnapshot(snapshot)
} else {
return ctrl.syncCompleteSnapshot(snapshot)
return ctrl.syncBoundSnapshot(snapshot)
}
}

func (ctrl *CSISnapshotController) syncCompleteSnapshot(snapshot *crdv1.VolumeSnapshot) error {
// syncCompleteSnapshot checks the snapshot which has been bound to snapshot content succesfully before.
// If there is any problem with the binding (e.g., snapshot points to a non-exist snapshot content), update the snapshot status and emit event.
func (ctrl *CSISnapshotController) syncBoundSnapshot(snapshot *crdv1.VolumeSnapshot) error {
if snapshot.Spec.SnapshotContentName == "" {
if _, err := ctrl.updateSnapshotStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotLost", "Bound snapshot has lost reference to VolumeSnapshotContent"); err != nil {
if _, err := ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotLost", "Bound snapshot has lost reference to VolumeSnapshotContent"); err != nil {
return err
}
return nil
Expand All @@ -448,7 +448,7 @@ func (ctrl *CSISnapshotController) syncCompleteSnapshot(snapshot *crdv1.VolumeSn
return err
}
if !found {
if _, err = ctrl.updateSnapshotStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotLost", "Bound snapshot has lost reference to VolumeSnapshotContent"); err != nil {
if _, err = ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotLost", "Bound snapshot has lost reference to VolumeSnapshotContent"); err != nil {
return err
}
return nil
Expand All @@ -459,24 +459,26 @@ func (ctrl *CSISnapshotController) syncCompleteSnapshot(snapshot *crdv1.VolumeSn
}

glog.V(4).Infof("syncCompleteSnapshot[%s]: VolumeSnapshotContent %q found", snapshotKey(snapshot), content.Name)
if content.Spec.VolumeSnapshotRef == nil || content.Spec.VolumeSnapshotRef.Name != snapshot.Name ||
content.Spec.VolumeSnapshotRef.UID != snapshot.UID {
if !IsSnapshotBound(snapshot, content) {
// snapshot is bound but content is not bound to snapshot correctly
if _, err = ctrl.updateSnapshotStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotMisbound", "VolumeSnapshotContent is not bound to the VolumeSnapshot correctly"); err != nil {
if _, err = ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotMisbound", "VolumeSnapshotContent is not bound to the VolumeSnapshot correctly"); err != nil {
return err
}
return nil
}
// Snapshot is correctly bound.
if _, err = ctrl.updateSnapshotBoundWithEvent(snapshot, v1.EventTypeNormal, "SnapshotBound", "Snapshot is bound to its VolumeSnapshotContent"); err != nil {
return err
}
return nil
}
}

func (ctrl *CSISnapshotController) syncUncompleteSnapshot(snapshot *crdv1.VolumeSnapshot) error {
func (ctrl *CSISnapshotController) syncUnboundSnapshot(snapshot *crdv1.VolumeSnapshot) error {
uniqueSnapshotName := snapshotKey(snapshot)
glog.V(4).Infof("syncSnapshot %s", uniqueSnapshotName)

// Snsapshot has errors before its completion. Controller will not try to fix it. Nothing to do.
// Snsapshot has errors during its creation. Controller will not try to fix it. Nothing to do.
if snapshot.Status.Error != nil {
return nil
}
Expand All @@ -487,13 +489,20 @@ func (ctrl *CSISnapshotController) syncUncompleteSnapshot(snapshot *crdv1.Volume
return err
}
if !found {
// vs is bound to a non-existing content.
// snapshot is bound to a non-existing content.
return fmt.Errorf("snapshot %s is bound to a non-existing content %s", uniqueSnapshotName, snapshot.Spec.SnapshotContentName)
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
return fmt.Errorf("expected vs, got %+v", contentObj)
return fmt.Errorf("expected volume snapshot content, got %+v", contentObj)
}

if err := ctrl.BindSnapshotContent(snapshot, content); err != nil {
// snapshot is bound but content is not bound to snapshot correctly
return fmt.Errorf("snapshot %s is bound, but VolumeSnapshotContent %s is not bound to the VolumeSnapshot correctly, %v", uniqueSnapshotName, content.Name, err)
}

// snapshot is already bound correctly, check the status and update if it is ready.
glog.V(4).Infof("Check and update snapshot %s status", uniqueSnapshotName)
if err = ctrl.checkandUpdateSnapshotStatus(snapshot, content); err != nil {
return err
Expand Down Expand Up @@ -695,42 +704,109 @@ func (ctrl *CSISnapshotController) checkandUpdateSnapshotStatus(snapshot *crdv1.
// Parameters:
// snapshot - snapshot to update
// eventtype, reason, message - event to send, see EventRecorder.Event()
func (ctrl *CSISnapshotController) updateSnapshotStatusWithEvent(snapshot *crdv1.VolumeSnapshot, eventtype, reason, message string) (*crdv1.VolumeSnapshot, error) {
func (ctrl *CSISnapshotController) updateSnapshotErrorStatusWithEvent(snapshot *crdv1.VolumeSnapshot, eventtype, reason, message string) (*crdv1.VolumeSnapshot, error) {
glog.V(4).Infof("updateSnapshotStatusWithEvent[%s]", snapshotKey(snapshot))
if snapshot.Status.Error != nil {
// Nothing to do.

// Emit the event only when the status change happens
ctrl.eventRecorder.Event(snapshot, eventtype, reason, message)

if snapshot.Status.Error != nil && snapshot.Status.Bound == false {
glog.V(4).Infof("updateClaimStatusWithEvent[%s]: error %v already set", snapshot.Status.Error)
return snapshot, nil
}
statusError := &storage.VolumeError{
Time: metav1.Time{
Time: time.Now(),
},
Message: message,
snapshotClone := snapshot.DeepCopy()
if snapshot.Status.Error == nil {
statusError := &storage.VolumeError{
Time: metav1.Time{
Time: time.Now(),
},
Message: message,
}
snapshotClone.Status.Error = statusError
}
snapshotClone.Status.Bound = false
newSnapshot, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshotClone.Namespace).Update(snapshotClone)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status failed %v", snapshotKey(snapshot), err)
return newSnapshot, err
}

_, err = ctrl.storeSnapshotUpdate(newSnapshot)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status: cannot update internal cache %v", snapshotKey(snapshot), err)
return newSnapshot, err
}
// Emit the event only when the status change happens
ctrl.eventRecorder.Event(snapshot, eventtype, reason, message)

return newSnapshot, nil
}

func (ctrl *CSISnapshotController) updateSnapshotBoundWithEvent(snapshot *crdv1.VolumeSnapshot, eventtype, reason, message string) (*crdv1.VolumeSnapshot, error) {
glog.V(4).Infof("updateSnapshotBoundWithEvent[%s]", snapshotKey(snapshot))
if snapshot.Status.Bound && snapshot.Status.Error == nil {
// Nothing to do.
glog.V(4).Infof("updateSnapshotBoundWithEvent[%s]: Bound %v already set", snapshot.Status.Bound)
return snapshot, nil
}

// Emit the event only when the status change happens
//ctrl.eventRecorder.Event(snapshot, eventtype, reason, message)

snapshotClone := snapshot.DeepCopy()
snapshotClone.Status.Error = statusError
snapshotClone.Status.Bound = true
snapshotClone.Status.Error = nil
newSnapshot, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshotClone.Namespace).Update(snapshotClone)

if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status failed %v", snapshotKey(snapshot), err)
return newSnapshot, err
}
// Emit the event only when the status change happens
ctrl.eventRecorder.Event(snapshot, eventtype, reason, message)

_, err = ctrl.storeSnapshotUpdate(newSnapshot)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status: cannot update internal cache %v", snapshotKey(snapshot), err)
return newSnapshot, err
}
// Emit the event only when the status change happens
ctrl.eventRecorder.Event(newSnapshot, eventtype, reason, message)


return newSnapshot, nil
}


// Stateless functions
func getSnapshotStatusForLogging(snapshot *crdv1.VolumeSnapshot) string {
complete := metav1.HasAnnotation(snapshot.ObjectMeta, annSnapshotCompleted)
return fmt.Sprintf("bound to: %q, Completed: %v", snapshot.Spec.SnapshotContentName, complete)
return fmt.Sprintf("bound to: %q, Completed: %v", snapshot.Spec.SnapshotContentName, snapshot.Status.Bound)
}


func IsSnapshotBound(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) bool {
if content.Spec.VolumeSnapshotRef != nil && content.Spec.VolumeSnapshotRef.Name == snapshot.Name &&
content.Spec.VolumeSnapshotRef.UID == snapshot.UID {
return true
}
return false
}


func (ctrl *CSISnapshotController) BindSnapshotContent(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) error {
if content.Spec.VolumeSnapshotRef == nil || content.Spec.VolumeSnapshotRef.Name != snapshot.Name {
return fmt.Errorf("Could not bind snapshot %s and content %s, the VolumeSnapshotRef does not match", snapshot.Name, content.Name)
} else if content.Spec.VolumeSnapshotRef.UID != "" && content.Spec.VolumeSnapshotRef.UID != snapshot.UID {
return fmt.Errorf("Could not bind snapshot %s and content %s, the VolumeSnapshotRef does not match", snapshot.Name, content.Name)
} else if content.Spec.VolumeSnapshotRef.UID == "" {
content.Spec.VolumeSnapshotRef.UID = snapshot.UID
newContent, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshotContents().Update(content)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshotContent[%s] error status failed %v", content.Name, err)
return err
}
_, err = ctrl.storeContentUpdate(newContent)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshotContent[%s] error status: cannot update internal cache %v", newContent.Name, err)
return err
}
}
return nil
}
55 changes: 11 additions & 44 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type Handler interface {
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (*csi.SnapshotStatus, int64, error)
BindandUpdateVolumeSnapshot(snapshotContent *crdv1.VolumeSnapshotContent, snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error)
GetClassFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshotClass, error)
UpdateVolumeSnapshotStatus(snapshot *crdv1.VolumeSnapshot, status *crdv1.VolumeSnapshotStatus) (*crdv1.VolumeSnapshot, error)
}

// csiHandler is a handler that calls CSI to create/delete volume snapshot.
Expand Down Expand Up @@ -114,40 +113,22 @@ func (handler *csiHandler) GetSnapshotStatus(content *crdv1.VolumeSnapshotConten
if err != nil {
return nil, 0, fmt.Errorf("failed to list snapshot data %s: %q", content.Name, err)
}

return csiSnapshotStatus, timestamp, nil
}

func (handler *csiHandler) CheckandUpdateSnapshotStatusOperation(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) (*crdv1.VolumeSnapshot, error) {
if snapshot.Status.AvailableAt != nil {
return handler.markSnapshotCompleted(snapshot)
}
status, _, err := handler.GetSnapshotStatus(content)
if err != nil {
return nil, fmt.Errorf("failed to check snapshot status %s with error %v", snapshot.Name, err)
}

newSnapshot, err := handler.UpdateSnapshotStatus(snapshot, status, time.Now())
newSnapshot, err := handler.updateSnapshotStatus(snapshot, status, time.Now(), true)
if err != nil {
return nil, err
} else {
if newSnapshot.Status.AvailableAt != nil {
// mark snapshot ready and bound
return handler.markSnapshotCompleted(newSnapshot)
}
}
return newSnapshot, nil
}

func (handler *csiHandler) markSnapshotCompleted(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
metav1.SetMetaDataAnnotation(&snapshot.ObjectMeta, annSnapshotCompleted, "yes")
updateSnapshot, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshot)
if err != nil {
return nil, err
}
return updateSnapshot, nil
}

func makeSnapshotName(prefix, snapshotUID string, snapshotNameUUIDLength int) (string, error) {
// create persistent name based on a volumeNamePrefix and volumeNameUUIDLength
// of PVC's UID
Expand All @@ -166,7 +147,7 @@ func makeSnapshotName(prefix, snapshotUID string, snapshotNameUUIDLength int) (s

// The function goes through the whole snapshot creation process.
// 1. Trigger the snapshot through csi storage provider.
// 2. Update VolumeSnapshot 'status with timestamp information
// 2. Update VolumeSnapshot status with creationtimestamp information
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
func (handler *csiHandler) CreateSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
Expand Down Expand Up @@ -198,7 +179,7 @@ func (handler *csiHandler) CreateSnapshotOperation(snapshot *crdv1.VolumeSnapsho
glog.Infof("Create snapshot driver %s, snapshotId %s, timestamp %d, csiSnapshotStatus %v", driverName, snapshotID, timestamp, csiSnapshotStatus)

// Update snapshot status with timestamp
newSnapshot, err := handler.UpdateSnapshotStatus(snapshot, csiSnapshotStatus, time.Unix(0, timestamp))
newSnapshot, err := handler.updateSnapshotStatus(snapshot, csiSnapshotStatus, time.Unix(0, timestamp), false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -299,42 +280,26 @@ func (handler *csiHandler) BindandUpdateVolumeSnapshot(snapshotContent *crdv1.Vo

// Copy the snapshot object before updating it
snapshotCopy := snapshotObj.DeepCopy()
var updateSnapshot *crdv1.VolumeSnapshot

if snapshotObj.Spec.SnapshotContentName == snapshotContent.Name {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: VolumeSnapshot %s already bind to volumeSnapshotContent [%s]", snapshot.Name, snapshotContent.Name)
} else {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: before bind VolumeSnapshot %s to volumeSnapshotContent [%s]", snapshot.Name, snapshotContent.Name)
snapshotCopy.Spec.SnapshotContentName = snapshotContent.Name
updateSnapshot, err = handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshotCopy)
updateSnapshot, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshotCopy)
if err != nil {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: Error binding VolumeSnapshot %s to volumeSnapshotContent [%s]. Error [%#v]", snapshot.Name, snapshotContent.Name, err)
return nil, fmt.Errorf("error updating snapshot object %s on the API server: %v", snapshotKey(updateSnapshot), err)
}
snapshotCopy = updateSnapshot
}

glog.V(5).Infof("bindandUpdateVolumeSnapshot for snapshot completed [%#v]", snapshotCopy)
return snapshotCopy, nil
}

// UpdateVolumeSnapshotStatus update VolumeSnapshot status.
func (handler *csiHandler) UpdateVolumeSnapshotStatus(snapshot *crdv1.VolumeSnapshot, status *crdv1.VolumeSnapshotStatus) (*crdv1.VolumeSnapshot, error) {
snapshotObj, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Get(snapshot.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get volume snapshot %s from api server: %s", snapshotKey(snapshot), err)
}

snapshotObj.Status = *status
newSnapshotObj, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshotObj)
if err != nil {
return nil, fmt.Errorf("error update status for volume snapshot %s: %s", snapshotKey(snapshot), err)
}

glog.Infof("UpdateVolumeSnapshotStatus finishes %+v", newSnapshotObj)
return newSnapshotObj, nil
}

// UpdateSnapshotStatus converts snapshot status to crdv1.VolumeSnapshotCondition
func (handler *csiHandler) UpdateSnapshotStatus(snapshot *crdv1.VolumeSnapshot, csistatus *csi.SnapshotStatus, timestamp time.Time) (*crdv1.VolumeSnapshot, error) {
func (handler *csiHandler) updateSnapshotStatus(snapshot *crdv1.VolumeSnapshot, csistatus *csi.SnapshotStatus, timestamp time.Time, bound bool) (*crdv1.VolumeSnapshot, error) {
glog.V(4).Infof("updating VolumeSnapshot[]%s, set status %v, timestamp %v", snapshotKey(snapshot), csistatus, timestamp)
status := snapshot.Status
change := false
Expand All @@ -345,12 +310,14 @@ func (handler *csiHandler) UpdateSnapshotStatus(snapshot *crdv1.VolumeSnapshot,
snapshotClone := snapshot.DeepCopy()
switch csistatus.Type {
case csi.SnapshotStatus_READY:
if status.AvailableAt == nil {
status.AvailableAt = timeAt
if bound {
status.Bound = true
handler.eventRecorder.Event(snapshotClone, v1.EventTypeNormal, "BoundSnapshot", fmt.Sprintf("The requested snapshot %s is created and bound to a VolumeSnapshotContent", snapshotKey(snapshotClone)))
change = true
}
if status.CreatedAt == nil {
status.CreatedAt = timeAt
handler.eventRecorder.Event(snapshotClone, v1.EventTypeNormal, "CreateSnapshot", fmt.Sprintf("The requested snapshot %s is created, waiting to bind to a VolumeSnapshotContent", snapshotKey(snapshotClone)))
change = true
}
case csi.SnapshotStatus_ERROR_UPLOADING:
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bo
return true, nil
}

// getSnapshotContentNameForSnapshot returns SnapshotData.Name for the create VolumeSnapshotContent.
// GetSnapshotContentNameForSnapshot returns SnapshotData.Name for the create VolumeSnapshotContent.
// The name must be unique.
func GetSnapshotContentNameForSnapshot(snapshot *crdv1.VolumeSnapshot) string {
return "snapdata-" + string(snapshot.UID)
}
}