From ec70cc04f4d6c11ebacc543bed6246b2c80b65eb Mon Sep 17 00:00:00 2001 From: Harri Avellan Date: Thu, 28 May 2026 10:49:19 +0300 Subject: [PATCH] fix(kubernetes): return clean exit state from WaitStep on context expiry There's a corner case where the pipeline service on Kubernetes backend is not cleaned up appropriately when the context is cancelled, leading to services being shown as running, even while the service pods are terminated. Also the DestroyWorkflow changed to handler destroying steps with errgroup like handled out with Docker backend, instead of failing on the first error. --- pipeline/backend/kubernetes/kubernetes.go | 27 ++++--- .../backend/kubernetes/kubernetes_test.go | 74 ++++++++++++++++++- 2 files changed, 88 insertions(+), 13 deletions(-) diff --git a/pipeline/backend/kubernetes/kubernetes.go b/pipeline/backend/kubernetes/kubernetes.go index 8ba29b4cd26..8fedf6f2dde 100644 --- a/pipeline/backend/kubernetes/kubernetes.go +++ b/pipeline/backend/kubernetes/kubernetes.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/cenkalti/backoff/v5" "github.com/rs/zerolog/log" "github.com/urfave/cli/v3" @@ -350,7 +352,7 @@ func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) select { case <-finished: case <-ctx.Done(): - return nil, ctx.Err() + return &types.State{ExitCode: 0, Exited: true}, nil } pod, err := e.client.CoreV1().Pods(e.config.GetNamespace(step.OrgID)).Get(ctx, podName, kube_meta_v1.GetOptions{}) @@ -502,27 +504,30 @@ func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID strin func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error { log.Trace().Str("taskUUID", taskUUID).Msg("deleting Kubernetes primitives") + var g errgroup.Group + for _, stage := range conf.Stages { for _, step := range stage.Steps { - err := stopPod(ctx, e, step, e.config.newDefaultDeleteOptions()) - if err != nil { - return err - } + g.Go(func() error { + return e.DestroyStep(ctx, step, taskUUID) + }) } } + if err := g.Wait(); err != nil { + log.Error().Err(err).Msg("could not destroy all pods") + } + namespace := e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID) log.Trace().Str("taskUUID", taskUUID).Msgf("deleting workflow headless service") - err := e.stopHeadlessService(ctx, e, namespace, taskUUID) - if err != nil { - return err + if err := e.stopHeadlessService(ctx, e, namespace, taskUUID); err != nil { + log.Error().Err(err).Msg("could not delete headless service") } log.Trace().Str("taskUUID", taskUUID).Msgf("deleting workflow volume") - err = stopVolume(ctx, e, conf.Volume, e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID), e.config.newDefaultDeleteOptions()) - if err != nil { - return err + if err := stopVolume(ctx, e, conf.Volume, e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID), e.config.newDefaultDeleteOptions()); err != nil { + log.Error().Err(err).Msg("could not delete workflow volume") } return nil diff --git a/pipeline/backend/kubernetes/kubernetes_test.go b/pipeline/backend/kubernetes/kubernetes_test.go index ab4e014378e..270c066c8e3 100644 --- a/pipeline/backend/kubernetes/kubernetes_test.go +++ b/pipeline/backend/kubernetes/kubernetes_test.go @@ -27,7 +27,9 @@ import ( "github.com/urfave/cli/v3" kube_core_v1 "k8s.io/api/core/v1" kube_meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kube_runtime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + kube_testing "k8s.io/client-go/testing" "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" ) @@ -252,8 +254,9 @@ func TestWaitStepReturnsOnContextCancel(t *testing.T) { select { case r := <-ch: - assert.Nil(t, r.state) - assert.ErrorIs(t, r.err, context.Canceled) + require.NoError(t, r.err) + require.NotNil(t, r.state) + assert.True(t, r.state.Exited) case <-time.After(3 * time.Second): t.Fatal("WaitStep did not return after context cancellation") } @@ -335,3 +338,70 @@ func TestWaitStepNoGoroutineLeak(t *testing.T) { "goroutines leaked after canceling %d WaitStep calls: got %d leaked", numSteps, leaked) } + +func makeServiceStep(uuid string) *types.Step { + return &types.Step{ + UUID: uuid, + Name: "svc-" + uuid, + OrgID: 1, + Type: types.StepTypeService, + Detached: true, + } +} + +func TestDestroyWorkflowDeletesRemainingServicesOnPartialFailure(t *testing.T) { + t.Parallel() + + namespace := "test-ns" + client := fake.NewClientset() + engine := makeEngine(client) + + psqlStep := makeServiceStep("psql-01") + redisStep := makeServiceStep("redis-01") + buildStep := makeStep("build-01") + + psqlPodName := createPod(t, client, psqlStep, namespace) + redisPodName := createPod(t, client, redisStep, namespace) + createPod(t, client, buildStep, namespace) + + client.PrependReactor("delete", "pods", func(action kube_testing.Action) (bool, kube_runtime.Object, error) { + da, ok := action.(kube_testing.DeleteAction) + if ok && da.GetName() == psqlPodName { + return true, nil, fmt.Errorf("simulated transient API error") + } + return false, nil, nil + }) + + conf := &types.Config{ + Stages: []*types.Stage{ + {Steps: []*types.Step{psqlStep, redisStep, buildStep}}, + {Steps: []*types.Step{makeStep("test-01")}}, + }, + } + + _ = engine.DestroyWorkflow(context.Background(), conf, "task-1") + + _, err := client.CoreV1().Pods(namespace).Get(context.Background(), redisPodName, kube_meta_v1.GetOptions{}) + assert.Error(t, err, "redis service pod should be deleted despite psql failure") + + _, err = client.CoreV1().Pods(namespace).Get(context.Background(), psqlPodName, kube_meta_v1.GetOptions{}) + assert.NoError(t, err, "psql pod should still exist since its delete was rejected") +} + +func TestWaitStepReturnsCleanStateOnContextExpiry(t *testing.T) { + t.Parallel() + + client := fake.NewClientset() + engine := makeEngine(client) + step := makeServiceStep("db-01") + createPod(t, client, step, "test-ns") + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + state, err := engine.WaitStep(ctx, step, "task-1") + + require.NoError(t, err, "must not return an error that causes the runtime to skip cleanup") + require.NotNil(t, state, "must return a state so the runtime can trace the service exit") + assert.True(t, state.Exited) +}