Skip to content

Commit

Permalink
Clone from snapshot annotation
Browse files Browse the repository at this point in the history
Add cloneFromSnapshot annotation and functionality
  • Loading branch information
jharrod authored Dec 19, 2024
1 parent 5bdc66a commit 281c33f
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 6 deletions.
1 change: 1 addition & 0 deletions frontend/csi/controller_helpers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
AnnBlockSize = annPrefix + "/blockSize"
AnnFileSystem = annPrefix + "/fileSystem"
AnnCloneFromPVC = annPrefix + "/cloneFromPVC"
AnnCloneFromSnapshot = annPrefix + "/cloneFromSnapshot"
AnnSplitOnClone = annPrefix + "/splitOnClone"
AnnNotManaged = annPrefix + "/notManaged"
AnnImportOriginalName = annPrefix + "/importOriginalName"
Expand Down
137 changes: 131 additions & 6 deletions frontend/csi/controller_helpers/kubernetes/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,28 @@ func (h *helper) GetVolumeConfig(
annotations := processPVCAnnotations(pvc, fsType)
volumeConfig := getVolumeConfig(ctx, pvc, pvName, pvcSize, annotations, sc, requisiteTopology, preferredTopology)

// Get clone annotations
sourceSnapshotName := getAnnotation(annotations, AnnCloneFromSnapshot)
sourcePVCName := getAnnotation(annotations, AnnCloneFromPVC)

// Check if we're cloning a PVC, and if so, do some further validation
if cloneSourcePVName, err := h.getCloneSourceInfo(ctx, pvc); err != nil {
return nil, err
} else if cloneSourcePVName != "" {
volumeConfig.CloneSourceVolume = cloneSourcePVName
if sourcePVCName != "" && sourceSnapshotName == "" {
if cloneSourcePVName, err := h.getCloneSourceInfo(ctx, pvc); err != nil {
return nil, err
} else if cloneSourcePVName != "" {
volumeConfig.CloneSourceVolume = cloneSourcePVName
}
}

// Check if we're cloning from a snapshot, and if so, do some further validation
if sourceSnapshotName != "" {
cloneSourceVolume, cloneSourceSnapshot, err := h.getSnapshotCloneSourceInfo(ctx, pvc)
if err != nil {
return nil, err
} else if cloneSourceVolume != "" && cloneSourceSnapshot != "" {
volumeConfig.CloneSourceVolume = cloneSourceVolume
volumeConfig.CloneSourceSnapshot = cloneSourceSnapshot
}
}

// Check if we're importing a volume and do some further validation
Expand Down Expand Up @@ -276,8 +293,73 @@ func (h *helper) getStorageClass(ctx context.Context, name string) (*k8sstoragev
return sc, nil
}

// getSnapshotCloneSourceInfo accepts the PVC of a volume being provisioned by CSI and inspects it
// for the annotations indicating a snapshot clone operation (of which CSI is unaware).
// The method completes several checks on the source snapshot, and PVC if provided, and returns the
// name of the source PV and snapshot as needed by Trident to clone a volume.
func (h *helper) getSnapshotCloneSourceInfo(
ctx context.Context, clonePVC *v1.PersistentVolumeClaim,
) (string, string, error) {
// Check if this is a snapshot clone operation
annotations := processPVCAnnotations(clonePVC, "")
sourceSnapshotName := getAnnotation(annotations, AnnCloneFromSnapshot)
if sourceSnapshotName == "" {
return "", "", fmt.Errorf("annotation 'cloneFromSnapshot' is empty")
}

// Get the VolumeSnapshot
snapshot, err := h.getVolumeSnapshot(ctx, sourceSnapshotName, clonePVC.Namespace)
if err != nil {
return "", "", err
}

// If the clone from PVC annotation is also set, ensure it matches the snapshot
sourcePVCName := getAnnotation(annotations, AnnCloneFromPVC)
if sourcePVCName != "" {
snapSourcePVC := snapshot.Spec.Source.PersistentVolumeClaimName
if snapSourcePVC == nil {
return "", "", fmt.Errorf("cannot verify clone source PVC for snapshot '%s', "+
"PersistentVolumeClaimName is not set in the snapshot spec", sourceSnapshotName)
}
if sourcePVCName != *snapSourcePVC {
return "", "", fmt.Errorf("clone source snapshot '%s' does not originate from the given source "+
"PVC '%s'", sourceSnapshotName, sourcePVCName)
}
}

// Ensure the VolumeSnapshot is ready to use
if snapshot.Status.ReadyToUse == nil || !*snapshot.Status.ReadyToUse {
return "", "", fmt.Errorf("snapshot '%s' is not ready to use", snapshot.Name)
}

snapshotContent, err := h.getSnapshotContentFromSnapshot(ctx, snapshot)
if err != nil {
Logc(ctx).WithFields(LogFields{
"sourceSnapshotName": sourceSnapshotName,
}).Errorf("Clone source snapshot content not found: %v", err)
return "", "", err
}

// Ensure the VolumeSnapshotContent is ready to use
if snapshotContent.Status.ReadyToUse == nil || !*snapshotContent.Status.ReadyToUse {
return "", "", fmt.Errorf("volumeSnapshotContent '%s' is not ready to use", snapshotContent.Name)
}

if snapshotContent.Status.SnapshotHandle == nil {
return "", "", fmt.Errorf("volumeSnapshotContent '%s' does not have a snapshot handle",
snapshotContent.Name)
}

volumeName, snapshotName, err := storage.ParseSnapshotID(*snapshotContent.Status.SnapshotHandle)
if err != nil {
return "", "", err
}

return volumeName, snapshotName, nil
}

// getCloneSourceInfo accepts the PVC of a volume being provisioned by CSI and inspects it
// for the annotations indicating a clone operation (of which CSI is unaware). If a clone is
// for the annotations indicating a PVC clone operation (of which CSI is unaware). If a clone is
// being created, the method completes several checks on the source PVC/PV and returns the
// name of the source PV as needed by Trident to clone a volume as well as an optional
// snapshot name (also potentially unknown to CSI). Note that these legacy clone annotations
Expand All @@ -287,7 +369,7 @@ func (h *helper) getCloneSourceInfo(ctx context.Context, clonePVC *v1.Persistent
annotations := processPVCAnnotations(clonePVC, "")
sourcePVCName := getAnnotation(annotations, AnnCloneFromPVC)
if sourcePVCName == "" {
return "", nil
return "", fmt.Errorf("annotation 'cloneFromPVC' is empty")
}

// Check that the source PVC is in the same namespace.
Expand Down Expand Up @@ -476,6 +558,49 @@ func (h *helper) getSnapshotContentByName(ctx context.Context, name string) (*vs
return vsc, nil
}

// getVolumeSnapshot returns a VolumeSnapshot if it exists.
func (h *helper) getVolumeSnapshot(
ctx context.Context, name, namespace string,
) (*vsv1.VolumeSnapshot, error) {
fields := LogFields{"snapshotName": name, "namespace": namespace}
Logc(ctx).WithFields(fields).Trace(">>>> getVolumeSnapshot")
defer Logc(ctx).WithFields(fields).Trace("<<<< getVolumeSnapshot")

// Get the VolumeSnapshot
snapshot, err := h.snapClient.SnapshotV1().VolumeSnapshots(namespace).Get(ctx, name, getOpts)
if err != nil {
statusErr, ok := err.(*apierrors.StatusError)
if ok && statusErr.Status().Reason == metav1.StatusReasonNotFound {
return nil, errors.NotFoundError("snapshot %s not found; %v", name, statusErr)
}
return nil, err
}
return snapshot, nil
}

// getSnapshotContentBySnapshotName returns the VolumeSnapshotContent referenced by a VolumeSnapshot
func (h *helper) getSnapshotContentFromSnapshot(
ctx context.Context, snapshot *vsv1.VolumeSnapshot,
) (*vsv1.VolumeSnapshotContent, error) {
fields := LogFields{"snapshotName": snapshot.Name}
Logc(ctx).WithFields(fields).Trace(">>>> getSnapshotContentFromSnapshot")
defer Logc(ctx).WithFields(fields).Trace("<<<< getSnapshotContentFromSnapshot")

// Extract the VolumeSnapshotContent name from the VolumeSnapshot status
snapshotContentName := snapshot.Status.BoundVolumeSnapshotContentName
if snapshotContentName == nil || *snapshotContentName == "" {
return nil, errors.NotFoundError("boundVolumeSnapshotContentName not found for snapshot %s", snapshot.Name)
}

// Get the VolumeSnapshotContent
vsc, err := h.getSnapshotContentByName(ctx, *snapshotContentName)
if err != nil {
return nil, err
}

return vsc, nil
}

// getSnapshotInternalNameFromAnnotation gets the snapshotInternalName from an annotation on a VolumeSnapshotContent
// for snapshot import.
func (h *helper) getSnapshotInternalNameFromAnnotation(
Expand Down

0 comments on commit 281c33f

Please sign in to comment.