From c73ce5b33a6bfc42abf772d6f794c6af885499a0 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Fri, 4 Jun 2021 14:49:27 -0700 Subject: [PATCH] fix: change from patch to update for the controller --- CHANGELOG.md | 17 +++ api/v1alpha1/step_status.go | 8 +- config/ci.yaml | 6 +- .../crd/bases/dataflow.argoproj.io_steps.yaml | 4 - config/default.yaml | 6 +- config/dev.yaml | 6 +- config/quick-start.yaml | 6 +- config/rbac/role.yaml | 2 +- examples/103-scaling-pipeline.yaml | 1 - manager/controllers/pipeline_controller.go | 2 +- manager/controllers/step_controller.go | 132 +++++++++--------- 11 files changed, 91 insertions(+), 99 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c6108fb..6f374291 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +## v0.0.33 (2021-06-04) + + * [60eab31](https://github.com/argoproj/argo-workflows/commit/60eab313fe856ee34b44d8f7cfcb4057d01c6344) fix: prevent violent scale-up and scale-down by only scaling by 1 each time + +### Contributors + + * Alex Collins + +## v0.0.32 (2021-06-04) + + * [ca363fe](https://github.com/argoproj/argo-workflows/commit/ca363fe3638fa6c329dc786dedb4aaf8d230a8f3) feat: add pending metric + * [f3a4813](https://github.com/argoproj/argo-workflows/commit/f3a4813aaf23d630f4d8292792d6b147d53515f0) fix: fix metrics + +### Contributors + + * Alex Collins + ## v0.0.31 (2021-06-04) * [3dd39f1](https://github.com/argoproj/argo-workflows/commit/3dd39f16bf6d1104af3d2a3e4c23b98c22170639) fix(sidecar): updated to use fixed counters diff --git a/api/v1alpha1/step_status.go b/api/v1alpha1/step_status.go index a734c26d..5cf34fba 100644 --- a/api/v1alpha1/step_status.go +++ b/api/v1alpha1/step_status.go @@ -6,13 +6,13 @@ import ( type StepStatus struct { Phase StepPhase `json:"phase" protobuf:"bytes,1,opt,name=phase,casttype=StepPhase"` - Reason string `json:"reason" protobuf:"bytes,8,opt,name=reason"` - Message string `json:"message" protobuf:"bytes,2,opt,name=message"` + Reason string `json:"reason,omitempty" protobuf:"bytes,8,opt,name=reason"` + Message string `json:"message,omitempty" protobuf:"bytes,2,opt,name=message"` Replicas uint32 `json:"replicas" protobuf:"varint,5,opt,name=replicas"` Selector string `json:"selector,omitempty" protobuf:"bytes,7,opt,name=selector"` LastScaledAt metav1.Time `json:"lastScaledAt,omitempty" protobuf:"bytes,6,opt,name=lastScaledAt"` - SourceStatuses SourceStatuses `json:"sourceStatuses" protobuf:"bytes,3,rep,name=sourceStatuses"` - SinkStatues SourceStatuses `json:"sinkStatuses" protobuf:"bytes,4,rep,name=sinkStatuses"` + SourceStatuses SourceStatuses `json:"sourceStatuses,omitempty" protobuf:"bytes,3,rep,name=sourceStatuses"` + SinkStatues SourceStatuses `json:"sinkStatuses,omitempty" protobuf:"bytes,4,rep,name=sinkStatuses"` } func (m StepStatus) GetReplicas() int { diff --git a/config/ci.yaml b/config/ci.yaml index 6897566b..455a6ba3 100644 --- a/config/ci.yaml +++ b/config/ci.yaml @@ -3748,12 +3748,8 @@ spec: type: object type: object required: - - message - phase - - reason - replicas - - sinkStatuses - - sourceStatuses type: object required: - spec @@ -3926,7 +3922,7 @@ rules: resources: - steps/status verbs: - - patch + - update - apiGroups: - "" resources: diff --git a/config/crd/bases/dataflow.argoproj.io_steps.yaml b/config/crd/bases/dataflow.argoproj.io_steps.yaml index 71416a26..3e5c09fd 100644 --- a/config/crd/bases/dataflow.argoproj.io_steps.yaml +++ b/config/crd/bases/dataflow.argoproj.io_steps.yaml @@ -2775,12 +2775,8 @@ spec: type: object type: object required: - - message - phase - - reason - replicas - - sinkStatuses - - sourceStatuses type: object required: - spec diff --git a/config/default.yaml b/config/default.yaml index 022fc282..d3082ef7 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -3748,12 +3748,8 @@ spec: type: object type: object required: - - message - phase - - reason - replicas - - sinkStatuses - - sourceStatuses type: object required: - spec @@ -3926,7 +3922,7 @@ rules: resources: - steps/status verbs: - - patch + - update - apiGroups: - "" resources: diff --git a/config/dev.yaml b/config/dev.yaml index 59739bc4..9334fc6e 100644 --- a/config/dev.yaml +++ b/config/dev.yaml @@ -3748,12 +3748,8 @@ spec: type: object type: object required: - - message - phase - - reason - replicas - - sinkStatuses - - sourceStatuses type: object required: - spec @@ -3926,7 +3922,7 @@ rules: resources: - steps/status verbs: - - patch + - update - apiGroups: - "" resources: diff --git a/config/quick-start.yaml b/config/quick-start.yaml index 09bbb39e..5bf87ab2 100644 --- a/config/quick-start.yaml +++ b/config/quick-start.yaml @@ -3748,12 +3748,8 @@ spec: type: object type: object required: - - message - phase - - reason - replicas - - sinkStatuses - - sourceStatuses type: object required: - spec @@ -3926,7 +3922,7 @@ rules: resources: - steps/status verbs: - - patch + - update - apiGroups: - "" resources: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 1aaf573d..b59a772f 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -33,7 +33,7 @@ rules: resources: - steps/status verbs: - - patch + - update - apiGroups: - "" resources: diff --git a/examples/103-scaling-pipeline.yaml b/examples/103-scaling-pipeline.yaml index 294b9761..2e524534 100644 --- a/examples/103-scaling-pipeline.yaml +++ b/examples/103-scaling-pipeline.yaml @@ -16,7 +16,6 @@ spec: steps: - cat: {} name: main - replicas: 2 sinks: - kafka: topic: output-topic diff --git a/manager/controllers/pipeline_controller.go b/manager/controllers/pipeline_controller.go index 9ac0103d..f94bf738 100644 --- a/manager/controllers/pipeline_controller.go +++ b/manager/controllers/pipeline_controller.go @@ -88,7 +88,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if err := r.Client.Get(ctx, client.ObjectKeyFromObject(obj), old); err != nil { return ctrl.Result{}, err } - old.Spec.Replicas = nil // nil this field as it can be user/HPA modified + step.Replicas = old.Spec.Replicas // copy this field as it should only be modified by `kubectl scale`, edited by the user if notEqual, patch := util.NotEqual(step, old.Spec); notEqual { log.Info("updating step due to changed spec", "patch", patch) old.Spec = step diff --git a/manager/controllers/step_controller.go b/manager/controllers/step_controller.go index 4a04eeeb..fae0697f 100644 --- a/manager/controllers/step_controller.go +++ b/manager/controllers/step_controller.go @@ -22,6 +22,8 @@ import ( "strconv" "time" + apierr "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/intstr" "github.com/go-logr/logr" @@ -29,7 +31,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -70,75 +71,24 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. log.Info("reconciling", "pipelineName", pipelineName, "stepName", stepName) - pods := &corev1.PodList{} selector, _ := labels.Parse(dfv1.KeyPipelineName + "=" + pipelineName + "," + dfv1.KeyStepName + "=" + stepName) - if err := r.Client.List(ctx, pods, &client.ListOptions{Namespace: step.Namespace, LabelSelector: selector}); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to list pods: %w", err) - } hash := util.MustHash(step.Spec) - currentReplicas := len(pods.Items) - - oldStatus := dfv1.StepStatus{ - Phase: step.Status.Phase, - Reason: step.Status.Reason, - Message: step.Status.Message, - Replicas: step.Status.Replicas, - Selector: step.Status.Selector, - LastScaledAt: step.Status.LastScaledAt, - SinkStatues: dfv1.SourceStatuses{}, - SourceStatuses: dfv1.SourceStatuses{}, - } + currentReplicas := int(step.Status.Replicas) - newStatus := dfv1.StepStatus{ - Phase: dfv1.StepUnknown, - Replicas: uint32(currentReplicas), - Selector: selector.String(), - LastScaledAt: step.Status.LastScaledAt, - SinkStatues: dfv1.SourceStatuses{}, - SourceStatuses: dfv1.SourceStatuses{}, - } - - for _, pod := range pods.Items { - phase, reason, message := inferPhase(pod) - x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(phase, reason, message)) - newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage() - } + oldStatus := step.Status.DeepCopy() + step.Status.Phase, step.Status.Reason, step.Status.Message = dfv1.StepUnknown, "", "" + step.Status.Selector = selector.String() targetReplicas := step.GetTargetReplicas(currentReplicas, scalingDelay, peekDelay) if currentReplicas != targetReplicas { log.Info("scaling", "currentReplicas", currentReplicas, "targetReplicas", targetReplicas) - newStatus.LastScaledAt = metav1.Time{Time: time.Now()} + step.Status.Replicas = uint32(targetReplicas) + step.Status.LastScaledAt = metav1.Time{Time: time.Now()} r.Recorder.Eventf(step, "Normal", eventReason(currentReplicas, targetReplicas), "Scaling from %d to %d", currentReplicas, targetReplicas) } - for _, pod := range pods.Items { - if i, _ := strconv.Atoi(pod.GetAnnotations()[dfv1.KeyReplica]); i >= targetReplicas || hash != pod.GetAnnotations()[dfv1.KeyHash] { - log.Info("deleting excess pod", "podName", pod.Name) - if err := r.Client.Delete(ctx, &pod); client.IgnoreNotFound(err) != nil { - x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to delete excess pod %s: %v", pod.Name, err))) - newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage() - } - } else { - // if the main container has terminated, kill all sidecars - mainCtrTerminated := false - for _, s := range pod.Status.ContainerStatuses { - mainCtrTerminated = mainCtrTerminated || (s.Name == dfv1.CtrMain && s.State.Terminated != nil && s.State.Terminated.ExitCode == 0) - } - log.Info("pod", "name", pod.Name, "mainCtrTerminated", mainCtrTerminated) - if mainCtrTerminated { - for _, s := range pod.Status.ContainerStatuses { - if s.Name != dfv1.CtrMain { - if err := r.ContainerKiller.KillContainer(pod, s.Name); err != nil { - log.Error(err, "failed to kill container", "pod", pod.Name, "container", s.Name) - } - } - } - } - } - } - for replica := 0; replica < targetReplicas; replica++ { podName := fmt.Sprintf("%s-%d", step.Name, replica) _labels := map[string]string{} @@ -185,9 +135,13 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. }, ), }, - ); util.IgnoreAlreadyExists(err) != nil { - x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create pod %s: %v", podName, err))) - newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage() + ); apierr.IsAlreadyExists(err) { + // ignore + } else if err != nil { + x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(step.Status.Phase, step.Status.Reason, step.Status.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create pod %s: %v", podName, err))) + step.Status.Phase, step.Status.Reason, step.Status.Message = x.GetPhase(), x.GetReason(), x.GetMessage() + } else { + log.Info("pod created", "pod", podName) } } @@ -213,16 +167,58 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. }, }, ); util.IgnoreAlreadyExists(err) != nil { - x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create service %s: %v", step.Name, err))) - newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage() + x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(step.Status.Phase, step.Status.Reason, step.Status.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create service %s: %v", step.Name, err))) + step.Status.Phase, step.Status.Reason, step.Status.Message = x.GetPhase(), x.GetReason(), x.GetMessage() } } - if notEqual, patch := util.NotEqual(oldStatus, newStatus); notEqual { - log.Info("patching step status", "patch", patch) - if err := r.Status(). - Patch(ctx, step, client.RawPatch(types.MergePatchType, []byte(util.MustJSON(&dfv1.Step{Status: newStatus})))); util.IgnoreConflict(err) != nil { // conflict is ok, we will reconcile again soon - return ctrl.Result{}, fmt.Errorf("failed to patch status: %w", err) + pods := &corev1.PodList{} + if err := r.Client.List(ctx, pods, &client.ListOptions{Namespace: step.Namespace, LabelSelector: selector}); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to list pods: %w", err) + } + + for _, pod := range pods.Items { + replica, err := strconv.Atoi(pod.GetAnnotations()[dfv1.KeyReplica]) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to parse replica of pod %q: %w", pod.Name, err) + } + if replica >= targetReplicas || hash != pod.GetAnnotations()[dfv1.KeyHash] { + log.Info("deleting excess pod", "podName", pod.Name) + if err := r.Client.Delete(ctx, &pod); client.IgnoreNotFound(err) != nil { + x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(step.Status.Phase, step.Status.Reason, step.Status.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to delete excess pod %s: %v", pod.Name, err))) + step.Status.Phase, step.Status.Reason, step.Status.Message = x.GetPhase(), x.GetReason(), x.GetMessage() + } + } else { + phase, reason, message := inferPhase(pod) + x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(step.Status.Phase, step.Status.Reason, step.Status.Message), dfv1.NewStepPhaseMessage(phase, reason, message)) + step.Status.Phase, step.Status.Reason, step.Status.Message = x.GetPhase(), x.GetReason(), x.GetMessage() + + // if the main container has terminated, kill all sidecars + mainCtrTerminated := false + for _, s := range pod.Status.ContainerStatuses { + mainCtrTerminated = mainCtrTerminated || (s.Name == dfv1.CtrMain && s.State.Terminated != nil && s.State.Terminated.ExitCode == 0) + } + log.Info("pod", "name", pod.Name, "mainCtrTerminated", mainCtrTerminated) + if mainCtrTerminated { + for _, s := range pod.Status.ContainerStatuses { + if s.Name != dfv1.CtrMain { + if err := r.ContainerKiller.KillContainer(pod, s.Name); err != nil { + log.Error(err, "failed to kill container", "pod", pod.Name, "container", s.Name) + } + } + } + } + } + } + + if notEqual, patch := util.NotEqual(oldStatus, step.Status); notEqual { + log.Info("updating step", "patch", patch, "oldStatus", util.MustJSON(oldStatus), "newStatus", step.Status) + if err := r.Status().Update(ctx, step); err != nil { + if apierr.IsConflict(err) { + return ctrl.Result{}, nil // conflict is ok, we will reconcile again soon + } else { + return ctrl.Result{}, fmt.Errorf("failed to update status: %w", err) + } } }