diff --git a/pipeline/backend/kubernetes/kubernetes.go b/pipeline/backend/kubernetes/kubernetes.go index 8ba29b4cd26..8fedf6f2dde 100644 --- a/pipeline/backend/kubernetes/kubernetes.go +++ b/pipeline/backend/kubernetes/kubernetes.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/cenkalti/backoff/v5" "github.com/rs/zerolog/log" "github.com/urfave/cli/v3" @@ -350,7 +352,7 @@ func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) select { case <-finished: case <-ctx.Done(): - return nil, ctx.Err() + return &types.State{ExitCode: 0, Exited: true}, nil } pod, err := e.client.CoreV1().Pods(e.config.GetNamespace(step.OrgID)).Get(ctx, podName, kube_meta_v1.GetOptions{}) @@ -502,27 +504,30 @@ func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID strin func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error { log.Trace().Str("taskUUID", taskUUID).Msg("deleting Kubernetes primitives") + var g errgroup.Group + for _, stage := range conf.Stages { for _, step := range stage.Steps { - err := stopPod(ctx, e, step, e.config.newDefaultDeleteOptions()) - if err != nil { - return err - } + g.Go(func() error { + return e.DestroyStep(ctx, step, taskUUID) + }) } } + if err := g.Wait(); err != nil { + log.Error().Err(err).Msg("could not destroy all pods") + } + namespace := e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID) log.Trace().Str("taskUUID", taskUUID).Msgf("deleting workflow headless service") - err := e.stopHeadlessService(ctx, e, namespace, taskUUID) - if err != nil { - return err + if err := e.stopHeadlessService(ctx, e, namespace, taskUUID); err != nil { + log.Error().Err(err).Msg("could not delete headless service") } log.Trace().Str("taskUUID", taskUUID).Msgf("deleting workflow volume") - err = stopVolume(ctx, e, conf.Volume, e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID), e.config.newDefaultDeleteOptions()) - if err != nil { - return err + if err := stopVolume(ctx, e, conf.Volume, e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID), e.config.newDefaultDeleteOptions()); err != nil { + log.Error().Err(err).Msg("could not delete workflow volume") } return nil diff --git a/pipeline/backend/kubernetes/kubernetes_test.go b/pipeline/backend/kubernetes/kubernetes_test.go index ab4e014378e..270c066c8e3 100644 --- a/pipeline/backend/kubernetes/kubernetes_test.go +++ b/pipeline/backend/kubernetes/kubernetes_test.go @@ -27,7 +27,9 @@ import ( "github.com/urfave/cli/v3" kube_core_v1 "k8s.io/api/core/v1" kube_meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kube_runtime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + kube_testing "k8s.io/client-go/testing" "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" ) @@ -252,8 +254,9 @@ func TestWaitStepReturnsOnContextCancel(t *testing.T) { select { case r := <-ch: - assert.Nil(t, r.state) - assert.ErrorIs(t, r.err, context.Canceled) + require.NoError(t, r.err) + require.NotNil(t, r.state) + assert.True(t, r.state.Exited) case <-time.After(3 * time.Second): t.Fatal("WaitStep did not return after context cancellation") } @@ -335,3 +338,70 @@ func TestWaitStepNoGoroutineLeak(t *testing.T) { "goroutines leaked after canceling %d WaitStep calls: got %d leaked", numSteps, leaked) } + +func makeServiceStep(uuid string) *types.Step { + return &types.Step{ + UUID: uuid, + Name: "svc-" + uuid, + OrgID: 1, + Type: types.StepTypeService, + Detached: true, + } +} + +func TestDestroyWorkflowDeletesRemainingServicesOnPartialFailure(t *testing.T) { + t.Parallel() + + namespace := "test-ns" + client := fake.NewClientset() + engine := makeEngine(client) + + psqlStep := makeServiceStep("psql-01") + redisStep := makeServiceStep("redis-01") + buildStep := makeStep("build-01") + + psqlPodName := createPod(t, client, psqlStep, namespace) + redisPodName := createPod(t, client, redisStep, namespace) + createPod(t, client, buildStep, namespace) + + client.PrependReactor("delete", "pods", func(action kube_testing.Action) (bool, kube_runtime.Object, error) { + da, ok := action.(kube_testing.DeleteAction) + if ok && da.GetName() == psqlPodName { + return true, nil, fmt.Errorf("simulated transient API error") + } + return false, nil, nil + }) + + conf := &types.Config{ + Stages: []*types.Stage{ + {Steps: []*types.Step{psqlStep, redisStep, buildStep}}, + {Steps: []*types.Step{makeStep("test-01")}}, + }, + } + + _ = engine.DestroyWorkflow(context.Background(), conf, "task-1") + + _, err := client.CoreV1().Pods(namespace).Get(context.Background(), redisPodName, kube_meta_v1.GetOptions{}) + assert.Error(t, err, "redis service pod should be deleted despite psql failure") + + _, err = client.CoreV1().Pods(namespace).Get(context.Background(), psqlPodName, kube_meta_v1.GetOptions{}) + assert.NoError(t, err, "psql pod should still exist since its delete was rejected") +} + +func TestWaitStepReturnsCleanStateOnContextExpiry(t *testing.T) { + t.Parallel() + + client := fake.NewClientset() + engine := makeEngine(client) + step := makeServiceStep("db-01") + createPod(t, client, step, "test-ns") + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + state, err := engine.WaitStep(ctx, step, "task-1") + + require.NoError(t, err, "must not return an error that causes the runtime to skip cleanup") + require.NotNil(t, state, "must return a state so the runtime can trace the service exit") + assert.True(t, state.Exited) +}