Skip to content

Commit

Permalink
[juicefs] fix worker cache when set option (fluid-cloudnative#2563)
Browse files Browse the repository at this point in the history
* fix worker cache when set option

Signed-off-by: zwwhdls <[email protected]>

* update changelog in chart

Signed-off-by: zwwhdls <[email protected]>

* fix unittest

Signed-off-by: zwwhdls <[email protected]>

---------

Signed-off-by: zwwhdls <[email protected]>
  • Loading branch information
zwwhdls authored and niwang committed Feb 8, 2023
1 parent 24d20bf commit a899c7b
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 42 deletions.
3 changes: 3 additions & 0 deletions charts/juicefs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@

0.2.11
- Support credential key in secret

0.2.12
- Set cache dir in volumes & volumeMounts for worker & fuse
2 changes: 1 addition & 1 deletion charts/juicefs/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: juicefs
apiVersion: v1
description: FileSystem aimed for data analytics and machine learning in any cloud.
version: 0.2.11
version: 0.2.12
appVersion: v1.0.0
home: https://juicefs.com/
maintainers:
Expand Down
26 changes: 5 additions & 21 deletions charts/juicefs/templates/fuse/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,11 @@ spec:
exec:
command: ["sh", "-c", "umount {{ .Values.fuse.mountPath }}"]
volumeMounts:
- name: juicefs-fuse-mount
mountPath: {{ .Values.fuse.hostMountPath }}
mountPropagation: Bidirectional
- mountPath: /root/script
name: script
{{- range $name, $mount := .Values.cacheDirs }}
- name: cache-dir-{{ $name }}
mountPath: "{{ $mount.path }}"
{{- end }}
- name: juicefs-fuse-mount
mountPath: {{ .Values.fuse.hostMountPath }}
mountPropagation: Bidirectional
- mountPath: /root/script
name: script
{{- if .Values.fuse.volumeMounts }}
{{ toYaml .Values.fuse.volumeMounts | indent 12 }}
{{- end }}
Expand All @@ -155,18 +151,6 @@ spec:
hostPath:
path: {{ .Values.fuse.hostMountPath }}
type: DirectoryOrCreate
{{- range $name, $mount := .Values.cacheDirs }}
{{- if eq $mount.type "hostPath" }}
- hostPath:
path: "{{ $mount.path }}"
type: DirectoryOrCreate
name: cache-dir-{{ $name }}
{{- else if eq $mount.type "emptyDir" }}
- emptyDir: {}
name: cache-dir-{{ $name }}
{{- /* todo: support volume template */}}
{{- end }}
{{- end }}
- name: script
configMap:
name: {{ template "juicefs.fullname" . }}-fuse-script
Expand Down
16 changes: 0 additions & 16 deletions charts/juicefs/templates/worker/statefuleset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,27 +126,11 @@ spec:
volumeMounts:
- mountPath: /root/script
name: script
{{- range $name, $mount := .Values.cacheDirs }}
- name: cache-dir-{{ $name }}
mountPath: "{{ $mount.path }}"
{{- end }}
{{- if .Values.worker.volumeMounts }}
{{ toYaml .Values.worker.volumeMounts | indent 12 }}
{{- end }}
restartPolicy: Always
volumes:
{{- range $name, $mount := .Values.cacheDirs }}
{{- if eq $mount.type "hostPath" }}
- hostPath:
path: "{{ $mount.path }}"
type: DirectoryOrCreate
name: cache-dir-{{ $name }}
{{- else if eq $mount.type "emptyDir" }}
- emptyDir: {}
name: cache-dir-{{ $name }}
{{- /* todo: support volume template */}}
{{- end }}
{{- end }}
- name: script
configMap:
name: {{ template "juicefs.fullname" . }}-worker-script
Expand Down
9 changes: 8 additions & 1 deletion pkg/ddc/juicefs/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/transfromer"
corev1 "k8s.io/api/core/v1"
)

func (j *JuiceFSEngine) transform(runtime *datav1alpha1.JuiceFSRuntime) (value *JuiceFS, err error) {
Expand Down Expand Up @@ -109,6 +110,12 @@ func (j *JuiceFSEngine) transformWorkers(runtime *datav1alpha1.JuiceFSRuntime, v
if err != nil {
j.Log.Error(err, "failed to transform volumes for worker")
}
// transform cache volumes for worker
err = j.transformWorkerCacheVolumes(runtime, value)
if err != nil {
j.Log.Error(err, "failed to transform cache volumes for worker")
return err
}

// parse work pod network mode
value.Worker.HostNetwork = datav1alpha1.IsHostNetwork(runtime.Spec.Worker.NetworkMode)
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddc/juicefs/transform_fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func (j *JuiceFSEngine) transformFuse(runtime *datav1alpha1.JuiceFSRuntime, data
j.Log.Error(err, "failed to transform volumes for fuse")
return err
}
// transform cache volumes for fuse
err = j.transformFuseCacheVolumes(runtime, value)
if err != nil {
j.Log.Error(err, "failed to transform cache volumes for fuse")
return err
}

// set critical fuse pod to avoid eviction
value.Fuse.CriticalPod = common.CriticalFusePodEnabled()
Expand Down
113 changes: 112 additions & 1 deletion pkg/ddc/juicefs/transform_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ package juicefs

import (
"fmt"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
)

// transform worker volumes
Expand Down Expand Up @@ -54,6 +59,70 @@ func (j *JuiceFSEngine) transformWorkerVolumes(runtime *datav1alpha1.JuiceFSRunt
return err
}

// transform worker cache volumes
// after genValue & genMount function
func (j *JuiceFSEngine) transformWorkerCacheVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) {
caches := map[string]cache{}
cacheDir := ""

// if cache-dir is set in worker option, it will override the cache-dir of worker in runtime
workerOptions := runtime.Spec.Worker.Options
for k, v := range workerOptions {
if k == "cache-dir" {
cacheDir = v
break
}
}
if cacheDir != "" {
originPath := strings.Split(cacheDir, ",")
for i, p := range originPath {
var volumeType = common.VolumeTypeHostPath
caches[strconv.Itoa(i+1)] = cache{
Path: p,
Type: string(volumeType),
}
}
} else {
caches = value.CacheDirs
}

// set volumes & volumeMounts for cache
volumeMap := map[string]corev1.VolumeMount{}
for _, v := range runtime.Spec.Worker.VolumeMounts {
volumeMap[v.MountPath] = v
}
for i, cache := range caches {
if _, ok := volumeMap[cache.Path]; ok {
// cache path is already in volumeMounts
continue
}
value.Worker.VolumeMounts = append(value.Worker.VolumeMounts, corev1.VolumeMount{
Name: "cache-dir-" + i,
MountPath: cache.Path,
})
v := corev1.Volume{
Name: "cache-dir-" + i,
}
switch cache.Type {
case string(common.VolumeTypeHostPath):
dir := corev1.HostPathDirectoryOrCreate
v.VolumeSource = corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: cache.Path,
Type: &dir,
},
}
case string(common.VolumeTypeEmptyDir):
v.VolumeSource = corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
}
// todo: support volume template
}
value.Worker.Volumes = append(value.Worker.Volumes, v)
}
return err
}

// transform fuse volumes
func (j *JuiceFSEngine) transformFuseVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) {
if len(runtime.Spec.Fuse.VolumeMounts) > 0 {
Expand Down Expand Up @@ -84,3 +153,45 @@ func (j *JuiceFSEngine) transformFuseVolumes(runtime *datav1alpha1.JuiceFSRuntim

return err
}

// transform fuse cache volumes
// after genValue & genMount function
func (j *JuiceFSEngine) transformFuseCacheVolumes(runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (err error) {
caches := value.CacheDirs

// set volumes & volumeMounts for cache
volumeMap := map[string]corev1.VolumeMount{}
for _, v := range runtime.Spec.Fuse.VolumeMounts {
volumeMap[v.MountPath] = v
}
for i, cache := range caches {
if _, ok := volumeMap[cache.Path]; ok {
// cache path is already in volumeMounts
continue
}
value.Fuse.VolumeMounts = append(value.Fuse.VolumeMounts, corev1.VolumeMount{
Name: "cache-dir-" + i,
MountPath: cache.Path,
})
v := corev1.Volume{
Name: "cache-dir-" + i,
}
switch cache.Type {
case string(common.VolumeTypeHostPath):
dir := corev1.HostPathDirectoryOrCreate
v.VolumeSource = corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: cache.Path,
Type: &dir,
},
}
case string(common.VolumeTypeEmptyDir):
v.VolumeSource = corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
}
// todo: support volume template
}
value.Fuse.Volumes = append(value.Fuse.Volumes, v)
}
return err
}
Loading

0 comments on commit a899c7b

Please sign in to comment.