diff --git a/docs/EXAMPLES.md b/docs/EXAMPLES.md index 98376ed5..6da25de2 100644 --- a/docs/EXAMPLES.md +++ b/docs/EXAMPLES.md @@ -232,15 +232,6 @@ This example creates errors randomly kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/301-erroring-pipeline.yaml ``` -### [Log sink](examples/302-log-sink-pipeline.yaml) - -This example uses a log sink to debug messages - - -``` -kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/302-log-sink-pipeline.yaml -``` - ### [Default Kafka config](examples/dataflow-kafka-default-secret.yaml) This is an example of providing a namespace named Kafka configuration. diff --git a/docs/examples/302-log-sink-pipeline.yaml b/docs/examples/302-log-sink-pipeline.yaml deleted file mode 100644 index d63c8cb0..00000000 --- a/docs/examples/302-log-sink-pipeline.yaml +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: dataflow.argoproj.io/v1alpha1 -kind: Pipeline -metadata: - annotations: - dataflow.argoproj.io/description: | - This example uses a log sink to debug messages - dataflow.argoproj.io/name: Log sink - creationTimestamp: null - name: log-sink -spec: - steps: - - map: msg - name: main - sinks: - - log: {} - sources: - - kafka: - topic: input-topic diff --git a/manager/controllers/step_controller.go b/manager/controllers/step_controller.go index c40b9210..8bba4dd8 100644 --- a/manager/controllers/step_controller.go +++ b/manager/controllers/step_controller.go @@ -144,17 +144,14 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. r.Recorder.Eventf(step, "Normal", eventReason(currentReplicas, targetReplicas), "Scaling from %d to %d", currentReplicas, targetReplicas) } - deletedPods := false // we only want to delete one pod per sync for _, pod := range pods.Items { - if i, _ := strconv.Atoi(pod.GetAnnotations()[dfv1.KeyReplica]); (i >= targetReplicas || hash != pod.GetAnnotations()[dfv1.KeyHash]) && !deletedPods { + if i, _ := strconv.Atoi(pod.GetAnnotations()[dfv1.KeyReplica]); i >= targetReplicas || hash != pod.GetAnnotations()[dfv1.KeyHash] { log.Info("deleting excess pod", "podName", pod.Name) if err := r.Client.Delete(ctx, &pod); client.IgnoreNotFound(err) != nil { return ctrl.Result{}, fmt.Errorf("failed to delete excess pod %s: %w", pod.Name, err) } - deletedPods = true } else { phase, message := inferPhase(pod) - log.Info("pod", "name", pod.Name, "phase", phase, "message", message) x := dfv1.MinStepPhaseMessage(dfv1.NewStepPhaseMessage(newStatus.Phase, newStatus.Message), dfv1.NewStepPhaseMessage(phase, message)) newStatus.Phase, newStatus.Message = x.GetPhase(), x.GetMessage() // if the main container has terminated, kill all sidecars @@ -162,6 +159,7 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. for _, s := range pod.Status.ContainerStatuses { mainCtrTerminated = mainCtrTerminated || (s.Name == dfv1.CtrMain && s.State.Terminated != nil) } + log.Info("pod", "name", pod.Name, "phase", phase, "message", message, "mainCtrTerminated", mainCtrTerminated) if mainCtrTerminated { for _, s := range pod.Status.ContainerStatuses { if s.Name != dfv1.CtrMain { diff --git a/manager/main.go b/manager/main.go index eadb701b..cc7be9ae 100644 --- a/manager/main.go +++ b/manager/main.go @@ -21,6 +21,8 @@ import ( "fmt" "os" + "k8s.io/client-go/kubernetes" + "github.com/argoproj-labs/argo-dataflow/manager/controllers/bus" "k8s.io/apimachinery/pkg/runtime" @@ -70,11 +72,13 @@ func main() { panic(fmt.Errorf("unable to start manager: %w", err)) } + clientset := kubernetes.NewForConfigOrDie(restConfig) + containerKiller := containerkiller.New(clientset, restConfig) if err = (&controllers.PipelineReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Pipeline"), Scheme: mgr.GetScheme(), - ContainerKiller: containerkiller.Fake, + ContainerKiller: containerKiller, Installer: bus.NewInstaller(), }).SetupWithManager(mgr); err != nil { panic(fmt.Errorf("unable to create controller manager: %w", err)) @@ -85,7 +89,7 @@ func main() { Log: ctrl.Log.WithName("controllers").WithName("Step"), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("step-reconciler"), - ContainerKiller: containerkiller.Fake, + ContainerKiller: containerKiller, }).SetupWithManager(mgr); err != nil { panic(fmt.Errorf("unable to create controller manager: %w", err)) } diff --git a/runner/sidecar/sidecar.go b/runner/sidecar/sidecar.go index f86d72f3..f47c3e6c 100644 --- a/runner/sidecar/sidecar.go +++ b/runner/sidecar/sidecar.go @@ -495,7 +495,7 @@ func connectSink() (func([]byte) error, error) { debug.Info("◷ → stan", "subject", s.Subject, "m", printable(m)) err := sc.Publish(s.Subject, m) if err != nil { - withLock(func() {sinkStatues.IncErrors(sink.Name, replica, err)}) + withLock(func() { sinkStatues.IncErrors(sink.Name, replica, err) }) } return err })