From 5aae5a03f5ae3bff6b5effd49e5b0026c5cd4e65 Mon Sep 17 00:00:00 2001 From: cheyang Date: Mon, 27 Mar 2023 14:01:58 +0800 Subject: [PATCH] Cherry pick for 0.8.5 (#2785) * [juicefs] fix worker cache when set option (#2563) * fix worker cache when set option Signed-off-by: zwwhdls * update changelog in chart Signed-off-by: zwwhdls * fix unittest Signed-off-by: zwwhdls --------- Signed-off-by: zwwhdls Signed-off-by: cheyang * fix multi cache dir (#2639) * fix multi cache dir Signed-off-by: zwwhdls * fix unit test --------- Signed-off-by: zwwhdls Signed-off-by: cheyang * [Enhancement]CSI plugin checks mount point liveness before binding mount points (#2703) * Clean up broken mount point when NodeStageVolume Signed-off-by: dongyun.xzh * Check mount point aliveness when NodePublishVolume Signed-off-by: dongyun.xzh * Clean up broken mount point when NodeStageVolume Signed-off-by: dongyun.xzh * Fix cleaning logic Signed-off-by: dongyun.xzh --------- Signed-off-by: dongyun.xzh Signed-off-by: cheyang * Prettify error messages for exec.Commands in Fluid (#2718) * Prettify error log message when calling NodePublishVolume Signed-off-by: dongyun.xzh * Prettify error logs when calling helm-related funcs Signed-off-by: dongyun.xzh * Use instead `errors.As` Signed-off-by: dongyun.xzh * Use instead `errors.As` Signed-off-by: dongyun.xzh * Set higher log level for helm exec Signed-off-by: dongyun.xzh --------- Signed-off-by: dongyun.xzh Signed-off-by: cheyang * update mount to check mountinfo, To #48327952 Signed-off-by: cheyang * update mount to check mountinfo, To #48327952 Signed-off-by: cheyang * Build docker images for v0.8.5, To #48327952 Signed-off-by: cheyang --------- Signed-off-by: zwwhdls Signed-off-by: cheyang Signed-off-by: dongyun.xzh Co-authored-by: Weiwei Co-authored-by: TzZtzt --- charts/fluid/fluid/Chart.yaml | 2 +- charts/fluid/fluid/values.yaml | 24 +- charts/juicefs/CHANGELOG.md | 3 + charts/juicefs/Chart.yaml | 2 +- charts/juicefs/templates/fuse/daemonset.yaml | 26 +- .../templates/worker/statefuleset.yaml | 16 - csi/shell/check_mount.sh | 27 +- docs/zh/samples/juicefs/juicefs_cache_dir.md | 57 +++ .../samples/{ => juicefs}/juicefs_for_s3.md | 4 +- .../samples/{ => juicefs}/juicefs_runtime.md | 1 + .../zh/samples/{ => juicefs}/juicefs_setup.md | 0 pkg/csi/plugins/nodeserver.go | 37 +- pkg/ddc/juicefs/operations/base.go | 39 +- pkg/ddc/juicefs/operations/base_test.go | 149 +++++++- pkg/ddc/juicefs/shutdown.go | 44 ++- pkg/ddc/juicefs/shutdown_test.go | 141 ++++++- pkg/ddc/juicefs/transform.go | 6 + pkg/ddc/juicefs/transform_fuse.go | 14 +- pkg/ddc/juicefs/transform_volume.go | 113 +++++- pkg/ddc/juicefs/transform_volume_test.go | 343 +++++++++++++++++- pkg/ddc/juicefs/utils_test.go | 3 +- pkg/scripts/poststart/check_fuse.go | 4 +- pkg/utils/helm/utils.go | 21 +- pkg/utils/mount.go | 26 +- test/prow/juicefs_access_data.py | 91 ++++- 25 files changed, 1083 insertions(+), 110 deletions(-) create mode 100644 docs/zh/samples/juicefs/juicefs_cache_dir.md rename docs/zh/samples/{ => juicefs}/juicefs_for_s3.md (96%) rename docs/zh/samples/{ => juicefs}/juicefs_runtime.md (99%) rename docs/zh/samples/{ => juicefs}/juicefs_setup.md (100%) diff --git a/charts/fluid/fluid/Chart.yaml b/charts/fluid/fluid/Chart.yaml index 53ee3b29d4c..d9b13bc4c43 100644 --- a/charts/fluid/fluid/Chart.yaml +++ b/charts/fluid/fluid/Chart.yaml @@ -18,7 +18,7 @@ version: 0.8.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 0.8.4-885b5a7 +appVersion: 0.8.5-00f609e home: https://github.com/fluid-cloudnative/fluid keywords: - category:data diff --git a/charts/fluid/fluid/values.yaml b/charts/fluid/fluid/values.yaml index 00ff25a07a5..26b0268c903 100644 --- a/charts/fluid/fluid/values.yaml +++ b/charts/fluid/fluid/values.yaml @@ -4,7 +4,7 @@ workdir: /tmp crdUpgrade: - image: fluidcloudnative/fluid-crd-upgrader:v0.8.4-885b5a7 + image: fluidcloudnative/fluid-crd-upgrader:v0.8.5-00f609e image: imagePullSecrets: [] @@ -12,7 +12,7 @@ image: dataset: replicas: 1 controller: - image: fluidcloudnative/dataset-controller:v0.8.4-885b5a7 + image: fluidcloudnative/dataset-controller:v0.8.5-00f609e csi: featureGates: "FuseRecovery=false" @@ -21,7 +21,7 @@ csi: registrar: image: registry.aliyuncs.com/acs/csi-node-driver-registrar:v2.3.0-038aeb6-aliyun plugins: - image: fluidcloudnative/fluid-csi:v0.8.4-885b5a7 + image: fluidcloudnative/fluid-csi:v0.8.5-00f609e kubelet: rootDir: /var/lib/kubelet pruneFs: fuse.alluxio-fuse,fuse.jindofs-fuse,fuse.juicefs,fuse.goosefs-fuse,ossfs @@ -37,9 +37,9 @@ runtime: portAllocatePolicy: random enabled: false init: - image: fluidcloudnative/init-users:v0.8.4-885b5a7 + image: fluidcloudnative/init-users:v0.8.5-00f609e controller: - image: fluidcloudnative/alluxioruntime-controller:v0.8.4-885b5a7 + image: fluidcloudnative/alluxioruntime-controller:v0.8.5-00f609e runtime: # image: fluidcloudnative/alluxio:release-2.7.3-SNAPSHOT-a7154f1 image: fluidcloudnative/alluxio:release-2.8.1-SNAPSHOT-0433ade @@ -59,11 +59,11 @@ runtime: fuse: image: registry.cn-shanghai.aliyuncs.com/jindofs/jindo-fuse:4.5.1 controller: - image: fluidcloudnative/jindoruntime-controller:v0.8.4-885b5a7 + image: fluidcloudnative/jindoruntime-controller:v0.8.5-00f609e init: portCheck: enabled: false - image: fluidcloudnative/init-users:v0.8.4-885b5a7 + image: fluidcloudnative/init-users:v0.8.5-00f609e goosefs: replicas: 1 runtimeWorkers: 3 @@ -71,9 +71,9 @@ runtime: portAllocatePolicy: random enabled: false init: - image: fluidcloudnative/init-users:v0.8.4-885b5a7 + image: fluidcloudnative/init-users:v0.8.5-00f609e controller: - image: fluidcloudnative/goosefsruntime-controller:v0.8.4-885b5a7 + image: fluidcloudnative/goosefsruntime-controller:v0.8.5-00f609e runtime: image: ccr.ccs.tencentyun.com/qcloud/goosefs:v1.2.0 fuse: @@ -82,17 +82,17 @@ runtime: replicas: 1 enabled: false controller: - image: fluidcloudnative/juicefsruntime-controller:v0.8.4-885b5a7 + image: fluidcloudnative/juicefsruntime-controller:v0.8.5-00f609e fuse: image: juicedata/juicefs-fuse:v1.0.0-4.8.0 webhook: enabled: true - image: fluidcloudnative/fluid-webhook:v0.8.4-885b5a7 + image: fluidcloudnative/fluid-webhook:v0.8.5-00f609e replicas: 1 fluidapp: enabled: true replicas: 1 controller: - image: fluidcloudnative/application-controller:v0.8.4-885b5a7 + image: fluidcloudnative/application-controller:v0.8.5-00f609e diff --git a/charts/juicefs/CHANGELOG.md b/charts/juicefs/CHANGELOG.md index 98fd0057abb..a132ca5c441 100644 --- a/charts/juicefs/CHANGELOG.md +++ b/charts/juicefs/CHANGELOG.md @@ -44,3 +44,6 @@ Support configurable tieredstore's volume type 0.2.11 - Support credential key in secret + +0.2.12 +- Set cache dir in volumes & volumeMounts for worker & fuse diff --git a/charts/juicefs/Chart.yaml b/charts/juicefs/Chart.yaml index 39181ec0ffb..11a20831553 100644 --- a/charts/juicefs/Chart.yaml +++ b/charts/juicefs/Chart.yaml @@ -1,7 +1,7 @@ name: juicefs apiVersion: v1 description: FileSystem aimed for data analytics and machine learning in any cloud. -version: 0.2.11 +version: 0.2.12 appVersion: v1.0.0 home: https://juicefs.com/ maintainers: diff --git a/charts/juicefs/templates/fuse/daemonset.yaml b/charts/juicefs/templates/fuse/daemonset.yaml index 976eb258625..5377c06d5aa 100644 --- a/charts/juicefs/templates/fuse/daemonset.yaml +++ b/charts/juicefs/templates/fuse/daemonset.yaml @@ -137,15 +137,11 @@ spec: exec: command: ["sh", "-c", "umount {{ .Values.fuse.mountPath }}"] volumeMounts: - - name: juicefs-fuse-mount - mountPath: {{ .Values.fuse.hostMountPath }} - mountPropagation: Bidirectional - - mountPath: /root/script - name: script - {{- range $name, $mount := .Values.cacheDirs }} - - name: cache-dir-{{ $name }} - mountPath: "{{ $mount.path }}" - {{- end }} + - name: juicefs-fuse-mount + mountPath: {{ .Values.fuse.hostMountPath }} + mountPropagation: Bidirectional + - mountPath: /root/script + name: script {{- if .Values.fuse.volumeMounts }} {{ toYaml .Values.fuse.volumeMounts | indent 12 }} {{- end }} @@ -155,18 +151,6 @@ spec: hostPath: path: {{ .Values.fuse.hostMountPath }} type: DirectoryOrCreate - {{- range $name, $mount := .Values.cacheDirs }} - {{- if eq $mount.type "hostPath" }} - - hostPath: - path: "{{ $mount.path }}" - type: DirectoryOrCreate - name: cache-dir-{{ $name }} - {{- else if eq $mount.type "emptyDir" }} - - emptyDir: {} - name: cache-dir-{{ $name }} - {{- /* todo: support volume template */}} - {{- end }} - {{- end }} - name: script configMap: name: {{ template "juicefs.fullname" . }}-fuse-script diff --git a/charts/juicefs/templates/worker/statefuleset.yaml b/charts/juicefs/templates/worker/statefuleset.yaml index 735d1881927..1d9f5d5fb2c 100644 --- a/charts/juicefs/templates/worker/statefuleset.yaml +++ b/charts/juicefs/templates/worker/statefuleset.yaml @@ -126,27 +126,11 @@ spec: volumeMounts: - mountPath: /root/script name: script - {{- range $name, $mount := .Values.cacheDirs }} - - name: cache-dir-{{ $name }} - mountPath: "{{ $mount.path }}" - {{- end }} {{- if .Values.worker.volumeMounts }} {{ toYaml .Values.worker.volumeMounts | indent 12 }} {{- end }} restartPolicy: Always volumes: - {{- range $name, $mount := .Values.cacheDirs }} - {{- if eq $mount.type "hostPath" }} - - hostPath: - path: "{{ $mount.path }}" - type: DirectoryOrCreate - name: cache-dir-{{ $name }} - {{- else if eq $mount.type "emptyDir" }} - - emptyDir: {} - name: cache-dir-{{ $name }} - {{- /* todo: support volume template */}} - {{- end }} - {{- end }} - name: script configMap: name: {{ template "juicefs.fullname" . }}-worker-script diff --git a/csi/shell/check_mount.sh b/csi/shell/check_mount.sh index cd364bc2549..1d9dfb94a24 100644 --- a/csi/shell/check_mount.sh +++ b/csi/shell/check_mount.sh @@ -4,19 +4,38 @@ set -ex ConditionPathIsMountPoint="$1" MountType="$2" +SubPath="$3" + #[ -z ${ConditionPathIsMountPoint} ] && ConditionPathIsMountPoint=/alluxio-fuse count=0 -# while ! mount | grep alluxio | grep $ConditionPathIsMountPoint | grep -v grep -while ! mount | grep $ConditionPathIsMountPoint | grep $MountType +# while ! cat /proc/self/mountinfo | grep alluxio | grep $ConditionPathIsMountPoint | grep -v grep +while ! cat /proc/self/mountinfo | grep $ConditionPathIsMountPoint | grep $MountType do sleep 3 count=`expr $count + 1` if test $count -eq 10 then - echo "timed out!" + echo "timed out waiting for $ConditionPathIsMountPoint mounted" exit 1 fi done -echo "succeed in checking mount point $ConditionPathIsMountPoint" \ No newline at end of file +count=0 +while ! stat $ConditionPathIsMountPoint +do + sleep 3 + count=`expr $count + 1` + if test $count -eq 10 + then + echo "timed out stating $ConditionPathIsMountPoint returns ready" + exit 1 + fi +done + +if [ ! -e $ConditionPathIsMountPoint/$SubPath ] ; then + echo "sub path [$SubPath] not exist!" + exit 2 +fi + +echo "succeed in checking mount point $ConditionPathIsMountPoint" diff --git a/docs/zh/samples/juicefs/juicefs_cache_dir.md b/docs/zh/samples/juicefs/juicefs_cache_dir.md new file mode 100644 index 00000000000..1baf451e1da --- /dev/null +++ b/docs/zh/samples/juicefs/juicefs_cache_dir.md @@ -0,0 +1,57 @@ +# JuiceFSRuntime 缓存配置 + +如何在 Fluid 中使用 JuiceFS,请参考文档[示例 - 如何在 Fluid 中使用 JuiceFS](juicefs_runtime.md)。本文讲述所有在 Fluid 中有关 JuiceFS 的缓存相关配置。 + +## 设置多个路径缓存 + +缓存路径在 JuiceFSRuntime 中的 tiredstore 设置,worker 和 fuse pod 共享相同的配置。 + +注意:JuiceFS 支持多路径缓存,不支持多级缓存。 + +```yaml +apiVersion: data.fluid.io/v1alpha1 +kind: JuiceFSRuntime +metadata: + name: jfsdemo +spec: + replicas: 1 + tieredstore: + levels: + - mediumtype: SSD + path: /mnt/cache1:/mnt/cache2 + quota: 40Gi + low: "0.1" +``` + +其中: +- `spec.tiredstore.levels.path` 可设置为多个路径,以 `:` 分隔,缓存会被分配在这里设置的所有路径下;但不支持通配符; +- `spec.tiredstore.levels.quota` 为缓存对象的总大小,与路径多少无关; +- `spec.tiredstore.levels.low` 为缓存路径的最小剩余空间比例,无论缓存是否达到限额,都会保证缓存路径的剩余空间; +- `spec.tiredstore.levels.mediumtype` 为缓存路径的类型,目前支持 `SSD` 和 `MEM`。 + + +## 单独设置 worker 的缓存路径 + +默认情况下,worker 和 fuse 的缓存路径都在 `spec.tiredstore.levels.path` 中设置,但是也可以单独设置 worker 的缓存路径。 + +```yaml +apiVersion: data.fluid.io/v1alpha1 +kind: JuiceFSRuntime +metadata: + name: jfsdemo +spec: + worker: + options: + "cache-dir": "/mnt/cache1:/mnt/cache2" + tieredstore: + levels: + - mediumtype: MEM + path: /dev/shm + quota: 500Mi + low: "0.1" +``` + +其中: +- `spec.worker.options` 为 worker 的挂载参数,缓存路径以 `cache-dir` 为 key,以 `:` 分隔的多个路径; + + diff --git a/docs/zh/samples/juicefs_for_s3.md b/docs/zh/samples/juicefs/juicefs_for_s3.md similarity index 96% rename from docs/zh/samples/juicefs_for_s3.md rename to docs/zh/samples/juicefs/juicefs_for_s3.md index 8dc48497daf..4ec6ac3f1d7 100644 --- a/docs/zh/samples/juicefs_for_s3.md +++ b/docs/zh/samples/juicefs/juicefs_for_s3.md @@ -7,7 +7,7 @@ Redis、MySQL、TiKV 等多种数据库中。 ## 部署 JuiceFSRuntime 环境 -具体部署方法参考文档 [如何在 Fluid 中使用 JuiceFS](./juicefs_runtime.md)。 +具体部署方法参考文档 [如何在 Fluid 中使用 JuiceFS](juicefs_runtime.md)。 在 JuiceFSRuntime 和 Dataset 创建成功后,等待 worker pod 启动成功,再进行下面的步骤。 @@ -63,4 +63,4 @@ root@jfsdemo-worker-0:~# 可以看到 bucket 中的文件已经被同步到了 JuiceFS 中。 -最后创建业务 Pod,其中 Pod 使用上面创建的 `Dataset` 的方式为指定同名的 PVC。该步骤与文档 [如何在 Fluid 中使用 JuiceFS](./juicefs_runtime.md) 中一致,这里不再赘述。 +最后创建业务 Pod,其中 Pod 使用上面创建的 `Dataset` 的方式为指定同名的 PVC。该步骤与文档 [如何在 Fluid 中使用 JuiceFS](juicefs_runtime.md) 中一致,这里不再赘述。 diff --git a/docs/zh/samples/juicefs_runtime.md b/docs/zh/samples/juicefs/juicefs_runtime.md similarity index 99% rename from docs/zh/samples/juicefs_runtime.md rename to docs/zh/samples/juicefs/juicefs_runtime.md index fbc941e51dc..e24b5da472a 100644 --- a/docs/zh/samples/juicefs_runtime.md +++ b/docs/zh/samples/juicefs/juicefs_runtime.md @@ -8,6 +8,7 @@ JuiceFS 是一款面向云环境设计的开源高性能共享文件系统,提 ## 安装 + 您可以从 [Fluid Releases](https://github.com/fluid-cloudnative/fluid/releases) 下载最新的 Fluid 安装包。 在 Fluid 的安装 chart values.yaml 中将 `runtime.juicefs.enable` 设置为 `true`,再参考 [安装文档](../userguide/install.md) 完成安装。并检查 Fluid 各组件正常运行: diff --git a/docs/zh/samples/juicefs_setup.md b/docs/zh/samples/juicefs/juicefs_setup.md similarity index 100% rename from docs/zh/samples/juicefs_setup.md rename to docs/zh/samples/juicefs/juicefs_setup.md diff --git a/pkg/csi/plugins/nodeserver.go b/pkg/csi/plugins/nodeserver.go index c8a3bf51ed4..4a09ebb7ed0 100644 --- a/pkg/csi/plugins/nodeserver.go +++ b/pkg/csi/plugins/nodeserver.go @@ -31,6 +31,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" + "k8s.io/utils/mount" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/container-storage-interface/spec/lib/go/csi" @@ -39,7 +40,6 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/utils/mount" ) const ( @@ -117,7 +117,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // 1. Wait the runtime fuse ready err = utils.CheckMountReady(fluidPath, mountType) if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } args := []string{"--bind"} @@ -262,6 +262,12 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol defer ns.mutex.Unlock() glog.Infof("NodeStageVolume: Starting NodeStage with VolumeId: %s, and VolumeContext: %v", req.GetVolumeId(), req.VolumeContext) + // 1. clean up broken mount point + fluidPath := req.GetVolumeContext()[common.VolumeAttrFluidPath] + if ignoredErr := cleanUpBrokenMountPoint(fluidPath); ignoredErr != nil { + glog.Warningf("Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr) + } + // 1. get runtime namespace and name namespace, name, err := ns.getRuntimeNamespacedName(req.GetVolumeContext(), req.GetVolumeId()) if err != nil { @@ -381,3 +387,30 @@ func checkMountInUse(volumeName string) (bool, error) { return inUse, err } + +// cleanUpBrokenMountPoint stats the given mountPoint and umounts it if it's broken mount point(i.e. Stat with errNo 107[Trasport Endpoint is not Connected]). +func cleanUpBrokenMountPoint(mountPoint string) error { + _, err := os.Stat(mountPoint) + if err != nil { + if os.IsNotExist(err) { + return nil + } + + if pathErr, ok := err.(*os.PathError); ok { + if errNo, ok := pathErr.Err.(syscall.Errno); ok { + if errNo == syscall.ENOTCONN { + mounter := mount.New(mountPoint) + if err := mounter.Unmount(mountPoint); err != nil { + return errors.Wrapf(mounter.Unmount(mountPoint), "failed to unmount %s", mountPoint) + } + glog.Infof("Found broken mount point %s, successfully umounted it", mountPoint) + return nil + } + } + } + + return errors.Wrapf(err, "failed to os.Stat(%s)", mountPoint) + } + + return nil +} diff --git a/pkg/ddc/juicefs/operations/base.go b/pkg/ddc/juicefs/operations/base.go index 3ca7b0f0f2f..b8abd74a9b4 100644 --- a/pkg/ddc/juicefs/operations/base.go +++ b/pkg/ddc/juicefs/operations/base.go @@ -184,8 +184,39 @@ func (j JuiceFileUtils) Mkdir(juiceSubPath string) (err error) { return } -// DeleteDir delete dir in pod -func (j JuiceFileUtils) DeleteDir(dir string) (err error) { +// DeleteCacheDirs delete cache dir in pod +func (j JuiceFileUtils) DeleteCacheDirs(dirs []string) (err error) { + for _, dir := range dirs { + // cache dir check + match := ValidCacheDir(dir) + if !match { + j.log.Info("invalid cache directory, skip cleaning up", "cacheDir", dir) + return + } + } + var ( + command = []string{"rm", "-rf"} + stdout string + stderr string + ) + command = append(command, dirs...) + + stdout, stderr, err = j.exec(command, true) + if err != nil { + err = fmt.Errorf("execute command %v with expectedErr: %v stdout %s and stderr %s", command, err, stdout, stderr) + return + } + return +} + +// DeleteCacheDir delete cache dir in pod +func (j JuiceFileUtils) DeleteCacheDir(dir string) (err error) { + // cache dir check + match := ValidCacheDir(dir) + if !match { + j.log.Info("invalid cache directory, skip cleaning up", "cacheDir", dir) + return + } var ( command = []string{"rm", "-rf", dir} stdout string @@ -349,3 +380,7 @@ func (j JuiceFileUtils) QueryMetaDataInfoIntoFile(key KeyOfMetaDataFile, filenam } return } + +func ValidCacheDir(dir string) (match bool) { + return strings.HasSuffix(dir, "raw/chunks") +} diff --git a/pkg/ddc/juicefs/operations/base_test.go b/pkg/ddc/juicefs/operations/base_test.go index 3347e99d570..934fa3bc20b 100644 --- a/pkg/ddc/juicefs/operations/base_test.go +++ b/pkg/ddc/juicefs/operations/base_test.go @@ -233,7 +233,7 @@ func TestJuiceFileUtils_GetMetric(t *testing.T) { wrappedUnhookExec() } -func TestJuiceFileUtils_DeleteDir(t *testing.T) { +func TestJuiceFileUtils_DeleteCacheDirs(t *testing.T) { ExecCommon := func(a JuiceFileUtils, command []string, verbose bool) (stdout string, stderr string, err error) { return "juicefs rmr success", "", nil } @@ -252,7 +252,7 @@ func TestJuiceFileUtils_DeleteDir(t *testing.T) { t.Fatal(err.Error()) } a := JuiceFileUtils{} - err = a.DeleteDir("") + err = a.DeleteCacheDirs([]string{"/tmp/raw/chunks"}) if err == nil { t.Error("check failure, want err, got nil") } @@ -262,13 +262,51 @@ func TestJuiceFileUtils_DeleteDir(t *testing.T) { if err != nil { t.Fatal(err.Error()) } - err = a.DeleteDir("") + err = a.DeleteCacheDirs([]string{"/tmp/raw/chunks"}) if err != nil { t.Errorf("check failure, want nil, got err: %v", err) } wrappedUnhookExec() } +func TestJuiceFileUtils_DeleteCacheDir(t *testing.T) { + ExecCommon := func(a JuiceFileUtils, command []string, verbose bool) (stdout string, stderr string, err error) { + return "juicefs rmr success", "", nil + } + ExecErr := func(a JuiceFileUtils, command []string, verbose bool) (stdout string, stderr string, err error) { + return "", "", errors.New("fail to run the command") + } + wrappedUnhookExec := func() { + err := gohook.UnHook(JuiceFileUtils.exec) + if err != nil { + t.Fatal(err.Error()) + } + } + + a := JuiceFileUtils{} + // no error + err := gohook.Hook(JuiceFileUtils.exec, ExecCommon, nil) + if err != nil { + t.Fatal(err.Error()) + } + err = a.DeleteCacheDir("/tmp/raw/chunks") + if err != nil { + t.Errorf("check failure, want nil, got err: %v", err) + } + wrappedUnhookExec() + + // error + err = gohook.Hook(JuiceFileUtils.exec, ExecErr, nil) + if err != nil { + t.Fatal(err.Error()) + } + err = a.DeleteCacheDir("/tmp/raw/chunks") + if err == nil { + t.Error("check failure, want err, got nil") + } + wrappedUnhookExec() +} + func TestJuiceFileUtils_GetStatus(t *testing.T) { ExecCommon := func(a JuiceFileUtils, command []string, verbose bool) (stdout string, stderr string, err error) { return CommonStatus, "", nil @@ -288,7 +326,7 @@ func TestJuiceFileUtils_GetStatus(t *testing.T) { t.Fatal(err.Error()) } a := JuiceFileUtils{} - err = a.DeleteDir("") + err = a.DeleteCacheDir("/tmp/raw/chunks") if err == nil { t.Error("check failure, want err, got nil") } @@ -504,3 +542,106 @@ func TestAlluxioFileUtils_QueryMetaDataInfoIntoFile(t *testing.T) { } wrappedUnhookExec() } + +func TestValidDir(t *testing.T) { + type args struct { + dir string + } + tests := []struct { + name string + args args + wantMatch bool + }{ + { + name: "test-normal", + args: args{ + dir: "/tmp/raw/chunks", + }, + wantMatch: true, + }, + { + name: "test1", + args: args{ + dir: "/t mp/raw/chunks", + }, + wantMatch: true, + }, + { + name: "test2", + args: args{ + dir: "/t..mp/raw/chunks", + }, + wantMatch: true, + }, + { + name: "test3", + args: args{ + dir: "/t__mp/raw/chunks", + }, + wantMatch: true, + }, + { + name: "test4", + args: args{ + dir: "/t--mp/raw/chunks", + }, + wantMatch: true, + }, + { + name: "test5", + args: args{ + dir: "/", + }, + wantMatch: false, + }, + { + name: "test6", + args: args{ + dir: ".", + }, + wantMatch: false, + }, + { + name: "test7", + args: args{ + dir: "/tttt/raw/chunks", + }, + wantMatch: true, + }, + { + name: "test8", + args: args{ + dir: "//", + }, + wantMatch: false, + }, + { + name: "test9", + args: args{ + dir: "/0/raw/chunks", + }, + wantMatch: true, + }, + { + name: "test10", + args: args{ + dir: "/0/1/raw/chunks", + }, + wantMatch: true, + }, + { + name: "test11", + args: args{ + dir: "/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z/0/raw/chunks", + }, + wantMatch: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotMatch := ValidCacheDir(tt.args.dir); gotMatch != tt.wantMatch { + t.Errorf("ValidDir() = %v, want %v", gotMatch, tt.wantMatch) + } + }) + } +} diff --git a/pkg/ddc/juicefs/shutdown.go b/pkg/ddc/juicefs/shutdown.go index 0428d867602..876c780c6e7 100644 --- a/pkg/ddc/juicefs/shutdown.go +++ b/pkg/ddc/juicefs/shutdown.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/juicefs/operations" "github.com/fluid-cloudnative/fluid/pkg/utils" @@ -87,14 +88,7 @@ func (j *JuiceFSEngine) cleanupCache() (err error) { } j.Log.Info("get runtime info", "runtime", runtime) - cacheDir := common.JuiceFSDefaultCacheDir - if len(runtime.Spec.TieredStore.Levels) != 0 { - if runtime.Spec.TieredStore.Levels[0].MediumType == common.Memory { - j.Log.Info("cache in memory, skip clean up cache") - return - } - cacheDir = runtime.Spec.TieredStore.Levels[0].Path - } + cacheDirs := j.getCacheDirs(runtime) workerName := j.getWorkerName() pods, err := j.GetRunningPodsOfStatefulSet(workerName, j.namespace) @@ -118,9 +112,13 @@ func (j *JuiceFSEngine) cleanupCache() (err error) { for _, pod := range pods { fileUtils := operations.NewJuiceFileUtils(pod.Name, common.JuiceFSWorkerContainer, j.namespace, j.Log) - j.Log.Info("Remove cache in worker pod", "pod", pod.Name, "cache", cacheDir) - cacheDirToBeDeleted := filepath.Join(cacheDir, uuid, "raw/chunks") - err := fileUtils.DeleteDir(cacheDirToBeDeleted) + j.Log.Info("Remove cache in worker pod", "pod", pod.Name, "cache", cacheDirs) + + cacheDirsToBeDeleted := []string{} + for _, cacheDir := range cacheDirs { + cacheDirsToBeDeleted = append(cacheDirsToBeDeleted, filepath.Join(cacheDir, uuid, "raw/chunks")) + } + err := fileUtils.DeleteCacheDirs(cacheDirsToBeDeleted) if err != nil { return err } @@ -128,6 +126,30 @@ func (j *JuiceFSEngine) cleanupCache() (err error) { return nil } +func (j *JuiceFSEngine) getCacheDirs(runtime *datav1alpha1.JuiceFSRuntime) (cacheDirs []string) { + cacheDir := common.JuiceFSDefaultCacheDir + if len(runtime.Spec.TieredStore.Levels) != 0 { + cacheDir = "" + // if cache type hostpath, clean it + if runtime.Spec.TieredStore.Levels[0].VolumeType == common.VolumeTypeHostPath { + cacheDir = runtime.Spec.TieredStore.Levels[0].Path + } + } + if cacheDir != "" { + cacheDirs = strings.Split(cacheDir, ":") + } + + // if cache-dir is set in worker option, it will override the cache-dir of worker in runtime + workerOptions := runtime.Spec.Worker.Options + for k, v := range workerOptions { + if k == "cache-dir" { + cacheDirs = append(cacheDirs, strings.Split(v, ":")...) + break + } + } + return +} + func (j *JuiceFSEngine) getUUID(pod corev1.Pod, containerName string) (uuid string, err error) { cm, err := j.GetValuesConfigMap() if err != nil { diff --git a/pkg/ddc/juicefs/shutdown_test.go b/pkg/ddc/juicefs/shutdown_test.go index 5d5a334b7a5..ce2286b5a80 100644 --- a/pkg/ddc/juicefs/shutdown_test.go +++ b/pkg/ddc/juicefs/shutdown_test.go @@ -21,22 +21,25 @@ import ( "reflect" "testing" - "github.com/fluid-cloudnative/fluid/pkg/ctrl" "github.com/go-logr/logr" - "github.com/fluid-cloudnative/fluid/pkg/common" - "github.com/fluid-cloudnative/fluid/pkg/ddc/juicefs/operations" + "github.com/fluid-cloudnative/fluid/pkg/ctrl" + apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/juicefs/operations" + . "github.com/agiledragon/gomonkey/v2" "github.com/brahma-adshonor/gohook" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" . "github.com/smartystreets/goconvey/convey" corev1 "k8s.io/api/core/v1" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -415,8 +418,8 @@ func TestJuiceFSEngine_cleanupCache(t *testing.T) { return r, nil }) defer patch1.Reset() - patch2 := ApplyMethod(reflect.TypeOf(operations.JuiceFileUtils{}), "DeleteDir", - func(_ operations.JuiceFileUtils, cacheDir string) error { + patch2 := ApplyMethod(reflect.TypeOf(operations.JuiceFileUtils{}), "DeleteCacheDirs", + func(_ operations.JuiceFileUtils, cacheDirs []string) error { return nil }) defer patch2.Reset() @@ -441,7 +444,7 @@ func TestJuiceFSEngine_cleanupCache(t *testing.T) { return r, nil }) defer patch1.Reset() - patch2 := ApplyMethod(reflect.TypeOf(operations.JuiceFileUtils{}), "DeleteDir", + patch2 := ApplyMethod(reflect.TypeOf(operations.JuiceFileUtils{}), "DeleteCacheDir", func(_ operations.JuiceFileUtils, cacheDir string) error { return errors.New("delete dir error") }) @@ -466,7 +469,7 @@ func TestJuiceFSEngine_cleanupCache(t *testing.T) { return r, nil }) defer patch1.Reset() - patch2 := ApplyMethod(reflect.TypeOf(operations.JuiceFileUtils{}), "DeleteDir", + patch2 := ApplyMethod(reflect.TypeOf(operations.JuiceFileUtils{}), "DeleteCacheDir", func(_ operations.JuiceFileUtils, cacheDir string) error { return errors.New("delete dir error") }) @@ -667,3 +670,125 @@ func TestJuiceFSEngine_cleanAll(t *testing.T) { }) } } + +func TestJuiceFSEngine_getCacheDirs(t *testing.T) { + type args struct { + runtime *datav1alpha1.JuiceFSRuntime + } + tests := []struct { + name string + args args + wantCacheDirs []string + }{ + { + name: "test-default", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{}, + }, + wantCacheDirs: []string{"/var/jfsCache"}, + }, + { + name: "test-hostpath", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + TieredStore: datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + VolumeType: common.VolumeTypeHostPath, + Path: "/mnt/ramdisk", + }}, + }, + }, + }, + }, + wantCacheDirs: []string{"/mnt/ramdisk"}, + }, + { + name: "test-emptydir", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + TieredStore: datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + VolumeType: common.VolumeTypeEmptyDir, + Path: "/mnt/ramdisk", + }}, + }, + }, + }, + }, + wantCacheDirs: nil, + }, + { + name: "test-multipath-tiredstore", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + TieredStore: datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + VolumeType: common.VolumeTypeHostPath, + Path: "/mnt/ramdisk:/mnt/ramdisk2", + }}, + }, + }, + }, + }, + wantCacheDirs: []string{"/mnt/ramdisk", "/mnt/ramdisk2"}, + }, + { + name: "test-worker-cache", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + TieredStore: datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + VolumeType: common.VolumeTypeHostPath, + Path: "/mnt/ramdisk:/mnt/ramdisk2", + }}, + }, + Worker: datav1alpha1.JuiceFSCompTemplateSpec{ + Options: map[string]string{ + "cache-dir": "/worker/ramdisk", + }, + }, + }, + }, + }, + wantCacheDirs: []string{"/mnt/ramdisk", "/mnt/ramdisk2", "/worker/ramdisk"}, + }, + { + name: "test-worker-multi-cache", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + TieredStore: datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + VolumeType: common.VolumeTypeHostPath, + Path: "/mnt/ramdisk:/mnt/ramdisk2", + }}, + }, + Worker: datav1alpha1.JuiceFSCompTemplateSpec{ + Options: map[string]string{ + "cache-dir": "/worker/ramdisk1:/worker/ramdisk2", + }, + }, + }, + }, + }, + wantCacheDirs: []string{"/mnt/ramdisk", "/mnt/ramdisk2", "/worker/ramdisk1", "/worker/ramdisk2"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + j := &JuiceFSEngine{} + if gotCacheDirs := j.getCacheDirs(tt.args.runtime); !reflect.DeepEqual(gotCacheDirs, tt.wantCacheDirs) { + t.Errorf("getCacheDirs() = %v, want %v", gotCacheDirs, tt.wantCacheDirs) + } + }) + } +} diff --git a/pkg/ddc/juicefs/transform.go b/pkg/ddc/juicefs/transform.go index 96084e3c12b..9cee122d987 100644 --- a/pkg/ddc/juicefs/transform.go +++ b/pkg/ddc/juicefs/transform.go @@ -109,6 +109,12 @@ func (j *JuiceFSEngine) transformWorkers(runtime *datav1alpha1.JuiceFSRuntime, v if err != nil { j.Log.Error(err, "failed to transform volumes for worker") } + // transform cache volumes for worker + err = j.transformWorkerCacheVolumes(runtime, value) + if err != nil { + j.Log.Error(err, "failed to transform cache volumes for worker") + return err + } // parse work pod network mode value.Worker.HostNetwork = datav1alpha1.IsHostNetwork(runtime.Spec.Worker.NetworkMode) diff --git a/pkg/ddc/juicefs/transform_fuse.go b/pkg/ddc/juicefs/transform_fuse.go index 5f11bb19076..ffb6c942e97 100644 --- a/pkg/ddc/juicefs/transform_fuse.go +++ b/pkg/ddc/juicefs/transform_fuse.go @@ -81,6 +81,12 @@ func (j *JuiceFSEngine) transformFuse(runtime *datav1alpha1.JuiceFSRuntime, data j.Log.Error(err, "failed to transform volumes for fuse") return err } + // transform cache volumes for fuse + err = j.transformFuseCacheVolumes(runtime, value) + if err != nil { + j.Log.Error(err, "failed to transform cache volumes for fuse") + return err + } // set critical fuse pod to avoid eviction value.Fuse.CriticalPod = common.CriticalFusePodEnabled() @@ -174,7 +180,11 @@ func (j *JuiceFSEngine) genValue(mount datav1alpha1.Mount, tiredStoreLevel *data var storagePath = DefaultCacheDir var volumeType = common.VolumeTypeHostPath if tiredStoreLevel != nil { - storagePath = tiredStoreLevel.Path // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot + // juicefs cache-dir use colon (:) to separate multiple paths + // community doc: https://juicefs.com/docs/community/command_reference/#juicefs-mount + // enterprise doc: https://juicefs.com/docs/cloud/commands_reference#mount + // /mnt/disk1/bigboot or /mnt/disk1/bigboot:/mnt/disk2/bigboot + storagePath = tiredStoreLevel.Path if tiredStoreLevel.Quota != nil { q := tiredStoreLevel.Quota // juicefs cache-size should be integer in MiB @@ -188,7 +198,7 @@ func (j *JuiceFSEngine) genValue(mount datav1alpha1.Mount, tiredStoreLevel *data } volumeType = tiredStoreLevel.VolumeType } - originPath := strings.Split(storagePath, ",") + originPath := strings.Split(storagePath, ":") options["cache-dir"] = storagePath // transform cacheDir diff --git a/pkg/ddc/juicefs/transform_volume.go b/pkg/ddc/juicefs/transform_volume.go index c97ac025dc1..6b6621e56f7 100644 --- a/pkg/ddc/juicefs/transform_volume.go +++ b/pkg/ddc/juicefs/transform_volume.go @@ -18,8 +18,13 @@ package juicefs import ( "fmt" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "strconv" + "strings" + corev1 "k8s.io/api/core/v1" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" ) // transform worker volumes @@ -54,6 +59,70 @@ func (j *JuiceFSEngine) transformWorkerVolumes(runtime *datav1alpha1.JuiceFSRunt return err } +// transform worker cache volumes +// after genValue & genMount function +func (j *JuiceFSEngine) transformWorkerCacheVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) { + cacheDir := "" + + // if cache-dir is set in worker option, it will override the cache-dir of worker in runtime + workerOptions := runtime.Spec.Worker.Options + for k, v := range workerOptions { + if k == "cache-dir" { + cacheDir = v + break + } + } + // set tiredstore cache as volume also, for clear cache when shut down + caches := value.CacheDirs + index := len(caches) + if cacheDir != "" { + originPath := strings.Split(cacheDir, ":") + for i, p := range originPath { + var volumeType = common.VolumeTypeHostPath + caches[strconv.Itoa(index+i+1)] = cache{ + Path: p, + Type: string(volumeType), + } + } + } + + // set volumes & volumeMounts for cache + volumeMap := map[string]corev1.VolumeMount{} + for _, v := range runtime.Spec.Worker.VolumeMounts { + volumeMap[v.MountPath] = v + } + for i, cache := range caches { + if _, ok := volumeMap[cache.Path]; ok { + // cache path is already in volumeMounts + continue + } + value.Worker.VolumeMounts = append(value.Worker.VolumeMounts, corev1.VolumeMount{ + Name: "cache-dir-" + i, + MountPath: cache.Path, + }) + v := corev1.Volume{ + Name: "cache-dir-" + i, + } + switch cache.Type { + case string(common.VolumeTypeHostPath): + dir := corev1.HostPathDirectoryOrCreate + v.VolumeSource = corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: cache.Path, + Type: &dir, + }, + } + case string(common.VolumeTypeEmptyDir): + v.VolumeSource = corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + } + // todo: support volume template + } + value.Worker.Volumes = append(value.Worker.Volumes, v) + } + return err +} + // transform fuse volumes func (j *JuiceFSEngine) transformFuseVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) { if len(runtime.Spec.Fuse.VolumeMounts) > 0 { @@ -84,3 +153,45 @@ func (j *JuiceFSEngine) transformFuseVolumes(runtime *datav1alpha1.JuiceFSRuntim return err } + +// transform fuse cache volumes +// after genValue & genMount function +func (j *JuiceFSEngine) transformFuseCacheVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) { + caches := value.CacheDirs + + // set volumes & volumeMounts for cache + volumeMap := map[string]corev1.VolumeMount{} + for _, v := range runtime.Spec.Fuse.VolumeMounts { + volumeMap[v.MountPath] = v + } + for i, cache := range caches { + if _, ok := volumeMap[cache.Path]; ok { + // cache path is already in volumeMounts + continue + } + value.Fuse.VolumeMounts = append(value.Fuse.VolumeMounts, corev1.VolumeMount{ + Name: "cache-dir-" + i, + MountPath: cache.Path, + }) + v := corev1.Volume{ + Name: "cache-dir-" + i, + } + switch cache.Type { + case string(common.VolumeTypeHostPath): + dir := corev1.HostPathDirectoryOrCreate + v.VolumeSource = corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: cache.Path, + Type: &dir, + }, + } + case string(common.VolumeTypeEmptyDir): + v.VolumeSource = corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + } + // todo: support volume template + } + value.Fuse.Volumes = append(value.Fuse.Volumes, v) + } + return err +} diff --git a/pkg/ddc/juicefs/transform_volume_test.go b/pkg/ddc/juicefs/transform_volume_test.go index 3eead76d904..058c52a6b8f 100644 --- a/pkg/ddc/juicefs/transform_volume_test.go +++ b/pkg/ddc/juicefs/transform_volume_test.go @@ -17,10 +17,13 @@ package juicefs import ( - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - corev1 "k8s.io/api/core/v1" "reflect" "testing" + + corev1 "k8s.io/api/core/v1" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" ) func TestTransformWorkerVolumes(t *testing.T) { @@ -240,3 +243,339 @@ func TestTransformFuseVolumes(t *testing.T) { } } + +func TestJuiceFSEngine_transformWorkerCacheVolumes(t *testing.T) { + dir := corev1.HostPathDirectoryOrCreate + type args struct { + runtime *datav1alpha1.JuiceFSRuntime + value *JuiceFS + } + tests := []struct { + name string + args args + wantErr bool + wantVolumes []corev1.Volume + wantVolumeMounts []corev1.VolumeMount + }{ + { + name: "test-normal", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{}, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: []corev1.Volume{{ + Name: "cache-dir-1", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/cache", + Type: &dir, + }, + }, + }}, + wantVolumeMounts: []corev1.VolumeMount{{ + Name: "cache-dir-1", + MountPath: "/cache", + }}, + }, + { + name: "test-option-overwrite", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Worker: datav1alpha1.JuiceFSCompTemplateSpec{ + Options: map[string]string{"cache-dir": "/worker-cache1:/worker-cache2"}, + }, + }, + }, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: []corev1.Volume{ + { + Name: "cache-dir-1", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/cache", + Type: &dir, + }, + }, + }, + { + Name: "cache-dir-2", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/worker-cache1", + Type: &dir, + }, + }, + }, + { + Name: "cache-dir-3", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/worker-cache2", + Type: &dir, + }, + }, + }, + }, + wantVolumeMounts: []corev1.VolumeMount{ + { + Name: "cache-dir-1", + MountPath: "/cache", + }, + { + Name: "cache-dir-2", + MountPath: "/worker-cache1", + }, + { + Name: "cache-dir-3", + MountPath: "/worker-cache2", + }, + }, + }, + { + name: "test-volume-overwrite", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Worker: datav1alpha1.JuiceFSCompTemplateSpec{ + Options: map[string]string{"cache-dir": "/worker-cache1:/worker-cache2"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "cache", + MountPath: "/worker-cache2", + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "cache", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + Worker: Worker{ + VolumeMounts: []corev1.VolumeMount{ + { + Name: "cache", + MountPath: "/worker-cache2", + }, + }, + Volumes: []corev1.Volume{ + { + Name: "cache", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + }, + wantErr: false, + wantVolumes: []corev1.Volume{ + { + Name: "cache", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "cache-dir-1", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/cache", + Type: &dir, + }, + }, + }, + { + Name: "cache-dir-2", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/worker-cache1", + Type: &dir, + }, + }, + }, + }, + wantVolumeMounts: []corev1.VolumeMount{ + { + Name: "cache", + MountPath: "/worker-cache2", + }, + { + Name: "cache-dir-1", + MountPath: "/cache", + }, + { + Name: "cache-dir-2", + MountPath: "/worker-cache1", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + j := &JuiceFSEngine{} + if err := j.transformWorkerCacheVolumes(tt.args.runtime, tt.args.value); (err != nil) != tt.wantErr { + t.Errorf("transformWorkerCacheVolumes() error = %v, wantErr %v", err, tt.wantErr) + } + + // compare volumes + if len(tt.args.value.Worker.Volumes) != len(tt.wantVolumes) { + t.Errorf("want volumes %v, got %v for testcase %s", tt.wantVolumes, tt.args.value.Worker.Volumes, tt.name) + } + wantVolumeMap := make(map[string]corev1.Volume) + for _, v := range tt.wantVolumes { + wantVolumeMap[v.Name] = v + } + for _, v := range tt.args.value.Worker.Volumes { + if wv := wantVolumeMap[v.Name]; !reflect.DeepEqual(wv, v) { + t.Errorf("want volumes %v, got %v for testcase %s", tt.wantVolumes, tt.args.value.Worker.Volumes, tt.name) + } + } + + // compare volumeMounts + if len(tt.args.value.Worker.VolumeMounts) != len(tt.wantVolumeMounts) { + t.Errorf("want volumeMounts %v, got %v for testcase %s", tt.wantVolumeMounts, tt.args.value.Worker.VolumeMounts, tt.name) + } + wantVolumeMountsMap := make(map[string]corev1.VolumeMount) + for _, v := range tt.wantVolumeMounts { + wantVolumeMountsMap[v.Name] = v + } + for _, v := range tt.args.value.Worker.VolumeMounts { + if wv := wantVolumeMountsMap[v.Name]; !reflect.DeepEqual(wv, v) { + t.Errorf("want volumeMounts %v, got %v for testcase %s", tt.wantVolumeMounts, tt.args.value.Worker.VolumeMounts, tt.name) + } + } + }) + } +} + +func TestJuiceFSEngine_transformFuseCacheVolumes(t *testing.T) { + dir := corev1.HostPathDirectoryOrCreate + type args struct { + runtime *datav1alpha1.JuiceFSRuntime + value *JuiceFS + } + tests := []struct { + name string + args args + wantErr bool + wantVolumes []corev1.Volume + wantVolumeMounts []corev1.VolumeMount + }{ + { + name: "test-normal", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{}, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: []corev1.Volume{{ + Name: "cache-dir-1", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/cache", + Type: &dir, + }, + }, + }}, + wantVolumeMounts: []corev1.VolumeMount{{ + Name: "cache-dir-1", + MountPath: "/cache", + }}, + }, + { + name: "test-volume-overwrite", + args: args{ + runtime: &datav1alpha1.JuiceFSRuntime{ + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Fuse: datav1alpha1.JuiceFSFuseSpec{ + VolumeMounts: []corev1.VolumeMount{ + { + Name: "cache", + MountPath: "/cache", + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "cache", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + value: &JuiceFS{ + CacheDirs: map[string]cache{ + "1": {Path: "/cache", Type: string(common.VolumeTypeHostPath)}, + }, + }, + }, + wantErr: false, + wantVolumes: nil, + wantVolumeMounts: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + j := &JuiceFSEngine{} + if err := j.transformFuseCacheVolumes(tt.args.runtime, tt.args.value); (err != nil) != tt.wantErr { + t.Errorf("transformFuseCacheVolumes() error = %v, wantErr %v", err, tt.wantErr) + } + + // compare volumes + if len(tt.args.value.Fuse.Volumes) != len(tt.wantVolumes) { + t.Errorf("want volumes %v, got %v for testcase %s", tt.wantVolumes, tt.args.value.Fuse.Volumes, tt.name) + } + wantVolumeMap := make(map[string]corev1.Volume) + for _, v := range tt.wantVolumes { + wantVolumeMap[v.Name] = v + } + for _, v := range tt.args.value.Fuse.Volumes { + if wv := wantVolumeMap[v.Name]; !reflect.DeepEqual(wv, v) { + t.Errorf("want volumes %v, got %v for testcase %s", tt.wantVolumes, tt.args.value.Fuse.Volumes, tt.name) + } + } + + // compare volumeMounts + if len(tt.args.value.Fuse.VolumeMounts) != len(tt.wantVolumeMounts) { + t.Errorf("want volumeMounts %v, got %v for testcase %s", tt.wantVolumeMounts, tt.args.value.Fuse.VolumeMounts, tt.name) + } + wantVolumeMountsMap := make(map[string]corev1.VolumeMount) + for _, v := range tt.wantVolumeMounts { + wantVolumeMountsMap[v.Name] = v + } + for _, v := range tt.args.value.Fuse.VolumeMounts { + if wv := wantVolumeMountsMap[v.Name]; !reflect.DeepEqual(wv, v) { + t.Errorf("want volumeMounts %v, got %v for testcase %s", tt.wantVolumeMounts, tt.args.value.Fuse.VolumeMounts, tt.name) + } + } + }) + } +} diff --git a/pkg/ddc/juicefs/utils_test.go b/pkg/ddc/juicefs/utils_test.go index f63255bda50..5ad428bafc3 100644 --- a/pkg/ddc/juicefs/utils_test.go +++ b/pkg/ddc/juicefs/utils_test.go @@ -23,12 +23,13 @@ import ( "testing" . "github.com/agiledragon/gomonkey/v2" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/pkg/scripts/poststart/check_fuse.go b/pkg/scripts/poststart/check_fuse.go index 13469fdf5ca..0d414d1d650 100644 --- a/pkg/scripts/poststart/check_fuse.go +++ b/pkg/scripts/poststart/check_fuse.go @@ -45,8 +45,8 @@ ConditionPathIsMountPoint="$1" MountType="$2" count=0 -# while ! mount | grep alluxio | grep $ConditionPathIsMountPoint | grep -v grep -while ! mount | grep $ConditionPathIsMountPoint | grep $MountType +# while ! cat /proc/self/mountinfo | grep alluxio | grep $ConditionPathIsMountPoint | grep -v grep +while ! cat /proc/self/mountinfo | grep $ConditionPathIsMountPoint | grep $MountType do sleep 3 count=¬expr $count + 1¬ diff --git a/pkg/utils/helm/utils.go b/pkg/utils/helm/utils.go index 3cb1bc31fda..66f8f840256 100644 --- a/pkg/utils/helm/utils.go +++ b/pkg/utils/helm/utils.go @@ -41,7 +41,6 @@ func InstallRelease(name string, namespace string, valueFile string, chartName s // 4. prepare the arguments args := []string{"install", "-f", valueFile, "--namespace", namespace, name, chartName} - log.V(1).Info("Exec", "args", args) // env := os.Environ() // if types.KubeConfig != "" { @@ -51,15 +50,17 @@ func InstallRelease(name string, namespace string, valueFile string, chartName s // return syscall.Exec(cmd, args, env) // 5. execute the command cmd := exec.Command(binary, args...) + log.Info("Exec", "command", cmd.String()) // cmd.Env = env out, err := cmd.CombinedOutput() log.Info(string(out)) if err != nil { - log.Error(err, "failed to execute", "args", strings.Join(args, " ")) + log.Error(err, "failed to execute InstallRelease() command", "command", cmd.String()) + return fmt.Errorf("failed to create engine-related kubernetes resources") } - return err + return nil } // CheckRelease checks if the release with the given name and namespace exist. @@ -78,7 +79,7 @@ func CheckRelease(name, namespace string) (exist bool, err error) { if err := cmd.Start(); err != nil { // log.Fatalf("cmd.Start: %v", err) // log.Error(err) - log.Error(err, "failed to execute") + log.Error(err, "failed to start CheckRelease() command", "command", cmd.String()) return exist, err } @@ -93,7 +94,7 @@ func CheckRelease(name, namespace string) (exist bool, err error) { } } } else { - log.Error(err, "cmd.Wait") + log.Error(err, "failed to execute CheckRelease() command", "command", cmd.String()) return exist, err } } else { @@ -122,15 +123,19 @@ func DeleteRelease(name, namespace string) error { args := []string{"uninstall", name, "-n", namespace} cmd := exec.Command(binary, args...) - + log.Info("Exec", "command", cmd.String()) // env := os.Environ() // if types.KubeConfig != "" { // env = append(env, fmt.Sprintf("KUBECONFIG=%s", types.KubeConfig)) // } // return syscall.Exec(cmd, args, env) out, err := cmd.Output() - log.V(1).Info("delete release", "result", string(out)) - return err + log.Info("delete release", "result", string(out)) + if err != nil { + log.Error(err, "failed to execute DeleteRelease() command", "command", cmd.String()) + return fmt.Errorf("failed to delete engine-related kubernetes resources") + } + return nil } // ListReleases return an array with all releases' names in a given namespace diff --git a/pkg/utils/mount.go b/pkg/utils/mount.go index 0212dd2264b..4971bb82c24 100644 --- a/pkg/utils/mount.go +++ b/pkg/utils/mount.go @@ -18,19 +18,20 @@ package utils import ( "errors" "fmt" - "github.com/golang/glog" "io/ioutil" - corev1 "k8s.io/api/core/v1" - "k8s.io/utils/mount" "os" "os/exec" "path/filepath" "strings" + + "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" + "k8s.io/utils/mount" ) const MountRoot string = "MOUNT_ROOT" -//GetMountRoot gets the value of the env variable named MOUNT_ROOT +// GetMountRoot gets the value of the env variable named MOUNT_ROOT func GetMountRoot() (string, error) { mountRoot := os.Getenv(MountRoot) @@ -50,7 +51,22 @@ func CheckMountReady(fluidPath string, mountType string) error { glog.Infoln(command) stdoutStderr, err := command.CombinedOutput() glog.Infoln(string(stdoutStderr)) - return err + + if err != nil { + var checkMountErr *exec.ExitError + if errors.As(err, &checkMountErr) { + switch checkMountErr.ExitCode() { + case 1: + // exitcode=1 indicates timeout waiting for mount point to be ready + return errors.New("timeout waiting for FUSE mount point to be ready") + case 2: + // exitcode=2 indicates subPath not exists + return fmt.Errorf("subPath not exists under FUSE mount") + } + } + return err + } + return nil } func IsMounted(absPath string) (bool, error) { diff --git a/test/prow/juicefs_access_data.py b/test/prow/juicefs_access_data.py index 738e69e8a27..ee7ff28c513 100644 --- a/test/prow/juicefs_access_data.py +++ b/test/prow/juicefs_access_data.py @@ -96,7 +96,8 @@ def create_dataset_and_runtime(dataset_name): "metadata": {"name": dataset_name, "namespace": APP_NAMESPACE}, "spec": { "replicas": 1, - "tieredstore": {"levels": [{"mediumtype": "MEM", "path": "/dev/shm", "quota": "40960", "low": "0.1"}]} + "tieredstore": {"levels": [ + {"mediumtype": "MEM", "path": "/dev/shm/cache1:/dev/shm/cache2", "quota": "400Mi", "low": "0.1"}]} } } @@ -119,6 +120,67 @@ def create_dataset_and_runtime(dataset_name): print("Create juicefs runtime {}".format(dataset_name)) +def get_worker_node(dataset_name): + api = client.CoreV1Api() + pod_name = "{}-worker-0".format(dataset_name) + count = 0 + while count < 300: + count += 1 + try: + pod = api.read_namespaced_pod(name=pod_name, namespace=APP_NAMESPACE) + return pod.spec.node_name + except client.exceptions.ApiException as e: + if e.status == 404: + time.sleep(1) + continue + return "" + + +def create_check_cache_job(node_name): + print("Create check cache job") + api = client.BatchV1Api() + + container = client.V1Container( + name="demo", + image="debian:buster", + command=["/bin/bash"], + args=["-c", "if [ $(find /dev/shm/* | grep chunks | wc -l) = 0 ]; then exit 0; else exit 1; fi"], + volume_mounts=[client.V1VolumeMount(mount_path="/dev/shm/cache1", name="cache1"), + client.V1VolumeMount(mount_path="/dev/shm/cache2", name="cache2")] + ) + + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={"app": "checkcache"}), + spec=client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[ + client.V1Volume( + name="cache1", + host_path=client.V1HostPathVolumeSource(path="/dev/shm/cache1") + ), + client.V1Volume( + name="cache2", + host_path=client.V1HostPathVolumeSource(path="/dev/shm/cache2") + ) + ], + node_name=node_name, + ) + ) + + spec = client.V1JobSpec(template=template, backoff_limit=4) + + job = client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name="checkcache", namespace=APP_NAMESPACE), + spec=spec + ) + + api.create_namespaced_job(namespace=APP_NAMESPACE, body=job) + print("Job {} created.".format("checkcache")) + + def check_dataset_bound(dataset_name): api = client.CustomObjectsApi() @@ -254,9 +316,12 @@ def check_data_job_status(job_name): while count < 300: count += 1 response = api.read_namespaced_job_status(name=job_name, namespace=APP_NAMESPACE) - if response.status.succeeded is not None or response.status.failed is not None: + if response.status.succeeded is not None: print("Job {} completed.".format(job_name)) return True + if response.status.failed is not None: + print("Job {} failed.".format(job_name)) + return False time.sleep(1) print("Job {} not completed within 300s.".format(job_name)) return False @@ -331,9 +396,9 @@ def clean_up_secret(): def main(): config.load_incluster_config() - # ******************************** + # **************************************************************** # ------- test normal mode ------- - # ******************************** + # **************************************************************** dataset_name = "jfsdemo" test_write_job = "demo-write" test_read_job = "demo-read" @@ -348,6 +413,8 @@ def main(): if not check_volume_resources_ready(dataset_name): raise Exception("volume resources of dataset {} in normal mode are not ready.".format(dataset_name)) + node_name = get_worker_node(dataset_name) + # 3. create write & read data job create_data_write_job(dataset_name, test_write_job) if not check_data_job_status(test_write_job): @@ -369,7 +436,21 @@ def main(): # 6. clean up secret clean_up_secret() - # ******************************** + # **************************************************************** + # ------- test cache clear after runtime shutdown ------- + # **************************************************************** + try: + create_check_cache_job(node_name) + if not check_data_job_status("checkcache"): + raise Exception("read job {} in normal mode failed.".format("checkcache")) + except Exception as e: + print(e) + exit(-1) + finally: + # clean up check cache job + clean_job("checkcache") + + # **************************************************************** # ------- test sidecar mode ------- # ******************************** dataset_name = "jfsdemo-sidecar"