From 727b1d7b0ba90c6cc0ce91a907a82490b69e4179 Mon Sep 17 00:00:00 2001 From: Akash Kumar Date: Sun, 24 May 2026 02:15:16 +0530 Subject: [PATCH 1/6] Fix approved gated pipeline scheduling --- e2e/scenarios/gated_test.go | 80 +++++++++++++++++++++++++++++++++++++ server/pipeline/approve.go | 51 ++++++++--------------- server/pipeline/items.go | 48 +++++++++++++++------- 3 files changed, 131 insertions(+), 48 deletions(-) create mode 100644 e2e/scenarios/gated_test.go diff --git a/e2e/scenarios/gated_test.go b/e2e/scenarios/gated_test.go new file mode 100644 index 00000000000..63a28857c6d --- /dev/null +++ b/e2e/scenarios/gated_test.go @@ -0,0 +1,80 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build test + +package scenarios + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.woodpecker-ci.org/woodpecker/v3/e2e/setup" + forge_types "go.woodpecker-ci.org/woodpecker/v3/server/forge/types" + "go.woodpecker-ci.org/woodpecker/v3/server/model" + "go.woodpecker-ci.org/woodpecker/v3/server/pipeline" +) + +func TestApprovedGatedPipelineRuns(t *testing.T) { + env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ + {Name: ".woodpecker.yaml", Data: simpleSuccessYAML}, + }) + agent := setup.StartAgent(t, env.GRPCAddr) + setup.WaitForAgentRegistered(t, env.Store, agent) + + env.Fixtures.Repo.RequireApproval = model.RequireApprovalAllEvents + require.NoError(t, env.Store.UpdateRepo(env.Fixtures.Repo)) + + created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{ + Event: model.EventPush, + Branch: "main", + Commit: "deadbeef", + Ref: "refs/heads/main", + Author: "external-contributor", + Sender: "external-contributor", + }) + require.NoError(t, err, "create gated pipeline") + require.NotNil(t, created) + require.Equal(t, model.StatusBlocked, created.Status) + + blockedWorkflows, err := env.Store.WorkflowGetTree(created) + require.NoError(t, err) + require.Len(t, blockedWorkflows, 1) + assert.Equal(t, model.StatusBlocked, blockedWorkflows[0].State) + require.NotEmpty(t, blockedWorkflows[0].Children) + for _, step := range blockedWorkflows[0].Children { + assert.Equal(t, model.StatusBlocked, step.State) + } + + approved, err := pipeline.Approve(t.Context(), env.Store, created, env.Fixtures.Owner, env.Fixtures.Repo) + require.NoError(t, err, "approve gated pipeline") + require.NotNil(t, approved) + assert.Equal(t, env.Fixtures.Owner.Login, approved.Reviewer) + assert.NotZero(t, approved.Reviewed) + + finished := setup.WaitForPipeline(t, env.Store, approved.ID) + assert.Equal(t, model.StatusSuccess, finished.Status, "approved gated pipeline should run") + + workflows, err := env.Store.WorkflowGetTree(finished) + require.NoError(t, err) + require.Len(t, workflows, 1) + assert.Equal(t, model.StatusSuccess, workflows[0].State) + assert.Greater(t, workflows[0].AgentID, int64(0), "approved workflow should be assigned to an agent") + require.Len(t, workflows[0].Children, len(blockedWorkflows[0].Children)) + for _, step := range workflows[0].Children { + assert.Equal(t, model.StatusSuccess, step.State) + } +} diff --git a/server/pipeline/approve.go b/server/pipeline/approve.go index d66b128acd3..46a0ccd824a 100644 --- a/server/pipeline/approve.go +++ b/server/pipeline/approve.go @@ -21,6 +21,7 @@ import ( "github.com/rs/zerolog/log" + pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors" "go.woodpecker-ci.org/woodpecker/v3/server" forge_types "go.woodpecker-ci.org/woodpecker/v3/server/forge/types" "go.woodpecker-ci.org/woodpecker/v3/server/model" @@ -52,48 +53,30 @@ func Approve(ctx context.Context, store store.Store, currentPipeline *model.Pipe yamls = append(yamls, &forge_types.FileMeta{Data: y.Data, Name: y.Name}) } - if currentPipeline.Workflows, err = store.WorkflowGetTree(currentPipeline); err != nil { - return nil, fmt.Errorf("error: loading workflows. %w", err) - } - - if currentPipeline, err = UpdateToStatusPending(store, *currentPipeline, user.Login); err != nil { - return nil, fmt.Errorf("error updating pipeline. %w", err) - } - - for _, wf := range currentPipeline.Workflows { - if wf.State != model.StatusBlocked { - continue - } - wf.State = model.StatusPending - if err := store.WorkflowUpdate(wf); err != nil { - return nil, fmt.Errorf("error updating workflow. %w", err) - } - - for _, step := range wf.Children { - if step.State != model.StatusBlocked { - continue - } - step.State = model.StatusPending - if err := store.StepUpdate(step); err != nil { - return nil, fmt.Errorf("error updating step. %w", err) - } + pipelineItems, parseErr := parsePipeline(ctx, forge, store, currentPipeline, user, repo, yamls, nil) + if pipeline_errors.HasBlockingErrors(parseErr) { + if err := updatePipelineWithErr(ctx, forge, store, currentPipeline, repo, user, parseErr); err != nil { + log.Error().Err(err).Msgf("error setting error status of pipeline for %s#%d after approval", repo.FullName, currentPipeline.Number) } - } - - currentPipeline, pipelineItems, err := createPipelineItems(ctx, forge, store, currentPipeline, user, repo, yamls, nil) - if err != nil { - msg := fmt.Sprintf("failure to createPipelineItems for %s", repo.FullName) - log.Error().Err(err).Msg(msg) + msg := fmt.Sprintf("failure to parse pipeline config for %s", repo.FullName) + log.Error().Err(parseErr).Msg(msg) return nil, errors.New(msg) + } else if parseErr != nil { + currentPipeline.Errors = pipeline_errors.GetPipelineErrors(parseErr) } - // we have no way to link old workflows and steps in database to new engine generated steps, - // so we just delete the old and insert the new ones - if err := store.WorkflowsReplace(currentPipeline, currentPipeline.Workflows); err != nil { + enrichPipelineItemSteps(pipelineItems, repo) + currentPipeline.Status = model.StatusPending + currentPipeline, err = replaceWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems) + if err != nil { log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting new steps for %s#%d after approval", repo.FullName, currentPipeline.Number) return nil, err } + if currentPipeline, err = UpdateToStatusPending(store, *currentPipeline, user.Login); err != nil { + return nil, fmt.Errorf("error updating pipeline. %w", err) + } + publishPipeline(ctx, forge, currentPipeline, repo, user) currentPipeline, err = start(ctx, forge, store, currentPipeline, user, repo, pipelineItems) diff --git a/server/pipeline/items.go b/server/pipeline/items.go index 731e1da6fd8..c8633888f05 100644 --- a/server/pipeline/items.go +++ b/server/pipeline/items.go @@ -203,6 +203,34 @@ func enrichPipelineItemSteps(items []*builder.Item, repo *model.Repo) { // to be specific this func currently is used to convert the pipeline.Item list (crafted by PipelineBuilder.Build()) into // a pipeline that can be stored in the database by the server and save converted workflows. func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item) (*model.Pipeline, error) { + if pipeline.Workflows != nil { + return nil, errors.New("cannot save new workflows from pipeline builder: pipeline already has workflows loaded") + } + + workflows := workflowsFromPipelineBuilder(pipeline, pipelineItems) + if err := store.WorkflowsCreate(workflows); err != nil { + return nil, err + } + + pipeline.Workflows = workflows + setPipelineItemWorkflowIDs(pipelineItems, pipeline.Workflows) + + return pipeline, nil +} + +func replaceWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item) (*model.Pipeline, error) { + workflows := workflowsFromPipelineBuilder(pipeline, pipelineItems) + if err := store.WorkflowsReplace(pipeline, workflows); err != nil { + return nil, err + } + + pipeline.Workflows = workflows + setPipelineItemWorkflowIDs(pipelineItems, pipeline.Workflows) + + return pipeline, nil +} + +func workflowsFromPipelineBuilder(pipeline *model.Pipeline, pipelineItems []*builder.Item) []*model.Workflow { var pidSequence int for _, item := range pipelineItems { if pidSequence < item.Workflow.PID { @@ -210,12 +238,7 @@ func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipelin } } - // The workflows in the pipeline should be empty, only we populate them. - // But if a pipeline was already loaded from the database and contains workflows, - // we error out to prevent harm. - if pipeline.Workflows != nil { - return nil, errors.New("cannot save new workflows from pipeline builder: pipeline already has workflows loaded") - } + workflows := make([]*model.Workflow, 0, len(pipelineItems)) for _, item := range pipelineItems { workflow := &model.Workflow{ @@ -254,17 +277,14 @@ func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipelin } } - pipeline.Workflows = append(pipeline.Workflows, workflow) + workflows = append(workflows, workflow) } - if err := store.WorkflowsCreate(pipeline.Workflows); err != nil { - return nil, err - } + return workflows +} - // now thread IDs back to the builder items - for i, wf := range pipeline.Workflows { +func setPipelineItemWorkflowIDs(pipelineItems []*builder.Item, workflows []*model.Workflow) { + for i, wf := range workflows { pipelineItems[i].Workflow.ID = wf.ID } - - return pipeline, nil } From 8cb1922f6f491fea5e2c3b8e4f10f0d72cdd0d51 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 26 May 2026 13:14:48 +0200 Subject: [PATCH 2/6] better test --- e2e/scenarios/gated_test.go | 54 ++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/e2e/scenarios/gated_test.go b/e2e/scenarios/gated_test.go index 63a28857c6d..a684cc2df12 100644 --- a/e2e/scenarios/gated_test.go +++ b/e2e/scenarios/gated_test.go @@ -28,53 +28,59 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/pipeline" ) -func TestApprovedGatedPipelineRuns(t *testing.T) { +// TestGatedPipeline verifies the approval gate on a repo that requires +// approval for every event. The pipeline is created in StatusBlocked, and +// once pipeline.Approve releases it the pipeline runs to completion on an +// agent like any normal pipeline, steps included. +func TestGatedPipeline(t *testing.T) { env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{ {Name: ".woodpecker.yaml", Data: simpleSuccessYAML}, }) agent := setup.StartAgent(t, env.GRPCAddr) setup.WaitForAgentRegistered(t, env.Store, agent) + // Require approval for every event, gating every pipeline regardless of author. env.Fixtures.Repo.RequireApproval = model.RequireApprovalAllEvents - require.NoError(t, env.Store.UpdateRepo(env.Fixtures.Repo)) + require.NoError(t, env.Store.UpdateRepo(env.Fixtures.Repo), "enable repo approval") + // Pipeline must come back blocked, not running. created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{ Event: model.EventPush, Branch: "main", Commit: "deadbeef", Ref: "refs/heads/main", - Author: "external-contributor", - Sender: "external-contributor", + Author: env.Fixtures.Owner.Login, + Sender: env.Fixtures.Owner.Login, }) require.NoError(t, err, "create gated pipeline") require.NotNil(t, created) - require.Equal(t, model.StatusBlocked, created.Status) - - blockedWorkflows, err := env.Store.WorkflowGetTree(created) - require.NoError(t, err) - require.Len(t, blockedWorkflows, 1) - assert.Equal(t, model.StatusBlocked, blockedWorkflows[0].State) - require.NotEmpty(t, blockedWorkflows[0].Children) - for _, step := range blockedWorkflows[0].Children { - assert.Equal(t, model.StatusBlocked, step.State) - } + require.Equal(t, model.StatusBlocked, created.Status, "untrusted author pipeline must be blocked") + // Approve as the repo owner, releasing the gate. approved, err := pipeline.Approve(t.Context(), env.Store, created, env.Fixtures.Owner, env.Fixtures.Repo) require.NoError(t, err, "approve gated pipeline") require.NotNil(t, approved) - assert.Equal(t, env.Fixtures.Owner.Login, approved.Reviewer) - assert.NotZero(t, approved.Reviewed) + assert.Equal(t, env.Fixtures.Owner.Login, approved.Reviewer, "reviewer should be the approver") + assert.NotZero(t, approved.Reviewed, "reviewed timestamp should be set") + // Wait for the agent to actually pick it up and run it to a terminal state. finished := setup.WaitForPipeline(t, env.Store, approved.ID) - assert.Equal(t, model.StatusSuccess, finished.Status, "approved gated pipeline should run") + assert.Equal(t, model.StatusSuccess, finished.Status, "approved gated pipeline should succeed") + // Workflow outcome: one workflow, succeeded, assigned to an agent. workflows, err := env.Store.WorkflowGetTree(finished) - require.NoError(t, err) - require.Len(t, workflows, 1) - assert.Equal(t, model.StatusSuccess, workflows[0].State) - assert.Greater(t, workflows[0].AgentID, int64(0), "approved workflow should be assigned to an agent") - require.Len(t, workflows[0].Children, len(blockedWorkflows[0].Children)) - for _, step := range workflows[0].Children { - assert.Equal(t, model.StatusSuccess, step.State) + require.NoError(t, err, "get workflow tree") + require.Len(t, workflows, 1, "approved pipeline should produce exactly one workflow") + assert.Equal(t, model.StatusSuccess, workflows[0].State, "workflow should succeed") + assert.Greater(t, workflows[0].AgentID, int64(0), "workflow should record the agent that ran it") + + // Step outcome: every step from simpleSuccessYAML ran and exited cleanly. + steps, err := env.Store.StepList(finished.ID) + require.NoError(t, err, "list steps") + require.ElementsMatch(t, []string{"clone", "step-one", "step-two"}, modelStepsToName(steps), + "approved pipeline should run exactly the steps from the YAML") + for _, step := range steps { + assert.Equalf(t, model.StatusSuccess, step.State, "step %q status", step.Name) + assert.Equalf(t, 0, step.ExitCode, "step %q exit code", step.Name) } } From 5e85adcdf1a587017fa00c4768419b3c92607d41 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 26 May 2026 13:45:56 +0200 Subject: [PATCH 3/6] dedup code and propper fix --- server/pipeline/approve.go | 16 +++---- server/pipeline/create.go | 17 +++---- server/pipeline/items.go | 85 ++++++++++++++++++++--------------- server/pipeline/items_test.go | 45 ++++++++++++++++++- server/pipeline/restart.go | 12 ++++- 5 files changed, 116 insertions(+), 59 deletions(-) diff --git a/server/pipeline/approve.go b/server/pipeline/approve.go index 46a0ccd824a..e3540fffad1 100644 --- a/server/pipeline/approve.go +++ b/server/pipeline/approve.go @@ -21,7 +21,6 @@ import ( "github.com/rs/zerolog/log" - pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors" "go.woodpecker-ci.org/woodpecker/v3/server" forge_types "go.woodpecker-ci.org/woodpecker/v3/server/forge/types" "go.woodpecker-ci.org/woodpecker/v3/server/model" @@ -53,21 +52,20 @@ func Approve(ctx context.Context, store store.Store, currentPipeline *model.Pipe yamls = append(yamls, &forge_types.FileMeta{Data: y.Data, Name: y.Name}) } - pipelineItems, parseErr := parsePipeline(ctx, forge, store, currentPipeline, user, repo, yamls, nil) - if pipeline_errors.HasBlockingErrors(parseErr) { + // Release the gate before building workflows: saveWorkflowsFromPipelineBuilder + // derives workflow and step state from the pipeline status, so the status + // must already be pending when the new workflows are persisted. + currentPipeline.Status = model.StatusPending + + currentPipeline, pipelineItems, parseErr, err := createPipelineItems(ctx, forge, store, currentPipeline, user, repo, yamls, nil, true) + if handleParseErrors(currentPipeline, parseErr) { if err := updatePipelineWithErr(ctx, forge, store, currentPipeline, repo, user, parseErr); err != nil { log.Error().Err(err).Msgf("error setting error status of pipeline for %s#%d after approval", repo.FullName, currentPipeline.Number) } msg := fmt.Sprintf("failure to parse pipeline config for %s", repo.FullName) log.Error().Err(parseErr).Msg(msg) return nil, errors.New(msg) - } else if parseErr != nil { - currentPipeline.Errors = pipeline_errors.GetPipelineErrors(parseErr) } - - enrichPipelineItemSteps(pipelineItems, repo) - currentPipeline.Status = model.StatusPending - currentPipeline, err = replaceWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems) if err != nil { log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting new steps for %s#%d after approval", repo.FullName, currentPipeline.Number) return nil, err diff --git a/server/pipeline/create.go b/server/pipeline/create.go index 2e8585c4565..26a6a62949a 100644 --- a/server/pipeline/create.go +++ b/server/pipeline/create.go @@ -21,7 +21,6 @@ import ( "github.com/rs/zerolog/log" - pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors" "go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata" "go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/yaml/constraint" "go.woodpecker-ci.org/woodpecker/v3/server" @@ -94,12 +93,14 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline return nil, updatePipelineWithErr(ctx, _forge, _store, pipeline, repo, repoUser, fmt.Errorf("could not load config from forge: %w", configFetchErr)) } - pipelineItems, parseErr := parsePipeline(ctx, _forge, _store, pipeline, repoUser, repo, forgeYamlConfigs, nil) - if pipeline_errors.HasBlockingErrors(parseErr) { + currentPipeline, pipelineItems, parseErr, err := createPipelineItems(ctx, _forge, _store, pipeline, repoUser, repo, forgeYamlConfigs, nil, false) + *pipeline = *currentPipeline + if handleParseErrors(pipeline, parseErr) { log.Debug().Str("repo", repo.FullName).Err(parseErr).Msg("failed to parse yaml") return pipeline, updatePipelineWithErr(ctx, _forge, _store, pipeline, repo, repoUser, parseErr) - } else if parseErr != nil { - pipeline.Errors = pipeline_errors.GetPipelineErrors(parseErr) + } + if err != nil { + return nil, fmt.Errorf("createPipelineItems failed: %w", err) } if len(pipelineItems) == 0 { @@ -111,12 +112,6 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline return nil, ErrFiltered } - enrichPipelineItemSteps(pipelineItems, repo) - pipeline, err = saveWorkflowsFromPipelineBuilder(_store, pipeline, pipelineItems) - if err != nil { - return nil, fmt.Errorf("saveWorkflowsFromPipelineBuilder failed: %w", err) - } - // persist the pipeline config for historical correctness, restarts, etc var configs []*model.Config for _, forgeYamlConfig := range forgeYamlConfigs { diff --git a/server/pipeline/items.go b/server/pipeline/items.go index c8633888f05..d9ed22bde05 100644 --- a/server/pipeline/items.go +++ b/server/pipeline/items.go @@ -155,31 +155,45 @@ func parsePipeline(ctx context.Context, forge forge.Forge, store store.Store, cu return b.Build() } -func createPipelineItems(c context.Context, forge forge.Forge, store store.Store, - currentPipeline *model.Pipeline, user *model.User, repo *model.Repo, - yamls []*forge_types.FileMeta, envs map[string]string, -) (*model.Pipeline, []*builder.Item, error) { - pipelineItems, err := parsePipeline(c, forge, store, currentPipeline, user, repo, yamls, envs) - if pipeline_errors.HasBlockingErrors(err) { - currentPipeline, uErr := UpdateToStatusError(store, *currentPipeline, err) - if uErr != nil { - log.Error().Err(uErr).Msgf("error setting error status of pipeline for %s#%d", repo.FullName, currentPipeline.Number) - } else { - updatePipelineStatus(c, forge, currentPipeline, repo, user) - } +// handleParseErrors classifies the error returned by parsePipeline. Blocking +// errors abort the run, so true is returned and the caller decides how to +// report and persist the failure. Non-blocking errors are recorded on the +// pipeline so they surface to the user without stopping the run. +func handleParseErrors(pipeline *model.Pipeline, parseErr error) (blocking bool) { + if pipeline_errors.HasBlockingErrors(parseErr) { + return true + } + if parseErr != nil { + pipeline.Errors = pipeline_errors.GetPipelineErrors(parseErr) + } + return false +} - return currentPipeline, nil, err - } else if err != nil { - currentPipeline.Errors = pipeline_errors.GetPipelineErrors(err) - if err := updatePipelinePending(c, forge, store, currentPipeline, repo, user); err != nil { - return nil, nil, err - } +// createPipelineItems parses the pipeline config and persists the resulting +// workflows. It is the shared core of Create, Approve and Restart. +// +// It returns two errors. parseErr carries pipeline config diagnostics: callers +// classify it with handleParseErrors and report a blocking failure in their +// own way. err is a hard failure (e.g. persisting workflows) that always +// aborts the run. When the pipeline already has persisted workflows (a gated +// pipeline being approved), replaceExisting swaps them for the freshly built +// ones. +func createPipelineItems(ctx context.Context, forge forge.Forge, store store.Store, + currentPipeline *model.Pipeline, user *model.User, repo *model.Repo, + yamls []*forge_types.FileMeta, envs map[string]string, replaceExisting bool, +) (pipeline *model.Pipeline, items []*builder.Item, parseErr, err error) { + pipelineItems, parseErr := parsePipeline(ctx, forge, store, currentPipeline, user, repo, yamls, envs) + if pipeline_errors.HasBlockingErrors(parseErr) { + return currentPipeline, nil, parseErr, nil } enrichPipelineItemSteps(pipelineItems, repo) - currentPipeline, err = saveWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems) + currentPipeline, err = saveWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems, replaceExisting) + if err != nil { + return currentPipeline, nil, parseErr, err + } - return currentPipeline, pipelineItems, err + return currentPipeline, pipelineItems, parseErr, nil } // enrichPipelineItemSteps stamps server-side fields onto the backend step @@ -199,28 +213,25 @@ func enrichPipelineItemSteps(items []*builder.Item, repo *model.Repo) { } } -// saveWorkflowsFromPipelineBuilder is the link between pipeline representation in "pipeline package" and server -// to be specific this func currently is used to convert the pipeline.Item list (crafted by PipelineBuilder.Build()) into -// a pipeline that can be stored in the database by the server and save converted workflows. -func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item) (*model.Pipeline, error) { - if pipeline.Workflows != nil { +// saveWorkflowsFromPipelineBuilder converts the pipeline.Item list crafted by +// PipelineBuilder.Build() into model workflows and persists them. +// +// A freshly created pipeline has no workflows yet, so they are inserted. A +// gated pipeline already persisted its workflows when it was created, so on +// approval the stored workflows must be swapped for the freshly built ones: +// pass replaceExisting to delete the old workflows and steps before inserting. +func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item, replaceExisting bool) (*model.Pipeline, error) { + if pipeline.Workflows != nil && !replaceExisting { return nil, errors.New("cannot save new workflows from pipeline builder: pipeline already has workflows loaded") } workflows := workflowsFromPipelineBuilder(pipeline, pipelineItems) - if err := store.WorkflowsCreate(workflows); err != nil { - return nil, err - } - - pipeline.Workflows = workflows - setPipelineItemWorkflowIDs(pipelineItems, pipeline.Workflows) - return pipeline, nil -} - -func replaceWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item) (*model.Pipeline, error) { - workflows := workflowsFromPipelineBuilder(pipeline, pipelineItems) - if err := store.WorkflowsReplace(pipeline, workflows); err != nil { + if replaceExisting { + if err := store.WorkflowsReplace(pipeline, workflows); err != nil { + return nil, err + } + } else if err := store.WorkflowsCreate(workflows); err != nil { return nil, err } diff --git a/server/pipeline/items_test.go b/server/pipeline/items_test.go index f20c6e220bf..353c6292e6f 100644 --- a/server/pipeline/items_test.go +++ b/server/pipeline/items_test.go @@ -69,7 +69,7 @@ func TestSetPipelineStepsOnPipeline(t *testing.T) { s := store_mocks.NewMockStore(t) s.On("WorkflowsCreate", mock.Anything).Return(nil) - pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems) + pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems, false) require.NoError(t, err) if len(pipeline.Workflows) != 1 { t.Fatal("Should generate three in total") @@ -82,6 +82,49 @@ func TestSetPipelineStepsOnPipeline(t *testing.T) { } } +func TestSaveWorkflowsReplaceExisting(t *testing.T) { + t.Parallel() + + pipeline := &model.Pipeline{ + ID: 1, + Event: model.EventPush, + // a gated pipeline already carries persisted workflows on approval + Workflows: []*model.Workflow{{ID: 99, PID: 1}}, + } + + pipelineItems := []*builder.Item{{ + Workflow: &builder.Workflow{ID: 1, PID: 1}, + Config: &backend_types.Config{ + Stages: []*backend_types.Stage{ + {Steps: []*backend_types.Step{{Name: "clone"}}}, + }, + }, + }} + + s := store_mocks.NewMockStore(t) + s.On("WorkflowsReplace", mock.Anything, mock.Anything).Return(nil) + + pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems, true) + require.NoError(t, err) + assert.Len(t, pipeline.Workflows, 1) + assert.Equal(t, int64(1), pipeline.Workflows[0].PipelineID) +} + +func TestSaveWorkflowsRejectsExistingWithoutReplace(t *testing.T) { + t.Parallel() + + pipeline := &model.Pipeline{ + ID: 1, + Event: model.EventPush, + Workflows: []*model.Workflow{{ID: 99, PID: 1}}, + } + + s := store_mocks.NewMockStore(t) + + _, err := saveWorkflowsFromPipelineBuilder(s, pipeline, nil, false) + require.Error(t, err) +} + func TestParsePipeline(t *testing.T) { t.Parallel() diff --git a/server/pipeline/restart.go b/server/pipeline/restart.go index 1e75c634185..89014a6a37c 100644 --- a/server/pipeline/restart.go +++ b/server/pipeline/restart.go @@ -89,7 +89,17 @@ func Restart(ctx context.Context, store store.Store, lastPipeline *model.Pipelin return nil, errors.New(msg) } - newPipeline, pipelineItems, err := createPipelineItems(ctx, forge, store, newPipeline, user, repo, pipelineFiles, envs) + newPipeline, pipelineItems, parseErr, err := createPipelineItems(ctx, forge, store, newPipeline, user, repo, pipelineFiles, envs, false) + if handleParseErrors(newPipeline, parseErr) { + if newPipeline, uErr := UpdateToStatusError(store, *newPipeline, parseErr); uErr != nil { + log.Error().Err(uErr).Msgf("error setting error status of pipeline for %s#%d", repo.FullName, newPipeline.Number) + } else { + updatePipelineStatus(ctx, forge, newPipeline, repo, user) + } + msg := fmt.Sprintf("failure to parse pipeline config for %s", repo.FullName) + log.Error().Err(parseErr).Msg(msg) + return nil, errors.New(msg) + } if err != nil { msg := fmt.Sprintf("failure to createPipelineItems for %s", repo.FullName) log.Error().Err(err).Msg(msg) From 0dadb1a42fe758f8aa0ec7f160bbc4598ac13cb9 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 26 May 2026 14:04:02 +0200 Subject: [PATCH 4/6] fix lint --- server/pipeline/items.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/pipeline/items.go b/server/pipeline/items.go index d9ed22bde05..051651f1c0e 100644 --- a/server/pipeline/items.go +++ b/server/pipeline/items.go @@ -169,12 +169,12 @@ func handleParseErrors(pipeline *model.Pipeline, parseErr error) (blocking bool) return false } -// createPipelineItems parses the pipeline config and persists the resulting +// The createPipelineItems parses the pipeline config and persists the resulting // workflows. It is the shared core of Create, Approve and Restart. // // It returns two errors. parseErr carries pipeline config diagnostics: callers // classify it with handleParseErrors and report a blocking failure in their -// own way. err is a hard failure (e.g. persisting workflows) that always +// own way. The err is a hard failure (e.g. persisting workflows) that always // aborts the run. When the pipeline already has persisted workflows (a gated // pipeline being approved), replaceExisting swaps them for the freshly built // ones. From d18be346d437f9b5be5acea2e586831fbea5cc4a Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 26 May 2026 14:21:36 +0200 Subject: [PATCH 5/6] fix lint --- server/pipeline/items.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/pipeline/items.go b/server/pipeline/items.go index 051651f1c0e..320eb7c5179 100644 --- a/server/pipeline/items.go +++ b/server/pipeline/items.go @@ -170,14 +170,14 @@ func handleParseErrors(pipeline *model.Pipeline, parseErr error) (blocking bool) } // The createPipelineItems parses the pipeline config and persists the resulting -// workflows. It is the shared core of Create, Approve and Restart. +// workflows. It is the shared core of Create, Approve, and Restart. // -// It returns two errors. parseErr carries pipeline config diagnostics: callers -// classify it with handleParseErrors and report a blocking failure in their -// own way. The err is a hard failure (e.g. persisting workflows) that always -// aborts the run. When the pipeline already has persisted workflows (a gated -// pipeline being approved), replaceExisting swaps them for the freshly built -// ones. +// Returns two errors: parseErr carries pipeline config diagnostics, which +// callers classify with handleParseErrors and report as a blocking failure in +// their own way. The second error, err, signals a hard failure (e.g. persisting +// workflows) that always aborts the run. When the pipeline already has +// persisted workflows (a gated pipeline being approved), setting replaceExisting +// swaps them out for the freshly built ones. func createPipelineItems(ctx context.Context, forge forge.Forge, store store.Store, currentPipeline *model.Pipeline, user *model.User, repo *model.Repo, yamls []*forge_types.FileMeta, envs map[string]string, replaceExisting bool, From 81003568700772c17e1396183b4a3d0d782ff397 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 26 May 2026 14:34:27 +0200 Subject: [PATCH 6/6] fix --- server/pipeline/items.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/pipeline/items.go b/server/pipeline/items.go index 320eb7c5179..059cc1bdccc 100644 --- a/server/pipeline/items.go +++ b/server/pipeline/items.go @@ -187,6 +187,13 @@ func createPipelineItems(ctx context.Context, forge forge.Forge, store store.Sto return currentPipeline, nil, parseErr, nil } + // An empty pipeline (e.g. everything filtered out) has no workflows to + // persist. Return early so the caller can filter it without us touching + // the store. + if len(pipelineItems) == 0 { + return currentPipeline, pipelineItems, parseErr, nil + } + enrichPipelineItemSteps(pipelineItems, repo) currentPipeline, err = saveWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems, replaceExisting) if err != nil {