Skip to content
37 changes: 24 additions & 13 deletions server/pipeline/step_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/server/store"
)

// UpdateStepStatus updates step status based on agent reports via RPC.
func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error {
log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state)
func CalcStepStatus(step model.Step, state rpc.StepState) (_ *model.Step, cancelPipelineFromStep bool, _ error) {
log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", step, state)

switch step.State {
case model.StatusPending:
Expand All @@ -41,7 +40,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step,
if state.Finished != 0 {
step.Finished = state.Finished
}
return store.StepUpdate(step)
return &step, false, nil
}

// Transition from pending to running when started
Expand All @@ -68,10 +67,7 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step,
step.State = model.StatusFailure

if step.Failure == model.FailureCancel {
err := cancelPipelineFromStep(ctx, store, step)
if err != nil {
return err
}
cancelPipelineFromStep = true
}
}
}
Expand All @@ -92,16 +88,13 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step,
step.State = model.StatusFailure

if step.Failure == model.FailureCancel {
err := cancelPipelineFromStep(ctx, store, step)
if err != nil {
return err
}
cancelPipelineFromStep = true
}
}
}

default:
return fmt.Errorf("step has state %s and does not expect rpc state updates", step.State)
return nil, false, fmt.Errorf("step has state %s and does not expect rpc state updates", step.State)
}

// Handle cancellation across both cases
Expand All @@ -112,6 +105,24 @@ func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step,
}
}

return &step, cancelPipelineFromStep, nil
}

// UpdateStepStatus updates step status based on agent reports via RPC.
func UpdateStepStatus(ctx context.Context, store store.Store, step *model.Step, state rpc.StepState) error {
log.Debug().Str("StepUUID", step.UUID).Msgf("Update step %#v state %#v", *step, state)

updatedStep, shouldCancelPipelineFromStep, err := CalcStepStatus(*step, state)
if err != nil {
return err
}
*step = *updatedStep // update step for external callers

if shouldCancelPipelineFromStep {
if err := cancelPipelineFromStep(ctx, store, step); err != nil {
return err
}
}
return store.StepUpdate(step)
}

Expand Down
8 changes: 2 additions & 6 deletions server/rpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ package rpc
import "errors"

var (
ErrAgentIllegalPipelineWorkflowReRunStateChange = errors.New("workflow has parent pipeline marked as finished")
ErrAgentIllegalPipelineWorkflowRun = errors.New("workflow has parent pipeline in blocked state")

ErrAgentIllegalWorkflowReRunStateChange = errors.New("workflow was already marked as finished")
ErrAgentIllegalWorkflowRun = errors.New("workflow is currently in blocked state")

ErrAgentIllegalStepReRunStateChange = errors.New("step was already marked as finished")
ErrAgentIllegalStepRun = errors.New("step is currently in blocked state")

ErrAgentIllegalLogStreaming = errors.New("agent can not append logs to a step that is marked not running")

ErrAgentIllegalRepo = errors.New("agent is not allowed to interact with repo")
)
23 changes: 7 additions & 16 deletions server/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,8 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat
return err
}

// sanitize agent input
if err := checkPipelineState(currentPipeline); err != nil {
return err
}
if err := checkWorkflowStepStates(workflow, step); err != nil {
// sanitize agent input: only allow step updates that the workflow state permits
if err := checkWorkflowAllowsStepUpdate(workflow.State, step, state); err != nil {
return err
}

Expand Down Expand Up @@ -260,11 +257,8 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return err
}

// sanitize agent input
if err := checkPipelineState(currentPipeline); err != nil {
return err
}
if err := checkWorkflowStepStates(workflow, nil); err != nil {
// check workflow's own state to prevent re-initializing a finished or blocked workflow
if err := checkWorkflowState(workflow.State); err != nil {
return err
}

Expand Down Expand Up @@ -293,7 +287,7 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return s.updateAgentLastWork(agent)
}

// Done marks the workflow with the given ID as stope.
// Done marks the workflow with the given ID as stopped.
func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowState) error {
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
if err != nil {
Expand Down Expand Up @@ -333,11 +327,8 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return err
}

// sanitize agent input
if err := checkPipelineState(currentPipeline); err != nil {
return err
}
if err := checkWorkflowStepStates(workflow, nil); err != nil {
// check workflow's own state to prevent finishing an already-finished or blocked workflow
if err := checkWorkflowState(workflow.State); err != nil {
return err
}

Expand Down
Loading