Skip to content

Commit

Permalink
fix: fixed bug with pipeline conditions and messages computation
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 24, 2021
1 parent 45f51fa commit a561027
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 29 deletions.
10 changes: 10 additions & 0 deletions api/v1alpha1/step_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"strconv"
"time"

"k8s.io/apimachinery/pkg/util/intstr"

corev1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -105,6 +107,14 @@ func (in *StepSpec) GetPodSpec(req GetPodSpecReq) corev1.PodSpec {
Ports: []corev1.ContainerPort{
{ContainerPort: 3569},
},
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/pre-stop",
Port: intstr.FromInt(3569),
},
},
},
},
in.GetContainer(
req.ImageFormat,
Expand Down
2 changes: 1 addition & 1 deletion examples/201-vetinary-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
dataflow.argoproj.io/description: |
This pipeline processes pets (cats and dogs).
dataflow.argoproj.io/name: Vetinary
dataflow.argoproj.io/timeout: 2m
dataflow.argoproj.io/test: "false"
name: vet
spec:
steps:
Expand Down
2 changes: 1 addition & 1 deletion examples/201-word-count-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ metadata:
It also shows an example of a pipelines terminates based on a single step's status.
dataflow.argoproj.io/name: Word count
dataflow.argoproj.io/timeout: 2m
dataflow.argoproj.io/test: "false"
name: word-count
spec:
steps:
Expand Down
2 changes: 2 additions & 0 deletions examples/301-cron-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ spec:
steps:
- cat: {}
name: main
sinks:
- log: {}
sources:
- cron:
layout: "15:04:05"
Expand Down
2 changes: 1 addition & 1 deletion examples/301-erroring-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ metadata:
spec:
steps:
- handler:
code: "package main\n\nimport (\n\t\"fmt\"\n\t\"math/rand\"\n)\n\nfunc Handler(m []byte) ([][]byte, error) {\n\tif rand.Int()%2 == 0 {\n\t\treturn nil, fmt.Errorf(\"random error\")\n\t}\n\treturn [][]byte{[]byte(\"hi \" + string(m))}, nil\n}\n"
code: "package main\n\nimport (\n\t\"fmt\"\n\t\"math/rand\"\n)\n\nfunc Handler(m []byte) ([]byte, error) {\n\tif rand.Int()%2 == 0 {\n\t\treturn nil, fmt.Errorf(\"random error\")\n\t}\n\treturn []byte(\"hi \" + string(m)), nil\n}\n"
runtime: go1-16
name: main
sinks:
Expand Down
1 change: 1 addition & 0 deletions examples/301-parallel-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ metadata:
dataflow.argoproj.io/description: |
This example uses parallel to 2x the amount of data it processes.
dataflow.argoproj.io/name: Parallel
dataflow.argoproj.io/test: "false"
creationTimestamp: null
name: parallel
spec:
Expand Down
4 changes: 2 additions & 2 deletions examples/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test(t *testing.T) {
assert.NoError(t, err)
for _, info := range infos {
pipeline := info.Items[0]
if pipeline.GetAnnotations()["dataflow.argoproj.io/test"] != "true" {
if pipeline.GetAnnotations()["dataflow.argoproj.io/test"] == "false" {
continue
}
t.Run(info.Name(), func(t *testing.T) {
Expand All @@ -40,7 +40,7 @@ func Test(t *testing.T) {
pipelines := dynamicInterface.Resource(dfv1.PipelineGroupVersionResource).Namespace(namespace)
assert.NoError(t, pipelines.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{}))
condition := "SunkMessages"
timeout := 3 * time.Minute // typically actually about 30s
timeout := 1 * time.Minute // typically actually about 30s
pipelineName := pipeline.GetName()
if v := pipeline.GetAnnotations()["dataflow.argoproj.io/wait-for"]; v != "" {
condition = v
Expand Down
29 changes: 16 additions & 13 deletions manager/controllers/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
terminate, sunkMessages, errors := false, false, false
for _, step := range steps.Items {
stepName := step.Spec.Name

if !pipeline.Spec.HasStep(stepName) { // this happens when a pipeline changes and a step is removed
log.Info("deleting excess step", "stepName", stepName)
if err := r.Client.Delete(ctx, &step); err != nil {
Expand Down Expand Up @@ -143,12 +142,16 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
errors = errors || step.Status.AnyErrors()
}

if newStatus.Phase.Completed() {
terminate = false
}

var ss []string
for n, s := range map[int]string{
pending: "pending",
running: "running",
succeeded: "succeeded",
failed: "failed",
for s, n := range map[string]int{
"pending": pending,
"running": running,
"succeeded": succeeded,
"failed": failed,
} {
if n > 0 {
ss = append(ss, fmt.Sprintf("%d %s", n, s))
Expand All @@ -160,16 +163,16 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

newStatus.Message = strings.Join(ss, ", ")

for ok, c := range map[bool]string{
newStatus.Phase == dfv1.PipelineRunning: dfv1.ConditionRunning,
newStatus.Phase.Completed(): dfv1.ConditionCompleted,
sunkMessages: dfv1.ConditionSunkMessages,
errors: dfv1.ConditionErrors,
terminate: dfv1.ConditionTerminating,
for c, ok := range map[string]bool{
dfv1.ConditionRunning: newStatus.Phase == dfv1.PipelineRunning,
dfv1.ConditionCompleted: newStatus.Phase.Completed(),
dfv1.ConditionSunkMessages: sunkMessages,
dfv1.ConditionErrors: errors,
dfv1.ConditionTerminating: terminate,
} {
if ok {
meta.SetStatusCondition(&newStatus.Conditions, metav1.Condition{Type: c, Status: metav1.ConditionTrue, Reason: c})
} else if meta.FindStatusCondition(newStatus.Conditions, c) != nil { // guard only needed because RemoveStatusCondition panics on zero length conditions
} else if len(newStatus.Conditions) > 0 { // guard only needed because RemoveStatusCondition panics on zero length conditions
meta.RemoveStatusCondition(&newStatus.Conditions, c)
}
}
Expand Down
6 changes: 3 additions & 3 deletions manager/controllers/step_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
for _, s := range pod.Status.ContainerStatuses {
if s.Name != dfv1.CtrMain {
if err := r.ContainerKiller.KillContainer(pod, s.Name); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to kill container %s/%s: %w", pod.Name, s.Name, err)
log.Error(err, "failed to kill container", "pod", pod.Name, "container", s.Name)
}
}
}
Expand All @@ -156,8 +156,8 @@ func (r *StepReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
annotations[dfv1.KeyReplica] = strconv.Itoa(replica)
annotations[dfv1.KeyHash] = hash
annotations[dfv1.KeyDefaultContainer] = dfv1.CtrMain
annotations[dfv1.KeyKillCmd(dfv1.CtrMain)] = util.MustJSON([]string{dfv1.PathVarRun, "1"})
annotations[dfv1.KeyKillCmd(dfv1.CtrSidecar)] = util.MustJSON([]string{dfv1.PathVarRun, "1"})
annotations[dfv1.KeyKillCmd(dfv1.CtrMain)] = util.MustJSON([]string{dfv1.PathKill, "1"})
annotations[dfv1.KeyKillCmd(dfv1.CtrSidecar)] = util.MustJSON([]string{dfv1.PathKill, "1"})
if err := r.Client.Create(
ctx,
&corev1.Pod{
Expand Down
21 changes: 13 additions & 8 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

var (
logger = zap.New()
preStopCh = make(chan bool, 1)
preStopCh = make(chan bool, 16)
beforeClosers []func(ctx context.Context) error // should be closed before main container exits
afterClosers []func(ctx context.Context) error // should be close after the main container exits
dynamicInterface dynamic.Interface
Expand Down Expand Up @@ -96,9 +96,7 @@ func Exec(ctx context.Context) error {
logger.Info("sidecar config", "stepName", spec.Name, "pipelineName", pipelineName, "replica", replica, "updateInterval", updateInterval.String())

defer func() {
logger.Info("waiting for pre-stop")
<-preStopCh
logger.Info("waited for pre-stop")
preStop()
stop(afterClosers)
}()

Expand All @@ -115,11 +113,8 @@ func Exec(ctx context.Context) error {
// we listen to this message, but it does not come from Kubernetes, it actually comes from the main container's
// pre-stop hook
http.HandleFunc("/pre-stop", func(w http.ResponseWriter, r *http.Request) {
logger.Info("pre-stop")
stop(beforeClosers)
preStop()
w.WriteHeader(204)
logger.Info("pre-stop done")
preStopCh <- true
})

connectOut(toSink)
Expand All @@ -141,6 +136,16 @@ func Exec(ctx context.Context) error {
return nil
}

func preStop() {
logger.Info("pre-stop")
mu.Lock()
defer mu.Unlock()
stop(beforeClosers)
beforeClosers = nil
preStopCh <- true
logger.Info("pre-stop done")
}

func stop(closers []func(ctx context.Context) error) {
logger.Info("closing closers", "len", len(closers))
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
Expand Down

0 comments on commit a561027

Please sign in to comment.