Skip to content
Draft
236 changes: 188 additions & 48 deletions cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
"strings"

"codeberg.org/6543/xyaml"
"github.com/oklog/ulid/v2"
"github.com/urfave/cli/v3"
"go.uber.org/multierr"

"go.woodpecker-ci.org/woodpecker/v3/cli/common"
"go.woodpecker-ci.org/woodpecker/v3/cli/exec/scheduler"
"go.woodpecker-ci.org/woodpecker/v3/cli/lint"
"go.woodpecker-ci.org/woodpecker/v3/pipeline"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend"
Expand All @@ -44,6 +43,7 @@ import (
pipeline_runtime "go.woodpecker-ci.org/woodpecker/v3/pipeline/runtime"
pipeline_utils "go.woodpecker-ci.org/woodpecker/v3/pipeline/utils"
"go.woodpecker-ci.org/woodpecker/v3/shared/constant"
"go.woodpecker-ci.org/woodpecker/v3/shared/logger"
"go.woodpecker-ci.org/woodpecker/v3/shared/utils"
)

Expand All @@ -66,9 +66,7 @@ func run(ctx context.Context, c *cli.Command) error {
return common.RunPipelineFunc(ctx, c, execFile, execDir)
}

// TODO: do parallel runs with output to multiple _windows_ e.g. tmux like
func execDir(ctx context.Context, c *cli.Command, dir string) error {
// TODO: respect pipeline dependency
repoPath := c.String("repo-path")
if repoPath != "" {
repoPath, _ = filepath.Abs(repoPath)
Expand Down Expand Up @@ -152,14 +150,19 @@ func runExec(ctx context.Context, c *cli.Command, yamls []*builder.YamlFile, rep

privilegedPlugins := c.StringSlice("plugins-privileged")

// emulate server prefix for volume/network naming
prefix := "wp_" + ulid.Make().String()
// NOTE: we deliberately do NOT set compiler.WithPrefix here.
// The pipeline builder (pipeline/frontend/builder) generates a
// unique prefix per workflow of the form wp_<ULID>_<workflowID>,
// which becomes the workflow's docker network and volume name.
// Passing a shared WithPrefix would override that per-workflow
// value and cause parallel workflows to collide on the same
// docker network/volume — the exact symptom that appeared when
// the scheduler started running workflows concurrently.

// build compiler options — mirrors server behavior
compilerOpts := []compiler.Option{
compiler.WithEscalated(privilegedPlugins...),
compiler.WithNetworks(c.StringSlice("network")...),
compiler.WithPrefix(prefix),
compiler.WithProxy(compiler.ProxyOptions{
NoProxy: c.String("backend-no-proxy"),
HTTPProxy: c.String("backend-http-proxy"),
Expand All @@ -177,24 +180,24 @@ func runExec(ctx context.Context, c *cli.Command, yamls []*builder.YamlFile, rep

// configure volumes for local execution
volumes := c.StringSlice("volumes")
compilerOpts = append(
compilerOpts,
compiler.WithWorkspace(
c.String("workspace-base"),
c.String("workspace-path"),
),
)
if c.Bool("local") {
compilerOpts = append(compilerOpts,
compiler.WithWorkspace(
c.String("workspace-base"),
c.String("workspace-path"),
),
)
volumes = append(volumes,
prefix+"_default:"+c.String("workspace-base"),
// In local mode we bind-mount the user's repo directory into
// each step so the step sees the working tree as-is instead
// of a cloned copy. The per-workflow workspace volume mount
// (<prefix>_default:<workspace-base>) is added later, after
// the builder has assigned each workflow its own prefix —
// see injectLocalWorkspaceMounts below.
volumes = append(
volumes,
repoPath+":"+c.String("workspace-base")+"/"+c.String("workspace-path"),
)
} else {
compilerOpts = append(compilerOpts,
compiler.WithWorkspace(
c.String("workspace-base"),
c.String("workspace-path"),
),
)
}
compilerOpts = append(compilerOpts, compiler.WithVolumes(volumes...))

Expand Down Expand Up @@ -230,10 +233,28 @@ func runExec(ctx context.Context, c *cli.Command, yamls []*builder.YamlFile, rep
},
}

items, err := b.Build()
if err != nil {
str, fmtErr := lint.FormatLintError("pipeline", err, false)
fmt.Print(str)
items, buildErr := b.Build()

// Decide output mode up front. We need this before printing any
// warnings: in TUI mode the warnings must be captured into the
// model's messages ring so they render in the bottom pane,
// rather than being smeared across the terminal right before the
// alt-screen swap wipes them.
useTUI := !c.Bool("no-tui") && logger.IsInteractiveTerminal()

// preRunMessages collects pre-run diagnostic text (lint
// warnings, "Config is valid" banners, etc.) destined for the
// TUI messages pane. In line mode this stays empty and the
// output goes to stdout as before.
var preRunMessages strings.Builder

if buildErr != nil {
str, fmtErr := lint.FormatLintError("pipeline", buildErr, false)
if useTUI {
preRunMessages.WriteString(str)
} else {
fmt.Print(str)
}
if fmtErr != nil {
return fmtErr
}
Expand All @@ -243,6 +264,17 @@ func runExec(ctx context.Context, c *cli.Command, yamls []*builder.YamlFile, rep
return fmt.Errorf("no workflows to execute (all filtered out)")
}

// Local mode: mount each workflow's docker volume into every
// step's workspace path. This used to be done globally via
// compiler.WithVolumes with a shared prefix, but with parallel
// workflows that collided on the same docker volume name — all
// workflows used "<shared-prefix>_default". The builder now
// generates a per-workflow prefix, so we injecting the mount
// here after the build gives each workflow its own volume.
if c.Bool("local") {
injectLocalWorkspaceMounts(items, c.String("workspace-base"))
}

backendCtx := context.WithValue(ctx, backend_types.CliCommand, c)
backendEngine, err := backend.FindBackend(backendCtx, backends, c.String("backend-engine"))
if err != nil {
Expand All @@ -252,34 +284,147 @@ func runExec(ctx context.Context, c *cli.Command, yamls []*builder.YamlFile, rep
return err
}

var execErr error
// TODO: respect depends_on and run in parallel where possible
for _, item := range items {
fmt.Println("#", item.Workflow.Name)
// The pipeline context carries timeout + SIGTERM cancellation for
// the entire DAG run. Every workflow's runtime derives its own ctx
// from this one, so cancellation fans out to all of them at once.
pipelineCtx, cancel := context.WithTimeout(ctx, c.Duration("timeout"))
defer cancel()

if useTUI {
return runTUIMode(pipelineCtx, items, backendEngine, preRunMessages.String())
}
return runLineMode(pipelineCtx, items, backendEngine)
}

pipelineCtx, cancel := context.WithTimeout(context.Background(), c.Duration("timeout"))
defer cancel()
pipelineCtx = utils.WithContextSigtermCallback(pipelineCtx, func() {
fmt.Printf("ctrl+c received, terminating workflow '%s'\n", item.Workflow.Name)
// runLineMode drives the scheduler with a line-oriented output path:
// per-step output goes through LineWriter to stderr, and workflow
// banners / diagnostics are rendered by handleLineModeEvent.
//
// This is the path used when --no-tui is set, when stdout is not a
// terminal (e.g. CI logs), or as a fallback when the TUI is
// unavailable.
func runLineMode(pipelineCtx context.Context, items []*builder.Item, backendEngine backend_types.Backend) error {
pipelineCtx = utils.WithContextSigtermCallback(pipelineCtx, func() {
fmt.Fprintln(os.Stderr, "ctrl+c received, terminating pipeline")
})

// Whether to emit workflow names in the per-step log prefix. With
// a single workflow the prefix stays terse as "[step]"; with
// multiple workflows running in parallel, interleaved output needs
// the workflow qualifier to stay attributable.
multiWorkflow := len(items) > 1

// Per-workflow logger factory. The runtime calls this once per
// step with an io.ReadCloser streaming that step's stdout+stderr;
// we pipe each line through the workflow-aware LineWriter.
newLogger := func(workflowName string) logging.Logger {
return logging.Logger(func(step *backend_types.Step, rc io.ReadCloser) error {
var lw io.WriteCloser
if multiWorkflow {
lw = NewWorkflowLineWriter(workflowName, step.Name, step.UUID)
} else {
lw = NewLineWriter(step.Name, step.UUID)
}
return pipeline_utils.CopyLineByLine(lw, rc, pipeline.MaxLogLineLength)
})
}

err := pipeline_runtime.New(
// Events channel: consumed by a goroutine that turns scheduler
// state transitions into user-visible banners and diagnostics.
// Buffered generously so a slow terminal never back-pressures the
// scheduler's control loop.
events := make(chan scheduler.Event, schedulerEventBuffer)
eventsDone := make(chan struct{})
go func() {
defer close(eventsDone)
for ev := range events {
handleLineModeEvent(os.Stderr, ev)
}
}()

runFunc := func(runCtx context.Context, item *builder.Item) error {
return pipeline_runtime.New(
item.Config, backendEngine,
pipeline_runtime.WithContext(pipelineCtx), //nolint:contextcheck
pipeline_runtime.WithLogger(defaultLogger),
pipeline_runtime.WithContext(runCtx),
pipeline_runtime.WithLogger(newLogger(item.Workflow.Name)),
pipeline_runtime.WithDescription(map[string]string{
"CLI": "exec",
}),
).Run(ctx)
if err != nil {
fmt.Println(err)
execErr = multierr.Append(execErr, err)
}
fmt.Println("")
).Run(runCtx)
}

sched := scheduler.New(scheduler.Options{
Items: items,
Run: runFunc,
Events: events,
})

execErr := sched.Run(pipelineCtx)
<-eventsDone
return execErr
}

// schedulerEventBuffer is the channel buffer size for scheduler
// events. Generous so a slow consumer (terminal, tea program) does
// not back-pressure the scheduler's control loop.
const schedulerEventBuffer = 64

// handleLineModeEvent renders a workflow-level state transition to
// the given writer for the plain (non-TUI) output path. It emits:
//
// - a "# <name>" banner when a workflow starts running, matching
// the legacy sequential output,
// - a short diagnostic line when a workflow is blocked by a failed
// dependency (so the user understands the skip),
// - nothing for other states — per-step output and the final error
// return already cover success/failure reporting.
func handleLineModeEvent(out io.Writer, ev scheduler.Event) {
switch ev.State {
case scheduler.StateRunning:
WorkflowHeader(out, ev.Workflow)
case scheduler.StateBlocked:
if ev.Err != nil {
fmt.Fprintf(out, "# %s: %s\n", ev.Workflow, ev.Err.Error())
}
case scheduler.StateCanceled:
fmt.Fprintf(out, "# %s: canceled\n", ev.Workflow)
}
}

// injectLocalWorkspaceMounts adds the per-workflow workspace volume
// binding to every step in every item. In local-mode runs (the
// default when invoking `woodpecker-cli exec` directly), steps need
// to share a named docker volume for the workspace so files written
// by one step are visible to the next; the volume itself is created
// by the backend in SetupWorkflow using the name in item.Config.Volume.
//
// Previously the CLI computed one shared prefix upfront and added
// "<prefix>_default:<workspace-base>" to compiler.WithVolumes(),
// which applied to all workflows. That worked when exec ran
// workflows sequentially but collided on the first parallel run:
// every workflow tried to create the same docker volume and docker
// network, producing "already exists" and "unknown network" errors.
//
// Now the builder emits a unique prefix per workflow (see
// pipeline/frontend/builder/builder.go). We read the per-workflow
// volume name off each item's compiled Config and inject the binding
// into every step's Volumes slice. Per-step injection matches what
// compiler.WithVolumes already does internally for the non-local
// path, so the runtime sees an identical shape either way.
func injectLocalWorkspaceMounts(items []*builder.Item, workspaceBase string) {
for _, item := range items {
if item == nil || item.Config == nil || item.Config.Volume == "" {
continue
}
mount := item.Config.Volume + ":" + workspaceBase
for _, stage := range item.Config.Stages {
for _, step := range stage.Steps {
step.Volumes = append(step.Volumes, mount)
}
}
}
}

// convertPathForWindows converts a path to use slash separators
// for Windows. If the path is a Windows volume name like C:, it
// converts it to an absolute root path starting with slash (e.g.
Expand All @@ -298,8 +443,3 @@ func convertPathForWindows(path string) string {

return filepath.ToSlash(path)
}

var defaultLogger = logging.Logger(func(step *backend_types.Step, rc io.ReadCloser) error {
logWriter := NewLineWriter(step.Name, step.UUID)
return pipeline_utils.CopyLineByLine(logWriter, rc, pipeline.MaxLogLineLength)
})
6 changes: 4 additions & 2 deletions cli/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ steps:
r.Close()
stdout := buf.String()

assert.Contains(t, stdout,
assert.Contains(
t, stdout,
`[build:L0:0s] StepName: build
[build:L1:0s] StepType: commands
[build:L2:0s] StepUUID: `,
)
assert.Contains(t, stdout,
assert.Contains(
t, stdout,
`[build:L3:0s] StepCommands:
[build:L4:0s] ------------------
[build:L5:0s] echo hello
Expand Down
Loading