Skip to content

Commit

Permalink
Add PVC source support for DataImportCron (#3617)
Browse files Browse the repository at this point in the history
* Add PVC source support for DataImportCron

A PVC from any namespace can now be the source for a DataImportCron. The
source digest is based on the PVC UID, which is polled by the schedule
similarly to image stream, so when a new PVC is detected it will be
imported.

Signed-off-by: Arnon Gilboa <[email protected]>

* Cleanups

Signed-off-by: Arnon Gilboa <[email protected]>

---------

Signed-off-by: Arnon Gilboa <[email protected]>
  • Loading branch information
arnongilboa authored Jan 27, 2025
1 parent 9c197bd commit 41b96ed
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 31 deletions.
20 changes: 20 additions & 0 deletions doc/os-image-poll-and-update.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ Or on CRC:

More information on image streams is available [here](https://docs.openshift.com/container-platform/4.13/openshift_images/image-streams-manage.html) and [here](https://www.tutorialworks.com/openshift-imagestreams).

## PVC Source

A `PVC` from any namespace can also be the source for a `DataImportCron`. The source digest is based on the `PVC` `UID`, which is polled according to the schedule, so when a new `PVC` is detected it will be imported.

```yaml
apiVersion: cdi.kubevirt.io/v1beta1
kind: DataImportCron
metadata:
name: pvc-import-cron
namespace: ns1
spec:
template:
spec:
source:
pvc:
name: my-pvc
namespace: ns2
...
```

## DataImportCron source formats

* PersistentVolumeClaim
Expand Down
6 changes: 3 additions & 3 deletions pkg/apiserver/webhooks/dataimportcron-validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ func (wh *dataImportCronValidatingWebhook) Admit(ar admissionv1.AdmissionReview)

func (wh *dataImportCronValidatingWebhook) validateDataImportCronSpec(request *admissionv1.AdmissionRequest, field *k8sfield.Path, spec *cdiv1.DataImportCronSpec, namespace *string) []metav1.StatusCause {
var causes []metav1.StatusCause

if spec.Template.Spec.Source == nil || spec.Template.Spec.Source.Registry == nil {
source := spec.Template.Spec.Source
if source == nil || (source.Registry == nil && source.PVC == nil) {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeFieldValueInvalid,
Message: "Missing registry source",
Message: "Missing source",
Field: field.Child("Template").String(),
})
return causes
Expand Down
116 changes: 88 additions & 28 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ const (
AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"

dataImportControllerName = "dataimportcron-controller"
digestPrefix = "sha256:"
digestSha256Prefix = "sha256:"
digestUIDPrefix = "uid:"
digestDvNameSuffixLength = 12
cronJobUIDSuffixLength = 8
defaultImportsToKeepPerCron = 3
Expand Down Expand Up @@ -161,7 +162,7 @@ func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron
if dataImportCron.Spec.Schedule == "" {
return nil
}
if isImageStreamSource(dataImportCron) {
if isControllerPolledSource(dataImportCron) {
if dataImportCron.Annotations[AnnNextCronTime] == "" {
cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
}
Expand Down Expand Up @@ -248,16 +249,25 @@ func splitImageStreamName(imageStreamName string) (string, string, error) {
return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
}

func (r *DataImportCronReconciler) pollImageStreamDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
if nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]; nextTimeStr != "" {
nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
if err != nil {
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
if nextTimeStr == "" {
return r.setNextCronTime(dataImportCron)
}
nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
if err != nil {
return reconcile.Result{}, err
}
if nextTime.After(time.Now()) {
return r.setNextCronTime(dataImportCron)
}
if isImageStreamSource(dataImportCron) {
if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
return reconcile.Result{}, err
}
if nextTime.Before(time.Now()) {
if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
return reconcile.Result{}, err
}
} else if isPvcSource(dataImportCron) {
if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
return reconcile.Result{}, err
}
}
return r.setNextCronTime(dataImportCron)
Expand Down Expand Up @@ -287,11 +297,31 @@ func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
}

func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
if !isCronRegistrySource(cron) {
return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
}
return cron.Spec.Template.Spec.Source.Registry, nil
}

func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
source := cron.Spec.Template.Spec.Source
if source == nil || source.Registry == nil {
return nil, errors.Errorf("Cron with no registry source %s", cron.Name)
return source != nil && source.Registry != nil
}

func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
if !isPvcSource(cron) {
return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
}
return source.Registry, nil
return cron.Spec.Template.Spec.Source.PVC, nil
}

func isPvcSource(cron *cdiv1.DataImportCron) bool {
source := cron.Spec.Template.Spec.Source
return source != nil && source.PVC != nil
}

func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
return isImageStreamSource(cron) || isPvcSource(cron)
}

func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
Expand Down Expand Up @@ -424,9 +454,9 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
}

// Skip if schedule is disabled
if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
// We use the poll returned reconcile.Result for RequeueAfter if needed
pollRes, err := r.pollImageStreamDigest(ctx, dataImportCron)
pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
if err != nil {
return pollRes, err
}
Expand Down Expand Up @@ -586,6 +616,29 @@ func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Co
return nil
}

func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
pvcSource, err := getCronPvcSource(dataImportCron)
if err != nil {
return err
}
ns := pvcSource.Namespace
if ns == "" {
ns = dataImportCron.Namespace
}
pvc := &corev1.PersistentVolumeClaim{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
return err
}
digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
log.Info("Updating DataImportCron", "digest", digest)
cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
}
return nil
}

func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
log := r.log.WithName("updateDataSource")
dataSource, err := r.getDataSource(ctx, dataImportCron)
Expand Down Expand Up @@ -1434,16 +1487,18 @@ func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj
}

func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
var digestedURL string
dv := cron.Spec.Template.DeepCopy()
if isURLSource(cron) {
digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
} else if isImageStreamSource(cron) {
// No way to import image stream by name when we want specific digest, so we use its docker reference
digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
dv.Spec.Source.Registry.ImageStream = nil
}
dv.Spec.Source.Registry.URL = &digestedURL
if isCronRegistrySource(cron) {
var digestedURL string
if isURLSource(cron) {
digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
} else if isImageStreamSource(cron) {
// No way to import image stream by name when we want specific digest, so we use its docker reference
digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
dv.Spec.Source.Registry.ImageStream = nil
}
dv.Spec.Source.Registry.URL = &digestedURL
}
dv.Name = dataVolumeName
dv.Namespace = cron.Namespace
r.setDataImportCronResourceLabels(cron, dv)
Expand Down Expand Up @@ -1508,13 +1563,18 @@ func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cd
return dataSource
}

// Create DataVolume name based on the DataSource name + prefix of the digest sha256
// Create DataVolume name based on the DataSource name + prefix of the digest
func createDvName(prefix, digest string) (string, error) {
fromIdx := len(digestPrefix)
toIdx := fromIdx + digestDvNameSuffixLength
if !strings.HasPrefix(digest, digestPrefix) {
digestPrefix := ""
if strings.HasPrefix(digest, digestSha256Prefix) {
digestPrefix = digestSha256Prefix
} else if strings.HasPrefix(digest, digestUIDPrefix) {
digestPrefix = digestUIDPrefix
} else {
return "", errors.Errorf("Digest has no supported prefix")
}
fromIdx := len(digestPrefix)
toIdx := fromIdx + digestDvNameSuffixLength
if len(digest) < toIdx {
return "", errors.Errorf("Digest is too short")
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,33 @@ var _ = Describe("All DataImportCron Tests", func() {
Entry("has no tag", imageStreamName, 1),
)

It("Should fail with non-existing source PVC", func() {
cron := newDataImportCron(cronName)
cron.Spec.Template.Spec.Source = &cdiv1.DataVolumeSource{
PVC: &cdiv1.DataVolumeSourcePVC{
Name: "no-such-pvc",
},
}
reconciler = createDataImportCronReconciler(cron)
_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("not found"))
})

It("Should succeed with existing source PVC", func() {
pvc := newPVC("test-pvc")
cron := newDataImportCron(cronName)
cron.Spec.Template.Spec.Source = &cdiv1.DataVolumeSource{
PVC: &cdiv1.DataVolumeSourcePVC{
Name: pvc.Name,
Namespace: pvc.Namespace,
},
}
reconciler = createDataImportCronReconciler(cron, pvc)
_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())
})

It("Should succeed garbage collecting old version DVs", func() {
cron = newDataImportCron(cronName)
importsToKeep := int32(1)
Expand Down Expand Up @@ -1464,6 +1491,15 @@ func newDataImportCronWithImageStream(dataImportCronName, taggedImageStreamName
return cron
}

func newPVC(name string) *corev1.PersistentVolumeClaim {
return &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
UID: types.UID(metav1.NamespaceDefault + "-" + name),
},
}
}
func newImageStream(name string) *imagev1.ImageStream {
return &imagev1.ImageStream{
TypeMeta: metav1.TypeMeta{APIVersion: imagev1.SchemeGroupVersion.String()},
Expand Down

0 comments on commit 41b96ed

Please sign in to comment.