Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions e2e/scenarios/gated_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build test

package scenarios

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.woodpecker-ci.org/woodpecker/v3/e2e/setup"
forge_types "go.woodpecker-ci.org/woodpecker/v3/server/forge/types"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pipeline"
)

// TestGatedPipeline verifies the approval gate on a repo that requires
// approval for every event. The pipeline is created in StatusBlocked, and
// once pipeline.Approve releases it the pipeline runs to completion on an
// agent like any normal pipeline, steps included.
func TestGatedPipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: simpleSuccessYAML},
})
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

// Require approval for every event, gating every pipeline regardless of author.
env.Fixtures.Repo.RequireApproval = model.RequireApprovalAllEvents
require.NoError(t, env.Store.UpdateRepo(env.Fixtures.Repo), "enable repo approval")

// Pipeline must come back blocked, not running.
created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
Event: model.EventPush,
Branch: "main",
Commit: "deadbeef",
Ref: "refs/heads/main",
Author: env.Fixtures.Owner.Login,
Sender: env.Fixtures.Owner.Login,
})
require.NoError(t, err, "create gated pipeline")
require.NotNil(t, created)
require.Equal(t, model.StatusBlocked, created.Status, "untrusted author pipeline must be blocked")

// Approve as the repo owner, releasing the gate.
approved, err := pipeline.Approve(t.Context(), env.Store, created, env.Fixtures.Owner, env.Fixtures.Repo)
require.NoError(t, err, "approve gated pipeline")
require.NotNil(t, approved)
assert.Equal(t, env.Fixtures.Owner.Login, approved.Reviewer, "reviewer should be the approver")
assert.NotZero(t, approved.Reviewed, "reviewed timestamp should be set")

// Wait for the agent to actually pick it up and run it to a terminal state.
finished := setup.WaitForPipeline(t, env.Store, approved.ID)
assert.Equal(t, model.StatusSuccess, finished.Status, "approved gated pipeline should succeed")

// Workflow outcome: one workflow, succeeded, assigned to an agent.
workflows, err := env.Store.WorkflowGetTree(finished)
require.NoError(t, err, "get workflow tree")
require.Len(t, workflows, 1, "approved pipeline should produce exactly one workflow")
assert.Equal(t, model.StatusSuccess, workflows[0].State, "workflow should succeed")
assert.Greater(t, workflows[0].AgentID, int64(0), "workflow should record the agent that ran it")

// Step outcome: every step from simpleSuccessYAML ran and exited cleanly.
steps, err := env.Store.StepList(finished.ID)
require.NoError(t, err, "list steps")
require.ElementsMatch(t, []string{"clone", "step-one", "step-two"}, modelStepsToName(steps),
"approved pipeline should run exactly the steps from the YAML")
for _, step := range steps {
assert.Equalf(t, model.StatusSuccess, step.State, "step %q status", step.Name)
assert.Equalf(t, 0, step.ExitCode, "step %q exit code", step.Name)
}
}
51 changes: 16 additions & 35 deletions server/pipeline/approve.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,48 +52,29 @@ func Approve(ctx context.Context, store store.Store, currentPipeline *model.Pipe
yamls = append(yamls, &forge_types.FileMeta{Data: y.Data, Name: y.Name})
}

if currentPipeline.Workflows, err = store.WorkflowGetTree(currentPipeline); err != nil {
return nil, fmt.Errorf("error: loading workflows. %w", err)
}

if currentPipeline, err = UpdateToStatusPending(store, *currentPipeline, user.Login); err != nil {
return nil, fmt.Errorf("error updating pipeline. %w", err)
}

for _, wf := range currentPipeline.Workflows {
if wf.State != model.StatusBlocked {
continue
}
wf.State = model.StatusPending
if err := store.WorkflowUpdate(wf); err != nil {
return nil, fmt.Errorf("error updating workflow. %w", err)
// Release the gate before building workflows: saveWorkflowsFromPipelineBuilder
// derives workflow and step state from the pipeline status, so the status
// must already be pending when the new workflows are persisted.
currentPipeline.Status = model.StatusPending

currentPipeline, pipelineItems, parseErr, err := createPipelineItems(ctx, forge, store, currentPipeline, user, repo, yamls, nil, true)
if handleParseErrors(currentPipeline, parseErr) {
if err := updatePipelineWithErr(ctx, forge, store, currentPipeline, repo, user, parseErr); err != nil {
log.Error().Err(err).Msgf("error setting error status of pipeline for %s#%d after approval", repo.FullName, currentPipeline.Number)
}

for _, step := range wf.Children {
if step.State != model.StatusBlocked {
continue
}
step.State = model.StatusPending
if err := store.StepUpdate(step); err != nil {
return nil, fmt.Errorf("error updating step. %w", err)
}
}
}

currentPipeline, pipelineItems, err := createPipelineItems(ctx, forge, store, currentPipeline, user, repo, yamls, nil)
if err != nil {
msg := fmt.Sprintf("failure to createPipelineItems for %s", repo.FullName)
log.Error().Err(err).Msg(msg)
msg := fmt.Sprintf("failure to parse pipeline config for %s", repo.FullName)
log.Error().Err(parseErr).Msg(msg)
return nil, errors.New(msg)
}

// we have no way to link old workflows and steps in database to new engine generated steps,
// so we just delete the old and insert the new ones
if err := store.WorkflowsReplace(currentPipeline, currentPipeline.Workflows); err != nil {
if err != nil {
log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting new steps for %s#%d after approval", repo.FullName, currentPipeline.Number)
return nil, err
}

if currentPipeline, err = UpdateToStatusPending(store, *currentPipeline, user.Login); err != nil {
return nil, fmt.Errorf("error updating pipeline. %w", err)
}

publishPipeline(ctx, forge, currentPipeline, repo, user)

currentPipeline, err = start(ctx, forge, store, currentPipeline, user, repo, pipelineItems)
Expand Down
17 changes: 6 additions & 11 deletions server/pipeline/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/rs/zerolog/log"

pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/yaml/constraint"
"go.woodpecker-ci.org/woodpecker/v3/server"
Expand Down Expand Up @@ -94,12 +93,14 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline
return nil, updatePipelineWithErr(ctx, _forge, _store, pipeline, repo, repoUser, fmt.Errorf("could not load config from forge: %w", configFetchErr))
}

pipelineItems, parseErr := parsePipeline(ctx, _forge, _store, pipeline, repoUser, repo, forgeYamlConfigs, nil)
if pipeline_errors.HasBlockingErrors(parseErr) {
currentPipeline, pipelineItems, parseErr, err := createPipelineItems(ctx, _forge, _store, pipeline, repoUser, repo, forgeYamlConfigs, nil, false)
*pipeline = *currentPipeline
if handleParseErrors(pipeline, parseErr) {
log.Debug().Str("repo", repo.FullName).Err(parseErr).Msg("failed to parse yaml")
return pipeline, updatePipelineWithErr(ctx, _forge, _store, pipeline, repo, repoUser, parseErr)
} else if parseErr != nil {
pipeline.Errors = pipeline_errors.GetPipelineErrors(parseErr)
}
if err != nil {
return nil, fmt.Errorf("createPipelineItems failed: %w", err)
}

if len(pipelineItems) == 0 {
Expand All @@ -111,12 +112,6 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline
return nil, ErrFiltered
}

enrichPipelineItemSteps(pipelineItems, repo)
pipeline, err = saveWorkflowsFromPipelineBuilder(_store, pipeline, pipelineItems)
if err != nil {
return nil, fmt.Errorf("saveWorkflowsFromPipelineBuilder failed: %w", err)
}

// persist the pipeline config for historical correctness, restarts, etc
var configs []*model.Config
for _, forgeYamlConfig := range forgeYamlConfigs {
Expand Down
112 changes: 75 additions & 37 deletions server/pipeline/items.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,31 +155,52 @@ func parsePipeline(ctx context.Context, forge forge.Forge, store store.Store, cu
return b.Build()
}

func createPipelineItems(c context.Context, forge forge.Forge, store store.Store,
// handleParseErrors classifies the error returned by parsePipeline. Blocking
// errors abort the run, so true is returned and the caller decides how to
// report and persist the failure. Non-blocking errors are recorded on the
// pipeline so they surface to the user without stopping the run.
func handleParseErrors(pipeline *model.Pipeline, parseErr error) (blocking bool) {
if pipeline_errors.HasBlockingErrors(parseErr) {
return true
}
if parseErr != nil {
pipeline.Errors = pipeline_errors.GetPipelineErrors(parseErr)
}
return false
}

// The createPipelineItems parses the pipeline config and persists the resulting
// workflows. It is the shared core of Create, Approve, and Restart.
//
// Returns two errors: parseErr carries pipeline config diagnostics, which
// callers classify with handleParseErrors and report as a blocking failure in
// their own way. The second error, err, signals a hard failure (e.g. persisting
// workflows) that always aborts the run. When the pipeline already has
// persisted workflows (a gated pipeline being approved), setting replaceExisting
// swaps them out for the freshly built ones.
func createPipelineItems(ctx context.Context, forge forge.Forge, store store.Store,
currentPipeline *model.Pipeline, user *model.User, repo *model.Repo,
yamls []*forge_types.FileMeta, envs map[string]string,
) (*model.Pipeline, []*builder.Item, error) {
pipelineItems, err := parsePipeline(c, forge, store, currentPipeline, user, repo, yamls, envs)
if pipeline_errors.HasBlockingErrors(err) {
currentPipeline, uErr := UpdateToStatusError(store, *currentPipeline, err)
if uErr != nil {
log.Error().Err(uErr).Msgf("error setting error status of pipeline for %s#%d", repo.FullName, currentPipeline.Number)
} else {
updatePipelineStatus(c, forge, currentPipeline, repo, user)
}
yamls []*forge_types.FileMeta, envs map[string]string, replaceExisting bool,
) (pipeline *model.Pipeline, items []*builder.Item, parseErr, err error) {
pipelineItems, parseErr := parsePipeline(ctx, forge, store, currentPipeline, user, repo, yamls, envs)
if pipeline_errors.HasBlockingErrors(parseErr) {
return currentPipeline, nil, parseErr, nil
}

return currentPipeline, nil, err
} else if err != nil {
currentPipeline.Errors = pipeline_errors.GetPipelineErrors(err)
if err := updatePipelinePending(c, forge, store, currentPipeline, repo, user); err != nil {
return nil, nil, err
}
// An empty pipeline (e.g. everything filtered out) has no workflows to
// persist. Return early so the caller can filter it without us touching
// the store.
if len(pipelineItems) == 0 {
return currentPipeline, pipelineItems, parseErr, nil
}

enrichPipelineItemSteps(pipelineItems, repo)
currentPipeline, err = saveWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems)
currentPipeline, err = saveWorkflowsFromPipelineBuilder(store, currentPipeline, pipelineItems, replaceExisting)
if err != nil {
return currentPipeline, nil, parseErr, err
}

return currentPipeline, pipelineItems, err
return currentPipeline, pipelineItems, parseErr, nil
}

// enrichPipelineItemSteps stamps server-side fields onto the backend step
Expand All @@ -199,23 +220,43 @@ func enrichPipelineItemSteps(items []*builder.Item, repo *model.Repo) {
}
}

// saveWorkflowsFromPipelineBuilder is the link between pipeline representation in "pipeline package" and server
// to be specific this func currently is used to convert the pipeline.Item list (crafted by PipelineBuilder.Build()) into
// a pipeline that can be stored in the database by the server and save converted workflows.
func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item) (*model.Pipeline, error) {
// saveWorkflowsFromPipelineBuilder converts the pipeline.Item list crafted by
// PipelineBuilder.Build() into model workflows and persists them.
//
// A freshly created pipeline has no workflows yet, so they are inserted. A
// gated pipeline already persisted its workflows when it was created, so on
// approval the stored workflows must be swapped for the freshly built ones:
// pass replaceExisting to delete the old workflows and steps before inserting.
func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipeline, pipelineItems []*builder.Item, replaceExisting bool) (*model.Pipeline, error) {
if pipeline.Workflows != nil && !replaceExisting {
return nil, errors.New("cannot save new workflows from pipeline builder: pipeline already has workflows loaded")
}

workflows := workflowsFromPipelineBuilder(pipeline, pipelineItems)

if replaceExisting {
if err := store.WorkflowsReplace(pipeline, workflows); err != nil {
return nil, err
}
} else if err := store.WorkflowsCreate(workflows); err != nil {
return nil, err
}

pipeline.Workflows = workflows
setPipelineItemWorkflowIDs(pipelineItems, pipeline.Workflows)

return pipeline, nil
}

func workflowsFromPipelineBuilder(pipeline *model.Pipeline, pipelineItems []*builder.Item) []*model.Workflow {
var pidSequence int
for _, item := range pipelineItems {
if pidSequence < item.Workflow.PID {
pidSequence = item.Workflow.PID
}
}

// The workflows in the pipeline should be empty, only we populate them.
// But if a pipeline was already loaded from the database and contains workflows,
// we error out to prevent harm.
if pipeline.Workflows != nil {
return nil, errors.New("cannot save new workflows from pipeline builder: pipeline already has workflows loaded")
}
workflows := make([]*model.Workflow, 0, len(pipelineItems))

for _, item := range pipelineItems {
workflow := &model.Workflow{
Expand Down Expand Up @@ -254,17 +295,14 @@ func saveWorkflowsFromPipelineBuilder(store store.Store, pipeline *model.Pipelin
}
}

pipeline.Workflows = append(pipeline.Workflows, workflow)
workflows = append(workflows, workflow)
}

if err := store.WorkflowsCreate(pipeline.Workflows); err != nil {
return nil, err
}
return workflows
}

// now thread IDs back to the builder items
for i, wf := range pipeline.Workflows {
func setPipelineItemWorkflowIDs(pipelineItems []*builder.Item, workflows []*model.Workflow) {
for i, wf := range workflows {
pipelineItems[i].Workflow.ID = wf.ID
}

return pipeline, nil
}
45 changes: 44 additions & 1 deletion server/pipeline/items_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestSetPipelineStepsOnPipeline(t *testing.T) {
s := store_mocks.NewMockStore(t)
s.On("WorkflowsCreate", mock.Anything).Return(nil)

pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems)
pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems, false)
require.NoError(t, err)
if len(pipeline.Workflows) != 1 {
t.Fatal("Should generate three in total")
Expand All @@ -82,6 +82,49 @@ func TestSetPipelineStepsOnPipeline(t *testing.T) {
}
}

func TestSaveWorkflowsReplaceExisting(t *testing.T) {
t.Parallel()

pipeline := &model.Pipeline{
ID: 1,
Event: model.EventPush,
// a gated pipeline already carries persisted workflows on approval
Workflows: []*model.Workflow{{ID: 99, PID: 1}},
}

pipelineItems := []*builder.Item{{
Workflow: &builder.Workflow{ID: 1, PID: 1},
Config: &backend_types.Config{
Stages: []*backend_types.Stage{
{Steps: []*backend_types.Step{{Name: "clone"}}},
},
},
}}

s := store_mocks.NewMockStore(t)
s.On("WorkflowsReplace", mock.Anything, mock.Anything).Return(nil)

pipeline, err := saveWorkflowsFromPipelineBuilder(s, pipeline, pipelineItems, true)
require.NoError(t, err)
assert.Len(t, pipeline.Workflows, 1)
assert.Equal(t, int64(1), pipeline.Workflows[0].PipelineID)
}

func TestSaveWorkflowsRejectsExistingWithoutReplace(t *testing.T) {
t.Parallel()

pipeline := &model.Pipeline{
ID: 1,
Event: model.EventPush,
Workflows: []*model.Workflow{{ID: 99, PID: 1}},
}

s := store_mocks.NewMockStore(t)

_, err := saveWorkflowsFromPipelineBuilder(s, pipeline, nil, false)
require.Error(t, err)
}

func TestParsePipeline(t *testing.T) {
t.Parallel()

Expand Down
Loading