Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
421e7bb
backend local: windows cmd escape propper
6543 Sep 29, 2025
a997537
save
6543 Sep 29, 2025
b07d3c9
jup
6543 Sep 29, 2025
702ed80
tests
6543 Sep 29, 2025
04d2aac
tests
6543 Sep 29, 2025
4f91d84
Merge branch 'main' into backend-local-exec-windows-cmd-qoute-broke
6543 Sep 29, 2025
c5d3df5
Update pipeline/backend/local/command.go
6543 Sep 29, 2025
61a592a
Merge branch 'backend-local-exec-windows-cmd-qoute-broke' into backen…
6543 Sep 29, 2025
73e4539
more text
6543 Sep 29, 2025
6e5ad92
rm
6543 Sep 29, 2025
4af2fb1
Merge branch 'backend-local-exec-windows-cmd-qoute-broke' into backen…
6543 Sep 29, 2025
189b711
fmt
6543 Sep 29, 2025
4f36ac7
Merge branch 'backend-local-exec-windows-cmd-qoute-broke' into backen…
6543 Sep 29, 2025
e4d71c6
Merge branch 'main' into backend-local-test-shell-posix
6543 Sep 30, 2025
27d78c7
better wording
6543 Sep 30, 2025
86b6b45
clean
6543 Sep 30, 2025
3829834
make lint happy and even better error txt
6543 Sep 30, 2025
1c024d7
common :/
6543 Sep 30, 2025
2c98487
common :/ (2)
6543 Sep 30, 2025
14ee26c
local backend: check shell existence first
6543 Sep 30, 2025
76048ca
isolate log streaming between steps and use race save types
6543 Sep 30, 2025
6ec9c36
Merge branch 'main' into enhance-local-backend
6543 Sep 30, 2025
4fa0b58
more test coverage
6543 Sep 30, 2025
498df8b
gc
6543 Sep 30, 2025
2c497f7
split better and refactor
6543 Oct 1, 2025
26bba5a
mv code and add doc string etc
6543 Oct 1, 2025
170c6d7
satisfy lint
6543 Oct 1, 2025
74dfb93
lint
6543 Oct 1, 2025
53ddc08
local backend: proper wayt
6543 Oct 1, 2025
664e4a0
cleanup detached steps still running
6543 Oct 1, 2025
c166462
Merge branch 'main' into enhance-local-backend
6543 Oct 1, 2025
54d7ee2
changes from upstream
6543 Oct 1, 2025
5906744
fmt
6543 Oct 1, 2025
e44e610
Merge branch 'main' into enhance-local-backend
6543 Oct 1, 2025
4838b29
make less flaki test
6543 Oct 1, 2025
b7c05ae
better comments
6543 Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions pipeline/backend/local/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,20 @@ func (e *local) execClone(ctx context.Context, step *types.Step, state *workflow
cmd.Env = env
cmd.Dir = state.workspaceDir

reader, err := cmd.StdoutPipe()
if err != nil {
return err
}

// Save state
state.stepState.Store(step.UUID, &stepState{
cmd: cmd,
output: reader,
})

// Get output and redirect Stderr to Stdout
e.output, _ = cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout

state.stepCMDs[step.UUID] = cmd

return cmd.Start()
}

Expand Down
52 changes: 52 additions & 0 deletions pipeline/backend/local/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,66 @@
package local

import (
"context"
"fmt"
"io"
"os"
"os/exec"
"strings"

"al.essio.dev/pkg/shellescape"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"

"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)

// execCommands use step.Image as shell and run the commands in it.
func (e *local) execCommands(ctx context.Context, step *types.Step, state *workflowState, env []string) error {
if err := checkShellExistence(step.Image); err != nil {
return err
}

// Prepare commands
// TODO: support `entrypoint` from pipeline config
args, err := e.genCmdByShell(step.Image, step.Commands)
if err != nil {
return fmt.Errorf("could not convert commands into args: %w", err)
}

// Use "image name" as run command (indicate shell)
cmd := exec.CommandContext(ctx, step.Image, args...)
cmd.Env = env
cmd.Dir = state.workspaceDir

reader, err := cmd.StdoutPipe()
if err != nil {
return err
}

if e.os == "windows" {
// we get non utf8 output from windows so just sanitize it
// TODO: remove hack
reader = io.NopCloser(transform.NewReader(reader, unicode.UTF8.NewDecoder().Transformer))
}

// Get output and redirect Stderr to Stdout
cmd.Stderr = cmd.Stdout

// Save state
state.stepState.Store(step.UUID, &stepState{
cmd: cmd,
output: reader,
})

return cmd.Start()
}

func checkShellExistence(shell string) error {
_, err := exec.LookPath(shell)
return err
}

func (e *local) genCmdByShell(shell string, cmdList []string) (args []string, err error) {
if len(cmdList) == 0 {
return nil, ErrNoCmdSet
Expand Down
6 changes: 0 additions & 6 deletions pipeline/backend/local/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package local

import (
"errors"
"fmt"
)

Expand All @@ -30,11 +29,6 @@ var notAllowedEnvVarOverwrites = []string{
"CI_WORKSPACE",
}

var (
ErrUnsupportedStepType = errors.New("unsupported step type")
ErrWorkflowStateNotFound = errors.New("workflow state not found")
)

const netrcFile = `
machine %s
login %s
Expand Down
8 changes: 6 additions & 2 deletions pipeline/backend/local/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import (
)

var (
ErrNoShellSet = errors.New("no shell was set")
ErrNoCmdSet = errors.New("no commands where set")
ErrUnsupportedStepType = errors.New("unsupported step type")
ErrStepReaderNotFound = errors.New("could not found pipe reader for step")
ErrWorkflowStateNotFound = errors.New("workflow state not found")
ErrStepStateNotFound = errors.New("step state not found")
ErrNoShellSet = errors.New("no shell was set")
ErrNoCmdSet = errors.New("no commands where set")
)

// ErrNoPosixShell indicates that a shell was assumed to be POSIX-compatible but failed the test.
Expand Down
162 changes: 74 additions & 88 deletions pipeline/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,26 @@ import (

"github.com/rs/zerolog/log"
"github.com/urfave/cli/v3"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"

"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)

type workflowState struct {
stepCMDs map[string]*exec.Cmd
stepState sync.Map // map of *stepState
baseDir string
homeDir string
workspaceDir string
pluginGitBinary string
}

type stepState struct {
cmd *exec.Cmd
output io.ReadCloser
}

type local struct {
tempDir string
workflows sync.Map
output io.ReadCloser
pluginGitBinary string
os, arch string
}
Expand Down Expand Up @@ -89,7 +91,6 @@ func (e *local) Load(ctx context.Context) (*types.BackendInfo, error) {
}, nil
}

// SetupWorkflow the pipeline environment.
func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment")

Expand All @@ -99,7 +100,6 @@ func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID strin
}

state := &workflowState{
stepCMDs: make(map[string]*exec.Cmd),
baseDir: baseDir,
workspaceDir: filepath.Join(baseDir, "workspace"),
homeDir: filepath.Join(baseDir, "home"),
Expand All @@ -113,16 +113,15 @@ func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID strin
return err
}

e.saveState(taskUUID, state)
e.workflows.Store(taskUUID, state)

return nil
}

// StartStep the pipeline step.
func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)

state, err := e.getState(taskUUID)
state, err := e.getWorkflowState(taskUUID)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,117 +152,93 @@ func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string
}
}

// execCommands use step.Image as shell and run the commands in it.
func (e *local) execCommands(ctx context.Context, step *types.Step, state *workflowState, env []string) error {
// Prepare commands
// TODO: support `entrypoint` from pipeline config
args, err := e.genCmdByShell(step.Image, step.Commands)
if err != nil {
return fmt.Errorf("could not convert commands into args: %w", err)
}

// Use "image name" as run command (indicate shell)
cmd := exec.CommandContext(ctx, step.Image, args...)
cmd.Env = env
cmd.Dir = state.workspaceDir

// Get output and redirect Stderr to Stdout
e.output, _ = cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout

if e.os == "windows" {
// we get non utf8 output from windows so just sanitize it
// TODO: remove hack
e.output = io.NopCloser(transform.NewReader(e.output, unicode.UTF8.NewDecoder().Transformer))
}

state.stepCMDs[step.UUID] = cmd

return cmd.Start()
}

// execPlugin use step.Image as exec binary.
func (e *local) execPlugin(ctx context.Context, step *types.Step, state *workflowState, env []string) error {
binary, err := exec.LookPath(step.Image)
if err != nil {
return fmt.Errorf("lookup plugin binary: %w", err)
}

cmd := exec.CommandContext(ctx, binary)
cmd.Env = env
cmd.Dir = state.workspaceDir

// Get output and redirect Stderr to Stdout
e.output, _ = cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout

state.stepCMDs[step.UUID] = cmd

return cmd.Start()
}

// WaitStep for the pipeline step to complete and returns
// the completion results.
func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)

state, err := e.getState(taskUUID)
state, err := e.getStepState(taskUUID, step.UUID)
if err != nil {
return nil, err
}

cmd, ok := state.stepCMDs[step.UUID]
if !ok {
return nil, fmt.Errorf("step cmd for %s not found", step.UUID)
// normally we use cmd.Wait() to wait for *exec.Cmd, but cmd.StdoutPipe() tells us not
// as Wait() would close the io pipe even if not all logs where read and send back
// so we have to do use the underlying functions
if state.cmd.Process == nil {
return nil, errors.New("exec: not started")
}

err = cmd.Wait()
ExitCode := 0

var execExitError *exec.ExitError
if errors.As(err, &execExitError) {
ExitCode = execExitError.ExitCode()
// Non-zero exit code is a pipeline failure, but not an agent error.
err = nil
if state.cmd.ProcessState == nil {
cmdState, err := state.cmd.Process.Wait()
if err != nil {
return nil, err
}
state.cmd.ProcessState = cmdState
}

return &types.State{
Exited: true,
ExitCode: ExitCode,
ExitCode: state.cmd.ProcessState.ExitCode(),
}, err
}

// TailStep the pipeline step logs.
func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)
return e.output, nil
state, err := e.getStepState(taskUUID, step.UUID)
if err != nil {
return nil, err
} else if state.output == nil {
return nil, ErrStepReaderNotFound
}
return state.output, nil
}

func (e *local) DestroyStep(_ context.Context, _ *types.Step, _ string) error {
// WaitStep already waits for the command to finish, so there is nothing to do here.
func (e *local) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error {
state, err := e.getStepState(taskUUID, step.UUID)
if err != nil {
return err
}

// As WaitStep can not use cmd.Wait() witch ensures the process already finished and
// the io pipe is closed on process end, we make sure it is done.
_ = state.output.Close()
state.output = nil
_ = state.cmd.Cancel()
state.cmd = nil
workflowState, _ := e.getWorkflowState(taskUUID)
workflowState.stepState.Delete(step.UUID)

return nil
}

// DestroyWorkflow the pipeline environment.
func (e *local) DestroyWorkflow(_ context.Context, _ *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("delete workflow environment")

state, err := e.getState(taskUUID)
state, err := e.getWorkflowState(taskUUID)
if err != nil {
return err
}

// clean up steps not cleaned up because of context cancel or detached function
state.stepState.Range(func(_, value any) bool {
state, _ := value.(*stepState)
_ = state.output.Close()
state.output = nil
_ = state.cmd.Cancel()
state.cmd = nil
return true
})

err = os.RemoveAll(state.baseDir)
if err != nil {
return err
}

e.deleteState(taskUUID)
// hint for the gc to clean stuff
state.stepState.Clear()
e.workflows.Delete(taskUUID)

return err
}

func (e *local) getState(taskUUID string) (*workflowState, error) {
func (e *local) getWorkflowState(taskUUID string) (*workflowState, error) {
state, ok := e.workflows.Load(taskUUID)
if !ok {
return nil, ErrWorkflowStateNotFound
Expand All @@ -277,10 +252,21 @@ func (e *local) getState(taskUUID string) (*workflowState, error) {
return s, nil
}

func (e *local) saveState(taskUUID string, state *workflowState) {
e.workflows.Store(taskUUID, state)
}
func (e *local) getStepState(taskUUID, stepUUID string) (*stepState, error) {
wState, err := e.getWorkflowState(taskUUID)
if err != nil {
return nil, err
}

func (e *local) deleteState(taskUUID string) {
e.workflows.Delete(taskUUID)
state, ok := wState.stepState.Load(stepUUID)
if !ok {
return nil, ErrStepStateNotFound
}

s, ok := state.(*stepState)
if !ok {
return nil, fmt.Errorf("could not parse state: %v", state)
}

return s, nil
}
Loading