Skip to content
Merged
44 changes: 34 additions & 10 deletions pipeline/backend/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package docker

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
Expand All @@ -35,12 +37,15 @@ import (
"github.com/moby/term"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v3"
"golang.org/x/sync/errgroup"

backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v3/shared/httputil"
"go.woodpecker-ci.org/woodpecker/v3/shared/utils"
)

var containerKillTimeout = 5 // seconds

type docker struct {
client client.APIClient
info system.Info
Expand Down Expand Up @@ -304,13 +309,24 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s
log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name)

containerName := toContainerName(step)
var stopErr error

// we first signal to the container to stop ...
if err := e.client.ContainerStop(ctx, containerName, container.StopOptions{
Timeout: &containerKillTimeout,
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
// we do not return error yet as we try to kill it first
stopErr = fmt.Errorf("could not stop container '%s': %w", step.Name, err)
}

// ... and if stop does not work just force kill it
if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
return err
return errors.Join(stopErr, fmt.Errorf("could not kill container '%s': %w", step.Name, err))
}

// now we clean up files left
if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
return err
return fmt.Errorf("could not remove container '%s': %w", step.Name, err)
}

return nil
Expand All @@ -319,17 +335,20 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s
func (e *docker) DestroyWorkflow(ctx context.Context, conf *backend.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")

errWG := errgroup.Group{}

for _, stage := range conf.Stages {
for _, step := range stage.Steps {
containerName := toContainerName(step)
if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
log.Error().Err(err).Msgf("could not kill container '%s'", step.Name)
}
if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
log.Error().Err(err).Msgf("could not remove container '%s'", step.Name)
}
errWG.Go(func() error {
return e.DestroyStep(ctx, step, taskUUID)
})
}
}

if err := errWG.Wait(); err != nil {
log.Error().Err(err).Msgf("could not destroy all containers")
}

if err := e.client.VolumeRemove(ctx, conf.Volume, true); err != nil {
log.Error().Err(err).Msgf("could not remove volume '%s'", conf.Volume)
}
Expand All @@ -349,8 +368,13 @@ func isErrContainerNotFoundOrNotRunning(err error) bool {
// Error response from daemon: Cannot kill container: ...: No such container: ...
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
// Error response from podman daemon: can only kill running containers. ... is in state exited
// Error response from daemon: removal of container ... is already in progress
// Error: No such container: ...
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
return err != nil &&
(strings.Contains(err.Error(), "No such container") ||
strings.Contains(err.Error(), "is not running") ||
strings.Contains(err.Error(), "can only kill running containers") ||
(strings.Contains(err.Error(), "removal of container") && strings.Contains(err.Error(), "is already in progress")))
}

// normalizeArchType converts the arch type reported by docker info into
Expand Down
2 changes: 1 addition & 1 deletion pipeline/backend/dummy/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID stri

_, exist := e.kv.Load("task_" + taskUUID)
if !exist {
return fmt.Errorf("expect env of workflow %s to exist but found none to destroy", taskUUID)
return nil
}

// check state
Expand Down
3 changes: 0 additions & 3 deletions pipeline/backend/dummy/dummy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ func TestSmalPipelineDummyRun(t *testing.T) {

_, err = dummyEngine.WaitStep(ctx, step, nonExistWorkflowID)
assert.Error(t, err)

err = dummyEngine.DestroyStep(ctx, step, nonExistWorkflowID)
assert.Error(t, err)
})

t.Run("step exec successfully", func(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pipeline/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (
func (e *local) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error {
state, err := e.getStepState(taskUUID, step.UUID)
if err != nil {
if errors.Is(err, ErrStepStateNotFound) {
return nil
}
return err
}

Expand Down
1 change: 1 addition & 0 deletions pipeline/backend/types/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ type Backend interface {
// - Clean up step-specific resources (containers, processes)
// - Close any open log streams
// - Not affect other steps in the same or other workflows
// - Must not fail if already invoked once
//
// Must be safe to call even if StartStep failed or the step was never started.
// This function must be thread-safe for concurrent calls.
Expand Down
60 changes: 44 additions & 16 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,23 +233,43 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch
Str("step", step.Name).
Msg("executing")

processState, err := r.exec(runnerCtx, step)
// setup exec func in a way it can be detached if needed
// wg will signal once
execAndTrace := func(wg *sync.WaitGroup) error {
processState, err := r.exec(runnerCtx, step, wg)

logger.Debug().
Str("step", step.Name).
Msg("complete")
logger.Debug().
Str("step", step.Name).
Msg("complete")

// normalize context cancel error
if errors.Is(err, context.Canceled) {
err = ErrCancel
// normalize context cancel error
if errors.Is(err, context.Canceled) {
err = ErrCancel
}

// Return the error after tracing it.
err = r.traceStep(processState, err, step)
if err != nil && step.Failure == metadata.FailureIgnore {
return nil
}
return err
}

// Return the error after tracing it.
err = r.traceStep(processState, err, step)
if err != nil && step.Failure == metadata.FailureIgnore {
return nil
// we report all errors till setup happened
// afterwards they just ged dropped
Comment thread
6543 marked this conversation as resolved.
if step.Detached {
var wg sync.WaitGroup
wg.Add(1)
var setupErr error
go func() {
setupErr = execAndTrace(&wg)
}()
wg.Wait()
return setupErr
}
return err

// run blocking
return execAndTrace(nil)
})
}

Expand All @@ -262,7 +282,13 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch
}

// Executes the step and returns the state and error.
func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend.State, error) {
func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *sync.WaitGroup) (*backend.State, error) {
defer func() {
if setupWg != nil {
setupWg.Done()
}
}()

if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck
return nil, err
}
Expand All @@ -287,9 +313,11 @@ func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step) (*backend.
}()
}

// nothing else to do, this is a detached process.
if step.Detached {
return nil, nil
// nothing else to block for detached process.
if setupWg != nil {
setupWg.Done()
// set to nil so the setupWg.Done in defer does not call it a second time
setupWg = nil
}

// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
Expand Down