From 083c0a1b4ae8db55cb22180aea4750ca89413e05 Mon Sep 17 00:00:00 2001 From: "m.huber" Date: Wed, 28 Jan 2026 23:46:18 +0100 Subject: [PATCH 1/9] first itteration: detached steps / services report back --- pipeline/pipeline.go | 60 ++++++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 93e8f9467a4..60a733964ff 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -233,23 +233,43 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch Str("step", step.Name). Msg("executing") - processState, err := r.exec(runnerCtx, step) + // setup exec func in a way it can be detached if needed + // wg will signal once + execAndTrace := func(wg *sync.WaitGroup) error { + processState, err := r.exec(runnerCtx, step, wg) - logger.Debug(). - Str("step", step.Name). - Msg("complete") + logger.Debug(). + Str("step", step.Name). + Msg("complete") - // normalize context cancel error - if errors.Is(err, context.Canceled) { - err = ErrCancel + // normalize context cancel error + if errors.Is(err, context.Canceled) { + err = ErrCancel + } + + // Return the error after tracing it. + err = r.traceStep(processState, err, step) + if err != nil && step.Failure == metadata.FailureIgnore { + return nil + } + return err } - // Return the error after tracing it. - err = r.traceStep(processState, err, step) - if err != nil && step.Failure == metadata.FailureIgnore { - return nil + // we report all errors till setup happened + // afterwards they just ged dropped + if step.Detached { + var wg sync.WaitGroup + wg.Add(1) + var setupErr error + go func() { + setupErr = execAndTrace(&wg) + }() + wg.Wait() + return setupErr } - return err + + // run blocking + return execAndTrace(nil) }) } @@ -262,7 +282,13 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch } // Executes the step and returns the state and error. -func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend.State, error) { +func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *sync.WaitGroup) (*backend.State, error) { + defer func() { + if setupWg != nil { + setupWg.Done() + } + }() + if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck return nil, err } @@ -287,9 +313,11 @@ func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend. }() } - // nothing else to do, this is a detached process. - if step.Detached { - return nil, nil + // nothing else to block for detached process. + if setupWg != nil { + setupWg.Done() + // set to nil so the setupWg.Done in defer does not call it a second time + setupWg = nil } // We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream) From 3df799a514f60f42237d7c1c886df5f8fe4ae13f Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 30 Jan 2026 16:28:13 +0100 Subject: [PATCH 2/9] backend docker: first softKill containers still running --- pipeline/backend/docker/docker.go | 16 +++++++++++++++- pipeline/backend/dummy/dummy.go | 2 +- pipeline/backend/local/local.go | 3 +++ pipeline/backend/types/backend.go | 1 + 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pipeline/backend/docker/docker.go b/pipeline/backend/docker/docker.go index 8a48fdd72e6..2704e863632 100644 --- a/pipeline/backend/docker/docker.go +++ b/pipeline/backend/docker/docker.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/containerd/errdefs" "github.com/docker/docker/api/types/container" @@ -41,6 +42,8 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/shared/utils" ) +var containerKillTimeout = time.Second + type docker struct { client client.APIClient info system.Info @@ -305,6 +308,12 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s containerName := toContainerName(step) + softKillCtx, _ := context.WithTimeout(ctx, containerKillTimeout) //nolint:govet + if err := e.client.ContainerKill(softKillCtx, containerName, ""); err != nil && !isErrContainerNotFoundOrNotRunning(err) { + return err + } + + // if soft kill did now work just force kill it if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) { return err } @@ -349,8 +358,13 @@ func isErrContainerNotFoundOrNotRunning(err error) bool { // Error response from daemon: Cannot kill container: ...: No such container: ... // Error response from daemon: Cannot kill container: ...: Container ... is not running" // Error response from podman daemon: can only kill running containers. ... is in state exited + // Error response from daemon: removal of container ... is already in progress // Error: No such container: ... - return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) + return err != nil && + (strings.Contains(err.Error(), "No such container") || + strings.Contains(err.Error(), "is not running") || + strings.Contains(err.Error(), "can only kill running containers") || + (strings.Contains(err.Error(), "removal of container") && strings.Contains(err.Error(), "is already in progress"))) } // normalizeArchType converts the arch type reported by docker info into diff --git a/pipeline/backend/dummy/dummy.go b/pipeline/backend/dummy/dummy.go index 650b63dcb18..69eb8d5aa59 100644 --- a/pipeline/backend/dummy/dummy.go +++ b/pipeline/backend/dummy/dummy.go @@ -200,7 +200,7 @@ func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID stri _, exist := e.kv.Load("task_" + taskUUID) if !exist { - return fmt.Errorf("expect env of workflow %s to exist but found none to destroy", taskUUID) + return nil } // check state diff --git a/pipeline/backend/local/local.go b/pipeline/backend/local/local.go index 2529698199f..f6c13a2ef11 100644 --- a/pipeline/backend/local/local.go +++ b/pipeline/backend/local/local.go @@ -231,6 +231,9 @@ func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) ( func (e *local) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error { state, err := e.getStepState(taskUUID, step.UUID) if err != nil { + if errors.Is(err, ErrStepStateNotFound) { + return nil + } return err } diff --git a/pipeline/backend/types/backend.go b/pipeline/backend/types/backend.go index 229174c1dc9..0877c422a12 100644 --- a/pipeline/backend/types/backend.go +++ b/pipeline/backend/types/backend.go @@ -142,6 +142,7 @@ type Backend interface { // - Clean up step-specific resources (containers, processes) // - Close any open log streams // - Not affect other steps in the same or other workflows + // - Must not fail if already invoked once // // Must be safe to call even if StartStep failed or the step was never started. // This function must be thread-safe for concurrent calls. From bd851e2c1b289231649a044c442c7f83ceb8761f Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 30 Jan 2026 16:32:23 +0100 Subject: [PATCH 3/9] use stop --- pipeline/backend/docker/docker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipeline/backend/docker/docker.go b/pipeline/backend/docker/docker.go index 2704e863632..1b671f57c6b 100644 --- a/pipeline/backend/docker/docker.go +++ b/pipeline/backend/docker/docker.go @@ -21,7 +21,6 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/containerd/errdefs" "github.com/docker/docker/api/types/container" @@ -42,7 +41,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/shared/utils" ) -var containerKillTimeout = time.Second +var containerKillTimeout = 1 // seconds type docker struct { client client.APIClient @@ -308,8 +307,9 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s containerName := toContainerName(step) - softKillCtx, _ := context.WithTimeout(ctx, containerKillTimeout) //nolint:govet - if err := e.client.ContainerKill(softKillCtx, containerName, ""); err != nil && !isErrContainerNotFoundOrNotRunning(err) { + if err := e.client.ContainerStop(ctx, containerName, container.StopOptions{ + Timeout: &containerKillTimeout, + }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { return err } From 8e7ae4bff2425a7e1a308ca469180dc44719fe69 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 30 Jan 2026 17:41:10 +0100 Subject: [PATCH 4/9] docker backend: refactor DestroyStep --- pipeline/backend/docker/docker.go | 34 ++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/pipeline/backend/docker/docker.go b/pipeline/backend/docker/docker.go index 1b671f57c6b..422656d6e20 100644 --- a/pipeline/backend/docker/docker.go +++ b/pipeline/backend/docker/docker.go @@ -16,6 +16,8 @@ package docker import ( "context" + "errors" + "fmt" "io" "net/http" "os" @@ -35,13 +37,14 @@ import ( "github.com/moby/term" "github.com/rs/zerolog/log" "github.com/urfave/cli/v3" + "golang.org/x/sync/errgroup" backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" "go.woodpecker-ci.org/woodpecker/v3/shared/httputil" "go.woodpecker-ci.org/woodpecker/v3/shared/utils" ) -var containerKillTimeout = 1 // seconds +var containerKillTimeout = 5 // seconds type docker struct { client client.APIClient @@ -306,20 +309,24 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name) containerName := toContainerName(step) + var stopErr error + // we first signal to the container to stop ... if err := e.client.ContainerStop(ctx, containerName, container.StopOptions{ Timeout: &containerKillTimeout, }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err + // we do not return error jet as we try to kill it first + stopErr = fmt.Errorf("could not stop container '%s': %w", step.Name, err) } - // if soft kill did now work just force kill it + // ... and if stop does not work just force kill it if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err + return errors.Join(stopErr, fmt.Errorf("could not kill container '%s': %w", step.Name, err)) } + // now we clean up files left if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err + return fmt.Errorf("could not remove container '%s': %w", step.Name, err) } return nil @@ -328,17 +335,20 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s func (e *docker) DestroyWorkflow(ctx context.Context, conf *backend.Config, taskUUID string) error { log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment") + errWG := errgroup.Group{} + for _, stage := range conf.Stages { for _, step := range stage.Steps { - containerName := toContainerName(step) - if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - log.Error().Err(err).Msgf("could not kill container '%s'", step.Name) - } - if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - log.Error().Err(err).Msgf("could not remove container '%s'", step.Name) - } + errWG.Go(func() error { + return e.DestroyStep(ctx, step, taskUUID) + }) } } + + if err := errWG.Wait(); err != nil { + log.Error().Err(err).Msgf("could not destroy all containers") + } + if err := e.client.VolumeRemove(ctx, conf.Volume, true); err != nil { log.Error().Err(err).Msgf("could not remove volume '%s'", conf.Volume) } From ec28ee6c292f7291e73a9c5661a07b2a80a2de08 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 30 Jan 2026 18:23:35 +0100 Subject: [PATCH 5/9] queue: enhance error logs in agents --- server/queue/fifo.go | 3 +++ server/queue/queue.go | 30 ++++++++++++++++++++++++++++++ server/rpc/rpc.go | 5 ++++- 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/server/queue/fifo.go b/server/queue/fifo.go index 3b370186ebe..bde6ae1cc51 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -134,6 +134,9 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e q.Lock() defer q.Unlock() + // it's an external error so we wrap it + err = NewErrExternal(err) + var errs []error // we first process the tasks itself for _, id := range ids { diff --git a/server/queue/queue.go b/server/queue/queue.go index 77fa3ed8ec8..2dcb7925946 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -41,6 +41,36 @@ var ( ErrWorkerKicked = errors.New("worker was kicked") ) +// ErrExternal wraps an external error +type ErrExternal struct { + err error +} + +func (e *ErrExternal) Error() string { + if e.err != nil { + return "external error: " + e.err.Error() + } + return "external error" +} + +// Unwrap allows errors.Is and errors.As to work with the wrapped error +func (e *ErrExternal) Unwrap() error { + return e.err +} + +// Is allows errors.Is to match against ErrExternal types +func (e *ErrExternal) Is(target error) bool { + _, ok := target.(*ErrExternal) + return ok +} + +func NewErrExternal(err error) error { + if err == nil { + return nil + } + return &ErrExternal{err: err} +} + // InfoT provides runtime information. type InfoT struct { Pending []*model.Task `json:"pending"` diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 5ff78e275be..99b04394bfd 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -118,7 +118,10 @@ func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err err // we explicit send a cancel signal return true, nil } - // unknown error happened + if errors.Is(err, new(queue.ErrExternal)) { + // we do not have to give back the error an agent already told us + return false, nil + } return false, err } From 26c38edaa8d79a838916f99e6cd53fae128b3188 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 30 Jan 2026 18:28:47 +0100 Subject: [PATCH 6/9] fix code comment --- server/queue/fifo.go | 2 +- server/rpc/rpc.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/queue/fifo.go b/server/queue/fifo.go index bde6ae1cc51..a25581d97e7 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -134,7 +134,7 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e q.Lock() defer q.Unlock() - // it's an external error so we wrap it + // it's an external error so we wrap it err = NewErrExternal(err) var errs []error diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 99b04394bfd..e1dbebf80ce 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -119,7 +119,7 @@ func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err err return true, nil } if errors.Is(err, new(queue.ErrExternal)) { - // we do not have to give back the error an agent already told us + // we do not have to give back the same error the agent already told us return false, nil } return false, err From f50de70db336fa220c4e1bd1346cf8f702845902 Mon Sep 17 00:00:00 2001 From: "m.huber" Date: Thu, 5 Feb 2026 23:23:12 +0100 Subject: [PATCH 7/9] move into its own pull: https://github.com/woodpecker-ci/woodpecker/pull/6056 --- server/queue/fifo.go | 3 --- server/queue/queue.go | 30 ------------------------------ server/rpc/rpc.go | 5 +---- 3 files changed, 1 insertion(+), 37 deletions(-) diff --git a/server/queue/fifo.go b/server/queue/fifo.go index a25581d97e7..3b370186ebe 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -134,9 +134,6 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e q.Lock() defer q.Unlock() - // it's an external error so we wrap it - err = NewErrExternal(err) - var errs []error // we first process the tasks itself for _, id := range ids { diff --git a/server/queue/queue.go b/server/queue/queue.go index 2dcb7925946..77fa3ed8ec8 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -41,36 +41,6 @@ var ( ErrWorkerKicked = errors.New("worker was kicked") ) -// ErrExternal wraps an external error -type ErrExternal struct { - err error -} - -func (e *ErrExternal) Error() string { - if e.err != nil { - return "external error: " + e.err.Error() - } - return "external error" -} - -// Unwrap allows errors.Is and errors.As to work with the wrapped error -func (e *ErrExternal) Unwrap() error { - return e.err -} - -// Is allows errors.Is to match against ErrExternal types -func (e *ErrExternal) Is(target error) bool { - _, ok := target.(*ErrExternal) - return ok -} - -func NewErrExternal(err error) error { - if err == nil { - return nil - } - return &ErrExternal{err: err} -} - // InfoT provides runtime information. type InfoT struct { Pending []*model.Task `json:"pending"` diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index e1dbebf80ce..5ff78e275be 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -118,10 +118,7 @@ func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err err // we explicit send a cancel signal return true, nil } - if errors.Is(err, new(queue.ErrExternal)) { - // we do not have to give back the same error the agent already told us - return false, nil - } + // unknown error happened return false, err } From 8ece665b7cbcc783714bc88ea7a96b765db819cf Mon Sep 17 00:00:00 2001 From: "m.huber" Date: Fri, 6 Feb 2026 00:37:54 +0100 Subject: [PATCH 8/9] update test acordingly --- pipeline/backend/dummy/dummy_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pipeline/backend/dummy/dummy_test.go b/pipeline/backend/dummy/dummy_test.go index c0d78d656a9..90a3555b3c0 100644 --- a/pipeline/backend/dummy/dummy_test.go +++ b/pipeline/backend/dummy/dummy_test.go @@ -47,9 +47,6 @@ func TestSmalPipelineDummyRun(t *testing.T) { _, err = dummyEngine.WaitStep(ctx, step, nonExistWorkflowID) assert.Error(t, err) - - err = dummyEngine.DestroyStep(ctx, step, nonExistWorkflowID) - assert.Error(t, err) }) t.Run("step exec successfully", func(t *testing.T) { From 788f9be807c73a123db422785720024807d621fc Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 6 Feb 2026 23:54:57 +0100 Subject: [PATCH 9/9] a "jet" is a plane :laugh: Co-authored-by: qwerty287 <80460567+qwerty287@users.noreply.github.com> --- pipeline/backend/docker/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipeline/backend/docker/docker.go b/pipeline/backend/docker/docker.go index 422656d6e20..057284c2649 100644 --- a/pipeline/backend/docker/docker.go +++ b/pipeline/backend/docker/docker.go @@ -315,7 +315,7 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s if err := e.client.ContainerStop(ctx, containerName, container.StopOptions{ Timeout: &containerKillTimeout, }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - // we do not return error jet as we try to kill it first + // we do not return error yet as we try to kill it first stopErr = fmt.Errorf("could not stop container '%s': %w", step.Name, err) }