From b5a587e11c679f5f869b06bcc37c152f250e9c92 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 19 Mar 2026 21:19:18 +0100 Subject: [PATCH 1/2] add edgecase handling for services exiting with error --- pipeline/runtime/runtime.go | 6 ++ pipeline/runtime/runtime_test.go | 107 ++++++++++++++++++++++++++++--- pipeline/runtime/step.go | 9 ++- pipeline/runtime/workflow.go | 5 ++ 4 files changed, 117 insertions(+), 10 deletions(-) diff --git a/pipeline/runtime/runtime.go b/pipeline/runtime/runtime.go index 4512edfe907..48764c47aa9 100644 --- a/pipeline/runtime/runtime.go +++ b/pipeline/runtime/runtime.go @@ -16,6 +16,7 @@ package runtime import ( "context" + "sync" "github.com/oklog/ulid/v2" "github.com/rs/zerolog" @@ -42,6 +43,11 @@ type Runtime struct { // Cleanup operations should use the runnerCtx passed to Run(). ctx context.Context + // detachedWg tracks all background goroutines launched by runDetachedStep. + // Run waits on it before returning so that a service that exits after the + // last stage still has the opportunity to mark the pipeline as failed. + detachedWg sync.WaitGroup + tracer tracing.Tracer logger logging.Logger diff --git a/pipeline/runtime/runtime_test.go b/pipeline/runtime/runtime_test.go index 23ae2d9f68a..c051808eb48 100644 --- a/pipeline/runtime/runtime_test.go +++ b/pipeline/runtime/runtime_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -97,6 +98,12 @@ func withOOM() func(*backend_types.Step) { } } +func withSleep(t time.Duration) func(*backend_types.Step) { + return func(s *backend_types.Step) { + s.Environment[dummy.EnvKeyStepSleep] = t.String() + } +} + func withStartFail() func(*backend_types.Step) { return func(s *backend_types.Step) { s.Environment[dummy.EnvKeyStepStartFail] = "true" @@ -194,32 +201,35 @@ func TestWorkflowWithServiceStep(t *testing.T) { assert.NoError(t, r.Run(t.Context())) traces := getTracerStates(tracer) - if assert.Len(t, traces, 5) { + if assert.Len(t, traces, 6) { assert.EqualValues(t, backend_types.State{}, traces[0].CurrStepState) assert.Greater(t, traces[2].CurrStepState.Started, int64(0)) assert.EqualValues(t, backend_types.State{Started: traces[2].CurrStepState.Started, Exited: true}, traces[2].CurrStepState) assert.EqualValues(t, backend_types.State{}, traces[3].CurrStepState) assert.Greater(t, traces[4].CurrStepState.Started, int64(0)) assert.EqualValues(t, backend_types.State{Started: traces[4].CurrStepState.Started, Exited: true}, traces[4].CurrStepState) + assert.Greater(t, calls[5].CurrStepState.Started, int64(0)) + assert.EqualValues(t, backend.State{Started: calls[5].CurrStepState.Started, Exited: true}, calls[5].CurrStepState) - assert.Greater(t, traces[4].Workflow.Started, int64(0)) + assert.Greater(t, traces[5].Workflow.Started, int64(0)) assert.EqualValues(t, state.State{ Workflow: state.Workflow{ Started: traces[4].Workflow.Started, }, CurrStep: &backend_types.Step{ - Name: "test", - UUID: "test-uuid", - Type: "commands", + Name: "db", + UUID: "db-uuid", + Type: "service", + Detached: true, OnSuccess: true, - Environment: map[string]string{}, - Commands: []string{"echo test"}, + Environment: map[string]string{"SLEEP": "100ms"}, + Commands: []string{"echo db"}, }, CurrStepState: backend_types.State{ - Started: traces[4].CurrStepState.Started, + Started: traces[5].CurrStepState.Started, Exited: true, }, - }, traces[4]) + }, traces[5]) } } @@ -668,3 +678,82 @@ func TestWorkflowEmptyStages(t *testing.T) { assert.NoError(t, err) assert.Empty(t, getTracerStates(tracer)) } + +func TestWorkflowFailingServiceMarksWorkflowFailed(t *testing.T) { + t.Parallel() + tracer := newTestTracer(t) + r := New( + &backend_types.Config{ + Stages: []*backend_types.Stage{ + {Steps: []*backend_types.Step{ + // Service starts, runs for ~100ms, then exits non-zero. + // withService sets Detached=true; the exit-code env var + // causes the dummy backend to return code 1 when the + // step finishes. + cmdStep("db", withService(), withExitCode(1)), + // workflow is over before service exits on its own + cmdStep("build", withSleep(150*time.Millisecond)), + }}, + {Steps: []*backend_types.Step{cmdStep("deploy")}}, + }, + }, + dummy.New(), + WithTracer(tracer), + WithLogger(newTestLogger(t)), + ) + + err := r.Run(t.Context()) + + if !assert.Error(t, err, "workflow should fail when a service step exits non-zero") { + t.FailNow() + } + + var exitErr *pipeline_errors.ExitError + if assert.ErrorAs(t, err, &exitErr, "error should be an ExitError when service step fails") { + assert.Equal(t, 1, exitErr.Code) + } + + // deploy must be skipped, not silently dropped + deployTrace := findTraceByName(getTracerStates(tracer), "deploy") + require.NotNil(t, deployTrace, "deploy step should still be traced") + assert.True(t, deployTrace.CurrStepState.Skipped, "deploy should be skipped after service failure") +} + +func TestWorkflowFailingDetachedStepMarksWorkflowFailed(t *testing.T) { + t.Parallel() + tracer := newTestTracer(t) + r := New( + &backend_types.Config{ + Stages: []*backend_types.Stage{ + {Steps: []*backend_types.Step{ + // Detached (non-service) step that exits with code 2. + // withDetached sets Detached=true without changing the + // step type to StepTypeService, so it represents a + // background worker rather than a long-running service. + cmdStep("background-worker", withDetached(), withExitCode(2)), + // detached fails befor workflow is over + cmdStep("main-build", withSleep(200*time.Millisecond)), + }}, + {Steps: []*backend_types.Step{cmdStep("deploy")}}, + }, + }, + dummy.New(), + WithTracer(tracer), + WithLogger(newTestLogger(t)), + ) + + err := r.Run(t.Context()) + + if !assert.Error(t, err, "workflow should fail when a detached step exits non-zero") { + t.FailNow() + } + + var exitErr *pipeline_errors.ExitError + if assert.ErrorAs(t, err, &exitErr, "error should be an ExitError when detached step fails") { + assert.Equal(t, 2, exitErr.Code) + } + + deployTrace := findTraceByName(getTracerStates(tracer), "deploy") + require.NotNil(t, deployTrace, "deploy step should still be traced") + assert.True(t, deployTrace.CurrStepState.Skipped, "deploy should be skipped after detached step failure") +} diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go index 6df81648c4a..5ff0913f24d 100644 --- a/pipeline/runtime/step.go +++ b/pipeline/runtime/step.go @@ -205,8 +205,14 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types return r.traceStep(nil, err, step) } - // Container is up and logging is streaming — hand off to background. + // Register the goroutine with the WaitGroup before spawning it so there + // is no window between launch and Run()'s detachedWg.Wait() where the + // goroutine could be missed. + r.detachedWg.Add(1) + go func() { + defer r.detachedWg.Done() + logger := r.makeLogger() processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime) @@ -217,6 +223,7 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types } if err != nil { logger.Error().Err(err).Str("step", step.Name).Msg("detached step failed after while running") + r.err.Set(err) } if traceErr := r.traceStep(processState, err, step); traceErr != nil { diff --git a/pipeline/runtime/workflow.go b/pipeline/runtime/workflow.go index 1c841d80404..a750fe6969a 100644 --- a/pipeline/runtime/workflow.go +++ b/pipeline/runtime/workflow.go @@ -69,6 +69,11 @@ func (r *Runtime) Run(runnerCtx context.Context) error { } } + // Wait for all detached step goroutines to finish. A service or background + // worker that exits after the last stage would otherwise be silently ignored; + // waiting here ensures r.err is populated before we return. + r.detachedWg.Wait() + return r.err.Get() } From 3be73a1b03b2357ea68faed4cf5fa4aa9a25e449 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 19 Mar 2026 22:14:36 +0100 Subject: [PATCH 2/2] fix hang --- pipeline/runtime/option.go | 6 ++++-- pipeline/runtime/runtime.go | 11 ++++++++--- pipeline/runtime/step.go | 16 ++++++++++++---- pipeline/runtime/workflow.go | 11 ++++++++--- 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/pipeline/runtime/option.go b/pipeline/runtime/option.go index 853b255f09e..bf7de8043d1 100644 --- a/pipeline/runtime/option.go +++ b/pipeline/runtime/option.go @@ -38,10 +38,12 @@ func WithTracer(tracer tracing.Tracer) Option { } } -// WithContext sets the workflow execution context. +// WithContext sets the workflow execution context. The provided context is +// wrapped in a cancelable child so that Run() can always cancel it at the end +// of the stage loop to unblock any still-running detached steps / services. func WithContext(ctx context.Context) Option { return func(r *Runtime) { - r.ctx = ctx + r.ctx, r.cancelCtx = context.WithCancel(ctx) } } diff --git a/pipeline/runtime/runtime.go b/pipeline/runtime/runtime.go index 48764c47aa9..33db9a9e512 100644 --- a/pipeline/runtime/runtime.go +++ b/pipeline/runtime/runtime.go @@ -43,9 +43,14 @@ type Runtime struct { // Cleanup operations should use the runnerCtx passed to Run(). ctx context.Context + // cancelCtx cancels ctx. Called by Run() after the stage loop completes + // so that any still-running detached steps / services are told to stop, + // unblocking detachedWg.Wait(). + cancelCtx context.CancelFunc + // detachedWg tracks all background goroutines launched by runDetachedStep. - // Run waits on it before returning so that a service that exits after the - // last stage still has the opportunity to mark the pipeline as failed. + // Run cancels ctx first, then waits here, so every goroutine unblocks + // promptly and r.err is fully populated before Run returns. detachedWg sync.WaitGroup tracer tracing.Tracer @@ -62,7 +67,7 @@ func New(spec *backend_types.Config, backend backend_types.Backend, opts ...Opti r.description = map[string]string{} r.spec = spec r.engine = backend - r.ctx = context.Background() + r.ctx, r.cancelCtx = context.WithCancel(context.Background()) r.taskUUID = ulid.Make().String() for _, opt := range opts { opt(r) diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go index 5ff0913f24d..05a275d7c79 100644 --- a/pipeline/runtime/step.go +++ b/pipeline/runtime/step.go @@ -195,8 +195,12 @@ func (r *Runtime) runBlockingStep(runnerCtx context.Context, step *backend_types // runDetachedStep starts the step and returns as soon as the container is running // and log streaming is set up. The rest of the step lifecycle runs in the background. // -// Any error that occurs after setup is logged but not propagated — it cannot -// influence the pipeline outcome at that point. +// The goroutine is tracked by r.detachedWg so that Run() can wait for it after +// canceling r.ctx. This means: +// - services/detached steps that are still running when stages finish are stopped +// promptly via context cancellation and do not block Run() indefinitely; +// - a service that exits non-zero before Run() cancels r.ctx propagates the +// failure into r.err so Run() returns the correct error. func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types.Step) error { waitForLogs, startTime, err := r.startStep(step) if err != nil { @@ -218,9 +222,13 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime) logger.Debug().Str("step", step.Name).Msg("complete") - if errors.Is(err, context.Canceled) { - err = pipeline_errors.ErrCancel + // A context.Canceled / ErrCancel result here means Run() cancelled r.ctx + // to stop the service at end-of-pipeline — that is the normal teardown + // path, not a failure. Only propagate genuinely unexpected errors. + if errors.Is(err, context.Canceled) || errors.Is(err, pipeline_errors.ErrCancel) { + err = nil } + if err != nil { logger.Error().Err(err).Str("step", step.Name).Msg("detached step failed after while running") r.err.Set(err) diff --git a/pipeline/runtime/workflow.go b/pipeline/runtime/workflow.go index a750fe6969a..3606a3af440 100644 --- a/pipeline/runtime/workflow.go +++ b/pipeline/runtime/workflow.go @@ -69,9 +69,14 @@ func (r *Runtime) Run(runnerCtx context.Context) error { } } - // Wait for all detached step goroutines to finish. A service or background - // worker that exits after the last stage would otherwise be silently ignored; - // waiting here ensures r.err is populated before we return. + // Cancel r.ctx to tell any still-running detached steps / services to stop. + // Without this, detachedWg.Wait() would block indefinitely for services that + // are designed to outlive the pipeline stages. + r.cancelCtx() + + // Wait for every detached goroutine to finish. Each one will call r.err.Set + // if it exits with an error, so waiting here ensures the final return value + // reflects all outcomes — including a service that fails after the last stage. r.detachedWg.Wait() return r.err.Get()