Skip to content

Commit

Permalink
fix handle multi sidecar in appcontroller
Browse files Browse the repository at this point in the history
Signed-off-by: zwwhdls <[email protected]>
  • Loading branch information
zwwhdls committed Jan 11, 2023
1 parent 6ef44e5 commit b71a535
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 45 deletions.
14 changes: 8 additions & 6 deletions pkg/controllers/v1alpha1/fluidapp/fluidapp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ package fluidapp

import (
"context"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ctrl/watch"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -33,6 +30,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ctrl/watch"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)

const controllerName string = "FluidAppController"
Expand Down Expand Up @@ -98,8 +100,8 @@ func (f *FluidAppReconciler) Reconcile(ctx context.Context, request reconcile.Re
func (f *FluidAppReconciler) internalReconcile(ctx reconcileRequestContext) (ctrl.Result, error) {
pod := ctx.pod

// umount fuse sidecar
err := f.umountFuseSidecar(pod)
// umount fuse sidecars
err := f.umountFuseSidecars(pod)
if err != nil {
ctx.Log.Error(err, "umount fuse sidecar error", "podName", pod.Name, "podNamespace", pod.Namespace)
return utils.RequeueIfError(err)
Expand Down
23 changes: 15 additions & 8 deletions pkg/controllers/v1alpha1/fluidapp/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package fluidapp

import (
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"strings"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"

"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)

type FluidAppReconcilerImplement struct {
Expand All @@ -41,13 +43,18 @@ func NewFluidAppReconcilerImplement(client client.Client, log logr.Logger, recor
return r
}

func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod) (err error) {
var fuseContainer corev1.Container
func (i *FluidAppReconcilerImplement) umountFuseSidecars(pod *corev1.Pod) (err error) {
for _, cn := range pod.Spec.Containers {
if cn.Name == common.FuseContainerName {
fuseContainer = cn
if strings.Contains(cn.Name, common.FuseContainerName) {
if e := i.umountFuseSidecar(pod, cn); e != nil {
return
}
}
}
return
}

func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod, fuseContainer corev1.Container) (err error) {
if fuseContainer.Name == "" {
return
}
Expand All @@ -72,7 +79,7 @@ func (i *FluidAppReconcilerImplement) umountFuseSidecar(pod *corev1.Pod) (err er
}

i.Log.Info("exec cmd in pod fuse container", "cmd", cmd, "podName", pod.Name, "namespace", pod.Namespace)
stdout, stderr, err := kubeclient.ExecCommandInContainer(pod.Name, common.FuseContainerName, pod.Namespace, cmd)
stdout, stderr, err := kubeclient.ExecCommandInContainer(pod.Name, fuseContainer.Name, pod.Namespace, cmd)
if err != nil {
i.Log.Info("exec output", "stdout", stdout, "stderr", stderr)
if strings.Contains(stderr, "not mounted") {
Expand Down
44 changes: 36 additions & 8 deletions pkg/controllers/v1alpha1/fluidapp/implement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
"testing"

"github.com/brahma-adshonor/gohook"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)

func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) {
func TestFluidAppReconcilerImplement_umountFuseSidecars(t *testing.T) {
mockExec := func(p1, p2, p3 string, p4 []string) (stdout string, stderr string, e error) {
return "", "", nil
}
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) {
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: common.FuseContainerName}},
Containers: []corev1.Container{{Name: common.FuseContainerName + "-0"}},
},
},
},
Expand All @@ -91,7 +92,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{Command: []string{"umount"}},
Expand All @@ -110,7 +111,7 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
VolumeMounts: []corev1.VolumeMount{{
Name: "juicefs-fuse-mount",
MountPath: "/mnt/jfs",
Expand All @@ -121,13 +122,40 @@ func TestFluidAppReconcilerImplement_umountFuseSidecar(t *testing.T) {
},
wantErr: false,
},
{
name: "test-multi-sidecar",
args: args{
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: common.FuseContainerName + "-0",
VolumeMounts: []corev1.VolumeMount{{
Name: "juicefs-fuse-mount",
MountPath: "/mnt/jfs",
}},
},
{
Name: common.FuseContainerName + "-1",
VolumeMounts: []corev1.VolumeMount{{
Name: "juicefs-fuse-mount",
MountPath: "/mnt/jfs",
}},
},
},
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
i := &FluidAppReconcilerImplement{
Log: fake.NullLogger(),
}
if err := i.umountFuseSidecar(tt.args.pod); (err != nil) != tt.wantErr {
if err := i.umountFuseSidecars(tt.args.pod); (err != nil) != tt.wantErr {
t.Errorf("umountFuseSidecar() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
9 changes: 6 additions & 3 deletions pkg/ctrl/watch/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package watch
import (
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils"

"strings"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
)
Expand Down Expand Up @@ -108,7 +111,7 @@ func ShouldInQueue(pod *corev1.Pod) bool {
// ignore if no fuse container
exist := false
for _, cn := range pod.Spec.Containers {
if cn.Name == common.FuseContainerName {
if strings.Contains(cn.Name, common.FuseContainerName) {
exist = true
break
}
Expand All @@ -126,15 +129,15 @@ func ShouldInQueue(pod *corev1.Pod) bool {

// reconcile if all app containers exit 0 and fuse container not exit
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Name != common.FuseContainerName {
if !strings.Contains(containerStatus.Name, common.FuseContainerName) {
log.V(1).Info("container status", "status", containerStatus)
if containerStatus.State.Terminated == nil {
log.Info("fluid app not exited", "pod", pod.Name, "container", containerStatus.Name, "namespace", pod.Namespace)
// container not exist
return false
}
}
if containerStatus.Name == common.FuseContainerName {
if strings.Contains(containerStatus.Name, common.FuseContainerName) {
if containerStatus.State.Running == nil {
log.Info("fluid fuse not running", "pod", pod.Name, "container", containerStatus.Name, "namespace", pod.Namespace)
return false
Expand Down
38 changes: 20 additions & 18 deletions pkg/ctrl/watch/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package watch

import (
"github.com/fluid-cloudnative/fluid/pkg/common"
"testing"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
"testing"
"time"

"github.com/fluid-cloudnative/fluid/pkg/common"
)

func Test_podEventHandler_onDeleteFunc(t *testing.T) {
Expand Down Expand Up @@ -80,7 +82,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) {
common.InjectServerless: common.True,
},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
ContainerStatuses: []corev1.ContainerStatus{
Expand All @@ -93,7 +95,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) {
},
},
{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now()},
Expand All @@ -110,7 +112,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) {
common.InjectServerless: common.True,
},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
ContainerStatuses: []corev1.ContainerStatus{
Expand All @@ -124,7 +126,7 @@ func Test_podEventHandler_onUpdateFunc(t *testing.T) {
},
},
{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now()},
Expand Down Expand Up @@ -225,7 +227,7 @@ func Test_shouldRequeue(t *testing.T) {
Name: "test",
Labels: map[string]string{common.InjectServerless: common.True},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}},
Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{
{
Name: "app",
Expand All @@ -236,7 +238,7 @@ func Test_shouldRequeue(t *testing.T) {
},
},
{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now()},
Expand All @@ -256,7 +258,7 @@ func Test_shouldRequeue(t *testing.T) {
Name: "test",
Labels: map[string]string{common.InjectServerless: common.True},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}},
Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{
{
Name: "app",
Expand All @@ -268,7 +270,7 @@ func Test_shouldRequeue(t *testing.T) {
},
},
{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
StartedAt: metav1.Time{Time: time.Now()},
Expand All @@ -289,7 +291,7 @@ func Test_shouldRequeue(t *testing.T) {
Name: "test",
Labels: map[string]string{common.InjectServerless: common.True},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
ContainerStatuses: []corev1.ContainerStatus{
Expand All @@ -303,7 +305,7 @@ func Test_shouldRequeue(t *testing.T) {
},
},
{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now()},
Expand All @@ -323,7 +325,7 @@ func Test_shouldRequeue(t *testing.T) {
Name: "test",
Labels: map[string]string{common.InjectServerless: common.True},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName}}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName + "-0"}}},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
ContainerStatuses: []corev1.ContainerStatus{
Expand All @@ -342,7 +344,7 @@ func Test_shouldRequeue(t *testing.T) {
}},
},
{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now()},
Expand All @@ -361,7 +363,7 @@ func Test_shouldRequeue(t *testing.T) {
Name: "test",
Labels: map[string]string{common.InjectServerless: common.True},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName}}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: "app2"}, {Name: common.FuseContainerName + "-0"}}},
Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{
{
Name: "app",
Expand All @@ -381,7 +383,7 @@ func Test_shouldRequeue(t *testing.T) {
},
},
{
Name: common.FuseContainerName,
Name: common.FuseContainerName + "-0",
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{
StartedAt: metav1.Time{Time: time.Now()},
Expand All @@ -400,7 +402,7 @@ func Test_shouldRequeue(t *testing.T) {
Name: "test",
Labels: map[string]string{common.InjectServerless: common.True},
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName}}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "app"}, {Name: common.FuseContainerName + "-0"}}},
Status: corev1.PodStatus{
Phase: corev1.PodPending,
ContainerStatuses: []corev1.ContainerStatus{}},
Expand Down
9 changes: 7 additions & 2 deletions test/prow/juicefs_access_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,12 @@ def clean_job(job_name):

# See https://github.com/kubernetes-client/python/issues/234
body = client.V1DeleteOptions(propagation_policy='Background')
batch_api.delete_namespaced_job(name=job_name, namespace=APP_NAMESPACE, body=body)
try:
batch_api.delete_namespaced_job(name=job_name, namespace=APP_NAMESPACE, body=body)
except client.exceptions.ApiException as e:
if e.status == 404:
print("job {} deleted".format(job_name))
return True

count = 0
while count < 300:
Expand All @@ -294,7 +299,7 @@ def clean_up_dataset_and_runtime(dataset_name):
namespace=APP_NAMESPACE,
plural="datasets"
)
print("Dataset [] deleted".format(dataset_name))
print("Dataset {} deleted".format(dataset_name))

count = 0
while count < 300:
Expand Down

0 comments on commit b71a535

Please sign in to comment.