From 100d187cd7958bd7486f7f0cb63ae882d2a0979b Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 01:02:03 +0200 Subject: [PATCH 01/10] move upload waitgroup logic out of logger and tracer into pipeline runtime --- agent/logger.go | 6 +----- agent/runner.go | 15 +++++++++++++-- agent/tracer.go | 23 +---------------------- pipeline/runtime/option.go | 7 +++++++ pipeline/runtime/runtime.go | 11 +++++++++++ pipeline/runtime/step.go | 28 ++++++++++++++++++++++++---- 6 files changed, 57 insertions(+), 33 deletions(-) diff --git a/agent/logger.go b/agent/logger.go index cecf15dbe0c..23a420344a9 100644 --- a/agent/logger.go +++ b/agent/logger.go @@ -16,7 +16,6 @@ package agent import ( "io" - "sync" "github.com/rs/zerolog" @@ -28,7 +27,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/rpc" ) -func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) logging.Logger { +func (r *Runner) createLogger(_logger zerolog.Logger, workflow *rpc.Workflow) logging.Logger { return func(step *backend_types.Step, rc io.ReadCloser) error { defer rc.Close() @@ -36,9 +35,6 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w Str("image", step.Image). Logger() - uploads.Add(1) - defer uploads.Done() - var secrets []string for _, secret := range workflow.Config.Secrets { secrets = append(secrets, secret.Value) diff --git a/agent/runner.go b/agent/runner.go index 7e5d19464c9..348127016d4 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "runtime" "sync" "time" @@ -161,19 +162,29 @@ func (r *Runner) Run(runnerCtx context.Context) error { var uploads sync.WaitGroup + // Enrich workflow env with agent info + // TODO: find better way to track this state + for _, stage := range workflow.Config.Stages { + for _, step := range stage.Steps { + step.Environment["CI_MACHINE"] = r.hostname + step.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH + } + } + // Run pipeline err = pipeline_runtime.New( workflow.Config, r.backend, pipeline_runtime.WithContext(workflowCtx), pipeline_runtime.WithTaskUUID(fmt.Sprint(workflow.ID)), - pipeline_runtime.WithLogger(r.createLogger(logger, &uploads, workflow)), - pipeline_runtime.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)), + pipeline_runtime.WithLogger(r.createLogger(logger, workflow)), + pipeline_runtime.WithTracer(r.createTracer(ctxMeta, logger, workflow)), pipeline_runtime.WithDescription(map[string]string{ "workflow_id": workflow.ID, "repo": repoName, "pipeline_number": pipelineNumber, }), + pipeline_runtime.WithUploadLock(&uploads), ).Run(runnerCtx) state.Finished = time.Now().Unix() diff --git a/agent/tracer.go b/agent/tracer.go index 836badfbfc0..fa460f0e05d 100644 --- a/agent/tracer.go +++ b/agent/tracer.go @@ -17,9 +17,6 @@ package agent import ( "context" "errors" - "runtime" - "strconv" - "sync" "time" "github.com/rs/zerolog" @@ -30,11 +27,8 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/rpc" ) -func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) tracing.TraceFunc { +func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) tracing.TraceFunc { return func(state *state.State) error { - uploads.Add(1) - defer uploads.Done() - stepLogger := logger.With(). Str("image", state.CurrStep.Image). Str("workflow_id", workflow.ID). @@ -69,21 +63,6 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, stepLogger.Debug().Msg("update step status complete") }() - if state.CurrStepState.Exited { - return nil - } - if state.CurrStep.Environment == nil { - state.CurrStep.Environment = map[string]string{} - } - - // TODO: find better way to update this state and move it to pipeline to have the same env in cli-exec - state.CurrStep.Environment["CI_MACHINE"] = r.hostname - - state.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10) - - state.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10) - - state.CurrStep.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH return nil } diff --git a/pipeline/runtime/option.go b/pipeline/runtime/option.go index 853b255f09e..605e756b851 100644 --- a/pipeline/runtime/option.go +++ b/pipeline/runtime/option.go @@ -16,6 +16,7 @@ package runtime import ( "context" + "sync" "go.woodpecker-ci.org/woodpecker/v3/pipeline/logging" "go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing" @@ -58,3 +59,9 @@ func WithTaskUUID(uuid string) Option { r.taskUUID = uuid } } + +func WithUploadLock(up *sync.WaitGroup) Option { + return func(r *Runtime) { + r.uploadWait = up + } +} diff --git a/pipeline/runtime/runtime.go b/pipeline/runtime/runtime.go index 1cb262ebe34..955b803cead 100644 --- a/pipeline/runtime/runtime.go +++ b/pipeline/runtime/runtime.go @@ -47,6 +47,8 @@ type Runtime struct { tracerLock sync.Mutex logger logging.Logger + uploadWait *sync.WaitGroup + taskUUID string description map[string]string } @@ -75,3 +77,12 @@ func (r *Runtime) makeLogger() zerolog.Logger { } return logCtx.Logger() } + +func (r *Runtime) uploadSignal() func() { + if r.uploadWait == nil { + // no wait group, so we just return a noop + return func() {} + } + r.uploadWait.Add(1) + return r.uploadWait.Done +} diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go index ee971dc6ef8..7536ec174ca 100644 --- a/pipeline/runtime/step.go +++ b/pipeline/runtime/step.go @@ -17,6 +17,7 @@ package runtime import ( "context" "errors" + "strconv" "sync" "time" @@ -122,6 +123,8 @@ func (r *Runtime) startStep(step *backend_types.Step) (func(), int64, error) { // The runnerCtx is intentionally used for DestroyStep so that container cleanup can // still reach the backend even after the workflow context (r.ctx) is canceled. func (r *Runtime) completeStep(runnerCtx context.Context, step *backend_types.Step, waitForLogs func(), startTime int64) (*backend_types.State, error) { + defer r.uploadSignal()() + // Drain the log stream before waiting on the process exit. waitForLogs() @@ -235,6 +238,8 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types // // Always returns err unchanged so callers can write: return r.traceStep(state, err, step). func (r *Runtime) traceStep(processState *backend_types.State, err error, step *backend_types.Step) error { + defer r.uploadSignal()() + s := new(state.State) s.Workflow.Started = r.started s.CurrStep = step @@ -253,12 +258,27 @@ func (r *Runtime) traceStep(processState *backend_types.State, err error, step * // processState == nil && err == nil: step just started, leave s.CurrStepState zero-valued. } - // The tracer should just trace changes, but it currently also updates step env vars used in various ways: - // https://github.com/woodpecker-ci/woodpecker/blob/main/agent/tracer.go#L79-L86 . - r.tracerLock.Lock() - defer r.tracerLock.Unlock() if traceErr := r.tracer.Trace(s); traceErr != nil { return traceErr } + + // The traceStep should just trace changes, but it currently also updates step env vars. + { + r.tracerLock.Lock() + defer r.tracerLock.Unlock() + + if s.CurrStepState.Exited { + return err + } + + if s.CurrStep.Environment == nil { + s.CurrStep.Environment = map[string]string{} + } + + // TODO: find better way to insert runtime step environment variables. + s.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10) + s.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10) + } + return err } From 97252d4c6171432ed33ea53102dd1a3afbef96c3 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 01:08:34 +0200 Subject: [PATCH 02/10] simplify and self contain --- agent/runner.go | 9 --------- pipeline/runtime/option.go | 7 ------- pipeline/runtime/runtime.go | 11 +---------- pipeline/runtime/step.go | 6 ++++-- pipeline/runtime/workflow.go | 5 +++++ 5 files changed, 10 insertions(+), 28 deletions(-) diff --git a/agent/runner.go b/agent/runner.go index 348127016d4..e9a653cab83 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "runtime" - "sync" "time" "github.com/rs/zerolog/log" @@ -160,8 +159,6 @@ func (r *Runner) Run(runnerCtx context.Context) error { return err } - var uploads sync.WaitGroup - // Enrich workflow env with agent info // TODO: find better way to track this state for _, stage := range workflow.Config.Stages { @@ -184,7 +181,6 @@ func (r *Runner) Run(runnerCtx context.Context) error { "repo": repoName, "pipeline_number": pipelineNumber, }), - pipeline_runtime.WithUploadLock(&uploads), ).Run(runnerCtx) state.Finished = time.Now().Unix() @@ -203,11 +199,6 @@ func (r *Runner) Run(runnerCtx context.Context) error { Bool("canceled", state.Canceled). Msg("workflow finished") - // Ensure all logs/traces are uploaded before finishing - logger.Debug().Msg("waiting for logs and traces upload") - uploads.Wait() - logger.Debug().Msg("logs and traces uploaded") - // Update workflow state doneCtx := runnerCtx //nolint:contextcheck if doneCtx.Err() != nil { diff --git a/pipeline/runtime/option.go b/pipeline/runtime/option.go index 605e756b851..853b255f09e 100644 --- a/pipeline/runtime/option.go +++ b/pipeline/runtime/option.go @@ -16,7 +16,6 @@ package runtime import ( "context" - "sync" "go.woodpecker-ci.org/woodpecker/v3/pipeline/logging" "go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing" @@ -59,9 +58,3 @@ func WithTaskUUID(uuid string) Option { r.taskUUID = uuid } } - -func WithUploadLock(up *sync.WaitGroup) Option { - return func(r *Runtime) { - r.uploadWait = up - } -} diff --git a/pipeline/runtime/runtime.go b/pipeline/runtime/runtime.go index 955b803cead..8003b26a72d 100644 --- a/pipeline/runtime/runtime.go +++ b/pipeline/runtime/runtime.go @@ -47,7 +47,7 @@ type Runtime struct { tracerLock sync.Mutex logger logging.Logger - uploadWait *sync.WaitGroup + uploadWait sync.WaitGroup taskUUID string description map[string]string @@ -77,12 +77,3 @@ func (r *Runtime) makeLogger() zerolog.Logger { } return logCtx.Logger() } - -func (r *Runtime) uploadSignal() func() { - if r.uploadWait == nil { - // no wait group, so we just return a noop - return func() {} - } - r.uploadWait.Add(1) - return r.uploadWait.Done -} diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go index 7536ec174ca..c1faca2c848 100644 --- a/pipeline/runtime/step.go +++ b/pipeline/runtime/step.go @@ -123,7 +123,8 @@ func (r *Runtime) startStep(step *backend_types.Step) (func(), int64, error) { // The runnerCtx is intentionally used for DestroyStep so that container cleanup can // still reach the backend even after the workflow context (r.ctx) is canceled. func (r *Runtime) completeStep(runnerCtx context.Context, step *backend_types.Step, waitForLogs func(), startTime int64) (*backend_types.State, error) { - defer r.uploadSignal()() + r.uploadWait.Add(1) + defer r.uploadWait.Done() // Drain the log stream before waiting on the process exit. waitForLogs() @@ -238,7 +239,8 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types // // Always returns err unchanged so callers can write: return r.traceStep(state, err, step). func (r *Runtime) traceStep(processState *backend_types.State, err error, step *backend_types.Step) error { - defer r.uploadSignal()() + r.uploadWait.Add(1) + defer r.uploadWait.Done() s := new(state.State) s.Workflow.Started = r.started diff --git a/pipeline/runtime/workflow.go b/pipeline/runtime/workflow.go index 793110d4dc1..0f3be10bdf8 100644 --- a/pipeline/runtime/workflow.go +++ b/pipeline/runtime/workflow.go @@ -71,6 +71,11 @@ func (r *Runtime) Run(runnerCtx context.Context) error { } } + // Ensure all logs/traces are uploaded before finishing + logger.Debug().Msg("waiting for logs and traces upload") + r.uploadWait.Wait() + logger.Debug().Msg("logs and traces uploaded") + return r.err.Get() } From ce592f765f9988042873c1fae2b9a0331b291fb1 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 01:17:00 +0200 Subject: [PATCH 03/10] keep behaviour of step var similar as before --- agent/tracer.go | 15 +++++---------- pipeline/runtime/step.go | 21 ++++++++++----------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/agent/tracer.go b/agent/tracer.go index fa460f0e05d..e2c65e599e8 100644 --- a/agent/tracer.go +++ b/agent/tracer.go @@ -52,18 +52,13 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo stepState.Finished = time.Now().Unix() } - defer func() { - stepLogger.Debug().Msg("update step status") + stepLogger.Debug().Msg("update step status") - if err := r.client.Update(ctxMeta, workflow.ID, stepState); err != nil { - stepLogger.Debug(). - Err(err). - Msg("update step status error") - } - - stepLogger.Debug().Msg("update step status complete") - }() + if err := r.client.Update(ctxMeta, workflow.ID, stepState); err != nil { + return err + } + stepLogger.Debug().Msg("update step status complete") return nil } } diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go index c1faca2c848..774915b243e 100644 --- a/pipeline/runtime/step.go +++ b/pipeline/runtime/step.go @@ -260,26 +260,25 @@ func (r *Runtime) traceStep(processState *backend_types.State, err error, step * // processState == nil && err == nil: step just started, leave s.CurrStepState zero-valued. } - if traceErr := r.tracer.Trace(s); traceErr != nil { - return traceErr - } - // The traceStep should just trace changes, but it currently also updates step env vars. { r.tracerLock.Lock() defer r.tracerLock.Unlock() if s.CurrStepState.Exited { - return err - } + if s.CurrStep.Environment == nil { + s.CurrStep.Environment = map[string]string{} + } - if s.CurrStep.Environment == nil { - s.CurrStep.Environment = map[string]string{} + // TODO: find better way to insert runtime step environment variables. + s.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10) + s.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10) } + } - // TODO: find better way to insert runtime step environment variables. - s.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10) - s.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(s.Workflow.Started, 10) + if traceErr := r.tracer.Trace(s); traceErr != nil { + logger := r.makeLogger() + logger.Error().Err(traceErr).Msg("could not trace step state change") } return err From 24eb068a092a84dfa1496ba4ab5f93a8234e76ec Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 01:18:04 +0200 Subject: [PATCH 04/10] clean code --- agent/tracer.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/agent/tracer.go b/agent/tracer.go index e2c65e599e8..5cde77d8da8 100644 --- a/agent/tracer.go +++ b/agent/tracer.go @@ -53,12 +53,8 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo } stepLogger.Debug().Msg("update step status") + defer stepLogger.Debug().Msg("update step status complete") - if err := r.client.Update(ctxMeta, workflow.ID, stepState); err != nil { - return err - } - - stepLogger.Debug().Msg("update step status complete") - return nil + return r.client.Update(ctxMeta, workflow.ID, stepState) } } From bd48c5df0e5b6bf308f08c8ae836fe7137697811 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 01:40:03 +0200 Subject: [PATCH 05/10] fix last race --- pipeline/runtime/step.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go index 774915b243e..ae5726b276e 100644 --- a/pipeline/runtime/step.go +++ b/pipeline/runtime/step.go @@ -31,6 +31,9 @@ import ( // It checks whether the step should be skipped, emits a "started" trace, // sets up drone-compat env vars, then hands off to blocking or detached execution. func (r *Runtime) executeStep(runnerCtx context.Context, step *backend_types.Step) error { + r.uploadWait.Add(1) + defer r.uploadWait.Done() + logger := r.makeLogger() logger.Debug().Str("step", step.Name).Msg("prepare") @@ -123,9 +126,6 @@ func (r *Runtime) startStep(step *backend_types.Step) (func(), int64, error) { // The runnerCtx is intentionally used for DestroyStep so that container cleanup can // still reach the backend even after the workflow context (r.ctx) is canceled. func (r *Runtime) completeStep(runnerCtx context.Context, step *backend_types.Step, waitForLogs func(), startTime int64) (*backend_types.State, error) { - r.uploadWait.Add(1) - defer r.uploadWait.Done() - // Drain the log stream before waiting on the process exit. waitForLogs() @@ -202,8 +202,12 @@ func (r *Runtime) runBlockingStep(runnerCtx context.Context, step *backend_types // Any error that occurs after setup is logged but not propagated — it cannot // influence the pipeline outcome at that point. func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types.Step) error { + r.uploadWait.Add(1) + waitForLogs, startTime, err := r.startStep(step) if err != nil { + defer r.uploadWait.Done() + // Setup failed before the container was running — treat it like a // blocking failure so the pipeline is aware. return r.traceStep(nil, err, step) @@ -211,6 +215,8 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types // Container is up and logging is streaming — hand off to background. go func() { + defer r.uploadWait.Done() + logger := r.makeLogger() processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime) @@ -239,9 +245,6 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types // // Always returns err unchanged so callers can write: return r.traceStep(state, err, step). func (r *Runtime) traceStep(processState *backend_types.State, err error, step *backend_types.Step) error { - r.uploadWait.Add(1) - defer r.uploadWait.Done() - s := new(state.State) s.Workflow.Started = r.started s.CurrStep = step From 8d2e499653317cb4b27c17e65589e37f27016632 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 02:00:07 +0200 Subject: [PATCH 06/10] update unit tests --- pipeline/runtime/runtime_test.go | 10 +++++++++- pipeline/runtime/step_test.go | 29 ----------------------------- 2 files changed, 9 insertions(+), 30 deletions(-) diff --git a/pipeline/runtime/runtime_test.go b/pipeline/runtime/runtime_test.go index e497564a2ab..5c618d4e476 100644 --- a/pipeline/runtime/runtime_test.go +++ b/pipeline/runtime/runtime_test.go @@ -195,15 +195,19 @@ func TestWorkflowWithServiceStep(t *testing.T) { assert.NoError(t, r.Run(t.Context())) traces := getTracerStates(tracer) - if assert.Len(t, traces, 5) { + if assert.Len(t, traces, 6) { assert.EqualValues(t, backend_types.State{}, traces[0].CurrStepState) assert.Greater(t, traces[2].CurrStepState.Started, int64(0)) assert.EqualValues(t, backend_types.State{Started: traces[2].CurrStepState.Started, Exited: true}, traces[2].CurrStepState) assert.EqualValues(t, backend_types.State{}, traces[3].CurrStepState) assert.Greater(t, traces[4].CurrStepState.Started, int64(0)) assert.EqualValues(t, backend_types.State{Started: traces[4].CurrStepState.Started, Exited: true}, traces[4].CurrStepState) + assert.Greater(t, traces[5].CurrStepState.Started, int64(0)) + assert.EqualValues(t, backend_types.State{Started: traces[5].CurrStepState.Started, Exited: true}, traces[5].CurrStepState) assert.Greater(t, traces[4].Workflow.Started, int64(0)) + delete(traces[4].CurrStep.Environment, "CI_PIPELINE_STARTED") + delete(traces[4].CurrStep.Environment, "CI_STEP_STARTED") assert.EqualValues(t, state.State{ Workflow: state.Workflow{ Started: traces[4].Workflow.Started, @@ -409,6 +413,10 @@ func TestWorkflowPluginStep(t *testing.T) { lastPluginTrace := findLastTraceByName(getTracerStates(tracer), "publish") if assert.NotNil(t, lastPluginTrace) { + + delete(lastPluginTrace.CurrStep.Environment, "CI_PIPELINE_STARTED") + delete(lastPluginTrace.CurrStep.Environment, "CI_STEP_STARTED") + assert.EqualValues(t, map[string]string{ "DRONE_BUILD_STATUS": "success", "DRONE_REPO_SCM": "git", diff --git a/pipeline/runtime/step_test.go b/pipeline/runtime/step_test.go index 7f3dd1b0b02..47c2d1c9d31 100644 --- a/pipeline/runtime/step_test.go +++ b/pipeline/runtime/step_test.go @@ -177,18 +177,6 @@ func TestTraceStep(t *testing.T) { assert.True(t, calls[0].CurrStepState.Exited) }) - t.Run("TracerError", func(t *testing.T) { - t.Parallel() - traceErr := errors.New("tracer unavailable") - tracer := tracer_mocks.NewMockTracer(t) - tracer.On("Trace", mock.Anything).Return(traceErr).Maybe() - r := newDummyRuntime(t, tracer) - - err := r.traceStep(nil, nil, dummyStep("s1")) - - assert.ErrorIs(t, err, traceErr) - }) - t.Run("PipelineErrorPropagated", func(t *testing.T) { t.Parallel() tracer := newTestTracer(t) @@ -518,23 +506,6 @@ func TestExecuteStep(t *testing.T) { return atomic.LoadInt32(&traced) >= 2 }, time.Second, 10*time.Millisecond) }) - - t.Run("TracerErrorOnStarted", func(t *testing.T) { - t.Parallel() - traceErr := errors.New("tracer down") - tracer := tracer_mocks.NewMockTracer(t) - // First call (skip-check passes, this is the "started" trace) → error. - // The step has OnSuccess=true and no prior error, so shouldSkipStep returns false, - // meaning executeStep calls traceStep(nil, nil, step) first. - tracer.On("Trace", mock.Anything).Return(traceErr).Once() - - r := newDummyRuntime(t, tracer) - step := dummyStep("s1") // OnSuccess=true, so not skipped - - err := r.executeStep(t.Context(), step) - - assert.ErrorIs(t, err, traceErr) - }) } func TestRunBlockingStep(t *testing.T) { From 39bb4a447d21b0293e5c4325ce8468bc5109f38f Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 02:08:17 +0200 Subject: [PATCH 07/10] only detached steps need upload wait lock --- pipeline/runtime/helpers_test.go | 10 +++ pipeline/runtime/runtime_test.go | 101 +++++++++++++++++++++---------- pipeline/runtime/step.go | 8 +-- 3 files changed, 80 insertions(+), 39 deletions(-) diff --git a/pipeline/runtime/helpers_test.go b/pipeline/runtime/helpers_test.go index ea29c8f6ca4..14f5631dbae 100644 --- a/pipeline/runtime/helpers_test.go +++ b/pipeline/runtime/helpers_test.go @@ -57,3 +57,13 @@ func getTracerStates(tracer *tracer_mocks.MockTracer) []state.State { } return states } + +// indexOfTrace returns the first index where predicate matches, or -1. +func indexOfTrace(traces []state.State, match func(s state.State) bool) int { + for i := range traces { + if match(traces[i]) { + return i + } + } + return -1 +} diff --git a/pipeline/runtime/runtime_test.go b/pipeline/runtime/runtime_test.go index 5c618d4e476..f0b3166a33e 100644 --- a/pipeline/runtime/runtime_test.go +++ b/pipeline/runtime/runtime_test.go @@ -193,39 +193,77 @@ func TestWorkflowWithServiceStep(t *testing.T) { WithLogger(newTestLogger(t)), ) - assert.NoError(t, r.Run(t.Context())) + require.NoError(t, r.Run(t.Context())) traces := getTracerStates(tracer) - if assert.Len(t, traces, 6) { - assert.EqualValues(t, backend_types.State{}, traces[0].CurrStepState) - assert.Greater(t, traces[2].CurrStepState.Started, int64(0)) - assert.EqualValues(t, backend_types.State{Started: traces[2].CurrStepState.Started, Exited: true}, traces[2].CurrStepState) - assert.EqualValues(t, backend_types.State{}, traces[3].CurrStepState) - assert.Greater(t, traces[4].CurrStepState.Started, int64(0)) - assert.EqualValues(t, backend_types.State{Started: traces[4].CurrStepState.Started, Exited: true}, traces[4].CurrStepState) - assert.Greater(t, traces[5].CurrStepState.Started, int64(0)) - assert.EqualValues(t, backend_types.State{Started: traces[5].CurrStepState.Started, Exited: true}, traces[5].CurrStepState) - - assert.Greater(t, traces[4].Workflow.Started, int64(0)) - delete(traces[4].CurrStep.Environment, "CI_PIPELINE_STARTED") - delete(traces[4].CurrStep.Environment, "CI_STEP_STARTED") - assert.EqualValues(t, state.State{ - Workflow: state.Workflow{ - Started: traces[4].Workflow.Started, - }, - CurrStep: &backend_types.Step{ - Name: "test", - UUID: "test-uuid", - Type: "commands", - OnSuccess: true, - Environment: map[string]string{}, - Commands: []string{"echo test"}, - }, - CurrStepState: backend_types.State{ - Started: traces[4].CurrStepState.Started, - Exited: true, - }, - }, traces[4]) + + // Each step should emit exactly one "started" and one "exited" trace: + // db (service/detached), build, test — 3 * 2 = 6 traces total. + require.Len(t, traces, 6) + + // Per-step invariants: started trace is the zero state, exited trace is + // Exited=true with a monotonic Started timestamp. + for _, name := range []string{"db", "build", "test"} { + started := findFirstTraceByName(traces, name) + require.NotNil(t, started, "%s should have a started trace", name) + assert.EqualValues(t, backend_types.State{}, started.CurrStepState, + "%s started trace should be zero-valued", name) + + last := findLastTraceByName(traces, name) + require.NotNil(t, last, "%s should have an exited trace", name) + assert.True(t, last.CurrStepState.Exited, "%s should be exited", name) + assert.Equal(t, 0, last.CurrStepState.ExitCode, "%s should exit 0", name) + assert.Greater(t, last.CurrStepState.Started, int64(0), + "%s should have a non-zero Started timestamp", name) + } + + // Per-step ordering: started trace precedes exited trace for the same step. + for _, name := range []string{"db", "build", "test"} { + startedIdx := indexOfTrace(traces, func(s state.State) bool { + return s.CurrStep != nil && s.CurrStep.Name == name && !s.CurrStepState.Exited + }) + exitedIdx := indexOfTrace(traces, func(s state.State) bool { + return s.CurrStep != nil && s.CurrStep.Name == name && s.CurrStepState.Exited + }) + assert.Less(t, startedIdx, exitedIdx, "%s started must precede %s exited", name, name) } + + // The contract of a service/detached step in stage 1: its exit trace arrives + // AFTER stage 2's steps have already been traced. That's the whole point of + // detaching — it must not block the next stage. + dbExitIdx := indexOfTrace(traces, func(s state.State) bool { + return s.CurrStep != nil && s.CurrStep.Name == "db" && s.CurrStepState.Exited + }) + testExitIdx := indexOfTrace(traces, func(s state.State) bool { + return s.CurrStep != nil && s.CurrStep.Name == "test" && s.CurrStepState.Exited + }) + assert.Greater(t, dbExitIdx, testExitIdx, + "db (service) must complete after test (next stage) — otherwise it wasn't really detached") + + // Runtime-injected env vars should be present on the test step's exit trace. + testExit := findLastTraceByName(traces, "test") + require.NotNil(t, testExit) + assert.NotEmpty(t, testExit.CurrStep.Environment["CI_PIPELINE_STARTED"]) + assert.NotEmpty(t, testExit.CurrStep.Environment["CI_STEP_STARTED"]) + assert.Greater(t, testExit.Workflow.Started, int64(0)) + + // Strip runtime-injected env for a structural comparison of the step itself. + delete(testExit.CurrStep.Environment, "CI_PIPELINE_STARTED") + delete(testExit.CurrStep.Environment, "CI_STEP_STARTED") + assert.EqualValues(t, state.State{ + Workflow: state.Workflow{Started: testExit.Workflow.Started}, + CurrStep: &backend_types.Step{ + Name: "test", + UUID: "test-uuid", + Type: "commands", + OnSuccess: true, + Environment: map[string]string{}, + Commands: []string{"echo test"}, + }, + CurrStepState: backend_types.State{ + Started: testExit.CurrStepState.Started, + Exited: true, + }, + }, *testExit) } func TestWorkflowDetachedStepDoesNotBlockWorkflow(t *testing.T) { @@ -413,7 +451,6 @@ func TestWorkflowPluginStep(t *testing.T) { lastPluginTrace := findLastTraceByName(getTracerStates(tracer), "publish") if assert.NotNil(t, lastPluginTrace) { - delete(lastPluginTrace.CurrStep.Environment, "CI_PIPELINE_STARTED") delete(lastPluginTrace.CurrStep.Environment, "CI_STEP_STARTED") diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go index ae5726b276e..48244031413 100644 --- a/pipeline/runtime/step.go +++ b/pipeline/runtime/step.go @@ -31,9 +31,6 @@ import ( // It checks whether the step should be skipped, emits a "started" trace, // sets up drone-compat env vars, then hands off to blocking or detached execution. func (r *Runtime) executeStep(runnerCtx context.Context, step *backend_types.Step) error { - r.uploadWait.Add(1) - defer r.uploadWait.Done() - logger := r.makeLogger() logger.Debug().Str("step", step.Name).Msg("prepare") @@ -202,18 +199,15 @@ func (r *Runtime) runBlockingStep(runnerCtx context.Context, step *backend_types // Any error that occurs after setup is logged but not propagated — it cannot // influence the pipeline outcome at that point. func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types.Step) error { - r.uploadWait.Add(1) - waitForLogs, startTime, err := r.startStep(step) if err != nil { - defer r.uploadWait.Done() - // Setup failed before the container was running — treat it like a // blocking failure so the pipeline is aware. return r.traceStep(nil, err, step) } // Container is up and logging is streaming — hand off to background. + r.uploadWait.Add(1) go func() { defer r.uploadWait.Done() From acb5a1fee696c414b7240d1fd5950939167455d6 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 20 Apr 2026 02:18:13 +0200 Subject: [PATCH 08/10] add tests from https://github.com/woodpecker-ci/woodpecker/pull/6263 --- pipeline/runtime/runtime_test.go | 99 ++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/pipeline/runtime/runtime_test.go b/pipeline/runtime/runtime_test.go index f0b3166a33e..ffe4ba2e5d8 100644 --- a/pipeline/runtime/runtime_test.go +++ b/pipeline/runtime/runtime_test.go @@ -1268,3 +1268,102 @@ func TestWorkflowCancelDuringStepSleep(t *testing.T) { assert.Nil(t, findFirstTraceByName(getTracerStates(tracer), "never-reached"), "never-reached must not have been traced") } + +// TestWorkflowFailingServiceDoesNotFailWorkflow pins down the intentional design: +// a service/detached step that fails in the background has its failure logged +// and traced, but it must NOT propagate to the workflow error. Subsequent +// stages must still run, and Run() must return nil. +// +// This is the explicit contract in runDetachedStep: +// "Any error that occurs after setup is logged but not propagated — it cannot +// +// influence the pipeline outcome at that point." +func TestWorkflowFailingServiceDoesNotFailWorkflow(t *testing.T) { + t.Parallel() + tracer := newTestTracer(t) + r := New( + &backend_types.Config{ + Stages: []*backend_types.Stage{ + {Steps: []*backend_types.Step{ + // Service runs ~100ms (from withService), then exits non-zero. + cmdStep("db", withService(), withExitCode(1)), + cmdStep("build"), + }}, + {Steps: []*backend_types.Step{cmdStep("deploy")}}, + }, + }, + dummy.New(), + WithTracer(tracer), + WithLogger(newTestLogger(t)), + ) + + // Contract 1: workflow succeeds even though the service failed. + assert.NoError(t, r.Run(t.Context()), + "service failure must not fail the workflow (detached errors are not propagated)") + + traces := getTracerStates(tracer) + + // Contract 2: the service's failure IS visible in traces. This is the + // observability guarantee — the failure is logged and recorded even though + // it doesn't kill the workflow. + dbExit := findLastTraceByName(traces, "db") + require.NotNil(t, dbExit, "db must have an exit trace") + assert.True(t, dbExit.CurrStepState.Exited, "db should be marked exited") + assert.Equal(t, 1, dbExit.CurrStepState.ExitCode, "db exit code must be preserved in trace") + + // Contract 3: deploy must run normally — NOT skipped — because the service + // failure didn't set r.err. + deployExit := findLastTraceByName(traces, "deploy") + require.NotNil(t, deployExit, "deploy must be traced") + assert.False(t, deployExit.CurrStepState.Skipped, "deploy must run when only a service failed") + assert.True(t, deployExit.CurrStepState.Exited, "deploy should complete normally") + assert.Equal(t, 0, deployExit.CurrStepState.ExitCode) + + // Contract 4: uploadWait at the end of Run() guarantees the detached trace + // has been emitted BEFORE Run() returns. This is non-timing-dependent: + // if Run() returned, the exit trace for every detached step must exist. + // This is what the uploadWait plumbing in this PR is actually for. + assert.NotNil(t, findLastTraceByName(traces, "db"), + "detached step exit trace must be emitted before Run() returns (uploadWait contract)") +} + +// TestWorkflowFailingDetachedStepDoesNotFailWorkflow is the non-service +// counterpart: Detached=true, Type=commands (a background worker). Same +// contract — failures don't propagate. +func TestWorkflowFailingDetachedStepDoesNotFailWorkflow(t *testing.T) { + t.Parallel() + tracer := newTestTracer(t) + r := New( + &backend_types.Config{ + Stages: []*backend_types.Stage{ + {Steps: []*backend_types.Step{ + // Detached (non-service) worker, ~100ms (from withDetached), exits code 2. + cmdStep("background-worker", withDetached(), withExitCode(2)), + cmdStep("main-build"), + }}, + {Steps: []*backend_types.Step{cmdStep("deploy")}}, + }, + }, + dummy.New(), + WithTracer(tracer), + WithLogger(newTestLogger(t)), + ) + + assert.NoError(t, r.Run(t.Context()), + "detached worker failure must not fail the workflow") + + traces := getTracerStates(tracer) + + workerExit := findLastTraceByName(traces, "background-worker") + require.NotNil(t, workerExit, "background-worker must have an exit trace") + assert.True(t, workerExit.CurrStepState.Exited) + assert.Equal(t, 2, workerExit.CurrStepState.ExitCode, + "exit code from detached step must be preserved in trace") + + deployExit := findLastTraceByName(traces, "deploy") + require.NotNil(t, deployExit, "deploy must be traced") + assert.False(t, deployExit.CurrStepState.Skipped, + "deploy must run when only a detached worker failed") + assert.True(t, deployExit.CurrStepState.Exited) + assert.Equal(t, 0, deployExit.CurrStepState.ExitCode) +} From 60f0ba9e006f0402ed526c5c97df00e4efc6a6b2 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 23 Apr 2026 20:33:45 +0200 Subject: [PATCH 09/10] cli does not need tracer to update environment anymore ... thats in runtime --- cli/exec/exec.go | 2 +- pipeline/tracing/tracer.go | 16 ++-------------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/cli/exec/exec.go b/cli/exec/exec.go index fc5dfe7f03e..9da07e62ba6 100644 --- a/cli/exec/exec.go +++ b/cli/exec/exec.go @@ -324,7 +324,7 @@ func execWithAxis(ctx context.Context, c *cli.Command, file, repoPath string, ax return pipeline_runtime.New(compiled, backendEngine, pipeline_runtime.WithContext(pipelineCtx), //nolint:contextcheck - pipeline_runtime.WithTracer(tracing.DefaultTracer), + pipeline_runtime.WithTracer(tracing.NoOpTracer), pipeline_runtime.WithLogger(defaultLogger), pipeline_runtime.WithDescription(map[string]string{ "CLI": "exec", diff --git a/pipeline/tracing/tracer.go b/pipeline/tracing/tracer.go index 800cb9e3c65..083a808a2b3 100644 --- a/pipeline/tracing/tracer.go +++ b/pipeline/tracing/tracer.go @@ -34,19 +34,7 @@ func (f TraceFunc) Trace(state *state.State) error { return f(state) } -// DefaultTracer provides a tracer that updates the CI_ environment -// variables to include the correct timestamp and status. -// TODO: find either a new home or better name for this. -var DefaultTracer = TraceFunc(func(state *state.State) error { - if state.CurrStepState.Exited { - return nil - } - if state.CurrStep.Environment == nil { - return nil - } - state.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10) - - state.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10) - +// NoOpTracer provides a tracer that does nothing. +var NoOpTracer = TraceFunc(func(state *state.State) error { return nil }) From 3e944d40e0cb1662e1becfb223ca4eead6df860a Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 23 Apr 2026 21:01:16 +0200 Subject: [PATCH 10/10] fix import --- pipeline/tracing/tracer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipeline/tracing/tracer.go b/pipeline/tracing/tracer.go index 083a808a2b3..1f1233af7ee 100644 --- a/pipeline/tracing/tracer.go +++ b/pipeline/tracing/tracer.go @@ -15,8 +15,6 @@ package tracing import ( - "strconv" - "go.woodpecker-ci.org/woodpecker/v3/pipeline/state" )