Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancel states - rebased #4699

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
93 changes: 93 additions & 0 deletions agent/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package agent

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/dummy"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/rpc/mocks"
)

type peery struct {
*mocks.Peer
}

func (p *peery) Done(ctx context.Context, id string, state rpc.WorkflowState) error {
return nil
}

func TestRunnerCanceledState(t *testing.T) {
backend := dummy.New()
_peer := mocks.NewPeer(t)

peer := &peery{_peer}

hostname := "dummy"
filter := rpc.Filter{
Labels: map[string]string{
"hostname": hostname,
"platform": "test",
"backend": backend.Name(),
"repo": "*", // allow all repos by default
},
}
state := &State{
Metadata: map[string]Info{},
Polling: 1, // max workflows to poll
Running: 0,
}
r := NewRunner(peer, filter, hostname, state, &backend)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)

workflow := &rpc.Workflow{
ID: "1",
Config: &types.Config{
Stages: []*types.Stage{
{
Steps: []*types.Step{
{

Name: "test",
Environment: map[string]string{
"SLEEP": "10s",
},
Commands: []string{
"echo 'hello world'",
},
OnSuccess: true,
},
},
},
},
},
Timeout: 1, // 1 minute
}

peer.On("Next", mock.Anything, filter).Return(workflow, nil).Once()
peer.On("Init", mock.Anything, "1", mock.MatchedBy(func(state rpc.WorkflowState) bool {
return state.Started != 0 && state.Finished == 0 && state.Error == ""
})).Return(nil)
peer.On("Done", mock.Anything, "1", mock.MatchedBy(func(state rpc.WorkflowState) bool {
return state.Started != 0 && state.Finished != 0 && state.Error == ""
})).Return(nil)
peer.On("Log", mock.Anything, mock.Anything).Return(nil)
peer.On("Wait", mock.Anything, "1").Return(nil)
peer.On("Update", mock.Anything, "1", mock.Anything).Return(nil)
peer.On("Extend", mock.Anything, "1").Return(nil).Maybe()

go func() {
time.Sleep(100 * time.Millisecond)
fmt.Println("canceling ...")
cancel()
}()

err := r.Run(ctx, ctx)
assert.NoError(t, err)
}
8 changes: 5 additions & 3 deletions agent/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup,

stepState := rpc.StepState{
StepUUID: state.Pipeline.Step.UUID,
Exited: state.Process.Exited,
ExitCode: state.Process.ExitCode,
Started: time.Now().Unix(), // TODO: do not do this
Finished: time.Now().Unix(),
}
if !state.Process.Exited {
stepState.Started = time.Now().Unix() // TODO: do not do this (UpdateStepStatus currently takes care that this is not overwritten)
} else {
stepState.Finished = time.Now().Unix()
}
if state.Process.Error != nil {
stepState.Error = state.Process.Error.Error()
Expand Down
29 changes: 22 additions & 7 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,14 @@ func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepStat
return err
}

if err := pipeline.UpdateStepStatus(s.store, step, state); err != nil {
log.Error().Err(err).Msg("rpc.update: cannot update step")
if state.Finished == 0 {
if _, err := pipeline.UpdateStepToStatusStarted(s.store, *step, state); err != nil {
log.Error().Err(err).Msg("rpc.update: cannot update step")
}
} else {
if _, err := pipeline.UpdateStepStatusToDone(s.store, *step, state); err != nil {
log.Error().Err(err).Msg("rpc.update: cannot update step")
}
}

if currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline); err != nil {
Expand Down Expand Up @@ -247,10 +253,19 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return err
}

if currentPipeline.Status == model.StatusPending {
if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil {
log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID)
}
// Init should only be called on pending pipelines
if currentPipeline.Status != model.StatusPending {
log.Error().Msgf("pipeline %d is not pending", currentPipeline.ID)
return errors.New("pipeline is not pending")
}

if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil {
log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID)
}

workflow, err = pipeline.UpdateWorkflowStatusToRunning(s.store, *workflow, state)
if err != nil {
return err
}

s.updateForgeStatus(c, repo, currentPipeline, workflow)
Expand Down Expand Up @@ -541,7 +556,7 @@ func (s *RPC) checkAgentPermissionByWorkflow(_ context.Context, agent *model.Age
func (s *RPC) completeChildrenIfParentCompleted(completedWorkflow *model.Workflow) {
for _, c := range completedWorkflow.Children {
if c.Running() {
if _, err := pipeline.UpdateStepToStatusSkipped(s.store, *c, completedWorkflow.Finished); err != nil {
if _, err := pipeline.UpdateStepStatusToSkipped(s.store, *c, completedWorkflow.Finished); err != nil {
log.Error().Err(err).Msgf("done: cannot update step_id %d child state", c.ID)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/pipeline/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo
}
for _, step := range workflow.Children {
if step.State == model.StatusPending {
if _, err = UpdateStepToStatusSkipped(store, *step, 0); err != nil {
if _, err = UpdateStepStatusToSkipped(store, *step, 0); err != nil {
log.Error().Err(err).Msgf("cannot update workflow with id %d state", workflow.ID)
}
}
Expand Down
9 changes: 4 additions & 5 deletions server/pipeline/step_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func UpdateStepToStatusStarted(store store.Store, step model.Step, state rpc.Ste
return &step, store.StepUpdate(&step)
}

func UpdateStepToStatusSkipped(store store.Store, step model.Step, finished int64) (*model.Step, error) {
func UpdateStepStatusToSkipped(store store.Store, step model.Step, finished int64) (*model.Step, error) {
step.State = model.StatusSkipped
if step.Started != 0 {
step.State = model.StatusSuccess // for daemons that are killed
Expand All @@ -62,12 +62,11 @@ func UpdateStepStatusToDone(store store.Store, step model.Step, state rpc.StepSt
step.Finished = state.Finished
step.Error = state.Error
step.ExitCode = state.ExitCode
step.State = model.StatusSuccess
if state.Started == 0 {
step.State = model.StatusSkipped
} else {
step.State = model.StatusSuccess
}
if step.ExitCode != 0 || step.Error != "" {
if state.ExitCode != 0 || state.Error != "" {
step.State = model.StatusFailure
}
return &step, store.StepUpdate(&step)
Expand All @@ -79,6 +78,6 @@ func UpdateStepToStatusKilled(store store.Store, step model.Step) (*model.Step,
if step.Started == 0 {
step.Started = step.Finished
}
step.ExitCode = pipeline.ExitCodeKilled

return &step, store.StepUpdate(&step)
}
18 changes: 7 additions & 11 deletions server/pipeline/step_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ func TestUpdateStepStatusNotExited(t *testing.T) {
// advertised step status
state := rpc.StepState{
Started: int64(42),
Exited: false,
// Dummy data
Finished: int64(1),
ExitCode: pipeline.ExitCodeKilled,
Error: "not an error",
}

err := UpdateStepStatus(mockStoreStep(t), step, state)
step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state)
assert.NoError(t, err)
assert.EqualValues(t, model.StatusRunning, step.State)
assert.EqualValues(t, 42, step.Started)
Expand All @@ -74,7 +73,7 @@ func TestUpdateStepStatusNotExitedButStopped(t *testing.T) {
Error: "not an error",
}

err := UpdateStepStatus(mockStoreStep(t), step, state)
step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state)
assert.NoError(t, err)
assert.EqualValues(t, model.StatusKilled, step.State)
assert.EqualValues(t, 42, step.Started)
Expand All @@ -92,13 +91,12 @@ func TestUpdateStepStatusExited(t *testing.T) {
// advertised step status
state := rpc.StepState{
Started: int64(42),
Exited: true,
Finished: int64(34),
ExitCode: pipeline.ExitCodeKilled,
Error: "an error",
}

err := UpdateStepStatus(mockStoreStep(t), step, state)
step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state)
assert.NoError(t, err)
assert.EqualValues(t, model.StatusKilled, step.State)
assert.EqualValues(t, 42, step.Started)
Expand All @@ -116,12 +114,11 @@ func TestUpdateStepStatusExitedButNot137(t *testing.T) {
// advertised step status
state := rpc.StepState{
Started: int64(42),
Exited: true,
Finished: int64(34),
Error: "an error",
}

err := UpdateStepStatus(mockStoreStep(t), step, state)
step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state)
assert.NoError(t, err)
assert.EqualValues(t, model.StatusFailure, step.State)
assert.EqualValues(t, 42, step.Started)
Expand All @@ -136,13 +133,12 @@ func TestUpdateStepStatusExitedWithCode(t *testing.T) {
// advertised step status
state := rpc.StepState{
Started: int64(42),
Exited: true,
Finished: int64(34),
ExitCode: 1,
Error: "an error",
}
step := &model.Step{}
err := UpdateStepStatus(mockStoreStep(t), step, state)
step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state)
assert.NoError(t, err)

assert.Equal(t, model.StatusFailure, step.State)
Expand All @@ -162,7 +158,7 @@ func TestUpdateStepToStatusStarted(t *testing.T) {
func TestUpdateStepToStatusSkipped(t *testing.T) {
t.Parallel()

step, _ := UpdateStepToStatusSkipped(mockStoreStep(t), model.Step{}, int64(1))
step, _ := UpdateStepStatusToSkipped(mockStoreStep(t), model.Step{}, int64(1))

assert.Equal(t, model.StatusSkipped, step.State)
assert.EqualValues(t, 0, step.Finished)
Expand All @@ -175,7 +171,7 @@ func TestUpdateStepToStatusSkippedButStarted(t *testing.T) {
Started: int64(42),
}

step, _ = UpdateStepToStatusSkipped(mockStoreStep(t), *step, int64(1))
step, _ = UpdateStepStatusToSkipped(mockStoreStep(t), *step, int64(1))

assert.Equal(t, model.StatusSuccess, step.State)
assert.EqualValues(t, 1, step.Finished)
Expand Down