Skip to content

Commit

Permalink
fix(manager): re-instate killing terminated steps
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 14, 2021
1 parent 98917eb commit 8c2dd33
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 34 deletions.
9 changes: 0 additions & 9 deletions docs/EXAMPLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,6 @@ This example creates errors randomly
kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-erroring-pipeline.yaml
```

### [Log sink](examples/302-log-sink-pipeline.yaml)

This example uses a log sink to debug messages


```
kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/302-log-sink-pipeline.yaml
```

### [Default Kafka config](examples/dataflow-kafka-default-secret.yaml)

This is an example of providing a namespace named Kafka configuration.
Expand Down
18 changes: 0 additions & 18 deletions docs/examples/302-log-sink-pipeline.yaml

This file was deleted.

6 changes: 2 additions & 4 deletions manager/controllers/step_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,22 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
r.Recorder.Eventf(step, "Normal", eventReason(currentReplicas, targetReplicas), "Scaling from %d to %d", currentReplicas, targetReplicas)
}

deletedPods := false // we only want to delete one pod per sync
for _, pod := range pods.Items {
if i, _ := strconv.Atoi(pod.GetAnnotations()[dfv1.KeyReplica]); (i >= targetReplicas || hash != pod.GetAnnotations()[dfv1.KeyHash]) && !deletedPods {
if i, _ := strconv.Atoi(pod.GetAnnotations()[dfv1.KeyReplica]); i >= targetReplicas || hash != pod.GetAnnotations()[dfv1.KeyHash] {
log.Info("deleting excess pod", "podName", pod.Name)
if err := r.Client.Delete(ctx, &pod); client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to delete excess pod %s: %w", pod.Name, err)
}
deletedPods = true
} else {
phase, message := inferPhase(pod)
log.Info("pod", "name", pod.Name, "phase", phase, "message", message)
x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Message), dfv1.NewStepPhaseMessage(phase, message))
newStatus.Phase, newStatus.Message = x.GetPhase(), x.GetMessage()
// if the main container has terminated, kill all sidecars
mainCtrTerminated := false
for _, s := range pod.Status.ContainerStatuses {
mainCtrTerminated = mainCtrTerminated || (s.Name == dfv1.CtrMain && s.State.Terminated != nil)
}
log.Info("pod", "name", pod.Name, "phase", phase, "message", message, "mainCtrTerminated", mainCtrTerminated)
if mainCtrTerminated {
for _, s := range pod.Status.ContainerStatuses {
if s.Name != dfv1.CtrMain {
Expand Down
8 changes: 6 additions & 2 deletions manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"os"

"k8s.io/client-go/kubernetes"

"github.com/argoproj-labs/argo-dataflow/manager/controllers/bus"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -70,11 +72,13 @@ func main() {
panic(fmt.Errorf("unable to start manager: %w", err))
}

clientset := kubernetes.NewForConfigOrDie(restConfig)
containerKiller := containerkiller.New(clientset, restConfig)
if err = (&controllers.PipelineReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Pipeline"),
Scheme: mgr.GetScheme(),
ContainerKiller: containerkiller.Fake,
ContainerKiller: containerKiller,
Installer: bus.NewInstaller(),
}).SetupWithManager(mgr); err != nil {
panic(fmt.Errorf("unable to create controller manager: %w", err))
Expand All @@ -85,7 +89,7 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("Step"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("step-reconciler"),
ContainerKiller: containerkiller.Fake,
ContainerKiller: containerKiller,
}).SetupWithManager(mgr); err != nil {
panic(fmt.Errorf("unable to create controller manager: %w", err))
}
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func connectSink() (func([]byte) error, error) {
debug.Info("◷ → stan", "subject", s.Subject, "m", printable(m))
err := sc.Publish(s.Subject, m)
if err != nil {
withLock(func() {sinkStatues.IncErrors(sink.Name, replica, err)})
withLock(func() { sinkStatues.IncErrors(sink.Name, replica, err) })
}
return err
})
Expand Down

0 comments on commit 8c2dd33

Please sign in to comment.