diff --git a/pipeline/backend/docker/docker.go b/pipeline/backend/docker/docker.go index 8a48fdd72e6..057284c2649 100644 --- a/pipeline/backend/docker/docker.go +++ b/pipeline/backend/docker/docker.go @@ -16,6 +16,8 @@ package docker import ( "context" + "errors" + "fmt" "io" "net/http" "os" @@ -35,12 +37,15 @@ import ( "github.com/moby/term" "github.com/rs/zerolog/log" "github.com/urfave/cli/v3" + "golang.org/x/sync/errgroup" backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" "go.woodpecker-ci.org/woodpecker/v3/shared/httputil" "go.woodpecker-ci.org/woodpecker/v3/shared/utils" ) +var containerKillTimeout = 5 // seconds + type docker struct { client client.APIClient info system.Info @@ -304,13 +309,24 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name) containerName := toContainerName(step) + var stopErr error + + // we first signal to the container to stop ... + if err := e.client.ContainerStop(ctx, containerName, container.StopOptions{ + Timeout: &containerKillTimeout, + }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { + // we do not return error yet as we try to kill it first + stopErr = fmt.Errorf("could not stop container '%s': %w", step.Name, err) + } + // ... and if stop does not work just force kill it if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err + return errors.Join(stopErr, fmt.Errorf("could not kill container '%s': %w", step.Name, err)) } + // now we clean up files left if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err + return fmt.Errorf("could not remove container '%s': %w", step.Name, err) } return nil @@ -319,17 +335,20 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s func (e *docker) DestroyWorkflow(ctx context.Context, conf *backend.Config, taskUUID string) error { log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment") + errWG := errgroup.Group{} + for _, stage := range conf.Stages { for _, step := range stage.Steps { - containerName := toContainerName(step) - if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - log.Error().Err(err).Msgf("could not kill container '%s'", step.Name) - } - if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - log.Error().Err(err).Msgf("could not remove container '%s'", step.Name) - } + errWG.Go(func() error { + return e.DestroyStep(ctx, step, taskUUID) + }) } } + + if err := errWG.Wait(); err != nil { + log.Error().Err(err).Msgf("could not destroy all containers") + } + if err := e.client.VolumeRemove(ctx, conf.Volume, true); err != nil { log.Error().Err(err).Msgf("could not remove volume '%s'", conf.Volume) } @@ -349,8 +368,13 @@ func isErrContainerNotFoundOrNotRunning(err error) bool { // Error response from daemon: Cannot kill container: ...: No such container: ... // Error response from daemon: Cannot kill container: ...: Container ... is not running" // Error response from podman daemon: can only kill running containers. ... is in state exited + // Error response from daemon: removal of container ... is already in progress // Error: No such container: ... - return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) + return err != nil && + (strings.Contains(err.Error(), "No such container") || + strings.Contains(err.Error(), "is not running") || + strings.Contains(err.Error(), "can only kill running containers") || + (strings.Contains(err.Error(), "removal of container") && strings.Contains(err.Error(), "is already in progress"))) } // normalizeArchType converts the arch type reported by docker info into diff --git a/pipeline/backend/dummy/dummy.go b/pipeline/backend/dummy/dummy.go index 650b63dcb18..69eb8d5aa59 100644 --- a/pipeline/backend/dummy/dummy.go +++ b/pipeline/backend/dummy/dummy.go @@ -200,7 +200,7 @@ func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID stri _, exist := e.kv.Load("task_" + taskUUID) if !exist { - return fmt.Errorf("expect env of workflow %s to exist but found none to destroy", taskUUID) + return nil } // check state diff --git a/pipeline/backend/dummy/dummy_test.go b/pipeline/backend/dummy/dummy_test.go index c0d78d656a9..90a3555b3c0 100644 --- a/pipeline/backend/dummy/dummy_test.go +++ b/pipeline/backend/dummy/dummy_test.go @@ -47,9 +47,6 @@ func TestSmalPipelineDummyRun(t *testing.T) { _, err = dummyEngine.WaitStep(ctx, step, nonExistWorkflowID) assert.Error(t, err) - - err = dummyEngine.DestroyStep(ctx, step, nonExistWorkflowID) - assert.Error(t, err) }) t.Run("step exec successfully", func(t *testing.T) { diff --git a/pipeline/backend/local/local.go b/pipeline/backend/local/local.go index 2529698199f..f6c13a2ef11 100644 --- a/pipeline/backend/local/local.go +++ b/pipeline/backend/local/local.go @@ -231,6 +231,9 @@ func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) ( func (e *local) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error { state, err := e.getStepState(taskUUID, step.UUID) if err != nil { + if errors.Is(err, ErrStepStateNotFound) { + return nil + } return err } diff --git a/pipeline/backend/types/backend.go b/pipeline/backend/types/backend.go index 229174c1dc9..0877c422a12 100644 --- a/pipeline/backend/types/backend.go +++ b/pipeline/backend/types/backend.go @@ -142,6 +142,7 @@ type Backend interface { // - Clean up step-specific resources (containers, processes) // - Close any open log streams // - Not affect other steps in the same or other workflows + // - Must not fail if already invoked once // // Must be safe to call even if StartStep failed or the step was never started. // This function must be thread-safe for concurrent calls. diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 93e8f9467a4..60a733964ff 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -233,23 +233,43 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch Str("step", step.Name). Msg("executing") - processState, err := r.exec(runnerCtx, step) + // setup exec func in a way it can be detached if needed + // wg will signal once + execAndTrace := func(wg *sync.WaitGroup) error { + processState, err := r.exec(runnerCtx, step, wg) - logger.Debug(). - Str("step", step.Name). - Msg("complete") + logger.Debug(). + Str("step", step.Name). + Msg("complete") - // normalize context cancel error - if errors.Is(err, context.Canceled) { - err = ErrCancel + // normalize context cancel error + if errors.Is(err, context.Canceled) { + err = ErrCancel + } + + // Return the error after tracing it. + err = r.traceStep(processState, err, step) + if err != nil && step.Failure == metadata.FailureIgnore { + return nil + } + return err } - // Return the error after tracing it. - err = r.traceStep(processState, err, step) - if err != nil && step.Failure == metadata.FailureIgnore { - return nil + // we report all errors till setup happened + // afterwards they just ged dropped + if step.Detached { + var wg sync.WaitGroup + wg.Add(1) + var setupErr error + go func() { + setupErr = execAndTrace(&wg) + }() + wg.Wait() + return setupErr } - return err + + // run blocking + return execAndTrace(nil) }) } @@ -262,7 +282,13 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch } // Executes the step and returns the state and error. -func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend.State, error) { +func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *sync.WaitGroup) (*backend.State, error) { + defer func() { + if setupWg != nil { + setupWg.Done() + } + }() + if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck return nil, err } @@ -287,9 +313,11 @@ func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend. }() } - // nothing else to do, this is a detached process. - if step.Detached { - return nil, nil + // nothing else to block for detached process. + if setupWg != nil { + setupWg.Done() + // set to nil so the setupWg.Done in defer does not call it a second time + setupWg = nil } // We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)