diff --git a/charts/fluid/fluid/templates/controller/goosefsruntime_controller.yaml b/charts/fluid/fluid/templates/controller/goosefsruntime_controller.yaml index e03ad2f4c15..00287a8d21e 100644 --- a/charts/fluid/fluid/templates/controller/goosefsruntime_controller.yaml +++ b/charts/fluid/fluid/templates/controller/goosefsruntime_controller.yaml @@ -52,6 +52,7 @@ spec: - --runtime-workers={{ .Values.runtime.goosefs.runtimeWorkers }} - --pprof-addr=:6060 - --enable-leader-election + - --port-allocate-policy={{ .Values.runtime.goosefs.portAllocatePolicy }} env: {{- if .Values.workdir }} - name: FLUID_WORKDIR diff --git a/charts/fluid/fluid/templates/controller/jindoruntime_controller.yaml b/charts/fluid/fluid/templates/controller/jindoruntime_controller.yaml index 702efb8c58d..aed70ca866a 100644 --- a/charts/fluid/fluid/templates/controller/jindoruntime_controller.yaml +++ b/charts/fluid/fluid/templates/controller/jindoruntime_controller.yaml @@ -50,6 +50,7 @@ spec: - --runtime-workers={{ .Values.runtime.jindo.runtimeWorkers }} - --pprof-addr=:6060 - --enable-leader-election + - --port-allocate-policy={{ .Values.runtime.jindo.portAllocatePolicy }} env: {{- if .Values.workdir }} - name: FLUID_WORKDIR diff --git a/charts/fluid/fluid/values.yaml b/charts/fluid/fluid/values.yaml index 3eabf3d6603..eb36f7242b1 100644 --- a/charts/fluid/fluid/values.yaml +++ b/charts/fluid/fluid/values.yaml @@ -34,7 +34,7 @@ runtime: replicas: 1 runtimeWorkers: 3 portRange: 20000-26000 - portAllocatePolicy: bitmap + portAllocatePolicy: random enabled: false init: image: fluidcloudnative/init-users:v0.8.3-336791f @@ -50,6 +50,7 @@ runtime: replicas: 1 runtimeWorkers: 3 portRange: 18000-19999 + portAllocatePolicy: random enabled: false engine: jindofsx queryUfsTotal: true @@ -67,6 +68,7 @@ runtime: replicas: 1 runtimeWorkers: 3 portRange: 26000-32000 + portAllocatePolicy: random enabled: false init: image: fluidcloudnative/init-users:v0.8.3-336791f diff --git a/charts/juicefs/CHANGELOG.md b/charts/juicefs/CHANGELOG.md index c4590535120..a132ca5c441 100644 --- a/charts/juicefs/CHANGELOG.md +++ b/charts/juicefs/CHANGELOG.md @@ -38,3 +38,12 @@ Support configurable tieredstore's volume type 0.2.9 - Add updateStrategy for fuse + +0.2.10 +- Set root user in worker & fuse pod + +0.2.11 +- Support credential key in secret + +0.2.12 +- Set cache dir in volumes & volumeMounts for worker & fuse diff --git a/charts/juicefs/Chart.yaml b/charts/juicefs/Chart.yaml index dbd51fb2b40..11a20831553 100644 --- a/charts/juicefs/Chart.yaml +++ b/charts/juicefs/Chart.yaml @@ -1,7 +1,7 @@ name: juicefs apiVersion: v1 description: FileSystem aimed for data analytics and machine learning in any cloud. -version: 0.2.9 +version: 0.2.12 appVersion: v1.0.0 home: https://juicefs.com/ maintainers: diff --git a/charts/juicefs/templates/fuse/daemonset.yaml b/charts/juicefs/templates/fuse/daemonset.yaml index 41d6fdfd2c5..5377c06d5aa 100644 --- a/charts/juicefs/templates/fuse/daemonset.yaml +++ b/charts/juicefs/templates/fuse/daemonset.yaml @@ -83,28 +83,28 @@ spec: valueFrom: secretKeyRef: name: {{ .Values.configs.metaurlSecret }} - key: metaurl + key: {{ .Values.configs.metaurlSecretKey }} {{- end }} {{- if .Values.configs.accesskeySecret }} - name: ACCESS_KEY valueFrom: secretKeyRef: name: {{ .Values.configs.accesskeySecret }} - key: access-key + key: {{ .Values.configs.accesskeySecretKey }} {{- end }} {{- if .Values.configs.secretkeySecret }} - name: SECRET_KEY valueFrom: secretKeyRef: name: {{ .Values.configs.secretkeySecret }} - key: secret-key + key: {{ .Values.configs.secretkeySecretKey }} {{- end }} {{- if .Values.configs.tokenSecret }} - name: TOKEN valueFrom: secretKeyRef: name: {{ .Values.configs.tokenSecret }} - key: token + key: {{ .Values.configs.tokenSecretKey }} {{- end }} - name: FLUID_RUNTIME_TYPE value: "juicefs" @@ -127,8 +127,9 @@ spec: - containerPort: 9567 name: metrics protocol: TCP - {{- if .Values.fuse.privileged }} securityContext: + runAsUser: 0 + {{- if .Values.fuse.privileged }} privileged: true {{- end }} lifecycle: @@ -136,15 +137,11 @@ spec: exec: command: ["sh", "-c", "umount {{ .Values.fuse.mountPath }}"] volumeMounts: - - name: juicefs-fuse-mount - mountPath: {{ .Values.fuse.hostMountPath }} - mountPropagation: Bidirectional - - mountPath: /root/script - name: script - {{- range $name, $mount := .Values.cacheDirs }} - - name: cache-dir-{{ $name }} - mountPath: "{{ $mount.path }}" - {{- end }} + - name: juicefs-fuse-mount + mountPath: {{ .Values.fuse.hostMountPath }} + mountPropagation: Bidirectional + - mountPath: /root/script + name: script {{- if .Values.fuse.volumeMounts }} {{ toYaml .Values.fuse.volumeMounts | indent 12 }} {{- end }} @@ -154,18 +151,6 @@ spec: hostPath: path: {{ .Values.fuse.hostMountPath }} type: DirectoryOrCreate - {{- range $name, $mount := .Values.cacheDirs }} - {{- if eq $mount.type "hostPath" }} - - hostPath: - path: "{{ $mount.path }}" - type: DirectoryOrCreate - name: cache-dir-{{ $name }} - {{- else if eq $mount.type "emptyDir" }} - - emptyDir: {} - name: cache-dir-{{ $name }} - {{- /* todo: support volume template */}} - {{- end }} - {{- end }} - name: script configMap: name: {{ template "juicefs.fullname" . }}-fuse-script diff --git a/charts/juicefs/templates/worker/statefuleset.yaml b/charts/juicefs/templates/worker/statefuleset.yaml index 0e0e2374eec..1d9f5d5fb2c 100644 --- a/charts/juicefs/templates/worker/statefuleset.yaml +++ b/charts/juicefs/templates/worker/statefuleset.yaml @@ -76,8 +76,9 @@ spec: {{ toYaml .Values.worker.resources | trim | indent 12 }} {{- end }} command: ["sh", "/root/script/script.sh"] - {{- if .Values.worker.privileged }} securityContext: + runAsUser: 0 + {{- if .Values.worker.privileged }} privileged: true {{- end }} {{- if .Values.worker.ports }} @@ -95,28 +96,28 @@ spec: valueFrom: secretKeyRef: name: {{ .Values.configs.metaurlSecret }} - key: metaurl + key: {{ .Values.configs.metaurlSecretKey }} {{- end }} {{- if .Values.configs.accesskeySecret }} - name: ACCESS_KEY valueFrom: secretKeyRef: name: {{ .Values.configs.accesskeySecret }} - key: access-key + key: {{ .Values.configs.accesskeySecretKey }} {{- end }} {{- if .Values.configs.secretkeySecret }} - name: SECRET_KEY valueFrom: secretKeyRef: name: {{ .Values.configs.secretkeySecret }} - key: secret-key + key: {{ .Values.configs.secretkeySecretKey }} {{- end }} {{- if .Values.configs.tokenSecret }} - name: TOKEN valueFrom: secretKeyRef: name: {{ .Values.configs.tokenSecret }} - key: token + key: {{ .Values.configs.tokenSecretKey }} {{- end }} lifecycle: preStop: @@ -125,27 +126,11 @@ spec: volumeMounts: - mountPath: /root/script name: script - {{- range $name, $mount := .Values.cacheDirs }} - - name: cache-dir-{{ $name }} - mountPath: "{{ $mount.path }}" - {{- end }} {{- if .Values.worker.volumeMounts }} {{ toYaml .Values.worker.volumeMounts | indent 12 }} {{- end }} restartPolicy: Always volumes: - {{- range $name, $mount := .Values.cacheDirs }} - {{- if eq $mount.type "hostPath" }} - - hostPath: - path: "{{ $mount.path }}" - type: DirectoryOrCreate - name: cache-dir-{{ $name }} - {{- else if eq $mount.type "emptyDir" }} - - emptyDir: {} - name: cache-dir-{{ $name }} - {{- /* todo: support volume template */}} - {{- end }} - {{- end }} - name: script configMap: name: {{ template "juicefs.fullname" . }}-worker-script diff --git a/charts/juicefs/values.yaml b/charts/juicefs/values.yaml index 0315f616f48..d5328d91706 100644 --- a/charts/juicefs/values.yaml +++ b/charts/juicefs/values.yaml @@ -60,11 +60,15 @@ worker: configs: name: "" accesskeySecret: "" + accesskeySecretKey: "" secretkeySecret: "" + secretkeySecretKey: "" bucket: "" metaurlSecret: "" + metaurlSecretKey: "" storage: "" tokenSecret: "" + tokenSecretKey: "" formatCmd : "" ## FUSE ## diff --git a/cmd/alluxio/app/alluxio.go b/cmd/alluxio/app/alluxio.go index 595719f787b..475b6123401 100644 --- a/cmd/alluxio/app/alluxio.go +++ b/cmd/alluxio/app/alluxio.go @@ -74,7 +74,7 @@ func init() { alluxioCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "20000-25000", "Set available port range for Alluxio") alluxioCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for AlluxioRuntime controller") alluxioCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results") - alluxioCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "bitmap", "Set port allocating policy, available choice is bitmap or random(default bitmap).") + alluxioCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).") } func handle() { @@ -128,7 +128,12 @@ func handle() { } setupLog.Info("port range parsed", "port range", pr.String()) - portallocator.SetupRuntimePortAllocatorWithType(mgr.GetClient(), pr, portallocator.AllocatePolicy(portAllocatePolicy), alluxio.GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, alluxio.GetReservedPorts) + if err != nil { + setupLog.Error(err, "failed to setup runtime port allocator") + os.Exit(1) + } + setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy) setupLog.Info("starting alluxioruntime-controller") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/cmd/goosefs/app/goosefs.go b/cmd/goosefs/app/goosefs.go index 2f1e2ede793..e943dbffde6 100644 --- a/cmd/goosefs/app/goosefs.go +++ b/cmd/goosefs/app/goosefs.go @@ -49,6 +49,7 @@ var ( portRange string maxConcurrentReconciles int pprofAddr string + portAllocatePolicy string ) var cmd = &cobra.Command{ @@ -73,6 +74,7 @@ func init() { startCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.") startCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.") startCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "20000-25000", "Set available port range for GooseFS") + startCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).") startCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results") startCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for GooseFSRuntime controller") cmd.AddCommand(startCmd) @@ -129,7 +131,12 @@ func handle() { } setupLog.Info("port range parsed", "port range", pr.String()) - portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, goosefs.GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, goosefs.GetReservedPorts) + if err != nil { + setupLog.Error(err, "failed to setup runtime port allocator") + os.Exit(1) + } + setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy) setupLog.Info("starting goosefsruntime-controller") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/cmd/jindo/app/jindo.go b/cmd/jindo/app/jindo.go index c9c308e4177..25a2aa765d4 100644 --- a/cmd/jindo/app/jindo.go +++ b/cmd/jindo/app/jindo.go @@ -52,9 +52,10 @@ var ( development bool // The new mode eventDriven bool - portRange string maxConcurrentReconciles int pprofAddr string + portRange string + portAllocatePolicy string ) var jindoCmd = &cobra.Command{ @@ -74,6 +75,7 @@ func init() { jindoCmd.Flags().StringVarP(&leaderElectionNamespace, "leader-election-namespace", "", "fluid-system", "The namespace in which the leader election resource will be created.") jindoCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.") jindoCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "18000-19999", "Set available port range for Jindo") + jindoCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).") jindoCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for JindoRuntime controller") jindoCmd.Flags().BoolVar(&eventDriven, "event-driven", true, "The reconciler's loop strategy. if it's false, it indicates period driven.") jindoCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results") @@ -130,7 +132,12 @@ func handle() { } setupLog.Info("port range parsed", "port range", pr.String()) - portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, jindo.GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, jindo.GetReservedPorts) + if err != nil { + setupLog.Error(err, "failed to setup runtime port allocator") + os.Exit(1) + } + setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy) setupLog.Info("starting jindoruntime-controller") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go b/pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go index 64c1aead00e..b4d9d6b23e5 100644 --- a/pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go +++ b/pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go @@ -18,10 +18,7 @@ package fluidapp import ( "context" - "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/ctrl/watch" - "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -33,6 +30,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ctrl/watch" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" ) const controllerName string = "FluidAppController" @@ -98,8 +100,8 @@ func (f *FluidAppReconciler) Reconcile(ctx context.Context, request reconcile.Re func (f *FluidAppReconciler) internalReconcile(ctx reconcileRequestContext) (ctrl.Result, error) { pod := ctx.pod - // umount fuse sidecar - err := f.umountFuseSidecar(pod) + // umount fuse sidecars + err := f.umountFuseSidecars(pod) if err != nil { ctx.Log.Error(err, "umount fuse sidecar error", "podName", pod.Name, "podNamespace", pod.Namespace) return utils.RequeueIfError(err) diff --git a/pkg/controllers/v1alpha1/fluidapp/implement.go b/pkg/controllers/v1alpha1/fluidapp/implement.go index e9b15d73d68..08b632b9ab6 100644 --- a/pkg/controllers/v1alpha1/fluidapp/implement.go +++ b/pkg/controllers/v1alpha1/fluidapp/implement.go @@ -17,13 +17,15 @@ package fluidapp import ( - "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "strings" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" - "strings" + + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" ) type FluidAppReconcilerImplement struct { @@ -41,13 +43,18 @@ func NewFluidAppReconcilerImplement(client client.Client, log logr.Logger, recor return r } -func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod) (err error) { - var fuseContainer corev1.Container +func (i *FluidAppReconcilerImplement) umountFuseSidecars(pod *corev1.Pod) (err error) { for _, cn := range pod.Spec.Containers { - if cn.Name == common.FuseContainerName { - fuseContainer = cn + if strings.Contains(cn.Name, common.FuseContainerName) { + if e := i.umountFuseSidecar(pod, cn); e != nil { + return + } } } + return +} + +func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod, fuseContainer corev1.Container) (err error) { if fuseContainer.Name == "" { return } @@ -72,7 +79,7 @@ func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod) (err er } i.Log.Info("exec cmd in pod fuse container", "cmd", cmd, "podName", pod.Name, "namespace", pod.Namespace) - stdout, stderr, err := kubeclient.ExecCommandInContainer(pod.Name, common.FuseContainerName, pod.Namespace, cmd) + stdout, stderr, err := kubeclient.ExecCommandInContainer(pod.Name, fuseContainer.Name, pod.Namespace, cmd) if err != nil { i.Log.Info("exec output", "stdout", stdout, "stderr", stderr) if strings.Contains(stderr, "not mounted") { diff --git a/pkg/controllers/v1alpha1/fluidapp/implement_test.go b/pkg/controllers/v1alpha1/fluidapp/implement_test.go index c7784e5c0bd..c0c225a07f9 100644 --- a/pkg/controllers/v1alpha1/fluidapp/implement_test.go +++ b/pkg/controllers/v1alpha1/fluidapp/implement_test.go @@ -20,17 +20,18 @@ import ( "testing" "github.com/brahma-adshonor/gohook" - "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" ) -func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { +func TestFluidAppReconcilerImplement_umountFuseSidecars(t *testing.T) { mockExec := func(p1, p2, p3 string, p4 []string) (stdout string, stderr string, e error) { return "", "", nil } @@ -78,7 +79,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { pod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: corev1.PodSpec{ - Containers: []corev1.Container{{Name: common.FuseContainerName}}, + Containers: []corev1.Container{{Name: common.FuseContainerName + "-0"}}, }, }, }, @@ -91,7 +92,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", Lifecycle: &corev1.Lifecycle{ PreStop: &corev1.LifecycleHandler{ Exec: &corev1.ExecAction{Command: []string{"umount"}}, @@ -110,7 +111,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", VolumeMounts: []corev1.VolumeMount{{ Name: "juicefs-fuse-mount", MountPath: "/mnt/jfs", @@ -121,13 +122,40 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) { }, wantErr: false, }, + { + name: "test-multi-sidecar", + args: args{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: common.FuseContainerName + "-0", + VolumeMounts: []corev1.VolumeMount{{ + Name: "juicefs-fuse-mount", + MountPath: "/mnt/jfs", + }}, + }, + { + Name: common.FuseContainerName + "-1", + VolumeMounts: []corev1.VolumeMount{{ + Name: "juicefs-fuse-mount", + MountPath: "/mnt/jfs", + }}, + }, + }, + }, + }, + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { i := &FluidAppReconcilerImplement{ Log: fake.NullLogger(), } - if err := i.umountFuseSidecar(tt.args.pod); (err != nil) != tt.wantErr { + if err := i.umountFuseSidecars(tt.args.pod); (err != nil) != tt.wantErr { t.Errorf("umountFuseSidecar() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/ctrl/watch/pod.go b/pkg/ctrl/watch/pod.go index e9ac4d7ad97..34b3cb98963 100644 --- a/pkg/ctrl/watch/pod.go +++ b/pkg/ctrl/watch/pod.go @@ -19,6 +19,9 @@ package watch import ( "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" + + "strings" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/event" ) @@ -108,7 +111,7 @@ func ShouldInQueue(pod *corev1.Pod) bool { // ignore if no fuse container exist := false for _, cn := range pod.Spec.Containers { - if cn.Name == common.FuseContainerName { + if strings.Contains(cn.Name, common.FuseContainerName) { exist = true break } @@ -126,7 +129,7 @@ func ShouldInQueue(pod *corev1.Pod) bool { // reconcile if all app containers exit 0 and fuse container not exit for _, containerStatus := range pod.Status.ContainerStatuses { - if containerStatus.Name != common.FuseContainerName { + if !strings.Contains(containerStatus.Name, common.FuseContainerName) { log.V(1).Info("container status", "status", containerStatus) if containerStatus.State.Terminated == nil { log.Info("fluid app not exited", "pod", pod.Name, "container", containerStatus.Name, "namespace", pod.Namespace) @@ -134,7 +137,7 @@ func ShouldInQueue(pod *corev1.Pod) bool { return false } } - if containerStatus.Name == common.FuseContainerName { + if strings.Contains(containerStatus.Name, common.FuseContainerName) { if containerStatus.State.Running == nil { log.Info("fluid fuse not running", "pod", pod.Name, "container", containerStatus.Name, "namespace", pod.Namespace) return false diff --git a/pkg/ctrl/watch/pod_test.go b/pkg/ctrl/watch/pod_test.go index c34d18ff318..eacc90b00c9 100644 --- a/pkg/ctrl/watch/pod_test.go +++ b/pkg/ctrl/watch/pod_test.go @@ -17,13 +17,15 @@ package watch import ( - "github.com/fluid-cloudnative/fluid/pkg/common" + "testing" + "time" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/event" - "testing" - "time" + + "github.com/fluid-cloudnative/fluid/pkg/common" ) func Test_podEventHandler_onDeleteFunc(t *testing.T) { @@ -80,7 +82,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) { common.InjectServerless: common.True, }, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodRunning, ContainerStatuses: []corev1.ContainerStatus{ @@ -93,7 +95,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -110,7 +112,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) { common.InjectServerless: common.True, }, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodRunning, ContainerStatuses: []corev1.ContainerStatus{ @@ -124,7 +126,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -225,7 +227,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{ { Name: "app", @@ -236,7 +238,7 @@ func Test_shouldRequeue(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -256,7 +258,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{ { Name: "app", @@ -268,7 +270,7 @@ func Test_shouldRequeue(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ StartedAt: metav1.Time{Time: time.Now()}, @@ -289,7 +291,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodRunning, ContainerStatuses: []corev1.ContainerStatus{ @@ -303,7 +305,7 @@ func Test_shouldRequeue(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -323,7 +325,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodRunning, ContainerStatuses: []corev1.ContainerStatus{ @@ -342,7 +344,7 @@ func Test_shouldRequeue(t *testing.T) { }}, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -361,7 +363,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{ { Name: "app", @@ -381,7 +383,7 @@ func Test_shouldRequeue(t *testing.T) { }, }, { - Name: common.FuseContainerName, + Name: common.FuseContainerName + "-0", State: corev1.ContainerState{ Running: &corev1.ContainerStateRunning{ StartedAt: metav1.Time{Time: time.Now()}, @@ -400,7 +402,7 @@ func Test_shouldRequeue(t *testing.T) { Name: "test", Labels: map[string]string{common.InjectServerless: common.True}, }, - Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}}, Status: corev1.PodStatus{ Phase: corev1.PodPending, ContainerStatuses: []corev1.ContainerStatus{}}, diff --git a/pkg/ddc/alluxio/master_internal_test.go b/pkg/ddc/alluxio/master_internal_test.go index d0cb5e0d7fd..517b69870fd 100644 --- a/pkg/ddc/alluxio/master_internal_test.go +++ b/pkg/ddc/alluxio/master_internal_test.go @@ -112,8 +112,11 @@ func TestSetupMasterInternal(t *testing.T) { }, }, } - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts) - err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) if err != nil { t.Fatal(err.Error()) } @@ -238,8 +241,11 @@ func TestGenerateAlluxioValueFile(t *testing.T) { }, } - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts) - err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/ddc/alluxio/shutdown_engine_test.go b/pkg/ddc/alluxio/shutdown_engine_test.go index 956dc883c9e..833cd1073ca 100644 --- a/pkg/ddc/alluxio/shutdown_engine_test.go +++ b/pkg/ddc/alluxio/shutdown_engine_test.go @@ -127,7 +127,10 @@ func TestShutdown(t *testing.T) { pr := net.ParsePortRangeOrDie("20000-21000") - portallocator.SetupRuntimePortAllocator(nil, pr, dummy) + err = portallocator.SetupRuntimePortAllocator(nil, pr, "bitmap", dummy) + if err != nil { + t.Fatalf("failed to set up runtime port allocator due to %v", err) + } var testCase = []struct { expectedWorkers int32 diff --git a/pkg/ddc/alluxio/shutdown_test.go b/pkg/ddc/alluxio/shutdown_test.go index c48caffeb81..665ee637107 100644 --- a/pkg/ddc/alluxio/shutdown_test.go +++ b/pkg/ddc/alluxio/shutdown_test.go @@ -335,7 +335,10 @@ func TestAlluxioEngineReleasePorts(t *testing.T) { Log: fake.NullLogger(), } - portallocator.SetupRuntimePortAllocator(client, pr, GetReservedPorts) + err := portallocator.SetupRuntimePortAllocator(client, pr, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } allocator, _ := portallocator.GetRuntimePortAllocator() patch1 := ApplyMethod(reflect.TypeOf(allocator), "ReleaseReservedPorts", func(_ *portallocator.RuntimePortAllocator, ports []int) { diff --git a/pkg/ddc/alluxio/transform_test.go b/pkg/ddc/alluxio/transform_test.go index d2fa6e6ab4e..38375a255a5 100644 --- a/pkg/ddc/alluxio/transform_test.go +++ b/pkg/ddc/alluxio/transform_test.go @@ -1,11 +1,8 @@ /* - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,11 +13,12 @@ limitations under the License. package alluxio import ( + "reflect" + "testing" + "github.com/agiledragon/gomonkey/v2" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" - "reflect" - "testing" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -176,6 +174,7 @@ func TestTransformWorkers(t *testing.T) { Worker: datav1alpha1.AlluxioCompTemplateSpec{ NetworkMode: datav1alpha1.HostNetworkMode, }, + TieredStore: datav1alpha1.TieredStore{}, }, }, wantValue: &Alluxio{ @@ -184,11 +183,33 @@ func TestTransformWorkers(t *testing.T) { }, }, }, + "test network mode case 4": { + runtime: &datav1alpha1.AlluxioRuntime{ + Spec: datav1alpha1.AlluxioRuntimeSpec{ + Worker: datav1alpha1.AlluxioCompTemplateSpec{ + NetworkMode: datav1alpha1.HostNetworkMode, + NodeSelector: map[string]string{ + "workerSelector": "true", + }, + }, + TieredStore: datav1alpha1.TieredStore{}, + }, + }, + wantValue: &Alluxio{ + Worker: Worker{ + HostNetwork: true, + NodeSelector: map[string]string{ + "workerSelector": "true", + }, + }, + }, + }, } engine := &AlluxioEngine{Log: fake.NullLogger()} for k, v := range testCases { gotValue := &Alluxio{} + engine.runtimeInfo, _ = base.BuildRuntimeInfo("test", "test", "alluxio", v.runtime.Spec.TieredStore) if err := engine.transformWorkers(v.runtime, gotValue); err == nil { if gotValue.Worker.HostNetwork != v.wantValue.Worker.HostNetwork { t.Errorf("check %s failure, got:%t,want:%t", @@ -197,6 +218,15 @@ func TestTransformWorkers(t *testing.T) { v.wantValue.Worker.HostNetwork, ) } + if len(v.wantValue.Worker.NodeSelector) > 0 { + if !reflect.DeepEqual(v.wantValue.Worker.NodeSelector, gotValue.Worker.NodeSelector) { + t.Errorf("check %s failure, got:%v,want:%v", + k, + gotValue.Worker.NodeSelector, + v.wantValue.Worker.NodeSelector, + ) + } + } } } } diff --git a/pkg/ddc/base/portallocator/port_allocator.go b/pkg/ddc/base/portallocator/port_allocator.go index 54519dc6dd0..dcb297ebc07 100644 --- a/pkg/ddc/base/portallocator/port_allocator.go +++ b/pkg/ddc/base/portallocator/port_allocator.go @@ -17,6 +17,8 @@ limitations under the License. package portallocator import ( + "fmt" + "github.com/go-logr/logr" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/net" @@ -31,6 +33,15 @@ const ( BitMap AllocatePolicy = "bitmap" ) +func ValidateEnum(allocatePolicyStr string) (AllocatePolicy, error) { + switch AllocatePolicy(allocatePolicyStr) { + case Random, BitMap: + return AllocatePolicy(allocatePolicyStr), nil + default: + return AllocatePolicy(allocatePolicyStr), fmt.Errorf("runtime-port-allocator can only be random or bitmap") + } +} + type BatchAllocatorInterface interface { Allocate(int) error @@ -60,9 +71,15 @@ type RuntimePortAllocator struct { // rpa is a global singleton of type RuntimePortAllocator var rpa *RuntimePortAllocator -// SetupRuntimePortAllocator instantiates the global singleton rpa, use BitMap port allocating policy -func SetupRuntimePortAllocator(client client.Client, pr *net.PortRange, getReservedPorts func(client client.Client) (ports []int, err error)) { - SetupRuntimePortAllocatorWithType(client, pr, BitMap, getReservedPorts) +// SetupRuntimePortAllocator instantiates the global singleton rpa, set up port allocating policy according to the given allocatePolicyStr. +// Currently the valid policies are either "random" or "bitmap". +func SetupRuntimePortAllocator(client client.Client, pr *net.PortRange, allocatePolicyStr string, getReservedPorts func(client client.Client) (ports []int, err error)) error { + policy, err := ValidateEnum(allocatePolicyStr) + if err != nil { + return err + } + SetupRuntimePortAllocatorWithType(client, pr, policy, getReservedPorts) + return nil } // SetupRuntimePortAllocatorWithType instantiates the global singleton rpa with specified port allocating policy @@ -90,7 +107,7 @@ func (alloc *RuntimePortAllocator) createAndRestorePortAllocator() (err error) { case BitMap: alloc.pa, err = newBitMapAllocator(alloc.pr, alloc.log) default: - err = errors.New("allocate-port-policy can only be random or bitmap") + err = errors.New("runtime-port-allocator can only be random or bitmap") } if err != nil { diff --git a/pkg/ddc/base/portallocator/port_allocator_test.go b/pkg/ddc/base/portallocator/port_allocator_test.go index 974675a63aa..6b07cf76757 100644 --- a/pkg/ddc/base/portallocator/port_allocator_test.go +++ b/pkg/ddc/base/portallocator/port_allocator_test.go @@ -33,9 +33,12 @@ var errDummy = func(client client.Client) (ports []int, err error) { func TestRuntimePortAllocatorWithError(t *testing.T) { pr := net.ParsePortRangeOrDie("20000-21000") - SetupRuntimePortAllocator(nil, pr, errDummy) + err := SetupRuntimePortAllocator(nil, pr, "bitmap", errDummy) + if err != nil { + t.Fatalf("failed to setup runtime port allocator due to %v", err) + } - _, err := GetRuntimePortAllocator() + _, err = GetRuntimePortAllocator() if err == nil { t.Errorf("Expecetd error when GetRuntimePortAllocator") } @@ -43,7 +46,11 @@ func TestRuntimePortAllocatorWithError(t *testing.T) { func TestRuntimePortAllocator(t *testing.T) { pr := net.ParsePortRangeOrDie("20000-21000") - SetupRuntimePortAllocator(nil, pr, dummy) + err := SetupRuntimePortAllocator(nil, pr, "bitmap", dummy) + if err != nil { + t.Errorf("get non-nil err when GetRuntimePortAllocator") + return + } allocator, err := GetRuntimePortAllocator() if err != nil { @@ -71,7 +78,11 @@ func TestRuntimePortAllocator(t *testing.T) { func TestRuntimePortAllocatorRelease(t *testing.T) { pr := net.ParsePortRangeOrDie("20000-20010") - SetupRuntimePortAllocator(nil, pr, dummy) + err := SetupRuntimePortAllocator(nil, pr, "bitmap", dummy) + if err != nil { + t.Errorf("get non-nil err when GetRuntimePortAllocator") + return + } preservedPorts, _ := dummy(nil) diff --git a/pkg/ddc/goosefs/master_internal_test.go b/pkg/ddc/goosefs/master_internal_test.go index 852b7cb0377..f9020c29c3b 100644 --- a/pkg/ddc/goosefs/master_internal_test.go +++ b/pkg/ddc/goosefs/master_internal_test.go @@ -184,9 +184,12 @@ func TestSetupMasterInternal(t *testing.T) { }, } - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } - err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) if err != nil { @@ -400,9 +403,11 @@ func TestGenerateGooseFSValueFile(t *testing.T) { }, } - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts) - - err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) if err != nil { diff --git a/pkg/ddc/goosefs/shutdown_test.go b/pkg/ddc/goosefs/shutdown_test.go index 8fadb2740d4..4f133c8e80c 100644 --- a/pkg/ddc/goosefs/shutdown_test.go +++ b/pkg/ddc/goosefs/shutdown_test.go @@ -467,7 +467,10 @@ func TestGooseFSEngineReleasePorts(t *testing.T) { Log: fake.NullLogger(), } - portallocator.SetupRuntimePortAllocator(client, pr, GetReservedPorts) + err := portallocator.SetupRuntimePortAllocator(client, pr, "bitmap", GetReservedPorts) + if err != nil { + t.Fatalf("Failed to set up runtime port allocator due to %v", err) + } allocator, _ := portallocator.GetRuntimePortAllocator() patch1 := ApplyMethod(reflect.TypeOf(allocator), "ReleaseReservedPorts", func(_ *portallocator.RuntimePortAllocator, ports []int) { diff --git a/pkg/ddc/jindo/master_internal_test.go b/pkg/ddc/jindo/master_internal_test.go index 0d0f8983318..56402e4c9ee 100644 --- a/pkg/ddc/jindo/master_internal_test.go +++ b/pkg/ddc/jindo/master_internal_test.go @@ -111,8 +111,11 @@ func TestSetupMasterInternal(t *testing.T) { }, }, } - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts) - err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) if err != nil { t.Fatal(err.Error()) } @@ -234,8 +237,11 @@ func TestGenerateJindoValueFile(t *testing.T) { }, } - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts) - err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/ddc/jindofsx/master_internal_test.go b/pkg/ddc/jindofsx/master_internal_test.go index 1e471be02b0..b2acf82b148 100644 --- a/pkg/ddc/jindofsx/master_internal_test.go +++ b/pkg/ddc/jindofsx/master_internal_test.go @@ -111,8 +111,11 @@ func TestSetupMasterInternal(t *testing.T) { }, }, } - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts) - err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) if err != nil { t.Fatal(err.Error()) } @@ -234,8 +237,11 @@ func TestGenerateJindoValueFile(t *testing.T) { }, } - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, GetReservedPorts) - err := gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/ddc/jindofsx/transform_test.go b/pkg/ddc/jindofsx/transform_test.go index b5c28346513..382e220740c 100644 --- a/pkg/ddc/jindofsx/transform_test.go +++ b/pkg/ddc/jindofsx/transform_test.go @@ -549,8 +549,11 @@ func TestJindoFSxEngine_transform(t *testing.T) { Log: fake.NullLogger(), } tt.args.runtime = tt.fields.runtime - portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts) - _, err := e.transform(tt.args.runtime) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatalf("failed to set up runtime port allocator due to %v", err) + } + _, err = e.transform(tt.args.runtime) if (err != nil) != tt.wantErr { t.Errorf("JindoFSxEngine.transform() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/ddc/juicefs/shutdown.go b/pkg/ddc/juicefs/shutdown.go index 620d8b06839..0428d867602 100644 --- a/pkg/ddc/juicefs/shutdown.go +++ b/pkg/ddc/juicefs/shutdown.go @@ -82,10 +82,10 @@ func (j *JuiceFSEngine) destroyMaster() (err error) { // cleanupCache cleans up the cache func (j *JuiceFSEngine) cleanupCache() (err error) { runtime, err := j.getRuntime() - j.Log.Info("get runtime info", "runtime", runtime) if err != nil { return err } + j.Log.Info("get runtime info", "runtime", runtime) cacheDir := common.JuiceFSDefaultCacheDir if len(runtime.Spec.TieredStore.Levels) != 0 { diff --git a/pkg/ddc/juicefs/transform.go b/pkg/ddc/juicefs/transform.go index 96084e3c12b..9cee122d987 100644 --- a/pkg/ddc/juicefs/transform.go +++ b/pkg/ddc/juicefs/transform.go @@ -109,6 +109,12 @@ func (j *JuiceFSEngine) transformWorkers(runtime *datav1alpha1.JuiceFSRuntime, v if err != nil { j.Log.Error(err, "failed to transform volumes for worker") } + // transform cache volumes for worker + err = j.transformWorkerCacheVolumes(runtime, value) + if err != nil { + j.Log.Error(err, "failed to transform cache volumes for worker") + return err + } // parse work pod network mode value.Worker.HostNetwork = datav1alpha1.IsHostNetwork(runtime.Spec.Worker.NetworkMode) diff --git a/pkg/ddc/juicefs/transform_fuse.go b/pkg/ddc/juicefs/transform_fuse.go index 444d6cd0062..b08334086cd 100644 --- a/pkg/ddc/juicefs/transform_fuse.go +++ b/pkg/ddc/juicefs/transform_fuse.go @@ -81,6 +81,12 @@ func (j *JuiceFSEngine) transformFuse(runtime *datav1alpha1.JuiceFSRuntime, data j.Log.Error(err, "failed to transform volumes for fuse") return err } + // transform cache volumes for fuse + err = j.transformFuseCacheVolumes(runtime, value) + if err != nil { + j.Log.Error(err, "failed to transform cache volumes for fuse") + return err + } // set critical fuse pod to avoid eviction value.Fuse.CriticalPod = common.CriticalFusePodEnabled() @@ -133,6 +139,7 @@ func (j *JuiceFSEngine) genValue(mount datav1alpha1.Mount, tiredStoreLevel *data case JuiceMetaUrl: source = "${METAURL}" value.Configs.MetaUrlSecret = secretKeyRef.Name + value.Configs.MetaUrlSecretKey = secretKeyRef.Key _, ok := secret.Data[secretKeyRef.Key] if !ok { return nil, fmt.Errorf("can't get metaurl from secret %s", secret.Name) @@ -140,10 +147,13 @@ func (j *JuiceFSEngine) genValue(mount datav1alpha1.Mount, tiredStoreLevel *data value.Edition = CommunityEdition case JuiceAccessKey: value.Configs.AccessKeySecret = secretKeyRef.Name + value.Configs.AccessKeySecretKey = secretKeyRef.Key case JuiceSecretKey: value.Configs.SecretKeySecret = secretKeyRef.Name + value.Configs.SecretKeySecretKey = secretKeyRef.Key case JuiceToken: value.Configs.TokenSecret = secretKeyRef.Name + value.Configs.TokenSecretKey = secretKeyRef.Key } } diff --git a/pkg/ddc/juicefs/transform_volume.go b/pkg/ddc/juicefs/transform_volume.go index c97ac025dc1..5d7eb4e102f 100644 --- a/pkg/ddc/juicefs/transform_volume.go +++ b/pkg/ddc/juicefs/transform_volume.go @@ -18,8 +18,13 @@ package juicefs import ( "fmt" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "strconv" + "strings" + corev1 "k8s.io/api/core/v1" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" ) // transform worker volumes @@ -54,6 +59,70 @@ func (j *JuiceFSEngine) transformWorkerVolumes(runtime *datav1alpha1.JuiceFSRunt return err } +// transform worker cache volumes +// after genValue & genMount function +func (j *JuiceFSEngine) transformWorkerCacheVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) { + caches := map[string]cache{} + cacheDir := "" + + // if cache-dir is set in worker option, it will override the cache-dir of worker in runtime + workerOptions := runtime.Spec.Worker.Options + for k, v := range workerOptions { + if k == "cache-dir" { + cacheDir = v + break + } + } + if cacheDir != "" { + originPath := strings.Split(cacheDir, ",") + for i, p := range originPath { + var volumeType = common.VolumeTypeHostPath + caches[strconv.Itoa(i+1)] = cache{ + Path: p, + Type: string(volumeType), + } + } + } else { + caches = value.CacheDirs + } + + // set volumes & volumeMounts for cache + volumeMap := map[string]corev1.VolumeMount{} + for _, v := range runtime.Spec.Worker.VolumeMounts { + volumeMap[v.MountPath] = v + } + for i, cache := range caches { + if _, ok := volumeMap[cache.Path]; ok { + // cache path is already in volumeMounts + continue + } + value.Worker.VolumeMounts = append(value.Worker.VolumeMounts, corev1.VolumeMount{ + Name: "cache-dir-" + i, + MountPath: cache.Path, + }) + v := corev1.Volume{ + Name: "cache-dir-" + i, + } + switch cache.Type { + case string(common.VolumeTypeHostPath): + dir := corev1.HostPathDirectoryOrCreate + v.VolumeSource = corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: cache.Path, + Type: &dir, + }, + } + case string(common.VolumeTypeEmptyDir): + v.VolumeSource = corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + } + // todo: support volume template + } + value.Worker.Volumes = append(value.Worker.Volumes, v) + } + return err +} + // transform fuse volumes func (j *JuiceFSEngine) transformFuseVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) { if len(runtime.Spec.Fuse.VolumeMounts) > 0 { @@ -84,3 +153,45 @@ func (j *JuiceFSEngine) transformFuseVolumes(runtime *datav1alpha1.JuiceFSRuntim return err } + +// transform fuse cache volumes +// after genValue & genMount function +func (j *JuiceFSEngine) transformFuseCacheVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) { + caches := value.CacheDirs + + // set volumes & volumeMounts for cache + volumeMap := map[string]corev1.VolumeMount{} + for _, v := range runtime.Spec.Fuse.VolumeMounts { + volumeMap[v.MountPath] = v + } + for i, cache := range caches { + if _, ok := volumeMap[cache.Path]; ok { + // cache path is already in volumeMounts + continue + } + value.Fuse.VolumeMounts = append(value.Fuse.VolumeMounts, corev1.VolumeMount{ + Name: "cache-dir-" + i, + MountPath: cache.Path, + }) + v := corev1.Volume{ + Name: "cache-dir-" + i, + } + switch cache.Type { + case string(common.VolumeTypeHostPath): + dir := corev1.HostPathDirectoryOrCreate + v.VolumeSource = corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: cache.Path, + Type: &dir, + }, + } + case string(common.VolumeTypeEmptyDir): + v.VolumeSource = corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + } + // todo: support volume template + } + value.Fuse.Volumes = append(value.Fuse.Volumes, v) + } + return err +} diff --git a/pkg/ddc/juicefs/transform_volume_test.go b/pkg/ddc/juicefs/transform_volume_test.go index 3eead76d904..6c258b76b63 100644 --- a/pkg/ddc/juicefs/transform_volume_test.go +++ b/pkg/ddc/juicefs/transform_volume_test.go @@ -17,10 +17,13 @@ package juicefs import ( - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - corev1 "k8s.io/api/core/v1" "reflect" "testing" + + corev1 "k8s.io/api/core/v1" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" ) func TestTransformWorkerVolumes(t *testing.T) { @@ -240,3 +243,287 @@ func TestTransformFuseVolumes(t *testing.T) { } } + +func TestJuiceFSEngine_transformWorkerCacheVolumes(t *testing.T) { + dir := corev1.HostPathDirectoryOrCreate + type args struct { + runtime *datav1alpha1.JuiceFSRuntime + value *JuiceFS + } + tests := []struct { + name string + args args + wantErr bool + wantVolumes []corev1.Volume + wantVolumeMounts []corev1.VolumeMount + }{ + { + name: "test-normal", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{}, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: []corev1.Volume{{ + Name: "cache-dir-1", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/cache", + Type: &dir, + }, + }, + }}, + wantVolumeMounts: []corev1.VolumeMount{{ + Name: "cache-dir-1", + MountPath: "/cache", + }}, + }, + { + name: "test-option-overwrite", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Worker: datav1alpha1.JuiceFSCompTemplateSpec{ + Options: map[string]string{"cache-dir": "/worker-cache1,/worker-cache2"}, + }, + }, + }, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: []corev1.Volume{ + { + Name: "cache-dir-1", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/worker-cache1", + Type: &dir, + }, + }, + }, + { + Name: "cache-dir-2", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/worker-cache2", + Type: &dir, + }, + }, + }, + }, + wantVolumeMounts: []corev1.VolumeMount{ + { + Name: "cache-dir-1", + MountPath: "/worker-cache1", + }, + { + Name: "cache-dir-2", + MountPath: "/worker-cache2", + }, + }, + }, + { + name: "test-volume-overwrite", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Worker: datav1alpha1.JuiceFSCompTemplateSpec{ + Options: map[string]string{"cache-dir": "/worker-cache1,/worker-cache2"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "cache", + MountPath: "/worker-cache2", + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "cache", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: []corev1.Volume{ + { + Name: "cache-dir-1", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/worker-cache1", + Type: &dir, + }, + }, + }, + }, + wantVolumeMounts: []corev1.VolumeMount{ + { + Name: "cache-dir-1", + MountPath: "/worker-cache1", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + j := &JuiceFSEngine{} + if err := j.transformWorkerCacheVolumes(tt.args.runtime, tt.args.value); (err != nil) != tt.wantErr { + t.Errorf("transformWorkerCacheVolumes() error = %v, wantErr %v", err, tt.wantErr) + } + + // compare volumes + if len(tt.args.value.Worker.Volumes) != len(tt.wantVolumes) { + t.Errorf("want volumes %v, got %v for testcase %s", tt.wantVolumes, tt.args.value.Worker.Volumes, tt.name) + } + wantVolumeMap := make(map[string]corev1.Volume) + for _, v := range tt.wantVolumes { + wantVolumeMap[v.Name] = v + } + for _, v := range tt.args.value.Worker.Volumes { + if wv := wantVolumeMap[v.Name]; !reflect.DeepEqual(wv, v) { + t.Errorf("want volumes %v, got %v for testcase %s", tt.wantVolumes, tt.args.value.Worker.Volumes, tt.name) + } + } + + // compare volumeMounts + if len(tt.args.value.Worker.VolumeMounts) != len(tt.wantVolumeMounts) { + t.Errorf("want volumeMounts %v, got %v for testcase %s", tt.wantVolumeMounts, tt.args.value.Worker.VolumeMounts, tt.name) + } + wantVolumeMountsMap := make(map[string]corev1.VolumeMount) + for _, v := range tt.wantVolumeMounts { + wantVolumeMountsMap[v.Name] = v + } + for _, v := range tt.args.value.Worker.VolumeMounts { + if wv := wantVolumeMountsMap[v.Name]; !reflect.DeepEqual(wv, v) { + t.Errorf("want volumeMounts %v, got %v for testcase %s", tt.wantVolumeMounts, tt.args.value.Worker.VolumeMounts, tt.name) + } + } + }) + } +} + +func TestJuiceFSEngine_transformFuseCacheVolumes(t *testing.T) { + dir := corev1.HostPathDirectoryOrCreate + type args struct { + runtime *datav1alpha1.JuiceFSRuntime + value *JuiceFS + } + tests := []struct { + name string + args args + wantErr bool + wantVolumes []corev1.Volume + wantVolumeMounts []corev1.VolumeMount + }{ + { + name: "test-normal", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{}, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: []corev1.Volume{{ + Name: "cache-dir-1", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/cache", + Type: &dir, + }, + }, + }}, + wantVolumeMounts: []corev1.VolumeMount{{ + Name: "cache-dir-1", + MountPath: "/cache", + }}, + }, + { + name: "test-volume-overwrite", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Fuse: datav1alpha1.JuiceFSFuseSpec{ + VolumeMounts: []corev1.VolumeMount{ + { + Name: "cache", + MountPath: "/cache", + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "cache", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: nil, + wantVolumeMounts: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + j := &JuiceFSEngine{} + if err := j.transformFuseCacheVolumes(tt.args.runtime, tt.args.value); (err != nil) != tt.wantErr { + t.Errorf("transformFuseCacheVolumes() error = %v, wantErr %v", err, tt.wantErr) + } + + // compare volumes + if len(tt.args.value.Fuse.Volumes) != len(tt.wantVolumes) { + t.Errorf("want volumes %v, got %v for testcase %s", tt.wantVolumes, tt.args.value.Fuse.Volumes, tt.name) + } + wantVolumeMap := make(map[string]corev1.Volume) + for _, v := range tt.wantVolumes { + wantVolumeMap[v.Name] = v + } + for _, v := range tt.args.value.Fuse.Volumes { + if wv := wantVolumeMap[v.Name]; !reflect.DeepEqual(wv, v) { + t.Errorf("want volumes %v, got %v for testcase %s", tt.wantVolumes, tt.args.value.Fuse.Volumes, tt.name) + } + } + + // compare volumeMounts + if len(tt.args.value.Fuse.VolumeMounts) != len(tt.wantVolumeMounts) { + t.Errorf("want volumeMounts %v, got %v for testcase %s", tt.wantVolumeMounts, tt.args.value.Fuse.VolumeMounts, tt.name) + } + wantVolumeMountsMap := make(map[string]corev1.VolumeMount) + for _, v := range tt.wantVolumeMounts { + wantVolumeMountsMap[v.Name] = v + } + for _, v := range tt.args.value.Fuse.VolumeMounts { + if wv := wantVolumeMountsMap[v.Name]; !reflect.DeepEqual(wv, v) { + t.Errorf("want volumeMounts %v, got %v for testcase %s", tt.wantVolumeMounts, tt.args.value.Fuse.VolumeMounts, tt.name) + } + } + }) + } +} diff --git a/pkg/ddc/juicefs/type.go b/pkg/ddc/juicefs/type.go index a374e5b3242..668b3575560 100644 --- a/pkg/ddc/juicefs/type.go +++ b/pkg/ddc/juicefs/type.go @@ -44,14 +44,18 @@ type JuiceFS struct { } type Configs struct { - Name string `json:"name"` - AccessKeySecret string `json:"accesskeySecret,omitempty"` - SecretKeySecret string `json:"secretkeySecret,omitempty"` - Bucket string `json:"bucket,omitempty"` - MetaUrlSecret string `json:"metaurlSecret,omitempty"` - TokenSecret string `json:"tokenSecret,omitempty"` - Storage string `json:"storage,omitempty"` - FormatCmd string `json:"formatCmd,omitempty"` + Name string `json:"name"` + AccessKeySecret string `json:"accesskeySecret,omitempty"` + AccessKeySecretKey string `json:"accesskeySecretKey,omitempty"` + SecretKeySecret string `json:"secretkeySecret,omitempty"` + SecretKeySecretKey string `json:"secretkeySecretKey,omitempty"` + Bucket string `json:"bucket,omitempty"` + MetaUrlSecret string `json:"metaurlSecret,omitempty"` + MetaUrlSecretKey string `json:"metaurlSecretKey,omitempty"` + TokenSecret string `json:"tokenSecret,omitempty"` + TokenSecretKey string `json:"tokenSecretKey,omitempty"` + Storage string `json:"storage,omitempty"` + FormatCmd string `json:"formatCmd,omitempty"` } type Worker struct { diff --git a/test/prow/juicefs_access_data.py b/test/prow/juicefs_access_data.py new file mode 100644 index 00000000000..a0c0bcfe186 --- /dev/null +++ b/test/prow/juicefs_access_data.py @@ -0,0 +1,412 @@ +# Copyright 2022 The Fluid Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TestCase: Pod accesses Juicefs data +DDC Engine: Juicefs(Community) with local redis and minio + +Prerequisite: +1. docker run -d -p 9000:9000 \ + --name minio \ + -e "MINIO_ROOT_USER=minioadmin" \ + -e "MINIO_ROOT_PASSWORD=minioadmin" \ + minio/minio server /data +2. docker run -itd --name redis -p 6379:6379 redis +3. Write down the node IP +4. Apply the following secret +``` +apiVersion: v1 +kind: Secret +metadata: + name: jfs-secret +stringData: + metaurl: redis://:6379/0 + access-key: minioadmin + secret-key: minioadmin +``` + +Steps: +1. create Dataset & Runtime +2. check if dataset is bound +3. check if PVC & PV is created +4. submit data write job +5. wait until data write job completes +6. submit data read job +7. check if data content consistent +8. clean up +""" + +import time + +from kubernetes import client, config + +NODE_IP = "minio" +APP_NAMESPACE = "default" +SECRET_NAME = "jfs-secret" + + +def create_redis_secret(): + api = client.CoreV1Api() + jfs_secret = { + "apiVersion": "v1", + "kind": "Secret", + "metadata": {"name": SECRET_NAME}, + "stringData": {"metaurl": "redis://redis:6379/0", "accesskey": "minioadmin", "secretkey": "minioadmin"} + } + + api.create_namespaced_secret(namespace=APP_NAMESPACE, body=jfs_secret) + print("Created secret {}".format(SECRET_NAME)) + + +def create_dataset_and_runtime(dataset_name): + api = client.CustomObjectsApi() + my_dataset = { + "apiVersion": "data.fluid.io/v1alpha1", + "kind": "Dataset", + "metadata": {"name": dataset_name, "namespace": APP_NAMESPACE}, + "spec": { + "mounts": [{ + "mountPoint": "juicefs:///", + "name": "juicefs-community", + "options": {"bucket": "http://%s:9000/minio/test" % NODE_IP, "storage": "minio"}, + "encryptOptions": [ + {"name": "metaurl", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "metaurl"}}}, + {"name": "access-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "accesskey"}}}, + {"name": "secret-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "secretkey"}}} + ] + }], + "accessModes": ["ReadWriteMany"] + } + } + + my_juicefsruntime = { + "apiVersion": "data.fluid.io/v1alpha1", + "kind": "JuiceFSRuntime", + "metadata": {"name": dataset_name, "namespace": APP_NAMESPACE}, + "spec": { + "replicas": 1, + "tieredstore": {"levels": [{"mediumtype": "MEM", "path": "/dev/shm", "quota": "40960", "low": "0.1"}]} + } + } + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace="default", + plural="datasets", + body=my_dataset, + ) + print("Create dataset {}".format(dataset_name)) + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace="default", + plural="juicefsruntimes", + body=my_juicefsruntime + ) + print("Create juicefs runtime {}".format(dataset_name)) + + +def check_dataset_bound(dataset_name): + api = client.CustomObjectsApi() + + count = 0 + while count < 300: + resource = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=dataset_name, + namespace=APP_NAMESPACE, + plural="datasets" + ) + + if "status" in resource: + if "phase" in resource["status"]: + if resource["status"]["phase"] == "Bound": + print("Dataset {} is bound.".format(dataset_name)) + return True + time.sleep(1) + count += 1 + print("Dataset {} is not bound within 300s.".format(dataset_name)) + return False + + +def check_volume_resources_ready(dataset_name): + pv_name = "{}-{}".format(APP_NAMESPACE, dataset_name) + pvc_name = dataset_name + count = 0 + while count < 300: + count += 1 + try: + client.CoreV1Api().read_persistent_volume(name=pv_name) + except client.exceptions.ApiException as e: + if e.status == 404: + time.sleep(1) + continue + + try: + client.CoreV1Api().read_namespaced_persistent_volume_claim(name=pvc_name, namespace=APP_NAMESPACE) + except client.exceptions.ApiException as e: + if e.status == 404: + time.sleep(1) + continue + + print("PersistentVolume {} & PersistentVolumeClaim {} Ready.".format(pv_name, pvc_name)) + return True + print("PersistentVolume {} & PersistentVolumeClaim {} not ready within 300s.".format(pv_name, pvc_name)) + return False + + +def create_data_write_job(dataset_name, job_name, use_sidecar=False): + pvc_name = dataset_name + api = client.BatchV1Api() + + container = client.V1Container( + name="demo", + image="debian:buster", + command=["/bin/bash"], + args=["-c", "dd if=/dev/zero of=/data/allzero.file bs=100M count=10 && sha256sum /data/allzero.file"], + volume_mounts=[client.V1VolumeMount(mount_path="/data", name="demo")] + ) + + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={"app": "datawrite"}), + spec=client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[client.V1Volume( + name="demo", + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name) + )] + ) + ) + if use_sidecar: + template.metadata.labels["serverless.fluid.io/inject"] = "true" + + spec = client.V1JobSpec(template=template, backoff_limit=4) + + job = client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name, namespace=APP_NAMESPACE), + spec=spec + ) + + api.create_namespaced_job(namespace=APP_NAMESPACE, body=job) + print("Job {} created.".format(job_name)) + + +def create_data_read_job(dataset_name, job_name, use_sidecar=False): + pvc_name = dataset_name + api = client.BatchV1Api() + + container = client.V1Container( + name="demo", + image="debian:buster", + command=["/bin/bash"], + args=["-c", "time sha256sum /data/allzero.file && rm /data/allzero.file"], + volume_mounts=[client.V1VolumeMount(mount_path="/data", name="demo")] + ) + + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={"app": "dataread"}), + spec=client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[client.V1Volume( + name="demo", + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name) + )] + ) + ) + if use_sidecar: + template.metadata.labels["serverless.fluid.io/inject"] = "true" + + spec = client.V1JobSpec(template=template, backoff_limit=4) + + job = client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name, namespace=APP_NAMESPACE), + spec=spec + ) + + api.create_namespaced_job(namespace=APP_NAMESPACE, body=job) + print("Data Read Job {} created.".format(job_name)) + + +def check_data_job_status(job_name): + api = client.BatchV1Api() + + count = 0 + while count < 300: + count += 1 + response = api.read_namespaced_job_status(name=job_name, namespace=APP_NAMESPACE) + if response.status.succeeded is not None or response.status.failed is not None: + print("Job {} completed.".format(job_name)) + return True + time.sleep(1) + print("Job {} not completed within 300s.".format(job_name)) + return False + + +def clean_job(job_name): + batch_api = client.BatchV1Api() + + # See https://github.com/kubernetes-client/python/issues/234 + body = client.V1DeleteOptions(propagation_policy='Background') + try: + batch_api.delete_namespaced_job(name=job_name, namespace=APP_NAMESPACE, body=body) + except client.exceptions.ApiException as e: + if e.status == 404: + print("job {} deleted".format(job_name)) + return True + + count = 0 + while count < 300: + count += 1 + print("job {} still exists...".format(job_name)) + try: + batch_api.read_namespaced_job(name=job_name, namespace=APP_NAMESPACE) + except client.exceptions.ApiException as e: + if e.status == 404: + print("job {} deleted".format(job_name)) + return True + time.sleep(1) + + print("job {} not deleted within 300s".format(job_name)) + return False + + +def clean_up_dataset_and_runtime(dataset_name): + custom_api = client.CustomObjectsApi() + custom_api.delete_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=dataset_name, + namespace=APP_NAMESPACE, + plural="datasets" + ) + print("Dataset {} deleted".format(dataset_name)) + + count = 0 + while count < 300: + count += 1 + print("JuiceFSRuntime {} still exists...".format(dataset_name)) + try: + custom_api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=dataset_name, + namespace=APP_NAMESPACE, + plural="juicefsruntimes" + ) + except client.exceptions.ApiException as e: + if e.status == 404: + print("JuiceFSRuntime {} is cleaned up".format(dataset_name)) + return True + time.sleep(1) + print("JuiceFSRuntime {} is not cleaned up within 300s".format(dataset_name)) + return False + + +def clean_up_secret(): + core_api = client.CoreV1Api() + core_api.delete_namespaced_secret(name=SECRET_NAME, namespace=APP_NAMESPACE) + print("secret {} is cleaned up".format(SECRET_NAME)) + + +def main(): + config.load_incluster_config() + + # ******************************** + # ------- test normal mode ------- + # ******************************** + dataset_name = "jfsdemo" + test_write_job = "demo-write" + test_read_job = "demo-read" + try: + # 1. create secret + create_redis_secret() + + # 2. create dataset and runtime + create_dataset_and_runtime(dataset_name) + if not check_dataset_bound(dataset_name): + raise Exception("dataset {} in normal mode is not bound.".format(dataset_name)) + if not check_volume_resources_ready(dataset_name): + raise Exception("volume resources of dataset {} in normal mode are not ready.".format(dataset_name)) + + # 3. create write & read data job + create_data_write_job(dataset_name, test_write_job) + if not check_data_job_status(test_write_job): + raise Exception("write job {} in normal mode failed.".format(test_write_job)) + create_data_read_job(dataset_name, test_read_job) + if not check_data_job_status(test_read_job): + raise Exception("read job {} in normal mode failed.".format(test_read_job)) + except Exception as e: + print(e) + exit(-1) + finally: + # 4. clean up write & read data job + clean_job(test_write_job) + clean_job(test_read_job) + + # 5. clean up dataset and runtime + clean_up_dataset_and_runtime(dataset_name) + + # 6. clean up secret + clean_up_secret() + + # ******************************** + # ------- test sidecar mode ------- + # ******************************** + dataset_name = "jfsdemo-sidecar" + test_write_job = "demo-write-sidecar" + test_read_job = "demo-read-sidecar" + try: + # 1. create secret + create_redis_secret() + + # 2. create dataset and runtime + create_dataset_and_runtime(dataset_name) + if not check_dataset_bound(dataset_name): + raise Exception("dataset {} in sidecar mode is not bound.".format(dataset_name)) + if not check_volume_resources_ready(dataset_name): + raise Exception("volume resources of dataset {} in sidecar mode are not ready.".format(dataset_name)) + + # 3. create write & read data job + create_data_write_job(dataset_name, test_write_job, use_sidecar=True) + if not check_data_job_status(test_write_job): + raise Exception("write job {} in sidecar mode failed.".format(test_write_job)) + create_data_read_job(dataset_name, test_read_job, use_sidecar=True) + if not check_data_job_status(test_read_job): + raise Exception("read job {} in sidecar mode failed.".format(test_read_job)) + except Exception as e: + print(e) + exit(-1) + finally: + # 4. clean up write & read data job + clean_job(test_write_job) + clean_job(test_read_job) + + # 5. clean up dataset and runtime + clean_up_dataset_and_runtime(dataset_name) + + # 6. clean up secret + clean_up_secret() + + +if __name__ == '__main__': + main()