From cc01d8c5edb463578792582cce5a7d196f770086 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 6 Apr 2026 20:31:41 +0200 Subject: [PATCH 01/12] Canceled workflows still get step updates to mark running steps done --- server/rpc/rpc.go | 3 --- server/rpc/sanitize.go | 7 +++++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 0bf0e2c63d7..df7c3478230 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -200,9 +200,6 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat } // sanitize agent input - if err := checkPipelineState(currentPipeline); err != nil { - return err - } if err := checkWorkflowStepStates(workflow, step); err != nil { return err } diff --git a/server/rpc/sanitize.go b/server/rpc/sanitize.go index 84af183a8df..67872d90b95 100644 --- a/server/rpc/sanitize.go +++ b/server/rpc/sanitize.go @@ -102,6 +102,13 @@ func checkWorkflowStepStates(currWorkflow *model.Workflow, currStep *model.Step) case model.StatusBlocked: err = ErrAgentIllegalWorkflowRun + case model.StatusCanceled: + if currStep.State != model.StatusCanceled && + currStep.State != model.StatusKilled && + currStep.State != model.StatusSkipped { + err = ErrAgentIllegalWorkflowReRunStateChange + } + default: err = ErrAgentIllegalWorkflowReRunStateChange } From bf15b5cc21d038a9c909cbedae4e4378e7101576 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 6 Apr 2026 20:43:38 +0200 Subject: [PATCH 02/12] Canceled pipelines still get workflow updates to mark as done --- server/rpc/rpc.go | 4 ++-- server/rpc/sanitize.go | 11 ++++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index df7c3478230..501d306b611 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -258,7 +258,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt } // sanitize agent input - if err := checkPipelineState(currentPipeline); err != nil { + if err := checkPipelineState(currentPipeline, workflow); err != nil { return err } if err := checkWorkflowStepStates(workflow, nil); err != nil { @@ -331,7 +331,7 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt } // sanitize agent input - if err := checkPipelineState(currentPipeline); err != nil { + if err := checkPipelineState(currentPipeline, workflow); err != nil { return err } if err := checkWorkflowStepStates(workflow, nil); err != nil { diff --git a/server/rpc/sanitize.go b/server/rpc/sanitize.go index 67872d90b95..33e9a3a3d8f 100644 --- a/server/rpc/sanitize.go +++ b/server/rpc/sanitize.go @@ -68,7 +68,7 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age // checkPipelineState checks if an agent is allowed to change/update a workflow/pipeline state // by the state the parent pipeline is in. -func checkPipelineState(currPipeline *model.Pipeline) (err error) { +func checkPipelineState(currPipeline *model.Pipeline, currWorkflow *model.Workflow) (err error) { // check if pipeline was already run and marked finished or is blocked switch currPipeline.Status { case model.StatusCreated, @@ -79,6 +79,15 @@ func checkPipelineState(currPipeline *model.Pipeline) (err error) { case model.StatusBlocked: err = ErrAgentIllegalPipelineWorkflowRun + case model.StatusCanceled, + model.StatusFailure, + model.StatusKilled: + if currWorkflow.State != model.StatusCanceled && + currWorkflow.State != model.StatusKilled && + currWorkflow.State != model.StatusSkipped { + err = ErrAgentIllegalPipelineWorkflowReRunStateChange + } + default: err = ErrAgentIllegalPipelineWorkflowReRunStateChange } From 87e9c26e8cec97cbcafa0bd0d6922fba55565079 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 6 Apr 2026 21:00:13 +0200 Subject: [PATCH 03/12] fix tests --- server/rpc/rpc_integration_test.go | 12 +- server/rpc/sanitize_test.go | 177 +++++++++++++++++++++++------ 2 files changed, 148 insertions(+), 41 deletions(-) diff --git a/server/rpc/rpc_integration_test.go b/server/rpc/rpc_integration_test.go index f05204eca66..d8c0efca359 100644 --- a/server/rpc/rpc_integration_test.go +++ b/server/rpc/rpc_integration_test.go @@ -148,7 +148,7 @@ func TestRPCUpdate(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() pipeline := defaultPipeline(model.StatusSuccess) - workflow := defaultWorkflow(model.StatusRunning) + workflow := defaultWorkflow(model.StatusSuccess) step := defaultStep(model.StatusRunning) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) @@ -161,14 +161,14 @@ func TestRPCUpdate(t *testing.T) { ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) t.Run("reject pipeline already failed", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() pipeline := defaultPipeline(model.StatusFailure) - workflow := defaultWorkflow(model.StatusRunning) + workflow := defaultWorkflow(model.StatusFailure) step := defaultStep(model.StatusRunning) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) @@ -181,14 +181,14 @@ func TestRPCUpdate(t *testing.T) { ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) t.Run("reject pipeline blocked", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() pipeline := defaultPipeline(model.StatusBlocked) - workflow := defaultWorkflow(model.StatusRunning) + workflow := defaultWorkflow(model.StatusBlocked) step := defaultStep(model.StatusRunning) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) @@ -201,7 +201,7 @@ func TestRPCUpdate(t *testing.T) { ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowRun) }) t.Run("reject workflow already finished", func(t *testing.T) { diff --git a/server/rpc/sanitize_test.go b/server/rpc/sanitize_test.go index 6791bbf25d0..74686c74d5f 100644 --- a/server/rpc/sanitize_test.go +++ b/server/rpc/sanitize_test.go @@ -28,60 +28,143 @@ func TestCheckPipelineState(t *testing.T) { t.Parallel() tests := []struct { - name string - status model.StatusValue - wantErr error - expectNil bool + name string + status model.StatusValue + workflowState model.StatusValue + wantErr error + expectNil bool }{ { - name: "created is allowed", - status: model.StatusCreated, - expectNil: true, + name: "created is allowed", + status: model.StatusCreated, + workflowState: model.StatusRunning, + expectNil: true, }, { - name: "pending is allowed", - status: model.StatusPending, - expectNil: true, + name: "pending is allowed", + status: model.StatusPending, + workflowState: model.StatusRunning, + expectNil: true, }, { - name: "running is allowed", - status: model.StatusRunning, - expectNil: true, + name: "running is allowed", + status: model.StatusRunning, + workflowState: model.StatusRunning, + expectNil: true, }, { - name: "blocked is rejected", - status: model.StatusBlocked, - wantErr: ErrAgentIllegalPipelineWorkflowRun, + name: "blocked is rejected", + status: model.StatusBlocked, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowRun, }, { - name: "success is rejected as re-run", - status: model.StatusSuccess, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + name: "success is rejected as re-run", + status: model.StatusSuccess, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, }, { - name: "failure is rejected as re-run", - status: model.StatusFailure, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + name: "failure is rejected as re-run", + status: model.StatusFailure, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, }, { - name: "killed is rejected as re-run", - status: model.StatusKilled, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + name: "killed is rejected as re-run", + status: model.StatusKilled, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, }, { - name: "error is rejected as re-run", - status: model.StatusError, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + name: "error is rejected as re-run", + status: model.StatusError, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, }, { - name: "skipped is rejected as re-run", - status: model.StatusSkipped, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + name: "skipped is rejected as re-run", + status: model.StatusSkipped, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, }, { - name: "declined is rejected as re-run", - status: model.StatusDeclined, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + name: "declined is rejected as re-run", + status: model.StatusDeclined, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + }, + { + name: "pipeline canceled, workflow canceled → allowed", + status: model.StatusCanceled, + workflowState: model.StatusCanceled, + expectNil: true, + }, + { + name: "pipeline canceled, workflow killed → allowed", + status: model.StatusCanceled, + workflowState: model.StatusKilled, + expectNil: true, + }, + { + name: "pipeline canceled, workflow skipped → allowed", + status: model.StatusCanceled, + workflowState: model.StatusSkipped, + expectNil: true, + }, + { + name: "pipeline canceled, workflow running → rejected", + status: model.StatusCanceled, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + }, + { + name: "pipeline canceled, workflow success → rejected", + status: model.StatusCanceled, + workflowState: model.StatusSuccess, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + }, + { + name: "pipeline failure, workflow canceled → allowed", + status: model.StatusFailure, + workflowState: model.StatusCanceled, + expectNil: true, + }, + { + name: "pipeline failure, workflow killed → allowed", + status: model.StatusFailure, + workflowState: model.StatusKilled, + expectNil: true, + }, + { + name: "pipeline failure, workflow skipped → allowed", + status: model.StatusFailure, + workflowState: model.StatusSkipped, + expectNil: true, + }, + { + name: "pipeline killed, workflow canceled → allowed", + status: model.StatusKilled, + workflowState: model.StatusCanceled, + expectNil: true, + }, + { + name: "pipeline killed, workflow killed → allowed", + status: model.StatusKilled, + workflowState: model.StatusKilled, + expectNil: true, + }, + { + name: "pipeline killed, workflow skipped → allowed", + status: model.StatusKilled, + workflowState: model.StatusSkipped, + expectNil: true, + }, + { + name: "pipeline killed, workflow running → rejected", + status: model.StatusKilled, + workflowState: model.StatusRunning, + wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, }, } @@ -90,7 +173,8 @@ func TestCheckPipelineState(t *testing.T) { t.Parallel() pipeline := &model.Pipeline{Status: tt.status} - err := checkPipelineState(pipeline) + workflow := &model.Workflow{State: tt.workflowState} + err := checkPipelineState(pipeline, workflow) if tt.expectNil { assert.NoError(t, err) @@ -246,6 +330,29 @@ func TestCheckWorkflowStepStates(t *testing.T) { assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) }) + + t.Run("workflow canceled, step success → rejected", func(t *testing.T) { + t.Parallel() + + workflow := &model.Workflow{State: model.StatusCanceled} + step := &model.Step{State: model.StatusSuccess} + err := checkWorkflowStepStates(workflow, step) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) + }) + + t.Run("workflow canceled, nil step → rejects (workflow default branch)", func(t *testing.T) { + t.Parallel() + + // checkWorkflowStepStates is never called with (canceled-workflow, nil-step) + // in production (Update always has a step, Init/Done pass nil only when + // workflow is checked alone). When step is nil the canceled branch would + // panic, so production code always supplies a non-nil step for Update. + // Test the observable behavior: a non-exempt step state triggers rejection. + workflow := &model.Workflow{State: model.StatusCanceled} + step := &model.Step{State: model.StatusRunning} // non-exempt → rejected + err := checkWorkflowStepStates(workflow, step) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) + }) } // AllowAppendingLogs — updated for the new (pipeline, step) signature From 0d29e13f507ab5b65e5edc64a6544d9f5bf9d208 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 13:01:47 +0200 Subject: [PATCH 04/12] almost finished --- server/rpc/errors.go | 5 +- server/rpc/rpc.go | 15 +- server/rpc/rpc_integration_test.go | 47 +-- server/rpc/sanitize.go | 85 ++-- server/rpc/sanitize_test.go | 635 ++++++++--------------------- 5 files changed, 230 insertions(+), 557 deletions(-) diff --git a/server/rpc/errors.go b/server/rpc/errors.go index 2065904a778..f8f0369105f 100644 --- a/server/rpc/errors.go +++ b/server/rpc/errors.go @@ -23,8 +23,7 @@ var ( ErrAgentIllegalWorkflowReRunStateChange = errors.New("workflow was already marked as finished") ErrAgentIllegalWorkflowRun = errors.New("workflow is currently in blocked state") - ErrAgentIllegalStepReRunStateChange = errors.New("step was already marked as finished") - ErrAgentIllegalStepRun = errors.New("step is currently in blocked state") - ErrAgentIllegalLogStreaming = errors.New("agent can not append logs to a step that is marked not running") + + ErrAgentIllegalRepo = errors.New("agent is not allowed to interact with repo") ) diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 501d306b611..11ada50b4a1 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -200,7 +200,10 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat } // sanitize agent input - if err := checkWorkflowStepStates(workflow, step); err != nil { + if err := checkParentState(currentPipeline.Status, step.State, true); err != nil { + return err + } + if err := checkParentState(workflow.State, step.State, true); err != nil { return err } @@ -258,10 +261,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt } // sanitize agent input - if err := checkPipelineState(currentPipeline, workflow); err != nil { - return err - } - if err := checkWorkflowStepStates(workflow, nil); err != nil { + if err := checkParentState(currentPipeline.Status, workflow.State, false); err != nil { return err } @@ -331,10 +331,7 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt } // sanitize agent input - if err := checkPipelineState(currentPipeline, workflow); err != nil { - return err - } - if err := checkWorkflowStepStates(workflow, nil); err != nil { + if err := checkParentState(currentPipeline.Status, workflow.State, false); err != nil { return err } diff --git a/server/rpc/rpc_integration_test.go b/server/rpc/rpc_integration_test.go index d8c0efca359..349dcb641ba 100644 --- a/server/rpc/rpc_integration_test.go +++ b/server/rpc/rpc_integration_test.go @@ -224,26 +224,6 @@ func TestRPCUpdate(t *testing.T) { assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) - t.Run("reject step already finished", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusRunning) - workflow := defaultWorkflow(model.StatusRunning) - step := defaultStep(model.StatusSuccess) // finished - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) - }) - t.Run("reject step belongs to different pipeline", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() @@ -446,6 +426,15 @@ func TestRPCInit(t *testing.T) { mockStore.On("AgentFind", int64(1)).Return(agent, nil) mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) + // Init calls updateForgeStatus (→ GetUser) before UpdateWorkflowStatusToRunning + // checks the workflow state. UpdateWorkflowStatusToRunning itself calls + // WorkflowUpdate, and the deferred notify calls WorkflowGetTree — all of + // these execute even though the function ultimately returns an error. + mockStore.On("GetUser", mock.Anything).Return(nil, errors.New("user not found")) + mockStore.On("WorkflowUpdate", mock.Anything).Return(nil) + mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) + // updateAgentLastWork -> AgentUpdate (agent.LastWork is 0, so it always updates) + mockStore.On("AgentUpdate", mock.Anything).Return(nil) rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) @@ -464,6 +453,13 @@ func TestRPCInit(t *testing.T) { mockStore.On("AgentFind", int64(1)).Return(agent, nil) mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) + // Same as above: updateForgeStatus, WorkflowUpdate, and the deferred + // WorkflowGetTree all run before the error surfaces to the caller. + mockStore.On("GetUser", mock.Anything).Return(nil, errors.New("user not found")) + mockStore.On("WorkflowUpdate", mock.Anything).Return(nil) + mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) + // updateAgentLastWork -> AgentUpdate (agent.LastWork is 0, so it always updates) + mockStore.On("AgentUpdate", mock.Anything).Return(nil) rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) @@ -604,6 +600,11 @@ func TestRPCDone(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) mockStore.On("AgentFind", int64(1)).Return(agent, nil) + // checkParentState passes (pipeline=running), so UpdateWorkflowStatusToDone + // runs (→ WorkflowUpdate) and Done then calls WorkflowGetTree before the + // error from UpdateWorkflowStatusToDone propagates to the caller. + mockStore.On("WorkflowUpdate", mock.Anything).Return(nil) + mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) @@ -769,10 +770,6 @@ func TestRPCLog(t *testing.T) { }) t.Run("reject: pipeline finished stale and step not running", func(t *testing.T) { - // This replaces the old "reject pipeline already finished" test. - // Previously the rejection came from checkPipelineState returning - // ErrAgentIllegalPipelineWorkflowReRunStateChange. - // Now it comes from allowAppendingLogs returning ErrAgentIllegalLogStreaming. mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() pipeline := stalePipeline(model.StatusSuccess) @@ -792,8 +789,6 @@ func TestRPCLog(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "can not alter logs") assert.ErrorIs(t, err, ErrAgentIllegalLogStreaming) - // The old error is no longer returned from Log() — allowAppendingLogs - // now handles the pipeline-finished case itself. assert.False(t, errors.Is(err, ErrAgentIllegalPipelineWorkflowReRunStateChange)) }) diff --git a/server/rpc/sanitize.go b/server/rpc/sanitize.go index 33e9a3a3d8f..01f8b302072 100644 --- a/server/rpc/sanitize.go +++ b/server/rpc/sanitize.go @@ -16,7 +16,6 @@ package rpc import ( "context" - "errors" "fmt" "strconv" "time" @@ -61,80 +60,52 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age return nil } - msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo[%d] '%s'", agent.ID, repo.ID, repo.FullName) - log.Error().Int64("repoId", repo.ID).Msg(msg) - return errors.New(msg) + log.Error().Err(ErrAgentIllegalRepo).Int64("agentID", agent.ID).Int64("repoId", repo.ID).Send() + return fmt.Errorf("%w: agentId=%d repoID=%d", ErrAgentIllegalRepo, agent.ID, repo.ID) } -// checkPipelineState checks if an agent is allowed to change/update a workflow/pipeline state -// by the state the parent pipeline is in. -func checkPipelineState(currPipeline *model.Pipeline, currWorkflow *model.Workflow) (err error) { +// checkParentState checks if an agent is allowed to change/update a workflow/step state +// by the state the parent pipeline/workflow. +func checkParentState(parentState, childState model.StatusValue, isStep bool) (err error) { // check if pipeline was already run and marked finished or is blocked - switch currPipeline.Status { + switch parentState { case model.StatusCreated, model.StatusPending, model.StatusRunning: - break + return nil case model.StatusBlocked: - err = ErrAgentIllegalPipelineWorkflowRun + if isStep { + err = ErrAgentIllegalWorkflowRun + } else { + err = ErrAgentIllegalPipelineWorkflowRun + } case model.StatusCanceled, model.StatusFailure, model.StatusKilled: - if currWorkflow.State != model.StatusCanceled && - currWorkflow.State != model.StatusKilled && - currWorkflow.State != model.StatusSkipped { - err = ErrAgentIllegalPipelineWorkflowReRunStateChange - } - - default: - err = ErrAgentIllegalPipelineWorkflowReRunStateChange - } - - if err != nil { - log.Error().Err(err).Msg("caught agent performing illegal instruction") - } - return err -} -// checkWorkflowStepStates checks if a workflow/step state or its logs can be altered -// depending on what state the workflow and step currently is in. -func checkWorkflowStepStates(currWorkflow *model.Workflow, currStep *model.Step) (err error) { - if currWorkflow != nil { - switch currWorkflow.State { - case model.StatusCreated, - model.StatusPending, - model.StatusRunning: - break - - case model.StatusBlocked: - err = ErrAgentIllegalWorkflowRun + switch childState { + case model.StatusCanceled, + model.StatusKilled, + model.StatusSkipped, + model.StatusFailure, + model.StatusSuccess: + return nil - case model.StatusCanceled: - if currStep.State != model.StatusCanceled && - currStep.State != model.StatusKilled && - currStep.State != model.StatusSkipped { + default: + if isStep { err = ErrAgentIllegalWorkflowReRunStateChange + } else { + err = ErrAgentIllegalPipelineWorkflowReRunStateChange } - - default: - err = ErrAgentIllegalWorkflowReRunStateChange } - } - if currStep != nil { - switch currStep.State { - case model.StatusCreated, - model.StatusPending, - model.StatusRunning: - break - - case model.StatusBlocked: - err = errors.Join(err, ErrAgentIllegalStepRun) - - default: - err = errors.Join(err, ErrAgentIllegalStepReRunStateChange) + default: + if isStep { + err = ErrAgentIllegalWorkflowReRunStateChange + } else { + err = ErrAgentIllegalPipelineWorkflowReRunStateChange } } diff --git a/server/rpc/sanitize_test.go b/server/rpc/sanitize_test.go index 74686c74d5f..cb86fab2cb1 100644 --- a/server/rpc/sanitize_test.go +++ b/server/rpc/sanitize_test.go @@ -15,7 +15,7 @@ package rpc import ( - "errors" + "fmt" "testing" "time" @@ -24,517 +24,228 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/model" ) -func TestCheckPipelineState(t *testing.T) { +// TestCheckParentState covers checkParentState for both pipeline-level +// (isStep=false) and workflow-level (isStep=true) checks in a single +// table-driven test, eliminating duplicate cases shared by the two levels. +func TestCheckParentState(t *testing.T) { t.Parallel() - tests := []struct { - name string - status model.StatusValue - workflowState model.StatusValue - wantErr error - expectNil bool - }{ - { - name: "created is allowed", - status: model.StatusCreated, - workflowState: model.StatusRunning, - expectNil: true, - }, - { - name: "pending is allowed", - status: model.StatusPending, - workflowState: model.StatusRunning, - expectNil: true, - }, - { - name: "running is allowed", - status: model.StatusRunning, - workflowState: model.StatusRunning, - expectNil: true, - }, - { - name: "blocked is rejected", - status: model.StatusBlocked, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowRun, - }, - { - name: "success is rejected as re-run", - status: model.StatusSuccess, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "failure is rejected as re-run", - status: model.StatusFailure, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "killed is rejected as re-run", - status: model.StatusKilled, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "error is rejected as re-run", - status: model.StatusError, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "skipped is rejected as re-run", - status: model.StatusSkipped, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "declined is rejected as re-run", - status: model.StatusDeclined, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "pipeline canceled, workflow canceled → allowed", - status: model.StatusCanceled, - workflowState: model.StatusCanceled, - expectNil: true, - }, - { - name: "pipeline canceled, workflow killed → allowed", - status: model.StatusCanceled, - workflowState: model.StatusKilled, - expectNil: true, - }, - { - name: "pipeline canceled, workflow skipped → allowed", - status: model.StatusCanceled, - workflowState: model.StatusSkipped, - expectNil: true, - }, - { - name: "pipeline canceled, workflow running → rejected", - status: model.StatusCanceled, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "pipeline canceled, workflow success → rejected", - status: model.StatusCanceled, - workflowState: model.StatusSuccess, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - }, - { - name: "pipeline failure, workflow canceled → allowed", - status: model.StatusFailure, - workflowState: model.StatusCanceled, - expectNil: true, - }, - { - name: "pipeline failure, workflow killed → allowed", - status: model.StatusFailure, - workflowState: model.StatusKilled, - expectNil: true, - }, - { - name: "pipeline failure, workflow skipped → allowed", - status: model.StatusFailure, - workflowState: model.StatusSkipped, - expectNil: true, - }, - { - name: "pipeline killed, workflow canceled → allowed", - status: model.StatusKilled, - workflowState: model.StatusCanceled, - expectNil: true, - }, - { - name: "pipeline killed, workflow killed → allowed", - status: model.StatusKilled, - workflowState: model.StatusKilled, - expectNil: true, - }, + // States that always allow a child to proceed, regardless of level. + allowedParents := []model.StatusValue{ + model.StatusCreated, + model.StatusPending, + model.StatusRunning, + } + + // Terminal parent states that reject a running child as an illegal re-run. + terminalParents := []model.StatusValue{ + model.StatusSuccess, + model.StatusFailure, + model.StatusKilled, + model.StatusError, + model.StatusSkipped, + } + + // Parents whose terminal children are exempt (allowed through). + exemptParents := []model.StatusValue{ + model.StatusCanceled, + model.StatusFailure, + model.StatusKilled, + } + + // Child states considered exempt under a terminal/canceled parent. + exemptChildren := []model.StatusValue{ + model.StatusCanceled, + model.StatusKilled, + model.StatusSkipped, + } + + // Error sentinels per level. + type levelConfig struct { + isStep bool + blockedErr error + reRunErr error + extraExempt []model.StatusValue // additional exempt child states beyond the shared set + extraRejects []model.StatusValue // additional terminal parents beyond the shared set + } + + levels := []levelConfig{ { - name: "pipeline killed, workflow skipped → allowed", - status: model.StatusKilled, - workflowState: model.StatusSkipped, - expectNil: true, + isStep: false, + blockedErr: ErrAgentIllegalPipelineWorkflowRun, + reRunErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + extraExempt: []model.StatusValue{model.StatusFailure, model.StatusSuccess}, + extraRejects: []model.StatusValue{model.StatusDeclined}, }, { - name: "pipeline killed, workflow running → rejected", - status: model.StatusKilled, - workflowState: model.StatusRunning, - wantErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, + isStep: true, + blockedErr: ErrAgentIllegalWorkflowRun, + reRunErr: ErrAgentIllegalWorkflowReRunStateChange, + extraExempt: []model.StatusValue{model.StatusFailure, model.StatusSuccess}, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for _, lc := range levels { + label := "pipeline" + if lc.isStep { + label = "step" + } + + t.Run(label, func(t *testing.T) { t.Parallel() - pipeline := &model.Pipeline{Status: tt.status} - workflow := &model.Workflow{State: tt.workflowState} - err := checkPipelineState(pipeline, workflow) + // Allowed parent states. + for _, ps := range allowedParents { + t.Run(fmt.Sprintf("%s allows", ps), func(t *testing.T) { + t.Parallel() + assert.NoError(t, checkParentState(ps, model.StatusRunning, lc.isStep)) + }) + } + + // Blocked parent. + t.Run("blocked rejects", func(t *testing.T) { + t.Parallel() + assert.ErrorIs(t, checkParentState(model.StatusBlocked, model.StatusRunning, lc.isStep), lc.blockedErr) + }) - if tt.expectNil { - assert.NoError(t, err) - } else { - assert.ErrorIs(t, err, tt.wantErr) + // Terminal parents reject a running child. + allTerminal := append(terminalParents, lc.extraRejects...) + for _, ps := range allTerminal { + t.Run(fmt.Sprintf("%s running child rejected", ps), func(t *testing.T) { + t.Parallel() + assert.ErrorIs(t, checkParentState(ps, model.StatusRunning, lc.isStep), lc.reRunErr) + }) + } + + // Canceled parent with running child is also rejected. + t.Run("canceled running child rejected", func(t *testing.T) { + t.Parallel() + assert.ErrorIs(t, checkParentState(model.StatusCanceled, model.StatusRunning, lc.isStep), lc.reRunErr) + }) + + // Exempt parent + exempt child combinations → allowed. + allExemptChildren := append(exemptChildren, lc.extraExempt...) + for _, ps := range exemptParents { + for _, cs := range allExemptChildren { + t.Run(fmt.Sprintf("%s parent %s child allowed", ps, cs), func(t *testing.T) { + t.Parallel() + assert.NoError(t, checkParentState(ps, cs, lc.isStep)) + }) + } } }) } } -func TestCheckWorkflowStepStates(t *testing.T) { +func TestAllowAppendingLogs(t *testing.T) { t.Parallel() - t.Run("workflow only", func(t *testing.T) { + recentFinish := time.Now().Add(-30 * time.Second).Unix() + staleFinish := time.Now().Add(-10 * time.Minute).Unix() + + // Step running always allows logs, regardless of pipeline state or age. + t.Run("step running always allowed", func(t *testing.T) { t.Parallel() - tests := []struct { - name string - state model.StatusValue - wantErr error + for _, tc := range []struct { + name string + status model.StatusValue + finish int64 }{ - {"created allows", model.StatusCreated, nil}, - {"pending allows", model.StatusPending, nil}, - {"running allows", model.StatusRunning, nil}, - {"blocked rejects", model.StatusBlocked, ErrAgentIllegalWorkflowRun}, - {"success rejects", model.StatusSuccess, ErrAgentIllegalWorkflowReRunStateChange}, - {"failure rejects", model.StatusFailure, ErrAgentIllegalWorkflowReRunStateChange}, - {"killed rejects", model.StatusKilled, ErrAgentIllegalWorkflowReRunStateChange}, - {"error rejects", model.StatusError, ErrAgentIllegalWorkflowReRunStateChange}, - {"skipped rejects", model.StatusSkipped, ErrAgentIllegalWorkflowReRunStateChange}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + {"pipeline running", model.StatusRunning, 0}, + {"pipeline success stale", model.StatusSuccess, staleFinish}, + {"pipeline failure stale", model.StatusFailure, staleFinish}, + {"pipeline killed stale", model.StatusKilled, staleFinish}, + } { + t.Run(tc.name, func(t *testing.T) { t.Parallel() - - workflow := &model.Workflow{State: tt.state} - err := checkWorkflowStepStates(workflow, nil) - - if tt.wantErr == nil { - assert.NoError(t, err) - } else { - assert.ErrorIs(t, err, tt.wantErr) - } + p := &model.Pipeline{Status: tc.status, Finished: tc.finish} + assert.NoError(t, allowAppendingLogs(p, &model.Step{State: model.StatusRunning})) }) } }) - t.Run("step only (nil workflow)", func(t *testing.T) { + // Pipeline running allows logs for any step state. + t.Run("pipeline running any step allowed", func(t *testing.T) { t.Parallel() - tests := []struct { - name string - state model.StatusValue - wantErr error - }{ - {"created allows", model.StatusCreated, nil}, - {"pending allows", model.StatusPending, nil}, - {"running allows", model.StatusRunning, nil}, - {"blocked rejects", model.StatusBlocked, ErrAgentIllegalStepRun}, - {"success rejects", model.StatusSuccess, ErrAgentIllegalStepReRunStateChange}, - {"failure rejects", model.StatusFailure, ErrAgentIllegalStepReRunStateChange}, - {"killed rejects", model.StatusKilled, ErrAgentIllegalStepReRunStateChange}, - {"error rejects", model.StatusError, ErrAgentIllegalStepReRunStateChange}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for _, ss := range []model.StatusValue{ + model.StatusSuccess, model.StatusFailure, model.StatusPending, model.StatusKilled, + } { + t.Run(string(ss), func(t *testing.T) { t.Parallel() - - step := &model.Step{State: tt.state} - err := checkWorkflowStepStates(nil, step) - - if tt.wantErr == nil { - assert.NoError(t, err) - } else { - assert.ErrorIs(t, err, tt.wantErr) - } + p := &model.Pipeline{Status: model.StatusRunning} + assert.NoError(t, allowAppendingLogs(p, &model.Step{State: ss})) }) } }) - t.Run("nil workflow and nil step", func(t *testing.T) { - t.Parallel() - - assert.NoError(t, checkWorkflowStepStates(nil, nil)) - }) - - t.Run("workflow running, step running", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusRunning} - step := &model.Step{State: model.StatusRunning} - assert.NoError(t, checkWorkflowStepStates(workflow, step)) - }) - - t.Run("workflow running, step finished", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusRunning} - step := &model.Step{State: model.StatusSuccess} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) - // should not contain workflow error - assert.False(t, errors.Is(err, ErrAgentIllegalWorkflowReRunStateChange)) - }) - - t.Run("workflow running, step blocked", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusRunning} - step := &model.Step{State: model.StatusBlocked} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalStepRun) - }) - - t.Run("both finished - joined errors", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusSuccess} - step := &model.Step{State: model.StatusSuccess} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) - assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) - }) - - t.Run("both blocked - joined errors", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusBlocked} - step := &model.Step{State: model.StatusBlocked} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowRun) - assert.ErrorIs(t, err, ErrAgentIllegalStepRun) - }) - - t.Run("workflow finished, step blocked - joined errors", func(t *testing.T) { + // Recent finish → drain window allows logs. + t.Run("recent finish drain allowed", func(t *testing.T) { t.Parallel() - workflow := &model.Workflow{State: model.StatusKilled} - step := &model.Step{State: model.StatusBlocked} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) - assert.ErrorIs(t, err, ErrAgentIllegalStepRun) - }) - - t.Run("workflow finished (failure), step finished (failure) - joined errors", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusFailure} - step := &model.Step{State: model.StatusFailure} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) - assert.ErrorIs(t, err, ErrAgentIllegalStepReRunStateChange) - }) - - t.Run("workflow canceled, step success → rejected", func(t *testing.T) { - t.Parallel() - - workflow := &model.Workflow{State: model.StatusCanceled} - step := &model.Step{State: model.StatusSuccess} - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) + for _, tc := range []struct { + pStatus model.StatusValue + sState model.StatusValue + }{ + {model.StatusSuccess, model.StatusSuccess}, + {model.StatusFailure, model.StatusFailure}, + {model.StatusKilled, model.StatusPending}, + } { + t.Run(fmt.Sprintf("%s/%s", tc.pStatus, tc.sState), func(t *testing.T) { + t.Parallel() + p := &model.Pipeline{Status: tc.pStatus, Finished: recentFinish} + assert.NoError(t, allowAppendingLogs(p, &model.Step{State: tc.sState})) + }) + } }) - t.Run("workflow canceled, nil step → rejects (workflow default branch)", func(t *testing.T) { + // Stale finish → drain window expired → reject. + t.Run("stale finish drain rejected", func(t *testing.T) { t.Parallel() - // checkWorkflowStepStates is never called with (canceled-workflow, nil-step) - // in production (Update always has a step, Init/Done pass nil only when - // workflow is checked alone). When step is nil the canceled branch would - // panic, so production code always supplies a non-nil step for Update. - // Test the observable behavior: a non-exempt step state triggers rejection. - workflow := &model.Workflow{State: model.StatusCanceled} - step := &model.Step{State: model.StatusRunning} // non-exempt → rejected - err := checkWorkflowStepStates(workflow, step) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) + for _, tc := range []struct { + pStatus model.StatusValue + sState model.StatusValue + finish int64 + }{ + {model.StatusSuccess, model.StatusSuccess, staleFinish}, + {model.StatusFailure, model.StatusFailure, staleFinish}, + {model.StatusKilled, model.StatusPending, staleFinish}, + {model.StatusError, model.StatusCreated, staleFinish}, + {model.StatusSuccess, model.StatusSuccess, 0}, // zero = never recorded + } { + t.Run(fmt.Sprintf("%s/%s/fin=%d", tc.pStatus, tc.sState, tc.finish), func(t *testing.T) { + t.Parallel() + p := &model.Pipeline{Status: tc.pStatus, Finished: tc.finish} + assert.ErrorIs(t, allowAppendingLogs(p, &model.Step{State: tc.sState}), ErrAgentIllegalLogStreaming) + }) + } }) } -// AllowAppendingLogs — updated for the new (pipeline, step) signature -// -// New logic: -// Allow if step.State == Running (step is actively running) -// Allow if pipeline.Status == Running (pipeline still running, step may -// have just finished but pipeline hasn't caught up yet) -// Allow if pipeline.Finished is within the last logStreamDelayAllowed -// (drain window after a server restart / network blip) -// Reject otherwise. - -func TestAllowAppendingLogs(t *testing.T) { - t.Parallel() - - // recentFinish is a pipeline.Finished timestamp just 30 seconds ago — - // well within the 5-minute drain window. - recentFinish := time.Now().Add(-30 * time.Second).Unix() - - // staleFinish is a pipeline.Finished timestamp 10 minutes ago — - // outside the drain window. - staleFinish := time.Now().Add(-10 * time.Minute).Unix() - - tests := []struct { - name string - pipelineStatus model.StatusValue - pipelineFinish int64 - stepState model.StatusValue - wantErr error - }{ - // --- step is running: always allowed regardless of pipeline state ---- - { - name: "step running, pipeline running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusRunning, - }, - { - name: "step running, pipeline success → allow (step takes priority)", - pipelineStatus: model.StatusSuccess, - pipelineFinish: staleFinish, - stepState: model.StatusRunning, - }, - { - name: "step running, pipeline failure → allow", - pipelineStatus: model.StatusFailure, - pipelineFinish: staleFinish, - stepState: model.StatusRunning, - }, - { - name: "step running, pipeline killed → allow", - pipelineStatus: model.StatusKilled, - pipelineFinish: staleFinish, - stepState: model.StatusRunning, - }, - - // --- pipeline still running: allow even if step finished ------------ - { - name: "step success, pipeline still running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusSuccess, - }, - { - name: "step failure, pipeline still running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusFailure, - }, - { - name: "step pending, pipeline still running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusPending, - }, - { - name: "step killed, pipeline still running → allow", - pipelineStatus: model.StatusRunning, - stepState: model.StatusKilled, - }, - - // --- pipeline finished recently: drain window allows logs ----------- - { - name: "step success, pipeline finished recently → allow (drain window)", - pipelineStatus: model.StatusSuccess, - pipelineFinish: recentFinish, - stepState: model.StatusSuccess, - }, - { - name: "step failure, pipeline failed recently → allow (drain window)", - pipelineStatus: model.StatusFailure, - pipelineFinish: recentFinish, - stepState: model.StatusFailure, - }, - { - name: "step pending, pipeline killed recently → allow (drain window)", - pipelineStatus: model.StatusKilled, - pipelineFinish: recentFinish, - stepState: model.StatusPending, - }, - - // --- pipeline finished and drain window expired: reject ------------- - { - name: "step success, pipeline success, stale finish → reject", - pipelineStatus: model.StatusSuccess, - pipelineFinish: staleFinish, - stepState: model.StatusSuccess, - wantErr: ErrAgentIllegalLogStreaming, - }, - { - name: "step failure, pipeline failure, stale finish → reject", - pipelineStatus: model.StatusFailure, - pipelineFinish: staleFinish, - stepState: model.StatusFailure, - wantErr: ErrAgentIllegalLogStreaming, - }, - { - name: "step pending, pipeline killed, stale finish → reject", - pipelineStatus: model.StatusKilled, - pipelineFinish: staleFinish, - stepState: model.StatusPending, - wantErr: ErrAgentIllegalLogStreaming, - }, - { - name: "step created, pipeline error, stale finish → reject", - pipelineStatus: model.StatusError, - pipelineFinish: staleFinish, - stepState: model.StatusCreated, - wantErr: ErrAgentIllegalLogStreaming, - }, - - // --- zero Finished timestamp (never recorded): outside drain window - - { - name: "step success, pipeline success, Finished=0 → reject", - pipelineStatus: model.StatusSuccess, - pipelineFinish: 0, - stepState: model.StatusSuccess, - wantErr: ErrAgentIllegalLogStreaming, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - pipeline := &model.Pipeline{ - Status: tt.pipelineStatus, - Finished: tt.pipelineFinish, - } - step := &model.Step{State: tt.stepState} - - err := allowAppendingLogs(pipeline, step) - - if tt.wantErr == nil { - assert.NoError(t, err) - } else { - assert.ErrorIs(t, err, tt.wantErr) - } - }) - } -} - -// TestAllowAppendingLogsDrainBoundary checks the exact boundary of the -// 5-minute drain window to guard against off-by-one errors. +// TestAllowAppendingLogsDrainBoundary guards the exact edge of the 5-minute +// drain window against off-by-one errors. func TestAllowAppendingLogsDrainBoundary(t *testing.T) { t.Parallel() step := &model.Step{State: model.StatusSuccess} - t.Run("finished exactly at drain window boundary is allowed", func(t *testing.T) { + t.Run("just inside drain window allowed", func(t *testing.T) { t.Parallel() - - // Finished just barely inside the window (1 second of headroom). - finishedAt := time.Now().Add(-(logStreamDelayAllowed - time.Second)).Unix() - pipeline := &model.Pipeline{Status: model.StatusSuccess, Finished: finishedAt} - - assert.NoError(t, allowAppendingLogs(pipeline, step)) + p := &model.Pipeline{ + Status: model.StatusSuccess, + Finished: time.Now().Add(-(logStreamDelayAllowed - time.Second)).Unix(), + } + assert.NoError(t, allowAppendingLogs(p, step)) }) - t.Run("finished just outside drain window is rejected", func(t *testing.T) { + t.Run("just outside drain window rejected", func(t *testing.T) { t.Parallel() - - // Finished 1 second past the allowed window. - finishedAt := time.Now().Add(-(logStreamDelayAllowed + time.Second)).Unix() - pipeline := &model.Pipeline{Status: model.StatusSuccess, Finished: finishedAt} - - assert.ErrorIs(t, allowAppendingLogs(pipeline, step), ErrAgentIllegalLogStreaming) + p := &model.Pipeline{ + Status: model.StatusSuccess, + Finished: time.Now().Add(-(logStreamDelayAllowed + time.Second)).Unix(), + } + assert.ErrorIs(t, allowAppendingLogs(p, step), ErrAgentIllegalLogStreaming) }) } From 4d72da4045722f5742a386beacb5cc1b07f84564 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 13:04:10 +0200 Subject: [PATCH 05/12] finish --- server/rpc/rpc.go | 10 ++++++++ server/rpc/rpc_integration_test.go | 21 --------------- server/rpc/sanitize.go | 24 +++++++++++++++++ server/rpc/sanitize_test.go | 41 ++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 21 deletions(-) diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 11ada50b4a1..202457c2e43 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -265,6 +265,11 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt return err } + // check workflow's own state to prevent re-initializing a finished or blocked workflow + if err := checkWorkflowState(workflow.State); err != nil { + return err + } + if currentPipeline.Status == model.StatusPending { if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil { log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID) @@ -335,6 +340,11 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt return err } + // check workflow's own state to prevent finishing an already-finished or blocked workflow + if err := checkWorkflowState(workflow.State); err != nil { + return err + } + logger := log.With(). Str("repo_id", fmt.Sprint(repo.ID)). Str("pipeline_id", fmt.Sprint(currentPipeline.ID)). diff --git a/server/rpc/rpc_integration_test.go b/server/rpc/rpc_integration_test.go index 349dcb641ba..df56bee566d 100644 --- a/server/rpc/rpc_integration_test.go +++ b/server/rpc/rpc_integration_test.go @@ -426,15 +426,6 @@ func TestRPCInit(t *testing.T) { mockStore.On("AgentFind", int64(1)).Return(agent, nil) mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - // Init calls updateForgeStatus (→ GetUser) before UpdateWorkflowStatusToRunning - // checks the workflow state. UpdateWorkflowStatusToRunning itself calls - // WorkflowUpdate, and the deferred notify calls WorkflowGetTree — all of - // these execute even though the function ultimately returns an error. - mockStore.On("GetUser", mock.Anything).Return(nil, errors.New("user not found")) - mockStore.On("WorkflowUpdate", mock.Anything).Return(nil) - mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) - // updateAgentLastWork -> AgentUpdate (agent.LastWork is 0, so it always updates) - mockStore.On("AgentUpdate", mock.Anything).Return(nil) rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) @@ -453,13 +444,6 @@ func TestRPCInit(t *testing.T) { mockStore.On("AgentFind", int64(1)).Return(agent, nil) mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - // Same as above: updateForgeStatus, WorkflowUpdate, and the deferred - // WorkflowGetTree all run before the error surfaces to the caller. - mockStore.On("GetUser", mock.Anything).Return(nil, errors.New("user not found")) - mockStore.On("WorkflowUpdate", mock.Anything).Return(nil) - mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) - // updateAgentLastWork -> AgentUpdate (agent.LastWork is 0, so it always updates) - mockStore.On("AgentUpdate", mock.Anything).Return(nil) rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) @@ -600,11 +584,6 @@ func TestRPCDone(t *testing.T) { mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) mockStore.On("AgentFind", int64(1)).Return(agent, nil) - // checkParentState passes (pipeline=running), so UpdateWorkflowStatusToDone - // runs (→ WorkflowUpdate) and Done then calls WorkflowGetTree before the - // error from UpdateWorkflowStatusToDone propagates to the caller. - mockStore.On("WorkflowUpdate", mock.Anything).Return(nil) - mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) diff --git a/server/rpc/sanitize.go b/server/rpc/sanitize.go index 01f8b302072..0abd25fdaa3 100644 --- a/server/rpc/sanitize.go +++ b/server/rpc/sanitize.go @@ -115,6 +115,30 @@ func checkParentState(parentState, childState model.StatusValue, isStep bool) (e return err } +// checkWorkflowState checks if a workflow's own state allows it to be +// initialized or marked as done. A workflow that is already in a terminal +// state (success, failure, killed, …) must not be re-run, and a blocked +// workflow must not be started by an agent. +func checkWorkflowState(state model.StatusValue) (err error) { + switch state { + case model.StatusCreated, + model.StatusPending, + model.StatusRunning: + return nil + + case model.StatusBlocked: + err = ErrAgentIllegalWorkflowRun + + default: + err = ErrAgentIllegalWorkflowReRunStateChange + } + + if err != nil { + log.Error().Err(err).Msg("caught agent performing illegal instruction") + } + return err +} + func allowAppendingLogs(currPipeline *model.Pipeline, currStep *model.Step) error { // As long as pipeline is running just let the agent send logs if currStep.State == model.StatusRunning || currPipeline.Status == model.StatusRunning { diff --git a/server/rpc/sanitize_test.go b/server/rpc/sanitize_test.go index cb86fab2cb1..f2d96b15beb 100644 --- a/server/rpc/sanitize_test.go +++ b/server/rpc/sanitize_test.go @@ -137,6 +137,47 @@ func TestCheckParentState(t *testing.T) { } } +func TestCheckWorkflowState(t *testing.T) { + t.Parallel() + + t.Run("allowed states", func(t *testing.T) { + t.Parallel() + for _, s := range []model.StatusValue{ + model.StatusCreated, + model.StatusPending, + model.StatusRunning, + } { + t.Run(string(s), func(t *testing.T) { + t.Parallel() + assert.NoError(t, checkWorkflowState(s)) + }) + } + }) + + t.Run("blocked rejects", func(t *testing.T) { + t.Parallel() + assert.ErrorIs(t, checkWorkflowState(model.StatusBlocked), ErrAgentIllegalWorkflowRun) + }) + + t.Run("terminal states reject", func(t *testing.T) { + t.Parallel() + for _, s := range []model.StatusValue{ + model.StatusSuccess, + model.StatusFailure, + model.StatusKilled, + model.StatusError, + model.StatusSkipped, + model.StatusCanceled, + model.StatusDeclined, + } { + t.Run(string(s), func(t *testing.T) { + t.Parallel() + assert.ErrorIs(t, checkWorkflowState(s), ErrAgentIllegalWorkflowReRunStateChange) + }) + } + }) +} + func TestAllowAppendingLogs(t *testing.T) { t.Parallel() From a8318cd295e110797243dc24a4093a3575c7fa24 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 13:22:08 +0200 Subject: [PATCH 06/12] Split step state calc from update --- server/pipeline/step_status.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/server/pipeline/step_status.go b/server/pipeline/step_status.go index 7adbee00b5c..c0946edc2b5 100644 --- a/server/pipeline/step_status.go +++ b/server/pipeline/step_status.go @@ -28,8 +28,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/store" ) -// UpdateStepStatus updates step status based on agent reports via RPC. -func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error { +func CalcStepStatus(step *model.Step, state rpc.StepState) (_ *model.Step, cancelPipelineFromStep bool, _ error) { log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state) switch step.State { @@ -41,7 +40,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, if state.Finished != 0 { step.Finished = state.Finished } - return store.StepUpdate(step) + return step, false, nil } // Transition from pending to running when started @@ -68,10 +67,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, step.State = model.StatusFailure if step.Failure == model.FailureCancel { - err := cancelPipelineFromStep(ctx, store, step) - if err != nil { - return err - } + cancelPipelineFromStep = true } } } @@ -92,16 +88,13 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, step.State = model.StatusFailure if step.Failure == model.FailureCancel { - err := cancelPipelineFromStep(ctx, store, step) - if err != nil { - return err - } + cancelPipelineFromStep = true } } } default: - return fmt.Errorf("step has state %s and does not expect rpc state updates", step.State) + return nil, false, fmt.Errorf("step has state %s and does not expect rpc state updates", step.State) } // Handle cancellation across both cases @@ -112,6 +105,22 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, } } + return step, cancelPipelineFromStep, nil +} + +// UpdateStepStatus updates step status based on agent reports via RPC. +func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error { + log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state) + + step, shouldCancelPipelineFromStep, err := CalcStepStatus(step, state) + if err != nil { + return err + } + if shouldCancelPipelineFromStep { + if err := cancelPipelineFromStep(ctx, store, step); err != nil { + return err + } + } return store.StepUpdate(step) } From a30e5f1c18f5048421a09e0b82c86e91eac7cf4b Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 13:32:40 +0200 Subject: [PATCH 07/12] aww proper check --- server/pipeline/step_status.go | 10 +- server/rpc/rpc.go | 19 +-- server/rpc/rpc_integration_test.go | 145 +++++-------------- server/rpc/sanitize.go | 91 ++++++------ server/rpc/sanitize_test.go | 219 ++++++++++++++++------------- 5 files changed, 209 insertions(+), 275 deletions(-) diff --git a/server/pipeline/step_status.go b/server/pipeline/step_status.go index c0946edc2b5..ce44525dc31 100644 --- a/server/pipeline/step_status.go +++ b/server/pipeline/step_status.go @@ -28,8 +28,8 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/server/store" ) -func CalcStepStatus(step *model.Step, state rpc.StepState) (_ *model.Step, cancelPipelineFromStep bool, _ error) { - log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state) +func CalcStepStatus(step model.Step, state rpc.StepState) (_ *model.Step, cancelPipelineFromStep bool, _ error) { + log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", step, state) switch step.State { case model.StatusPending: @@ -40,7 +40,7 @@ func CalcStepStatus(step *model.Step, state rpc.StepState) (_ *model.Step, cance if state.Finished != 0 { step.Finished = state.Finished } - return step, false, nil + return &step, false, nil } // Transition from pending to running when started @@ -105,14 +105,14 @@ func CalcStepStatus(step *model.Step, state rpc.StepState) (_ *model.Step, cance } } - return step, cancelPipelineFromStep, nil + return &step, cancelPipelineFromStep, nil } // UpdateStepStatus updates step status based on agent reports via RPC. func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error { log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state) - step, shouldCancelPipelineFromStep, err := CalcStepStatus(step, state) + step, shouldCancelPipelineFromStep, err := CalcStepStatus(*step, state) if err != nil { return err } diff --git a/server/rpc/rpc.go b/server/rpc/rpc.go index 202457c2e43..d3f09082cd1 100644 --- a/server/rpc/rpc.go +++ b/server/rpc/rpc.go @@ -199,11 +199,8 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat return err } - // sanitize agent input - if err := checkParentState(currentPipeline.Status, step.State, true); err != nil { - return err - } - if err := checkParentState(workflow.State, step.State, true); err != nil { + // sanitize agent input: only allow step updates that the workflow state permits + if err := checkWorkflowAllowsStepUpdate(workflow.State, step, state); err != nil { return err } @@ -260,11 +257,6 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt return err } - // sanitize agent input - if err := checkParentState(currentPipeline.Status, workflow.State, false); err != nil { - return err - } - // check workflow's own state to prevent re-initializing a finished or blocked workflow if err := checkWorkflowState(workflow.State); err != nil { return err @@ -295,7 +287,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt return s.updateAgentLastWork(agent) } -// Done marks the workflow with the given ID as stope. +// Done marks the workflow with the given ID as stopped. func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowState) error { workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) if err != nil { @@ -335,11 +327,6 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt return err } - // sanitize agent input - if err := checkParentState(currentPipeline.Status, workflow.State, false); err != nil { - return err - } - // check workflow's own state to prevent finishing an already-finished or blocked workflow if err := checkWorkflowState(workflow.State); err != nil { return err diff --git a/server/rpc/rpc_integration_test.go b/server/rpc/rpc_integration_test.go index df56bee566d..26ec74589f4 100644 --- a/server/rpc/rpc_integration_test.go +++ b/server/rpc/rpc_integration_test.go @@ -144,11 +144,18 @@ func TestRPCUpdate(t *testing.T) { assert.NoError(t, err) }) - t.Run("reject pipeline already succeeded", func(t *testing.T) { + t.Run("allow terminal step update when workflow already finished", func(t *testing.T) { + // When the workflow is already finished, a step update that moves the + // step to a terminal state (e.g. reporting exit code) should be allowed. mockStore := store_mocks.NewMockStore(t) + mockLogStore := log_mocks.NewMockService(t) + origLogStore := server.Config.Services.LogStore + server.Config.Services.LogStore = mockLogStore + t.Cleanup(func() { server.Config.Services.LogStore = origLogStore }) + agent := defaultAgent() - pipeline := defaultPipeline(model.StatusSuccess) - workflow := defaultWorkflow(model.StatusSuccess) + pipeline := defaultPipeline(model.StatusRunning) + workflow := defaultWorkflow(model.StatusSuccess) // finished step := defaultStep(model.StatusRunning) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) @@ -156,19 +163,29 @@ func TestRPCUpdate(t *testing.T) { mockStore.On("AgentFind", int64(1)).Return(agent, nil) mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) + mockStore.On("StepUpdate", mock.Anything).Return(nil) + mockStore.On("WorkflowGetTree", mock.Anything).Return([]*model.Workflow{workflow}, nil) + mockLogStore.On("StepFinished", mock.Anything).Return() rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) + // Step reports exit → it will transition to success/failure (terminal) + err := rpcInst.Update(ctx, "30", rpc.StepState{ + StepUUID: "step-uuid-123", + Exited: true, + ExitCode: 0, + }) + assert.NoError(t, err) }) - t.Run("reject pipeline already failed", func(t *testing.T) { + t.Run("reject non-terminal step update when workflow already finished", func(t *testing.T) { + // When the workflow is already finished, a step update that would keep + // the step in a non-terminal state (e.g. just started, no exit) is rejected. mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() - pipeline := defaultPipeline(model.StatusFailure) - workflow := defaultWorkflow(model.StatusFailure) + pipeline := defaultPipeline(model.StatusRunning) + workflow := defaultWorkflow(model.StatusSuccess) // finished step := defaultStep(model.StatusRunning) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) @@ -180,11 +197,12 @@ func TestRPCUpdate(t *testing.T) { rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) + // Step reports started but not exited → still running (non-terminal) err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) - t.Run("reject pipeline blocked", func(t *testing.T) { + t.Run("reject step update when workflow blocked", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() pipeline := defaultPipeline(model.StatusBlocked) @@ -200,26 +218,6 @@ func TestRPCUpdate(t *testing.T) { rpcInst := newTestRPC(t, mockStore) ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowRun) - }) - - t.Run("reject workflow already finished", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusRunning) - workflow := defaultWorkflow(model.StatusSuccess) // finished - step := defaultStep(model.StatusRunning) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - mockStore.On("StepByUUID", "step-uuid-123").Return(step, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - err := rpcInst.Update(ctx, "30", rpc.StepState{StepUUID: "step-uuid-123"}) assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) @@ -380,42 +378,6 @@ func TestRPCInit(t *testing.T) { assert.NoError(t, err) }) - t.Run("reject pipeline already succeeded", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusSuccess) - workflow := defaultWorkflow(model.StatusPending) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) - }) - - t.Run("reject pipeline blocked", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusBlocked) - workflow := defaultWorkflow(model.StatusPending) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Init(ctx, "30", rpc.WorkflowState{Started: 100}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun) - }) - t.Run("reject workflow already finished", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() @@ -516,49 +478,11 @@ func TestRPCDone(t *testing.T) { assert.NoError(t, err) }) - t.Run("reject pipeline already succeeded", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusSuccess) - workflow := defaultWorkflow(model.StatusRunning) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) - }) - - t.Run("reject pipeline killed", func(t *testing.T) { - mockStore := store_mocks.NewMockStore(t) - agent := defaultAgent() - pipeline := defaultPipeline(model.StatusKilled) - workflow := defaultWorkflow(model.StatusRunning) - - mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) - mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil) - mockStore.On("GetPipeline", int64(20)).Return(pipeline, nil) - mockStore.On("GetRepo", int64(10)).Return(defaultRepo(), nil) - mockStore.On("AgentFind", int64(1)).Return(agent, nil) - - rpcInst := newTestRPC(t, mockStore) - ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) - - err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowReRunStateChange) - }) - - t.Run("reject pipeline blocked", func(t *testing.T) { + t.Run("reject workflow already finished", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() - pipeline := defaultPipeline(model.StatusBlocked) - workflow := defaultWorkflow(model.StatusRunning) + pipeline := defaultPipeline(model.StatusRunning) + workflow := defaultWorkflow(model.StatusSuccess) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil) @@ -570,14 +494,14 @@ func TestRPCDone(t *testing.T) { ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) - assert.ErrorIs(t, err, ErrAgentIllegalPipelineWorkflowRun) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) }) - t.Run("reject workflow already finished", func(t *testing.T) { + t.Run("reject workflow blocked", func(t *testing.T) { mockStore := store_mocks.NewMockStore(t) agent := defaultAgent() pipeline := defaultPipeline(model.StatusRunning) - workflow := defaultWorkflow(model.StatusSuccess) + workflow := defaultWorkflow(model.StatusBlocked) mockStore.On("WorkflowLoad", int64(30)).Return(workflow, nil) mockStore.On("StepListFromWorkflowFind", mock.Anything).Return([]*model.Step{}, nil) @@ -589,7 +513,7 @@ func TestRPCDone(t *testing.T) { ctx := metadata.NewIncomingContext(t.Context(), metadata.Pairs("agent_id", "1")) err := rpcInst.Done(ctx, "30", rpc.WorkflowState{Finished: 200}) - assert.ErrorIs(t, err, ErrAgentIllegalWorkflowReRunStateChange) + assert.ErrorIs(t, err, ErrAgentIllegalWorkflowRun) }) t.Run("reject agent wrong org", func(t *testing.T) { @@ -768,7 +692,6 @@ func TestRPCLog(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "can not alter logs") assert.ErrorIs(t, err, ErrAgentIllegalLogStreaming) - assert.False(t, errors.Is(err, ErrAgentIllegalPipelineWorkflowReRunStateChange)) }) t.Run("reject: pipeline failed stale and step not running", func(t *testing.T) { diff --git a/server/rpc/sanitize.go b/server/rpc/sanitize.go index 0abd25fdaa3..e858bddb2de 100644 --- a/server/rpc/sanitize.go +++ b/server/rpc/sanitize.go @@ -22,14 +22,16 @@ import ( "github.com/rs/zerolog/log" + "go.woodpecker-ci.org/woodpecker/v3/rpc" "go.woodpecker-ci.org/woodpecker/v3/server/model" + "go.woodpecker-ci.org/woodpecker/v3/server/pipeline" ) const logStreamDelayAllowed = 5 * time.Minute -func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, pipeline *model.Pipeline, repo *model.Repo) error { +func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, _pipeline *model.Pipeline, repo *model.Repo) error { var err error - if repo == nil && pipeline == nil { + if repo == nil && _pipeline == nil { workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) if err != nil { return err @@ -41,7 +43,7 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age return err } - pipeline, err = s.store.GetPipeline(workflow.PipelineID) + _pipeline, err = s.store.GetPipeline(workflow.PipelineID) if err != nil { log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID) return err @@ -49,9 +51,9 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age } if repo == nil { - repo, err = s.store.GetRepo(pipeline.RepoID) + repo, err = s.store.GetRepo(_pipeline.RepoID) if err != nil { - log.Error().Err(err).Msgf("cannot find repo with id %d", pipeline.RepoID) + log.Error().Err(err).Msgf("cannot find repo with id %d", _pipeline.RepoID) return err } } @@ -64,55 +66,56 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age return fmt.Errorf("%w: agentId=%d repoID=%d", ErrAgentIllegalRepo, agent.ID, repo.ID) } -// checkParentState checks if an agent is allowed to change/update a workflow/step state -// by the state the parent pipeline/workflow. -func checkParentState(parentState, childState model.StatusValue, isStep bool) (err error) { - // check if pipeline was already run and marked finished or is blocked - switch parentState { +// isActiveState returns true for states where work is in progress or not yet started. +func isActiveState(state model.StatusValue) bool { + switch state { case model.StatusCreated, model.StatusPending, model.StatusRunning: - return nil - - case model.StatusBlocked: - if isStep { - err = ErrAgentIllegalWorkflowRun - } else { - err = ErrAgentIllegalPipelineWorkflowRun - } + return true + default: + return false + } +} - case model.StatusCanceled, +// isDoneState returns true for terminal states where no further work will happen. +func isDoneState(state model.StatusValue) bool { + switch state { + case model.StatusSuccess, model.StatusFailure, - model.StatusKilled: - - switch childState { - case model.StatusCanceled, - model.StatusKilled, - model.StatusSkipped, - model.StatusFailure, - model.StatusSuccess: - return nil - - default: - if isStep { - err = ErrAgentIllegalWorkflowReRunStateChange - } else { - err = ErrAgentIllegalPipelineWorkflowReRunStateChange - } - } - + model.StatusKilled, + model.StatusCanceled, + model.StatusSkipped, + model.StatusError, + model.StatusDeclined: + return true default: - if isStep { - err = ErrAgentIllegalWorkflowReRunStateChange - } else { - err = ErrAgentIllegalPipelineWorkflowReRunStateChange - } + return false + } +} + +// checkWorkflowAllowsStepUpdate validates whether the workflow state permits +// the given step state update. If the workflow is active (created/pending/running), +// any step update is allowed. If the workflow is in a terminal state, only +// updates that would move the step into a terminal state are permitted — this +// lets the agent report final results for steps that completed after the +// workflow was already marked done. +func checkWorkflowAllowsStepUpdate(workflowState model.StatusValue, step *model.Step, state rpc.StepState) error { + if isActiveState(workflowState) { + return nil } + newStep, _, err := pipeline.CalcStepStatus(*step, state) if err != nil { - log.Error().Err(err).Msg("caught agent performing illegal instruction") + return err } - return err + if isDoneState(newStep.State) { + return nil + } + + retErr := ErrAgentIllegalWorkflowReRunStateChange + log.Error().Err(retErr).Msg("caught agent performing illegal instruction") + return retErr } // checkWorkflowState checks if a workflow's own state allows it to be diff --git a/server/rpc/sanitize_test.go b/server/rpc/sanitize_test.go index f2d96b15beb..03a2ceda8e3 100644 --- a/server/rpc/sanitize_test.go +++ b/server/rpc/sanitize_test.go @@ -21,120 +21,90 @@ import ( "github.com/stretchr/testify/assert" + "go.woodpecker-ci.org/woodpecker/v3/rpc" "go.woodpecker-ci.org/woodpecker/v3/server/model" ) -// TestCheckParentState covers checkParentState for both pipeline-level -// (isStep=false) and workflow-level (isStep=true) checks in a single -// table-driven test, eliminating duplicate cases shared by the two levels. -func TestCheckParentState(t *testing.T) { +func TestCheckWorkflowAllowsStepUpdate(t *testing.T) { t.Parallel() - // States that always allow a child to proceed, regardless of level. - allowedParents := []model.StatusValue{ - model.StatusCreated, - model.StatusPending, - model.StatusRunning, - } - - // Terminal parent states that reject a running child as an illegal re-run. - terminalParents := []model.StatusValue{ - model.StatusSuccess, - model.StatusFailure, - model.StatusKilled, - model.StatusError, - model.StatusSkipped, - } + t.Run("workflow running allows any step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + // Non-terminal update (step stays running) + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusRunning, step, rpc.StepState{})) + }) - // Parents whose terminal children are exempt (allowed through). - exemptParents := []model.StatusValue{ - model.StatusCanceled, - model.StatusFailure, - model.StatusKilled, - } + t.Run("workflow pending allows any step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusPending, step, rpc.StepState{})) + }) - // Child states considered exempt under a terminal/canceled parent. - exemptChildren := []model.StatusValue{ - model.StatusCanceled, - model.StatusKilled, - model.StatusSkipped, - } + t.Run("workflow created allows any step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusCreated, step, rpc.StepState{})) + }) - // Error sentinels per level. - type levelConfig struct { - isStep bool - blockedErr error - reRunErr error - extraExempt []model.StatusValue // additional exempt child states beyond the shared set - extraRejects []model.StatusValue // additional terminal parents beyond the shared set - } + t.Run("workflow finished allows terminal step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + // Step exits with code 0 → CalcStepStatus produces StatusSuccess (terminal) + state := rpc.StepState{Exited: true, ExitCode: 0} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state)) + }) - levels := []levelConfig{ - { - isStep: false, - blockedErr: ErrAgentIllegalPipelineWorkflowRun, - reRunErr: ErrAgentIllegalPipelineWorkflowReRunStateChange, - extraExempt: []model.StatusValue{model.StatusFailure, model.StatusSuccess}, - extraRejects: []model.StatusValue{model.StatusDeclined}, - }, - { - isStep: true, - blockedErr: ErrAgentIllegalWorkflowRun, - reRunErr: ErrAgentIllegalWorkflowReRunStateChange, - extraExempt: []model.StatusValue{model.StatusFailure, model.StatusSuccess}, - }, - } + t.Run("workflow finished allows failed step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + state := rpc.StepState{Exited: true, ExitCode: 1} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusFailure, step, state)) + }) - for _, lc := range levels { - label := "pipeline" - if lc.isStep { - label = "step" - } + t.Run("workflow finished allows canceled step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + state := rpc.StepState{Canceled: true} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusKilled, step, state)) + }) - t.Run(label, func(t *testing.T) { - t.Parallel() + t.Run("workflow finished allows skipped step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + state := rpc.StepState{Skipped: true} + assert.NoError(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state)) + }) - // Allowed parent states. - for _, ps := range allowedParents { - t.Run(fmt.Sprintf("%s allows", ps), func(t *testing.T) { - t.Parallel() - assert.NoError(t, checkParentState(ps, model.StatusRunning, lc.isStep)) - }) - } + t.Run("workflow finished rejects non-terminal step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + // No exit, no cancel → step stays Running (non-terminal) + state := rpc.StepState{} + assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state), ErrAgentIllegalWorkflowReRunStateChange) + }) - // Blocked parent. - t.Run("blocked rejects", func(t *testing.T) { - t.Parallel() - assert.ErrorIs(t, checkParentState(model.StatusBlocked, model.StatusRunning, lc.isStep), lc.blockedErr) - }) + t.Run("workflow killed rejects non-terminal step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + state := rpc.StepState{} + assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusKilled, step, state), ErrAgentIllegalWorkflowReRunStateChange) + }) - // Terminal parents reject a running child. - allTerminal := append(terminalParents, lc.extraRejects...) - for _, ps := range allTerminal { - t.Run(fmt.Sprintf("%s running child rejected", ps), func(t *testing.T) { - t.Parallel() - assert.ErrorIs(t, checkParentState(ps, model.StatusRunning, lc.isStep), lc.reRunErr) - }) - } - - // Canceled parent with running child is also rejected. - t.Run("canceled running child rejected", func(t *testing.T) { - t.Parallel() - assert.ErrorIs(t, checkParentState(model.StatusCanceled, model.StatusRunning, lc.isStep), lc.reRunErr) - }) + t.Run("workflow blocked rejects non-terminal step update", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusRunning} + state := rpc.StepState{} + assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusBlocked, step, state), ErrAgentIllegalWorkflowReRunStateChange) + }) - // Exempt parent + exempt child combinations → allowed. - allExemptChildren := append(exemptChildren, lc.extraExempt...) - for _, ps := range exemptParents { - for _, cs := range allExemptChildren { - t.Run(fmt.Sprintf("%s parent %s child allowed", ps, cs), func(t *testing.T) { - t.Parallel() - assert.NoError(t, checkParentState(ps, cs, lc.isStep)) - }) - } - } - }) - } + t.Run("workflow finished rejects pending-to-running transition", func(t *testing.T) { + t.Parallel() + step := &model.Step{State: model.StatusPending} + // No skip, no exit → CalcStepStatus produces Running (non-terminal) + state := rpc.StepState{Started: 100} + assert.ErrorIs(t, checkWorkflowAllowsStepUpdate(model.StatusSuccess, step, state), ErrAgentIllegalWorkflowReRunStateChange) + }) } func TestCheckWorkflowState(t *testing.T) { @@ -178,6 +148,57 @@ func TestCheckWorkflowState(t *testing.T) { }) } +func TestIsActiveState(t *testing.T) { + t.Parallel() + + active := []model.StatusValue{model.StatusCreated, model.StatusPending, model.StatusRunning} + inactive := []model.StatusValue{ + model.StatusSuccess, model.StatusFailure, model.StatusKilled, + model.StatusBlocked, model.StatusCanceled, model.StatusSkipped, + model.StatusError, model.StatusDeclined, + } + + for _, s := range active { + t.Run(fmt.Sprintf("%s is active", s), func(t *testing.T) { + t.Parallel() + assert.True(t, isActiveState(s)) + }) + } + for _, s := range inactive { + t.Run(fmt.Sprintf("%s is not active", s), func(t *testing.T) { + t.Parallel() + assert.False(t, isActiveState(s)) + }) + } +} + +func TestIsDoneState(t *testing.T) { + t.Parallel() + + done := []model.StatusValue{ + model.StatusSuccess, model.StatusFailure, model.StatusKilled, + model.StatusCanceled, model.StatusSkipped, model.StatusError, + model.StatusDeclined, + } + notDone := []model.StatusValue{ + model.StatusCreated, model.StatusPending, model.StatusRunning, + model.StatusBlocked, + } + + for _, s := range done { + t.Run(fmt.Sprintf("%s is done", s), func(t *testing.T) { + t.Parallel() + assert.True(t, isDoneState(s)) + }) + } + for _, s := range notDone { + t.Run(fmt.Sprintf("%s is not done", s), func(t *testing.T) { + t.Parallel() + assert.False(t, isDoneState(s)) + }) + } +} + func TestAllowAppendingLogs(t *testing.T) { t.Parallel() From 6a4f63d94bcc813513d6bef6b95100efdbb8466b Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 13:41:51 +0200 Subject: [PATCH 08/12] rm unused --- server/rpc/errors.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/rpc/errors.go b/server/rpc/errors.go index f8f0369105f..9955d437deb 100644 --- a/server/rpc/errors.go +++ b/server/rpc/errors.go @@ -17,9 +17,6 @@ package rpc import "errors" var ( - ErrAgentIllegalPipelineWorkflowReRunStateChange = errors.New("workflow has parent pipeline marked as finished") - ErrAgentIllegalPipelineWorkflowRun = errors.New("workflow has parent pipeline in blocked state") - ErrAgentIllegalWorkflowReRunStateChange = errors.New("workflow was already marked as finished") ErrAgentIllegalWorkflowRun = errors.New("workflow is currently in blocked state") From 13e69c74940e253f1148d9c79cbe7c49bf690b80 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 16:21:13 +0200 Subject: [PATCH 09/12] nit --- server/rpc/sanitize.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/rpc/sanitize.go b/server/rpc/sanitize.go index e858bddb2de..e99775c258b 100644 --- a/server/rpc/sanitize.go +++ b/server/rpc/sanitize.go @@ -29,9 +29,9 @@ import ( const logStreamDelayAllowed = 5 * time.Minute -func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, _pipeline *model.Pipeline, repo *model.Repo) error { +func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Agent, strWorkflowID string, p *model.Pipeline, repo *model.Repo) error { var err error - if repo == nil && _pipeline == nil { + if repo == nil && p == nil { workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64) if err != nil { return err @@ -43,7 +43,7 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age return err } - _pipeline, err = s.store.GetPipeline(workflow.PipelineID) + p, err = s.store.GetPipeline(workflow.PipelineID) if err != nil { log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID) return err @@ -51,9 +51,9 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age } if repo == nil { - repo, err = s.store.GetRepo(_pipeline.RepoID) + repo, err = s.store.GetRepo(p.RepoID) if err != nil { - log.Error().Err(err).Msgf("cannot find repo with id %d", _pipeline.RepoID) + log.Error().Err(err).Msgf("cannot find repo with id %d", p.RepoID) return err } } From 97f4581a2c9fa1a79e0eba77b07fd50cc47bcedc Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 17:06:29 +0200 Subject: [PATCH 10/12] we do copy by val for Calc but want coby-by-ref behaviour for update --- server/pipeline/step_status.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/pipeline/step_status.go b/server/pipeline/step_status.go index ce44525dc31..a90e524f2f9 100644 --- a/server/pipeline/step_status.go +++ b/server/pipeline/step_status.go @@ -112,7 +112,7 @@ func CalcStepStatus(step model.Step, state rpc.StepState) (_ *model.Step, cancel func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error { log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state) - step, shouldCancelPipelineFromStep, err := CalcStepStatus(*step, state) + updatedStep, shouldCancelPipelineFromStep, err := CalcStepStatus(*step, state) if err != nil { return err } @@ -121,6 +121,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, return err } } + *step = *updatedStep // update step for external callers return store.StepUpdate(step) } From 38a5d7928749de706f356d61454e7cb08b478458 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 17:10:08 +0200 Subject: [PATCH 11/12] well make codeflow work as before --- server/pipeline/step_status.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/pipeline/step_status.go b/server/pipeline/step_status.go index a90e524f2f9..e2f0d2cbbd0 100644 --- a/server/pipeline/step_status.go +++ b/server/pipeline/step_status.go @@ -116,12 +116,12 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, if err != nil { return err } + *step = *updatedStep // update step for external callers if shouldCancelPipelineFromStep { if err := cancelPipelineFromStep(ctx, store, step); err != nil { return err } } - *step = *updatedStep // update step for external callers return store.StepUpdate(step) } From 23f7043ad08b2972fd8fdd55af90195315c2f6fa Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 8 Apr 2026 17:10:30 +0200 Subject: [PATCH 12/12] fmt --- server/pipeline/step_status.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/pipeline/step_status.go b/server/pipeline/step_status.go index e2f0d2cbbd0..100a8a00acb 100644 --- a/server/pipeline/step_status.go +++ b/server/pipeline/step_status.go @@ -117,6 +117,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, return err } *step = *updatedStep // update step for external callers + if shouldCancelPipelineFromStep { if err := cancelPipelineFromStep(ctx, store, step); err != nil { return err