Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 44 additions & 18 deletions pipeline/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (
return nil, err
}

if state.cmd == nil {
return nil, errors.New("exec: step command not set up")
}

stepState := &types.State{
Exited: true,
}

// normally we use cmd.Wait() to wait for *exec.Cmd, but cmd.StdoutPipe() tells us not
// as Wait() would close the io pipe even if not all logs where read and send back
// so we have to do use the underlying functions
Expand All @@ -190,13 +198,19 @@ func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (
if err != nil {
return nil, err
}
state.cmd.ProcessState = cmdState
if cmdState == nil {
return nil, errors.New("exec: cmd state after Wait() can not be nil but is")
}
stepState.ExitCode = cmdState.ExitCode()
// can be nil if step got canceled
if state.cmd != nil {
state.cmd.ProcessState = cmdState
}
} else {
stepState.ExitCode = state.cmd.ProcessState.ExitCode()
}

return &types.State{
Exited: true,
ExitCode: state.cmd.ProcessState.ExitCode(),
}, err
return stepState, err
}

func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
Expand All @@ -217,13 +231,20 @@ func (e *local) DestroyStep(_ context.Context, step *types.Step, taskUUID string

// As WaitStep can not use cmd.Wait() witch ensures the process already finished and
// the io pipe is closed on process end, we make sure it is done.
_ = state.output.Close()
state.output = nil
_ = state.cmd.Cancel()
state.cmd = nil
workflowState, _ := e.getWorkflowState(taskUUID)
workflowState.stepState.Delete(step.UUID)
if state.output != nil {
_ = state.output.Close()
state.output = nil
}
if state.cmd != nil {
_ = state.cmd.Cancel()
state.cmd = nil
}
workflowState, err := e.getWorkflowState(taskUUID)
if err != nil {
return err
}

workflowState.stepState.Delete(step.UUID)
return nil
}

Expand All @@ -237,11 +258,16 @@ func (e *local) DestroyWorkflow(_ context.Context, _ *types.Config, taskUUID str

// clean up steps not cleaned up because of context cancel or detached function
state.stepState.Range(func(_, value any) bool {
state, _ := value.(*stepState)
_ = state.output.Close()
state.output = nil
_ = state.cmd.Cancel()
state.cmd = nil
if state, ok := value.(*stepState); ok && state != nil {
if state.output != nil {
_ = state.output.Close()
state.output = nil
}
if state.cmd != nil {
_ = state.cmd.Cancel()
state.cmd = nil
}
}
return true
})

Expand All @@ -264,7 +290,7 @@ func (e *local) getWorkflowState(taskUUID string) (*workflowState, error) {
}

s, ok := state.(*workflowState)
if !ok {
if !ok || s == nil {
return nil, fmt.Errorf("could not parse state: %v", state)
}

Expand All @@ -283,7 +309,7 @@ func (e *local) getStepState(taskUUID, stepUUID string) (*stepState, error) {
}

s, ok := state.(*stepState)
if !ok {
if !ok || s == nil {
return nil, fmt.Errorf("could not parse state: %v", state)
}

Expand Down