Skip to content

Commit

Permalink
fix: change from patch to update for the controller
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 4, 2021
1 parent 4a32fb9 commit c73ce5b
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 99 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# Changelog

## v0.0.33 (2021-06-04)

* [60eab31](https://github.com/argoproj/argo-workflows/commit/60eab313fe856ee34b44d8f7cfcb4057d01c6344) fix: prevent violent scale-up and scale-down by only scaling by 1 each time

### Contributors

* Alex Collins

## v0.0.32 (2021-06-04)

* [ca363fe](https://github.com/argoproj/argo-workflows/commit/ca363fe3638fa6c329dc786dedb4aaf8d230a8f3) feat: add pending metric
* [f3a4813](https://github.com/argoproj/argo-workflows/commit/f3a4813aaf23d630f4d8292792d6b147d53515f0) fix: fix metrics

### Contributors

* Alex Collins

## v0.0.31 (2021-06-04)

* [3dd39f1](https://github.com/argoproj/argo-workflows/commit/3dd39f16bf6d1104af3d2a3e4c23b98c22170639) fix(sidecar): updated to use fixed counters
Expand Down
8 changes: 4 additions & 4 deletions api/v1alpha1/step_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (

type StepStatus struct {
Phase StepPhase `json:"phase" protobuf:"bytes,1,opt,name=phase,casttype=StepPhase"`
Reason string `json:"reason" protobuf:"bytes,8,opt,name=reason"`
Message string `json:"message" protobuf:"bytes,2,opt,name=message"`
Reason string `json:"reason,omitempty" protobuf:"bytes,8,opt,name=reason"`
Message string `json:"message,omitempty" protobuf:"bytes,2,opt,name=message"`
Replicas uint32 `json:"replicas" protobuf:"varint,5,opt,name=replicas"`
Selector string `json:"selector,omitempty" protobuf:"bytes,7,opt,name=selector"`
LastScaledAt metav1.Time `json:"lastScaledAt,omitempty" protobuf:"bytes,6,opt,name=lastScaledAt"`
SourceStatuses SourceStatuses `json:"sourceStatuses" protobuf:"bytes,3,rep,name=sourceStatuses"`
SinkStatues SourceStatuses `json:"sinkStatuses" protobuf:"bytes,4,rep,name=sinkStatuses"`
SourceStatuses SourceStatuses `json:"sourceStatuses,omitempty" protobuf:"bytes,3,rep,name=sourceStatuses"`
SinkStatues SourceStatuses `json:"sinkStatuses,omitempty" protobuf:"bytes,4,rep,name=sinkStatuses"`
}

func (m StepStatus) GetReplicas() int {
Expand Down
6 changes: 1 addition & 5 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3748,12 +3748,8 @@ spec:
type: object
type: object
required:
- message
- phase
- reason
- replicas
- sinkStatuses
- sourceStatuses
type: object
required:
- spec
Expand Down Expand Up @@ -3926,7 +3922,7 @@ rules:
resources:
- steps/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
Expand Down
4 changes: 0 additions & 4 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2775,12 +2775,8 @@ spec:
type: object
type: object
required:
- message
- phase
- reason
- replicas
- sinkStatuses
- sourceStatuses
type: object
required:
- spec
Expand Down
6 changes: 1 addition & 5 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3748,12 +3748,8 @@ spec:
type: object
type: object
required:
- message
- phase
- reason
- replicas
- sinkStatuses
- sourceStatuses
type: object
required:
- spec
Expand Down Expand Up @@ -3926,7 +3922,7 @@ rules:
resources:
- steps/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
Expand Down
6 changes: 1 addition & 5 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3748,12 +3748,8 @@ spec:
type: object
type: object
required:
- message
- phase
- reason
- replicas
- sinkStatuses
- sourceStatuses
type: object
required:
- spec
Expand Down Expand Up @@ -3926,7 +3922,7 @@ rules:
resources:
- steps/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
Expand Down
6 changes: 1 addition & 5 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3748,12 +3748,8 @@ spec:
type: object
type: object
required:
- message
- phase
- reason
- replicas
- sinkStatuses
- sourceStatuses
type: object
required:
- spec
Expand Down Expand Up @@ -3926,7 +3922,7 @@ rules:
resources:
- steps/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
Expand Down
2 changes: 1 addition & 1 deletion config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ rules:
resources:
- steps/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
Expand Down
1 change: 0 additions & 1 deletion examples/103-scaling-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ spec:
steps:
- cat: {}
name: main
replicas: 2
sinks:
- kafka:
topic: output-topic
Expand Down
2 changes: 1 addition & 1 deletion manager/controllers/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(obj), old); err != nil {
return ctrl.Result{}, err
}
old.Spec.Replicas = nil // nil this field as it can be user/HPA modified
step.Replicas = old.Spec.Replicas // copy this field as it should only be modified by `kubectl scale`, edited by the user
if notEqual, patch := util.NotEqual(step, old.Spec); notEqual {
log.Info("updating step due to changed spec", "patch", patch)
old.Spec = step
Expand Down
132 changes: 64 additions & 68 deletions manager/controllers/step_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import (
"strconv"
"time"

apierr "k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/util/intstr"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -70,75 +71,24 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.

log.Info("reconciling", "pipelineName", pipelineName, "stepName", stepName)

pods := &corev1.PodList{}
selector, _ := labels.Parse(dfv1.KeyPipelineName + "=" + pipelineName + "," + dfv1.KeyStepName + "=" + stepName)
if err := r.Client.List(ctx, pods, &client.ListOptions{Namespace: step.Namespace, LabelSelector: selector}); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to list pods: %w", err)
}

hash := util.MustHash(step.Spec)
currentReplicas := len(pods.Items)

oldStatus := dfv1.StepStatus{
Phase: step.Status.Phase,
Reason: step.Status.Reason,
Message: step.Status.Message,
Replicas: step.Status.Replicas,
Selector: step.Status.Selector,
LastScaledAt: step.Status.LastScaledAt,
SinkStatues: dfv1.SourceStatuses{},
SourceStatuses: dfv1.SourceStatuses{},
}
currentReplicas := int(step.Status.Replicas)

newStatus := dfv1.StepStatus{
Phase: dfv1.StepUnknown,
Replicas: uint32(currentReplicas),
Selector: selector.String(),
LastScaledAt: step.Status.LastScaledAt,
SinkStatues: dfv1.SourceStatuses{},
SourceStatuses: dfv1.SourceStatuses{},
}

for _, pod := range pods.Items {
phase, reason, message := inferPhase(pod)
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(phase, reason, message))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
oldStatus := step.Status.DeepCopy()
step.Status.Phase, step.Status.Reason, step.Status.Message = dfv1.StepUnknown, "", ""
step.Status.Selector = selector.String()

targetReplicas := step.GetTargetReplicas(currentReplicas, scalingDelay, peekDelay)

if currentReplicas != targetReplicas {
log.Info("scaling", "currentReplicas", currentReplicas, "targetReplicas", targetReplicas)
newStatus.LastScaledAt = metav1.Time{Time: time.Now()}
step.Status.Replicas = uint32(targetReplicas)
step.Status.LastScaledAt = metav1.Time{Time: time.Now()}
r.Recorder.Eventf(step, "Normal", eventReason(currentReplicas, targetReplicas), "Scaling from %d to %d", currentReplicas, targetReplicas)
}

for _, pod := range pods.Items {
if i, _ := strconv.Atoi(pod.GetAnnotations()[dfv1.KeyReplica]); i >= targetReplicas || hash != pod.GetAnnotations()[dfv1.KeyHash] {
log.Info("deleting excess pod", "podName", pod.Name)
if err := r.Client.Delete(ctx, &pod); client.IgnoreNotFound(err) != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to delete excess pod %s: %v", pod.Name, err)))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
} else {
// if the main container has terminated, kill all sidecars
mainCtrTerminated := false
for _, s := range pod.Status.ContainerStatuses {
mainCtrTerminated = mainCtrTerminated || (s.Name == dfv1.CtrMain && s.State.Terminated != nil && s.State.Terminated.ExitCode == 0)
}
log.Info("pod", "name", pod.Name, "mainCtrTerminated", mainCtrTerminated)
if mainCtrTerminated {
for _, s := range pod.Status.ContainerStatuses {
if s.Name != dfv1.CtrMain {
if err := r.ContainerKiller.KillContainer(pod, s.Name); err != nil {
log.Error(err, "failed to kill container", "pod", pod.Name, "container", s.Name)
}
}
}
}
}
}

for replica := 0; replica < targetReplicas; replica++ {
podName := fmt.Sprintf("%s-%d", step.Name, replica)
_labels := map[string]string{}
Expand Down Expand Up @@ -185,9 +135,13 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
},
),
},
); util.IgnoreAlreadyExists(err) != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create pod %s: %v", podName, err)))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
); apierr.IsAlreadyExists(err) {
// ignore
} else if err != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(step.Status.Phase, step.Status.Reason, step.Status.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create pod %s: %v", podName, err)))
step.Status.Phase, step.Status.Reason, step.Status.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
} else {
log.Info("pod created", "pod", podName)
}
}

Expand All @@ -213,16 +167,58 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
},
},
); util.IgnoreAlreadyExists(err) != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Reason, newStatus.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create service %s: %v", step.Name, err)))
newStatus.Phase, newStatus.Reason, newStatus.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(step.Status.Phase, step.Status.Reason, step.Status.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to create service %s: %v", step.Name, err)))
step.Status.Phase, step.Status.Reason, step.Status.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
}

if notEqual, patch := util.NotEqual(oldStatus, newStatus); notEqual {
log.Info("patching step status", "patch", patch)
if err := r.Status().
Patch(ctx, step, client.RawPatch(types.MergePatchType, []byte(util.MustJSON(&dfv1.Step{Status: newStatus})))); util.IgnoreConflict(err) != nil { // conflict is ok, we will reconcile again soon
return ctrl.Result{}, fmt.Errorf("failed to patch status: %w", err)
pods := &corev1.PodList{}
if err := r.Client.List(ctx, pods, &client.ListOptions{Namespace: step.Namespace, LabelSelector: selector}); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to list pods: %w", err)
}

for _, pod := range pods.Items {
replica, err := strconv.Atoi(pod.GetAnnotations()[dfv1.KeyReplica])
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to parse replica of pod %q: %w", pod.Name, err)
}
if replica >= targetReplicas || hash != pod.GetAnnotations()[dfv1.KeyHash] {
log.Info("deleting excess pod", "podName", pod.Name)
if err := r.Client.Delete(ctx, &pod); client.IgnoreNotFound(err) != nil {
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(step.Status.Phase, step.Status.Reason, step.Status.Message), dfv1.NewStepPhaseMessage(dfv1.StepFailed, "", fmt.Sprintf("failed to delete excess pod %s: %v", pod.Name, err)))
step.Status.Phase, step.Status.Reason, step.Status.Message = x.GetPhase(), x.GetReason(), x.GetMessage()
}
} else {
phase, reason, message := inferPhase(pod)
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(step.Status.Phase, step.Status.Reason, step.Status.Message), dfv1.NewStepPhaseMessage(phase, reason, message))
step.Status.Phase, step.Status.Reason, step.Status.Message = x.GetPhase(), x.GetReason(), x.GetMessage()

// if the main container has terminated, kill all sidecars
mainCtrTerminated := false
for _, s := range pod.Status.ContainerStatuses {
mainCtrTerminated = mainCtrTerminated || (s.Name == dfv1.CtrMain && s.State.Terminated != nil && s.State.Terminated.ExitCode == 0)
}
log.Info("pod", "name", pod.Name, "mainCtrTerminated", mainCtrTerminated)
if mainCtrTerminated {
for _, s := range pod.Status.ContainerStatuses {
if s.Name != dfv1.CtrMain {
if err := r.ContainerKiller.KillContainer(pod, s.Name); err != nil {
log.Error(err, "failed to kill container", "pod", pod.Name, "container", s.Name)
}
}
}
}
}
}

if notEqual, patch := util.NotEqual(oldStatus, step.Status); notEqual {
log.Info("updating step", "patch", patch, "oldStatus", util.MustJSON(oldStatus), "newStatus", step.Status)
if err := r.Status().Update(ctx, step); err != nil {
if apierr.IsConflict(err) {
return ctrl.Result{}, nil // conflict is ok, we will reconcile again soon
} else {
return ctrl.Result{}, fmt.Errorf("failed to update status: %w", err)
}
}
}

Expand Down

0 comments on commit c73ce5b

Please sign in to comment.