Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package agent

import (
"io"
"sync"

"github.com/rs/zerolog"

Expand All @@ -28,17 +27,14 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/rpc"
)

func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) logging.Logger {
func (r *Runner) createLogger(_logger zerolog.Logger, workflow *rpc.Workflow) logging.Logger {
return func(step *backend_types.Step, rc io.ReadCloser) error {
defer rc.Close()

logger := _logger.With().
Str("image", step.Image).
Logger()

uploads.Add(1)
defer uploads.Done()

var secrets []string
for _, secret := range workflow.Config.Secrets {
secrets = append(secrets, secret.Value)
Expand Down
20 changes: 11 additions & 9 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"runtime"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -159,16 +159,23 @@ func (r *Runner) Run(runnerCtx context.Context) error {
return err
}

var uploads sync.WaitGroup
// Enrich workflow env with agent info
// TODO: find better way to track this state
Comment thread
6543 marked this conversation as resolved.
for _, stage := range workflow.Config.Stages {
for _, step := range stage.Steps {
step.Environment["CI_MACHINE"] = r.hostname
step.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH
}
}

// Run pipeline
err = pipeline_runtime.New(
workflow.Config,
r.backend,
pipeline_runtime.WithContext(workflowCtx),
pipeline_runtime.WithTaskUUID(fmt.Sprint(workflow.ID)),
pipeline_runtime.WithLogger(r.createLogger(logger, &uploads, workflow)),
pipeline_runtime.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)),
pipeline_runtime.WithLogger(r.createLogger(logger, workflow)),
pipeline_runtime.WithTracer(r.createTracer(ctxMeta, logger, workflow)),
pipeline_runtime.WithDescription(map[string]string{
"workflow_id": workflow.ID,
"repo": repoName,
Expand All @@ -192,11 +199,6 @@ func (r *Runner) Run(runnerCtx context.Context) error {
Bool("canceled", state.Canceled).
Msg("workflow finished")

// Ensure all logs/traces are uploaded before finishing
logger.Debug().Msg("waiting for logs and traces upload")
uploads.Wait()
logger.Debug().Msg("logs and traces uploaded")

// Update workflow state
doneCtx := runnerCtx //nolint:contextcheck
if doneCtx.Err() != nil {
Expand Down
38 changes: 4 additions & 34 deletions agent/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package agent
import (
"context"
"errors"
"runtime"
"strconv"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -30,11 +27,8 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/rpc"
)

func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) tracing.TraceFunc {
func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) tracing.TraceFunc {
return func(state *state.State) error {
uploads.Add(1)
defer uploads.Done()

stepLogger := logger.With().
Str("image", state.CurrStep.Image).
Str("workflow_id", workflow.ID).
Expand All @@ -58,33 +52,9 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup,
stepState.Finished = time.Now().Unix()
}

defer func() {
stepLogger.Debug().Msg("update step status")

if err := r.client.Update(ctxMeta, workflow.ID, stepState); err != nil {
stepLogger.Debug().
Err(err).
Msg("update step status error")
}

stepLogger.Debug().Msg("update step status complete")
}()
if state.CurrStepState.Exited {
return nil
}
if state.CurrStep.Environment == nil {
state.CurrStep.Environment = map[string]string{}
}

// TODO: find better way to update this state and move it to pipeline to have the same env in cli-exec
state.CurrStep.Environment["CI_MACHINE"] = r.hostname

state.CurrStep.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10)

state.CurrStep.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Workflow.Started, 10)

state.CurrStep.Environment["CI_SYSTEM_PLATFORM"] = runtime.GOOS + "/" + runtime.GOARCH
stepLogger.Debug().Msg("update step status")
defer stepLogger.Debug().Msg("update step status complete")

return nil
return r.client.Update(ctxMeta, workflow.ID, stepState)
}
}
2 changes: 1 addition & 1 deletion cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func execWithAxis(ctx context.Context, c *cli.Command, file, repoPath string, ax

return pipeline_runtime.New(compiled, backendEngine,
pipeline_runtime.WithContext(pipelineCtx), //nolint:contextcheck
pipeline_runtime.WithTracer(tracing.DefaultTracer),
pipeline_runtime.WithTracer(tracing.NoOpTracer),
pipeline_runtime.WithLogger(defaultLogger),
pipeline_runtime.WithDescription(map[string]string{
"CLI": "exec",
Expand Down
10 changes: 10 additions & 0 deletions pipeline/runtime/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,13 @@ func getTracerStates(tracer *tracer_mocks.MockTracer) []state.State {
}
return states
}

// indexOfTrace returns the first index where predicate matches, or -1.
func indexOfTrace(traces []state.State, match func(s state.State) bool) int {
for i := range traces {
if match(traces[i]) {
return i
}
}
return -1
}
2 changes: 2 additions & 0 deletions pipeline/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Runtime struct {
tracerLock sync.Mutex
logger logging.Logger

uploadWait sync.WaitGroup

taskUUID string
description map[string]string
}
Expand Down
198 changes: 171 additions & 27 deletions pipeline/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,35 +193,77 @@ func TestWorkflowWithServiceStep(t *testing.T) {
WithLogger(newTestLogger(t)),
)

assert.NoError(t, r.Run(t.Context()))
require.NoError(t, r.Run(t.Context()))
traces := getTracerStates(tracer)
if assert.Len(t, traces, 5) {
assert.EqualValues(t, backend_types.State{}, traces[0].CurrStepState)
assert.Greater(t, traces[2].CurrStepState.Started, int64(0))
assert.EqualValues(t, backend_types.State{Started: traces[2].CurrStepState.Started, Exited: true}, traces[2].CurrStepState)
assert.EqualValues(t, backend_types.State{}, traces[3].CurrStepState)
assert.Greater(t, traces[4].CurrStepState.Started, int64(0))
assert.EqualValues(t, backend_types.State{Started: traces[4].CurrStepState.Started, Exited: true}, traces[4].CurrStepState)

assert.Greater(t, traces[4].Workflow.Started, int64(0))
assert.EqualValues(t, state.State{
Workflow: state.Workflow{
Started: traces[4].Workflow.Started,
},
CurrStep: &backend_types.Step{
Name: "test",
UUID: "test-uuid",
Type: "commands",
OnSuccess: true,
Environment: map[string]string{},
Commands: []string{"echo test"},
},
CurrStepState: backend_types.State{
Started: traces[4].CurrStepState.Started,
Exited: true,
},
}, traces[4])

// Each step should emit exactly one "started" and one "exited" trace:
// db (service/detached), build, test — 3 * 2 = 6 traces total.
require.Len(t, traces, 6)

// Per-step invariants: started trace is the zero state, exited trace is
// Exited=true with a monotonic Started timestamp.
for _, name := range []string{"db", "build", "test"} {
started := findFirstTraceByName(traces, name)
require.NotNil(t, started, "%s should have a started trace", name)
assert.EqualValues(t, backend_types.State{}, started.CurrStepState,
"%s started trace should be zero-valued", name)

last := findLastTraceByName(traces, name)
require.NotNil(t, last, "%s should have an exited trace", name)
assert.True(t, last.CurrStepState.Exited, "%s should be exited", name)
assert.Equal(t, 0, last.CurrStepState.ExitCode, "%s should exit 0", name)
assert.Greater(t, last.CurrStepState.Started, int64(0),
"%s should have a non-zero Started timestamp", name)
}

// Per-step ordering: started trace precedes exited trace for the same step.
for _, name := range []string{"db", "build", "test"} {
startedIdx := indexOfTrace(traces, func(s state.State) bool {
return s.CurrStep != nil && s.CurrStep.Name == name && !s.CurrStepState.Exited
})
exitedIdx := indexOfTrace(traces, func(s state.State) bool {
return s.CurrStep != nil && s.CurrStep.Name == name && s.CurrStepState.Exited
})
assert.Less(t, startedIdx, exitedIdx, "%s started must precede %s exited", name, name)
}

// The contract of a service/detached step in stage 1: its exit trace arrives
// AFTER stage 2's steps have already been traced. That's the whole point of
// detaching — it must not block the next stage.
dbExitIdx := indexOfTrace(traces, func(s state.State) bool {
return s.CurrStep != nil && s.CurrStep.Name == "db" && s.CurrStepState.Exited
})
testExitIdx := indexOfTrace(traces, func(s state.State) bool {
return s.CurrStep != nil && s.CurrStep.Name == "test" && s.CurrStepState.Exited
})
assert.Greater(t, dbExitIdx, testExitIdx,
"db (service) must complete after test (next stage) — otherwise it wasn't really detached")

// Runtime-injected env vars should be present on the test step's exit trace.
testExit := findLastTraceByName(traces, "test")
require.NotNil(t, testExit)
assert.NotEmpty(t, testExit.CurrStep.Environment["CI_PIPELINE_STARTED"])
assert.NotEmpty(t, testExit.CurrStep.Environment["CI_STEP_STARTED"])
assert.Greater(t, testExit.Workflow.Started, int64(0))

// Strip runtime-injected env for a structural comparison of the step itself.
delete(testExit.CurrStep.Environment, "CI_PIPELINE_STARTED")
delete(testExit.CurrStep.Environment, "CI_STEP_STARTED")
assert.EqualValues(t, state.State{
Workflow: state.Workflow{Started: testExit.Workflow.Started},
CurrStep: &backend_types.Step{
Name: "test",
UUID: "test-uuid",
Type: "commands",
OnSuccess: true,
Environment: map[string]string{},
Commands: []string{"echo test"},
},
CurrStepState: backend_types.State{
Started: testExit.CurrStepState.Started,
Exited: true,
},
}, *testExit)
}

func TestWorkflowDetachedStepDoesNotBlockWorkflow(t *testing.T) {
Expand Down Expand Up @@ -409,6 +451,9 @@ func TestWorkflowPluginStep(t *testing.T) {

lastPluginTrace := findLastTraceByName(getTracerStates(tracer), "publish")
if assert.NotNil(t, lastPluginTrace) {
delete(lastPluginTrace.CurrStep.Environment, "CI_PIPELINE_STARTED")
delete(lastPluginTrace.CurrStep.Environment, "CI_STEP_STARTED")

assert.EqualValues(t, map[string]string{
"DRONE_BUILD_STATUS": "success",
"DRONE_REPO_SCM": "git",
Expand Down Expand Up @@ -1223,3 +1268,102 @@ func TestWorkflowCancelDuringStepSleep(t *testing.T) {
assert.Nil(t, findFirstTraceByName(getTracerStates(tracer), "never-reached"),
"never-reached must not have been traced")
}

// TestWorkflowFailingServiceDoesNotFailWorkflow pins down the intentional design:
// a service/detached step that fails in the background has its failure logged
// and traced, but it must NOT propagate to the workflow error. Subsequent
// stages must still run, and Run() must return nil.
//
// This is the explicit contract in runDetachedStep:
// "Any error that occurs after setup is logged but not propagated — it cannot
//
// influence the pipeline outcome at that point."
func TestWorkflowFailingServiceDoesNotFailWorkflow(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{
{Steps: []*backend_types.Step{
// Service runs ~100ms (from withService), then exits non-zero.
cmdStep("db", withService(), withExitCode(1)),
cmdStep("build"),
}},
{Steps: []*backend_types.Step{cmdStep("deploy")}},
},
},
dummy.New(),
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)

// Contract 1: workflow succeeds even though the service failed.
assert.NoError(t, r.Run(t.Context()),
"service failure must not fail the workflow (detached errors are not propagated)")

traces := getTracerStates(tracer)

// Contract 2: the service's failure IS visible in traces. This is the
// observability guarantee — the failure is logged and recorded even though
// it doesn't kill the workflow.
dbExit := findLastTraceByName(traces, "db")
require.NotNil(t, dbExit, "db must have an exit trace")
assert.True(t, dbExit.CurrStepState.Exited, "db should be marked exited")
assert.Equal(t, 1, dbExit.CurrStepState.ExitCode, "db exit code must be preserved in trace")

// Contract 3: deploy must run normally — NOT skipped — because the service
// failure didn't set r.err.
deployExit := findLastTraceByName(traces, "deploy")
require.NotNil(t, deployExit, "deploy must be traced")
assert.False(t, deployExit.CurrStepState.Skipped, "deploy must run when only a service failed")
assert.True(t, deployExit.CurrStepState.Exited, "deploy should complete normally")
assert.Equal(t, 0, deployExit.CurrStepState.ExitCode)

// Contract 4: uploadWait at the end of Run() guarantees the detached trace
// has been emitted BEFORE Run() returns. This is non-timing-dependent:
// if Run() returned, the exit trace for every detached step must exist.
// This is what the uploadWait plumbing in this PR is actually for.
assert.NotNil(t, findLastTraceByName(traces, "db"),
"detached step exit trace must be emitted before Run() returns (uploadWait contract)")
}

// TestWorkflowFailingDetachedStepDoesNotFailWorkflow is the non-service
// counterpart: Detached=true, Type=commands (a background worker). Same
// contract — failures don't propagate.
func TestWorkflowFailingDetachedStepDoesNotFailWorkflow(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{
{Steps: []*backend_types.Step{
// Detached (non-service) worker, ~100ms (from withDetached), exits code 2.
cmdStep("background-worker", withDetached(), withExitCode(2)),
cmdStep("main-build"),
}},
{Steps: []*backend_types.Step{cmdStep("deploy")}},
},
},
dummy.New(),
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)

assert.NoError(t, r.Run(t.Context()),
"detached worker failure must not fail the workflow")

traces := getTracerStates(tracer)

workerExit := findLastTraceByName(traces, "background-worker")
require.NotNil(t, workerExit, "background-worker must have an exit trace")
assert.True(t, workerExit.CurrStepState.Exited)
assert.Equal(t, 2, workerExit.CurrStepState.ExitCode,
"exit code from detached step must be preserved in trace")

deployExit := findLastTraceByName(traces, "deploy")
require.NotNil(t, deployExit, "deploy must be traced")
assert.False(t, deployExit.CurrStepState.Skipped,
"deploy must run when only a detached worker failed")
assert.True(t, deployExit.CurrStepState.Exited)
assert.Equal(t, 0, deployExit.CurrStepState.ExitCode)
}
Loading