diff --git a/frontend/csi/controller_helpers/kubernetes/config.go b/frontend/csi/controller_helpers/kubernetes/config.go index c74af3a2b..737a7526e 100644 --- a/frontend/csi/controller_helpers/kubernetes/config.go +++ b/frontend/csi/controller_helpers/kubernetes/config.go @@ -55,6 +55,8 @@ const ( AnnInternalSnapshotName = annPrefix + "/internalSnapshotName" AnnMirrorRelationship = annPrefix + "/mirrorRelationship" AnnVolumeShareFromPVC = annPrefix + "/shareFromPVC" + AnnVolumeCloneToNS = annPrefix + "/cloneToNamespace" + AnnVolumeCloneFromNS = annPrefix + "/cloneFromNamespace" AnnVolumeShareToNS = annPrefix + "/shareToNamespace" AnnReadOnlyClone = annPrefix + "/readOnlyClone" AnnLUKSEncryption = annPrefix + "/luksEncryption" // import only diff --git a/frontend/csi/controller_helpers/kubernetes/helper.go b/frontend/csi/controller_helpers/kubernetes/helper.go index 0ed9f2e87..deb480f84 100644 --- a/frontend/csi/controller_helpers/kubernetes/helper.go +++ b/frontend/csi/controller_helpers/kubernetes/helper.go @@ -306,13 +306,37 @@ func (h *helper) getSnapshotCloneSourceInfo( if sourceSnapshotName == "" { return "", "", fmt.Errorf("annotation 'cloneFromSnapshot' is empty") } + namespace := clonePVC.Namespace + cloneFromNamespace := getAnnotation(annotations, AnnVolumeCloneFromNS) + if cloneFromNamespace != "" { + // Source snapshot is in a different namespace + namespace = cloneFromNamespace + } // Get the VolumeSnapshot - snapshot, err := h.getVolumeSnapshot(ctx, sourceSnapshotName, clonePVC.Namespace) + snapshot, err := h.getVolumeSnapshot(ctx, sourceSnapshotName, namespace) if err != nil { return "", "", err } + // If cloning from another namespace, ensure the source PVC has the required annotation + if cloneFromNamespace != "" { + snapSourcePVC, err := h.waitForCachedPVCByName(ctx, *snapshot.Spec.Source.PersistentVolumeClaimName, namespace, PreSyncCacheWaitPeriod) + if err != nil { + Logc(ctx).WithFields(LogFields{ + "sourcePVCName": snapSourcePVC.Name, + "namespace": namespace, + }).Errorf("Clone source PVC not found in local cache: %v", err) + return "", "", err + } + sourceAnnotations := processPVCAnnotations(snapSourcePVC, "") + sourceCloneToNamespaces := getAnnotation(sourceAnnotations, AnnVolumeCloneToNS) + // Ensure the source PVC has been explicitly allowed to clone to the subordinate PVC namespace + if !h.matchNamespaceToAnnotation(clonePVC.Namespace, sourceCloneToNamespaces) { + return "", "", fmt.Errorf("cloning to namespace %s is not allowed, it is not listed in cloneToNamespace annotation", clonePVC.Namespace) + } + + } // If the clone from PVC annotation is also set, ensure it matches the snapshot sourcePVCName := getAnnotation(annotations, AnnCloneFromPVC) if sourcePVCName != "" { @@ -371,16 +395,33 @@ func (h *helper) getCloneSourceInfo(ctx context.Context, clonePVC *v1.Persistent if sourcePVCName == "" { return "", fmt.Errorf("annotation 'cloneFromPVC' is empty") } + namespace := clonePVC.Namespace + cloneFromNamespace := getAnnotation(annotations, AnnVolumeCloneFromNS) - // Check that the source PVC is in the same namespace. - // NOTE: For VolumeContentSource this check is performed by CSI - sourcePVC, err := h.waitForCachedPVCByName(ctx, sourcePVCName, clonePVC.Namespace, PreSyncCacheWaitPeriod) + // Determine what namespace to look for the source PVC in + if cloneFromNamespace != "" { + // Source PVC is in a different namespace + namespace = cloneFromNamespace + } + + // Retrieve the source PVC from the cache + sourcePVC, err := h.waitForCachedPVCByName(ctx, sourcePVCName, namespace, PreSyncCacheWaitPeriod) if err != nil { Logc(ctx).WithFields(LogFields{ "sourcePVCName": sourcePVCName, - "namespace": clonePVC.Namespace, + "namespace": namespace, }).Errorf("Clone source PVC not found in local cache: %v", err) - return "", fmt.Errorf("cloning from a PVC requires both PVCs be in the same namespace: %v", err) + return "", fmt.Errorf("source PVC not found in namespace: %v", err) + } + + // If cloning from another namespace, ensure the source PVC has the required annotation + if cloneFromNamespace != "" { + sourceAnnotations := processPVCAnnotations(sourcePVC, "") + sourceCloneToNamespaces := getAnnotation(sourceAnnotations, AnnVolumeCloneToNS) + // Ensure the source PVC has been explicitly allowed to clone to the subordinate PVC namespace + if !h.matchNamespaceToAnnotation(clonePVC.Namespace, sourceCloneToNamespaces) { + return "", fmt.Errorf("cloning to namespace %s is not allowed, it is not listed in cloneToNamespace annotation", clonePVC.Namespace) + } } // Check that both source and clone PVCs have the same storage class @@ -481,10 +522,10 @@ func (h *helper) validateSubordinateVolumeConfig( return nil } -func (h *helper) matchNamespaceToAnnotation(namespace, shareToAnnotation string) bool { - shareToNamespaces := strings.Split(shareToAnnotation, ",") - for _, shareToNamespace := range shareToNamespaces { - if shareToNamespace == namespace || shareToNamespace == "*" { +func (h *helper) matchNamespaceToAnnotation(namespace, annotation string) bool { + availableToNamespaces := strings.Split(annotation, ",") + for _, availableToNamespace := range availableToNamespaces { + if availableToNamespace == namespace || availableToNamespace == "*" { return true } }