Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pipeline/runtime/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ func WithTracer(tracer tracing.Tracer) Option {
}
}

// WithContext sets the workflow execution context.
// WithContext sets the workflow execution context. The provided context is
// wrapped in a cancelable child so that Run() can always cancel it at the end
// of the stage loop to unblock any still-running detached steps / services.
func WithContext(ctx context.Context) Option {
return func(r *Runtime) {
r.ctx = ctx
r.ctx, r.cancelCtx = context.WithCancel(ctx)
}
}

Expand Down
13 changes: 12 additions & 1 deletion pipeline/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package runtime

import (
"context"
"sync"

"github.com/oklog/ulid/v2"
"github.com/rs/zerolog"
Expand All @@ -42,6 +43,16 @@ type Runtime struct {
// Cleanup operations should use the runnerCtx passed to Run().
ctx context.Context

// cancelCtx cancels ctx. Called by Run() after the stage loop completes
// so that any still-running detached steps / services are told to stop,
// unblocking detachedWg.Wait().
cancelCtx context.CancelFunc

// detachedWg tracks all background goroutines launched by runDetachedStep.
// Run cancels ctx first, then waits here, so every goroutine unblocks
// promptly and r.err is fully populated before Run returns.
detachedWg sync.WaitGroup

tracer tracing.Tracer
logger logging.Logger

Expand All @@ -56,7 +67,7 @@ func New(spec *backend_types.Config, backend backend_types.Backend, opts ...Opti
r.description = map[string]string{}
r.spec = spec
r.engine = backend
r.ctx = context.Background()
r.ctx, r.cancelCtx = context.WithCancel(context.Background())
r.taskUUID = ulid.Make().String()
for _, opt := range opts {
opt(r)
Expand Down
107 changes: 98 additions & 9 deletions pipeline/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -97,6 +98,12 @@ func withOOM() func(*backend_types.Step) {
}
}

func withSleep(t time.Duration) func(*backend_types.Step) {
return func(s *backend_types.Step) {
s.Environment[dummy.EnvKeyStepSleep] = t.String()
}
}

func withStartFail() func(*backend_types.Step) {
return func(s *backend_types.Step) {
s.Environment[dummy.EnvKeyStepStartFail] = "true"
Expand Down Expand Up @@ -194,32 +201,35 @@ func TestWorkflowWithServiceStep(t *testing.T) {

assert.NoError(t, r.Run(t.Context()))
traces := getTracerStates(tracer)
if assert.Len(t, traces, 5) {
if assert.Len(t, traces, 6) {
assert.EqualValues(t, backend_types.State{}, traces[0].CurrStepState)
assert.Greater(t, traces[2].CurrStepState.Started, int64(0))
assert.EqualValues(t, backend_types.State{Started: traces[2].CurrStepState.Started, Exited: true}, traces[2].CurrStepState)
assert.EqualValues(t, backend_types.State{}, traces[3].CurrStepState)
assert.Greater(t, traces[4].CurrStepState.Started, int64(0))
assert.EqualValues(t, backend_types.State{Started: traces[4].CurrStepState.Started, Exited: true}, traces[4].CurrStepState)
assert.Greater(t, calls[5].CurrStepState.Started, int64(0))
assert.EqualValues(t, backend.State{Started: calls[5].CurrStepState.Started, Exited: true}, calls[5].CurrStepState)

assert.Greater(t, traces[4].Workflow.Started, int64(0))
assert.Greater(t, traces[5].Workflow.Started, int64(0))
assert.EqualValues(t, state.State{
Workflow: state.Workflow{
Started: traces[4].Workflow.Started,
},
CurrStep: &backend_types.Step{
Name: "test",
UUID: "test-uuid",
Type: "commands",
Name: "db",
UUID: "db-uuid",
Type: "service",
Detached: true,
OnSuccess: true,
Environment: map[string]string{},
Commands: []string{"echo test"},
Environment: map[string]string{"SLEEP": "100ms"},
Commands: []string{"echo db"},
},
CurrStepState: backend_types.State{
Started: traces[4].CurrStepState.Started,
Started: traces[5].CurrStepState.Started,
Exited: true,
},
}, traces[4])
}, traces[5])
}
}

Expand Down Expand Up @@ -668,3 +678,82 @@ func TestWorkflowEmptyStages(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, getTracerStates(tracer))
}

func TestWorkflowFailingServiceMarksWorkflowFailed(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{
{Steps: []*backend_types.Step{
// Service starts, runs for ~100ms, then exits non-zero.
// withService sets Detached=true; the exit-code env var
// causes the dummy backend to return code 1 when the
// step finishes.
cmdStep("db", withService(), withExitCode(1)),
// workflow is over before service exits on its own
cmdStep("build", withSleep(150*time.Millisecond)),
}},
{Steps: []*backend_types.Step{cmdStep("deploy")}},
},
},
dummy.New(),
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)

err := r.Run(t.Context())

if !assert.Error(t, err, "workflow should fail when a service step exits non-zero") {
t.FailNow()
}

var exitErr *pipeline_errors.ExitError
if assert.ErrorAs(t, err, &exitErr, "error should be an ExitError when service step fails") {
assert.Equal(t, 1, exitErr.Code)
}

// deploy must be skipped, not silently dropped
deployTrace := findTraceByName(getTracerStates(tracer), "deploy")
require.NotNil(t, deployTrace, "deploy step should still be traced")
assert.True(t, deployTrace.CurrStepState.Skipped, "deploy should be skipped after service failure")
}

func TestWorkflowFailingDetachedStepMarksWorkflowFailed(t *testing.T) {
t.Parallel()
tracer := newTestTracer(t)
r := New(
&backend_types.Config{
Stages: []*backend_types.Stage{
{Steps: []*backend_types.Step{
// Detached (non-service) step that exits with code 2.
// withDetached sets Detached=true without changing the
// step type to StepTypeService, so it represents a
// background worker rather than a long-running service.
cmdStep("background-worker", withDetached(), withExitCode(2)),
// detached fails befor workflow is over
cmdStep("main-build", withSleep(200*time.Millisecond)),
}},
{Steps: []*backend_types.Step{cmdStep("deploy")}},
},
},
dummy.New(),
WithTracer(tracer),
WithLogger(newTestLogger(t)),
)

err := r.Run(t.Context())

if !assert.Error(t, err, "workflow should fail when a detached step exits non-zero") {
t.FailNow()
}

var exitErr *pipeline_errors.ExitError
if assert.ErrorAs(t, err, &exitErr, "error should be an ExitError when detached step fails") {
assert.Equal(t, 2, exitErr.Code)
}

deployTrace := findTraceByName(getTracerStates(tracer), "deploy")
require.NotNil(t, deployTrace, "deploy step should still be traced")
assert.True(t, deployTrace.CurrStepState.Skipped, "deploy should be skipped after detached step failure")
}
25 changes: 20 additions & 5 deletions pipeline/runtime/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,12 @@ func (r *Runtime) runBlockingStep(runnerCtx context.Context, step *backend_types
// runDetachedStep starts the step and returns as soon as the container is running
// and log streaming is set up. The rest of the step lifecycle runs in the background.
//
// Any error that occurs after setup is logged but not propagated — it cannot
// influence the pipeline outcome at that point.
// The goroutine is tracked by r.detachedWg so that Run() can wait for it after
// canceling r.ctx. This means:
// - services/detached steps that are still running when stages finish are stopped
// promptly via context cancellation and do not block Run() indefinitely;
// - a service that exits non-zero before Run() cancels r.ctx propagates the
// failure into r.err so Run() returns the correct error.
func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types.Step) error {
waitForLogs, startTime, err := r.startStep(step)
if err != nil {
Expand All @@ -205,18 +209,29 @@ func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend_types
return r.traceStep(nil, err, step)
}

// Container is up and logging is streaming — hand off to background.
// Register the goroutine with the WaitGroup before spawning it so there
// is no window between launch and Run()'s detachedWg.Wait() where the
// goroutine could be missed.
r.detachedWg.Add(1)

go func() {
defer r.detachedWg.Done()

logger := r.makeLogger()

processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime)
logger.Debug().Str("step", step.Name).Msg("complete")

if errors.Is(err, context.Canceled) {
err = pipeline_errors.ErrCancel
// A context.Canceled / ErrCancel result here means Run() cancelled r.ctx
// to stop the service at end-of-pipeline — that is the normal teardown
// path, not a failure. Only propagate genuinely unexpected errors.
if errors.Is(err, context.Canceled) || errors.Is(err, pipeline_errors.ErrCancel) {
err = nil
}

if err != nil {
logger.Error().Err(err).Str("step", step.Name).Msg("detached step failed after while running")
r.err.Set(err)
}

if traceErr := r.traceStep(processState, err, step); traceErr != nil {
Expand Down
10 changes: 10 additions & 0 deletions pipeline/runtime/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
}
}

// Cancel r.ctx to tell any still-running detached steps / services to stop.
// Without this, detachedWg.Wait() would block indefinitely for services that
// are designed to outlive the pipeline stages.
r.cancelCtx()

// Wait for every detached goroutine to finish. Each one will call r.err.Set
// if it exits with an error, so waiting here ensures the final return value
// reflects all outcomes — including a service that fails after the last stage.
r.detachedWg.Wait()

return r.err.Get()
}

Expand Down