From a53da5d3806f508d5884c01481e194ce43a8586b Mon Sep 17 00:00:00 2001 From: Weiwei Date: Wed, 11 Jan 2023 14:52:50 +0800 Subject: [PATCH] add e2e test for juicefs (#2505) * add e2e test for juicefs Signed-off-by: zwwhdls * fix eci label in e2e test Signed-off-by: zwwhdls * fix eci label in e2e test Signed-off-by: zwwhdls * fix juicefs e2e test Signed-off-by: zwwhdls * fix job volume name in e2e test Signed-off-by: zwwhdls * fix handle multi sidecar in appcontroller Signed-off-by: zwwhdls Signed-off-by: zwwhdls --- .../v1alpha1/fluidapp/fluidapp_controller.go | 14 +- .../v1alpha1/fluidapp/implement.go | 23 +- .../v1alpha1/fluidapp/implement_test.go | 44 +- pkg/ctrl/watch/pod.go | 9 +- pkg/ctrl/watch/pod_test.go | 38 +- pkg/ddc/juicefs/shutdown.go | 2 +- test/prow/juicefs_access_data.py | 412 ++++++++++++++++++ 7 files changed, 498 insertions(+), 44 deletions(-) create mode 100644 test/prow/juicefs_access_data.py diff --git a/pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go b/pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go index 64c1aead00e..b4d9d6b23e5 100644 --- a/pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go +++ b/pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go @@ -18,10 +18,7 @@ package fluidapp import ( "context" - "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/ctrl/watch" - "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -33,6 +30,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ctrl/watch" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" ) const controllerName string = "FluidAppController" @@ -98,8 +100,8 @@ func (f *FluidAppReconciler) Reconcile(ctx context.Context, request reconcile.Re func (f *FluidAppReconciler) internalReconcile(ctx reconcileRequestContext) (ctrl.Result, error) { pod := ctx.pod - // umount fuse sidecar - err := f.umountFuseSidecar(pod) + // umount fuse sidecars + err := f.umountFuseSidecars(pod) if err != nil { ctx.Log.Error(err, "umount fuse sidecar error", "podName", pod.Name, "podNamespace", pod.Namespace) return utils.RequeueIfError(err) diff --git a/pkg/controllers/v1alpha1/fluidapp/implement.go b/pkg/controllers/v1alpha1/fluidapp/implement.go index e9b15d73d68..08b632b9ab6 100644 --- a/pkg/controllers/v1alpha1/fluidapp/implement.go +++ b/pkg/controllers/v1alpha1/fluidapp/implement.go @@ -17,13 +17,15 @@ package fluidapp import ( - "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "strings" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" - "strings" + + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" ) type FluidAppReconcilerImplement struct { @@ -41,13 +43,18 @@ func NewFluidAppReconcilerImplement(client client.Client, log logr.Logger, recor return r } -func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod) (err error) { - var fuseContainer corev1.Container +func (i *FluidAppReconcilerImplement) umountFuseSidecars(pod *corev1.Pod) (err error) { for _, cn := range pod.Spec.Containers { - if cn.Name == common.FuseContainerName { - fuseContainer = cn + if strings.Contains(cn.Name, common.FuseContainerName) { + if e := i.umountFuseSidecar(pod, cn); e != nil { + return + } } } + return +} + +func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod, fuseContainer corev1.Container) (err error) { if fuseContainer.Name == "" { return } @@ -72,7 +79,7 @@ func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod) (err er } i.Log.Info("exec cmd in pod fuse container", "cmd", cmd, "podName", pod.Name, "namespace", pod.Namespace) - stdout, stderr, err := kubeclient.ExecCommandInContainer(pod.Name, common.FuseContainerName, pod.Namespace, cmd) + stdout, stderr, err := kubeclient.ExecCommandInContainer(pod.Name, fuseContainer.Name, pod.Namespace, cmd) if err != nil { i.Log.Info("exec output", "stdout", stdout, "stderr", stderr) if strings.Contains(stderr, "not mounted") { diff --git a/pkg/controllers/v1alpha1/fluidapp/implement_test.go b/pkg/controllers/v1alpha1/fluidapp/implement_test.go index c7784e5c0bd..c0c225a07f9 100644 --- a/pkg/controllers/v1alpha1/fluidapp/implement_test.go +++ b/pkg/controllers/v1alpha1/fluidapp/implement_test.go @@ -20,17 +20,18 @@ import ( "testing" "github.com/brahma-adshonor/gohook" - "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" ) -func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { +func TestFluidAppReconcilerImplement_umountFuseSidecars(t *testing.T) { mockExec := func(p1, p2, p3 string, p4 []string) (stdout string, stderr string, e error) { return "", "", nil } @@ -78,7 +79,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { pod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: corev1.PodSpec{ - Containers: []corev1.Container{{Name: common.FuseContainerName}}, + Containers: []corev1.Container{{Name: common.FuseContainerName + "-0"}}, }, }, }, @@ -91,7 +92,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", Lifecycle: &corev1.Lifecycle{ PreStop: &corev1.LifecycleHandler{ Exec: &corev1.ExecAction{Command: []string{"umount"}}, @@ -110,7 +111,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", VolumeMounts: []corev1.VolumeMount{{ Name: "juicefs-fuse-mount", MountPath: "/mnt/jfs", @@ -121,13 +122,40 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { }, wantErr: false, }, + { + name: "test-multi-sidecar", + args: args{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: common.FuseContainerName + "-0", + VolumeMounts: []corev1.VolumeMount{{ + Name: "juicefs-fuse-mount", + MountPath: "/mnt/jfs", + }}, + }, + { + Name: common.FuseContainerName + "-1", + VolumeMounts: []corev1.VolumeMount{{ + Name: "juicefs-fuse-mount", + MountPath: "/mnt/jfs", + }}, + }, + }, + }, + }, + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { i := &FluidAppReconcilerImplement{ Log: fake.NullLogger(), } - if err := i.umountFuseSidecar(tt.args.pod); (err != nil) != tt.wantErr { + if err := i.umountFuseSidecars(tt.args.pod); (err != nil) != tt.wantErr { t.Errorf("umountFuseSidecar() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/ctrl/watch/pod.go b/pkg/ctrl/watch/pod.go index e9ac4d7ad97..34b3cb98963 100644 --- a/pkg/ctrl/watch/pod.go +++ b/pkg/ctrl/watch/pod.go @@ -19,6 +19,9 @@ package watch import ( "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" + + "strings" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/event" ) @@ -108,7 +111,7 @@ func ShouldInQueue(pod *corev1.Pod) bool { // ignore if no fuse container exist := false for _, cn := range pod.Spec.Containers { - if cn.Name == common.FuseContainerName { + if strings.Contains(cn.Name, common.FuseContainerName) { exist = true break } @@ -126,7 +129,7 @@ func ShouldInQueue(pod *corev1.Pod) bool { // reconcile if all app containers exit 0 and fuse container not exit for _, containerStatus := range pod.Status.ContainerStatuses { - if containerStatus.Name != common.FuseContainerName { + if !strings.Contains(containerStatus.Name, common.FuseContainerName) { log.V(1).Info("container status", "status", containerStatus) if containerStatus.State.Terminated == nil { log.Info("fluid app not exited", "pod", pod.Name, "container", containerStatus.Name, "namespace", pod.Namespace) @@ -134,7 +137,7 @@ func ShouldInQueue(pod *corev1.Pod) bool { return false } } - if containerStatus.Name == common.FuseContainerName { + if strings.Contains(containerStatus.Name, common.FuseContainerName) { if containerStatus.State.Running == nil { log.Info("fluid fuse not running", "pod", pod.Name, "container", containerStatus.Name, "namespace", pod.Namespace) return false diff --git a/pkg/ctrl/watch/pod_test.go b/pkg/ctrl/watch/pod_test.go index c34d18ff318..eacc90b00c9 100644 --- a/pkg/ctrl/watch/pod_test.go +++ b/pkg/ctrl/watch/pod_test.go @@ -17,13 +17,15 @@ package watch import ( - "github.com/fluid-cloudnative/fluid/pkg/common" + "testing" + "time" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/event" - "testing" - "time" + + "github.com/fluid-cloudnative/fluid/pkg/common" ) func Test_podEventHandler_onDeleteFunc(t *testing.T) { @@ -80,7 +82,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) { common.InjectServerless: common.True, }, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodRunning, ContainerStatuses: []corev1.ContainerStatus{ @@ -93,7 +95,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -110,7 +112,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) { common.InjectServerless: common.True, }, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodRunning, ContainerStatuses: []corev1.ContainerStatus{ @@ -124,7 +126,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -225,7 +227,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{ { Name: "app", @@ -236,7 +238,7 @@ func Test_shouldRequeue(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -256,7 +258,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{ { Name: "app", @@ -268,7 +270,7 @@ func Test_shouldRequeue(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ StartedAt: metav1.Time{Time: time.Now()}, @@ -289,7 +291,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodRunning, ContainerStatuses: []corev1.ContainerStatus{ @@ -303,7 +305,7 @@ func Test_shouldRequeue(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -323,7 +325,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodRunning, ContainerStatuses: []corev1.ContainerStatus{ @@ -342,7 +344,7 @@ func Test_shouldRequeue(t *testing.T) { }}, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -361,7 +363,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{ { Name: "app", @@ -381,7 +383,7 @@ func Test_shouldRequeue(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -400,7 +402,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodPending, ContainerStatuses: []corev1.ContainerStatus{}}, diff --git a/pkg/ddc/juicefs/shutdown.go b/pkg/ddc/juicefs/shutdown.go index 620d8b06839..0428d867602 100644 --- a/pkg/ddc/juicefs/shutdown.go +++ b/pkg/ddc/juicefs/shutdown.go @@ -82,10 +82,10 @@ func (j *JuiceFSEngine) destroyMaster() (err error) { // cleanupCache cleans up the cache func (j *JuiceFSEngine) cleanupCache() (err error) { runtime, err := j.getRuntime() - j.Log.Info("get runtime info", "runtime", runtime) if err != nil { return err } + j.Log.Info("get runtime info", "runtime", runtime) cacheDir := common.JuiceFSDefaultCacheDir if len(runtime.Spec.TieredStore.Levels) != 0 { diff --git a/test/prow/juicefs_access_data.py b/test/prow/juicefs_access_data.py new file mode 100644 index 00000000000..738e69e8a27 --- /dev/null +++ b/test/prow/juicefs_access_data.py @@ -0,0 +1,412 @@ +# Copyright 2022 The Fluid Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TestCase: Pod accesses Juicefs data +DDC Engine: Juicefs(Community) with local redis and minio + +Prerequisite: +1. docker run -d -p 9000:9000 \ + --name minio \ + -e "MINIO_ROOT_USER=minioadmin" \ + -e "MINIO_ROOT_PASSWORD=minioadmin" \ + minio/minio server /data +2. docker run -itd --name redis -p 6379:6379 redis +3. Write down the node IP +4. Apply the following secret +``` +apiVersion: v1 +kind: Secret +metadata: + name: jfs-secret +stringData: + metaurl: redis://:6379/0 + access-key: minioadmin + secret-key: minioadmin +``` + +Steps: +1. create Dataset & Runtime +2. check if dataset is bound +3. check if PVC & PV is created +4. submit data write job +5. wait until data write job completes +6. submit data read job +7. check if data content consistent +8. clean up +""" + +import time + +from kubernetes import client, config + +NODE_IP = "minio" +APP_NAMESPACE = "default" +SECRET_NAME = "jfs-secret" + + +def create_redis_secret(): + api = client.CoreV1Api() + jfs_secret = { + "apiVersion": "v1", + "kind": "Secret", + "metadata": {"name": SECRET_NAME}, + "stringData": {"metaurl": "redis://redis:6379/0", "access-key": "minioadmin", "secret-key": "minioadmin"} + } + + api.create_namespaced_secret(namespace=APP_NAMESPACE, body=jfs_secret) + print("Created secret {}".format(SECRET_NAME)) + + +def create_dataset_and_runtime(dataset_name): + api = client.CustomObjectsApi() + my_dataset = { + "apiVersion": "data.fluid.io/v1alpha1", + "kind": "Dataset", + "metadata": {"name": dataset_name, "namespace": APP_NAMESPACE}, + "spec": { + "mounts": [{ + "mountPoint": "juicefs:///", + "name": "juicefs-community", + "options": {"bucket": "http://%s:9000/minio/test" % NODE_IP, "storage": "minio"}, + "encryptOptions": [ + {"name": "metaurl", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "metaurl"}}}, + {"name": "access-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "access-key"}}}, + {"name": "secret-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "secret-key"}}} + ] + }], + "accessModes": ["ReadWriteMany"] + } + } + + my_juicefsruntime = { + "apiVersion": "data.fluid.io/v1alpha1", + "kind": "JuiceFSRuntime", + "metadata": {"name": dataset_name, "namespace": APP_NAMESPACE}, + "spec": { + "replicas": 1, + "tieredstore": {"levels": [{"mediumtype": "MEM", "path": "/dev/shm", "quota": "40960", "low": "0.1"}]} + } + } + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace="default", + plural="datasets", + body=my_dataset, + ) + print("Create dataset {}".format(dataset_name)) + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace="default", + plural="juicefsruntimes", + body=my_juicefsruntime + ) + print("Create juicefs runtime {}".format(dataset_name)) + + +def check_dataset_bound(dataset_name): + api = client.CustomObjectsApi() + + count = 0 + while count < 300: + resource = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=dataset_name, + namespace=APP_NAMESPACE, + plural="datasets" + ) + + if "status" in resource: + if "phase" in resource["status"]: + if resource["status"]["phase"] == "Bound": + print("Dataset {} is bound.".format(dataset_name)) + return True + time.sleep(1) + count += 1 + print("Dataset {} is not bound within 300s.".format(dataset_name)) + return False + + +def check_volume_resources_ready(dataset_name): + pv_name = "{}-{}".format(APP_NAMESPACE, dataset_name) + pvc_name = dataset_name + count = 0 + while count < 300: + count += 1 + try: + client.CoreV1Api().read_persistent_volume(name=pv_name) + except client.exceptions.ApiException as e: + if e.status == 404: + time.sleep(1) + continue + + try: + client.CoreV1Api().read_namespaced_persistent_volume_claim(name=pvc_name, namespace=APP_NAMESPACE) + except client.exceptions.ApiException as e: + if e.status == 404: + time.sleep(1) + continue + + print("PersistentVolume {} & PersistentVolumeClaim {} Ready.".format(pv_name, pvc_name)) + return True + print("PersistentVolume {} & PersistentVolumeClaim {} not ready within 300s.".format(pv_name, pvc_name)) + return False + + +def create_data_write_job(dataset_name, job_name, use_sidecar=False): + pvc_name = dataset_name + api = client.BatchV1Api() + + container = client.V1Container( + name="demo", + image="debian:buster", + command=["/bin/bash"], + args=["-c", "dd if=/dev/zero of=/data/allzero.file bs=100M count=10 && sha256sum /data/allzero.file"], + volume_mounts=[client.V1VolumeMount(mount_path="/data", name="demo")] + ) + + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={"app": "datawrite"}), + spec=client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[client.V1Volume( + name="demo", + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name) + )] + ) + ) + if use_sidecar: + template.metadata.labels["serverless.fluid.io/inject"] = "true" + + spec = client.V1JobSpec(template=template, backoff_limit=4) + + job = client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name, namespace=APP_NAMESPACE), + spec=spec + ) + + api.create_namespaced_job(namespace=APP_NAMESPACE, body=job) + print("Job {} created.".format(job_name)) + + +def create_data_read_job(dataset_name, job_name, use_sidecar=False): + pvc_name = dataset_name + api = client.BatchV1Api() + + container = client.V1Container( + name="demo", + image="debian:buster", + command=["/bin/bash"], + args=["-c", "time sha256sum /data/allzero.file && rm /data/allzero.file"], + volume_mounts=[client.V1VolumeMount(mount_path="/data", name="demo")] + ) + + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={"app": "dataread"}), + spec=client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[client.V1Volume( + name="demo", + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name) + )] + ) + ) + if use_sidecar: + template.metadata.labels["serverless.fluid.io/inject"] = "true" + + spec = client.V1JobSpec(template=template, backoff_limit=4) + + job = client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name, namespace=APP_NAMESPACE), + spec=spec + ) + + api.create_namespaced_job(namespace=APP_NAMESPACE, body=job) + print("Data Read Job {} created.".format(job_name)) + + +def check_data_job_status(job_name): + api = client.BatchV1Api() + + count = 0 + while count < 300: + count += 1 + response = api.read_namespaced_job_status(name=job_name, namespace=APP_NAMESPACE) + if response.status.succeeded is not None or response.status.failed is not None: + print("Job {} completed.".format(job_name)) + return True + time.sleep(1) + print("Job {} not completed within 300s.".format(job_name)) + return False + + +def clean_job(job_name): + batch_api = client.BatchV1Api() + + # See https://github.com/kubernetes-client/python/issues/234 + body = client.V1DeleteOptions(propagation_policy='Background') + try: + batch_api.delete_namespaced_job(name=job_name, namespace=APP_NAMESPACE, body=body) + except client.exceptions.ApiException as e: + if e.status == 404: + print("job {} deleted".format(job_name)) + return True + + count = 0 + while count < 300: + count += 1 + print("job {} still exists...".format(job_name)) + try: + batch_api.read_namespaced_job(name=job_name, namespace=APP_NAMESPACE) + except client.exceptions.ApiException as e: + if e.status == 404: + print("job {} deleted".format(job_name)) + return True + time.sleep(1) + + print("job {} not deleted within 300s".format(job_name)) + return False + + +def clean_up_dataset_and_runtime(dataset_name): + custom_api = client.CustomObjectsApi() + custom_api.delete_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=dataset_name, + namespace=APP_NAMESPACE, + plural="datasets" + ) + print("Dataset {} deleted".format(dataset_name)) + + count = 0 + while count < 300: + count += 1 + print("JuiceFSRuntime {} still exists...".format(dataset_name)) + try: + custom_api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=dataset_name, + namespace=APP_NAMESPACE, + plural="juicefsruntimes" + ) + except client.exceptions.ApiException as e: + if e.status == 404: + print("JuiceFSRuntime {} is cleaned up".format(dataset_name)) + return True + time.sleep(1) + print("JuiceFSRuntime {} is not cleaned up within 300s".format(dataset_name)) + return False + + +def clean_up_secret(): + core_api = client.CoreV1Api() + core_api.delete_namespaced_secret(name=SECRET_NAME, namespace=APP_NAMESPACE) + print("secret {} is cleaned up".format(SECRET_NAME)) + + +def main(): + config.load_incluster_config() + + # ******************************** + # ------- test normal mode ------- + # ******************************** + dataset_name = "jfsdemo" + test_write_job = "demo-write" + test_read_job = "demo-read" + try: + # 1. create secret + create_redis_secret() + + # 2. create dataset and runtime + create_dataset_and_runtime(dataset_name) + if not check_dataset_bound(dataset_name): + raise Exception("dataset {} in normal mode is not bound.".format(dataset_name)) + if not check_volume_resources_ready(dataset_name): + raise Exception("volume resources of dataset {} in normal mode are not ready.".format(dataset_name)) + + # 3. create write & read data job + create_data_write_job(dataset_name, test_write_job) + if not check_data_job_status(test_write_job): + raise Exception("write job {} in normal mode failed.".format(test_write_job)) + create_data_read_job(dataset_name, test_read_job) + if not check_data_job_status(test_read_job): + raise Exception("read job {} in normal mode failed.".format(test_read_job)) + except Exception as e: + print(e) + exit(-1) + finally: + # 4. clean up write & read data job + clean_job(test_write_job) + clean_job(test_read_job) + + # 5. clean up dataset and runtime + clean_up_dataset_and_runtime(dataset_name) + + # 6. clean up secret + clean_up_secret() + + # ******************************** + # ------- test sidecar mode ------- + # ******************************** + dataset_name = "jfsdemo-sidecar" + test_write_job = "demo-write-sidecar" + test_read_job = "demo-read-sidecar" + try: + # 1. create secret + create_redis_secret() + + # 2. create dataset and runtime + create_dataset_and_runtime(dataset_name) + if not check_dataset_bound(dataset_name): + raise Exception("dataset {} in sidecar mode is not bound.".format(dataset_name)) + if not check_volume_resources_ready(dataset_name): + raise Exception("volume resources of dataset {} in sidecar mode are not ready.".format(dataset_name)) + + # 3. create write & read data job + create_data_write_job(dataset_name, test_write_job, use_sidecar=True) + if not check_data_job_status(test_write_job): + raise Exception("write job {} in sidecar mode failed.".format(test_write_job)) + create_data_read_job(dataset_name, test_read_job, use_sidecar=True) + if not check_data_job_status(test_read_job): + raise Exception("read job {} in sidecar mode failed.".format(test_read_job)) + except Exception as e: + print(e) + exit(-1) + finally: + # 4. clean up write & read data job + clean_job(test_write_job) + clean_job(test_read_job) + + # 5. clean up dataset and runtime + clean_up_dataset_and_runtime(dataset_name) + + # 6. clean up secret + clean_up_secret() + + +if __name__ == '__main__': + main()