Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
503be2a
local backend: show "Canceled" as error if cancled
6543 Jan 25, 2026
1b74c2b
return err proper
6543 Jan 25, 2026
14d8548
Update 05-architecture.md
6543 Jan 25, 2026
6c0d17b
refactor ./...; fix code comments; simplify queue interface; fix fifo…
6543 Jan 25, 2026
2761696
wowo
6543 Jan 26, 2026
5f1ec79
i am done for today
6543 Jan 26, 2026
8809aee
mv CopyLineByLine to shared utils
6543 Jan 26, 2026
6254c19
move agent specific log func into agent
6543 Jan 26, 2026
3c56e56
move "package pipeline/rpc" into "rpc"
6543 Jan 26, 2026
e206cc4
Merge branch 'main' into wip-docu-restructure-rpc
6543 Jan 26, 2026
baf16c0
add depguard lint rules for rpc
6543 Jan 26, 2026
f68f130
cspell ignore generated mocks
6543 Jan 26, 2026
e8c2310
cspell conf sort
6543 Jan 26, 2026
375dfcc
update architecture docs
6543 Jan 26, 2026
b7ed93e
mv
6543 Jan 26, 2026
d383c0d
fmt
6543 Jan 26, 2026
1c5fa6a
Merge branch 'main' into wip-docu-restructure-rpc
6543 Jan 26, 2026
8132d74
Merge branch 'main' into rework-step-status-signaling
6543 Jan 26, 2026
b02e4a3
server queue: simplify and fix itteration of ErrorAtOnce
6543 Jan 26, 2026
867a16b
server queue: adjust consumer of ErrorAtOnce
6543 Jan 26, 2026
96a615f
fix code comment nit
6543 Jan 26, 2026
2114fb5
looking via git blame we can rm it as the queue code was already touc…
6543 Jan 26, 2026
0e43894
its ok we cast only internally
6543 Jan 26, 2026
eea8984
Merge branch 'simplify-fix-queue' into rework-step-status-signaling
6543 Jan 26, 2026
e80cb9b
Merge remote-tracking branch 'upstream/wip-docu-restructure-rpc' into…
6543 Jan 26, 2026
40b2aae
fix tests lint etc
6543 Jan 26, 2026
e1c876f
fifo 98% coverage
6543 Jan 26, 2026
c4774e1
group tests
6543 Jan 26, 2026
679aba5
same coverage and less test code
6543 Jan 26, 2026
8ed5a3c
more coverage
6543 Jan 26, 2026
b2fe8c0
nice test
6543 Jan 26, 2026
08f0c43
add edgecases
6543 Jan 26, 2026
7cd9299
fix tests lint etc
6543 Jan 26, 2026
3319b5f
fifo 98% coverage
6543 Jan 26, 2026
d97c108
group tests
6543 Jan 26, 2026
7ab6540
same coverage and less test code
6543 Jan 26, 2026
069359a
more coverage
6543 Jan 26, 2026
3f183c3
nice test
6543 Jan 26, 2026
df05202
add edgecases
6543 Jan 26, 2026
5c1780e
Merge branch 'main' into simplify-fix-queue
6543 Jan 26, 2026
3c6d9f5
fix lint errors
6543 Jan 26, 2026
1f44beb
Merge branch 'main' into rework-step-status-signaling
6543 Jan 26, 2026
1314ac6
Merge remote-tracking branch 'upstream_own/simplify-fix-queue' into r…
6543 Jan 26, 2026
5799e7f
clean
6543 Jan 26, 2026
ebb2f01
fix(agent): properly cancel workflow execution on server cancel
Pnkcaht Jan 26, 2026
26c2686
no init signal is err
6543 Jan 27, 2026
12a4ffa
use defer to ensure waitgroup gets closed
6543 Jan 27, 2026
2222240
workflow state contains cancel info
6543 Jan 27, 2026
b096589
Merge branch 'main' into rework-step-status-signaling
6543 Jan 27, 2026
ebed073
Apply suggestion from @6543
6543 Jan 27, 2026
604687c
Apply suggestion from @6543
6543 Jan 27, 2026
a4d291f
Merge branch 'main' into rework-step-status-signaling
6543 Jan 27, 2026
b1111a7
Apply suggestion from @6543
6543 Jan 27, 2026
5843766
simplify
6543 Jan 27, 2026
b32bffc
enhance
6543 Jan 27, 2026
607c34f
also add cancled bool to grpc stepstate
6543 Jan 27, 2026
9d8ba18
more docu
6543 Jan 27, 2026
cea5241
cleanup
6543 Jan 27, 2026
41bca26
steps are now marked as cancled too
6543 Jan 27, 2026
44e40a9
better debug msgs
6543 Jan 28, 2026
9138cd4
Revert "Fix/docker kill container on cancel (#6018)"
6543 Jan 28, 2026
06b4192
more context usage
6543 Jan 28, 2026
24e3313
simplify shutdown ctx
6543 Jan 28, 2026
65f5087
cleanup fixes and docs
6543 Jan 28, 2026
32a8091
fix moved into won pull -> https://github.com/woodpecker-ci/woodpecke…
6543 Jan 28, 2026
80d2115
fix moved into won pull -> https://github.com/woodpecker-ci/woodpecke…
6543 Jan 28, 2026
c683d9b
step status: only set start time if not set once
6543 Jan 28, 2026
c586bfe
harden server state mashine for steps (UpdateStepStatus)
6543 Jan 28, 2026
3907d1c
Merge branch 'main' into rework-step-status-signaling
6543 Jan 28, 2026
c7b0c78
Merge branch 'main' into rework-step-status-signaling
6543 Jan 28, 2026
26b595f
Merge branch 'main' into rework-step-status-signaling
6543 Jan 28, 2026
6ab1958
non missleading canceled signal from server to agent
6543 Jan 28, 2026
43fe779
refactor rpc
6543 Jan 28, 2026
8e5ddad
document agent-server gRPC interface
6543 Jan 28, 2026
f2eec2f
persistentqueue returns errors on cancle where there should be nonoe
6543 Jan 28, 2026
ecd6058
document more
6543 Jan 28, 2026
658834b
persistent queue dont error on expected task cancle
6543 Jan 28, 2026
4bcf11a
Merge branch 'main' into rework-step-status-signaling
6543 Jan 28, 2026
2f9772a
misspell
6543 Jan 28, 2026
02b83ab
ignore lint errors we know what we do
6543 Jan 28, 2026
2fc3346
fix-test
6543 Jan 28, 2026
8804a24
Merge branch 'main' into rework-step-status-signaling
6543 Jan 28, 2026
939b9f6
Apply suggestion from @6543
6543 Jan 28, 2026
9801f50
move error checking more to source
6543 Jan 28, 2026
9a19c6b
Apply suggestion from @6543
6543 Jan 28, 2026
416659d
better code comments
6543 Jan 28, 2026
c0a7a69
better desc for Peer interface
6543 Jan 28, 2026
2ab98b2
common lint
6543 Jan 28, 2026
9bd7903
Apply suggestions from code review
6543 Jan 29, 2026
7bb50cf
Merge branch 'main' into rework-step-status-signaling
6543 Jan 29, 2026
39ce00c
wording
6543 Jan 29, 2026
c99239a
Update agent/runner.go
6543 Jan 30, 2026
1bc226b
Merge branch 'main' into rework-step-status-signaling
6543 Jan 30, 2026
fc0afa6
Merge branch 'main' into rework-step-status-signaling
6543 Jan 31, 2026
07d48b0
Merge branch 'main' into rework-step-status-signaling
6543 Feb 1, 2026
f2bcd6d
Merge branch 'main' into rework-step-status-signaling
6543 Feb 2, 2026
b69f68f
we never expect state.Pipeline to be nil so we dont need a ref
6543 Feb 2, 2026
fe20d56
better wording
6543 Feb 2, 2026
a7e2131
we dont need this magic number
6543 Feb 2, 2026
2d71e77
Merge branch 'main' into rework-step-status-signaling
6543 Feb 2, 2026
ff2e406
Merge branch 'main' into rework-step-status-signaling
6543 Feb 3, 2026
b4748f6
Merge branch 'main' into rework-step-status-signaling
6543 Feb 3, 2026
155e445
Merge branch 'main' into rework-step-status-signaling
6543 Feb 3, 2026
bedaecd
Merge branch 'main' into rework-step-status-signaling
6543 Feb 3, 2026
54910e8
Merge branch 'main' into rework-step-status-signaling
6543 Feb 4, 2026
620ca43
Merge branch 'main' into rework-step-status-signaling
6543 Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w
Logger()

uploads.Add(1)
defer uploads.Done()

var secrets []string
for _, secret := range workflow.Config.Secrets {
Expand All @@ -50,8 +51,6 @@ func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, w
}

logger.Debug().Msg("log stream copied, close ...")
uploads.Done()

return nil
}
}
25 changes: 14 additions & 11 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,27 @@ func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, er
return w, nil
}

// Wait blocks until the workflow is complete.
func (c *client) Wait(ctx context.Context, workflowID string) (err error) {
// Wait blocks until the workflow with the given ID is marked as completed or canceled by the server.
func (c *client) Wait(ctx context.Context, workflowID string) (canceled bool, err error) {
retry := c.newBackOff()
req := new(proto.WaitRequest)
req.Id = workflowID
for {
_, err = c.client.Wait(ctx, req)
resp, err := c.client.Wait(ctx, req)
if err == nil {
break
// wait block was released normally as expected by server
return resp.GetCanceled(), nil
}

switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
// expected as context was canceled
log.Debug().Err(err).Msgf("grpc error: wait(): context canceled")
return nil
return false, nil
}
log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
return err
return false, err
case
codes.Aborted,
codes.DataLoss,
Expand All @@ -178,16 +179,15 @@ func (c *client) Wait(ctx context.Context, workflowID string) (err error) {
log.Warn().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
default:
log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err))
return err
return false, err
}

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
}
}
return nil
}

// Init signals the workflow is initialized.
Expand All @@ -199,6 +199,7 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow
req.State.Started = state.Started
req.State.Finished = state.Finished
req.State.Error = state.Error
req.State.Canceled = state.Canceled
for {
_, err = c.client.Init(ctx, req)
if err == nil {
Expand Down Expand Up @@ -238,7 +239,7 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow
return nil
}

// Done signals the workflow is complete.
// Done let agent signal to server the workflow has stopped.
func (c *client) Done(ctx context.Context, workflowID string, state rpc.WorkflowState) (err error) {
retry := c.newBackOff()
req := new(proto.DoneRequest)
Expand All @@ -247,6 +248,7 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow
req.State.Started = state.Started
req.State.Finished = state.Finished
req.State.Error = state.Error
req.State.Canceled = state.Canceled
for {
_, err = c.client.Done(ctx, req)
if err == nil {
Expand Down Expand Up @@ -330,7 +332,7 @@ func (c *client) Extend(ctx context.Context, workflowID string) (err error) {
return nil
}

// Update updates the workflow state.
// Update let agent updates the step state at the server.
func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
Expand All @@ -342,6 +344,7 @@ func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepSt
req.State.Exited = state.Exited
req.State.ExitCode = int32(state.ExitCode)
req.State.Error = state.Error
req.State.Canceled = state.Canceled
for {
_, err = c.client.Update(ctx, req)
if err == nil {
Expand Down
59 changes: 31 additions & 28 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -51,6 +50,7 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen
}
}

// Run executes a workflow using a backend, tracks its state and reports the state back to the server.
func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
log.Debug().Msg("request next execution")

Expand Down Expand Up @@ -90,34 +90,32 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {

// Workflow execution context.
// This context is the SINGLE source of truth for cancellation.
workflowCtx, cancel := context.WithTimeout(ctxMeta, timeout)
defer cancel()
workflowCtx, _ := context.WithTimeout(ctxMeta, timeout) //nolint:govet
workflowCtx, cancelWorkflowCtx := context.WithCancelCause(workflowCtx)
defer cancelWorkflowCtx(nil)

// Handle SIGTERM (k8s, docker, system shutdown)
// Add sigterm support for internal context.
// Required to be able to terminate the running workflow by external signals.
workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() {
logger.Error().Msg("received sigterm termination signal")
cancel()
// WithContextSigtermCallback would cancel the context too, but we want our own custom error
cancelWorkflowCtx(pipeline.ErrCancel)
})

// canceled indicates whether the workflow was canceled remotely (UI/API).
// Must be atomic because it is written from a goroutine and read later.
var canceled atomic.Bool

// Listen for remote cancel events (UI / API).
// When canceled, we MUST cancel the workflow context
// so that pipeline execution and backend processes stop immediately.
// so that workflow execution stop immediately.
go func() {
logger.Debug().Msg("listening for cancel signal")

if err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
logger.Warn().Err(err).Msg("cancel signal received from server")

// Mark workflow as canceled (thread-safe)
canceled.Store(true)
logger.Debug().Msg("start listening for server side cancel signal")

// Propagate cancellation to pipeline + backend
cancel()
if canceled, err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
logger.Error().Err(err).Msg("server returned unexpected err while waiting for workflow to finish run")
cancelWorkflowCtx(err)
} else {
if canceled {
logger.Debug().Err(err).Msg("server side cancel signal received")
cancelWorkflowCtx(pipeline.ErrCancel)
}
// Wait returned without error, meaning the workflow finished normally
logger.Debug().Msg("cancel listener exited normally")
}
Expand All @@ -143,9 +141,13 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
state := rpc.WorkflowState{
Started: time.Now().Unix(),
}

if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("workflow initialization failed")
// TODO: should we return here?
logger.Error().Err(err).Msg("signaling workflow initialization to server failed")
// We have an error, maybe the server is currently unreachable or other server-side errors occurred.
// So let's clean up and end this not yet started workflow run.
cancelWorkflowCtx(err)
return err
}

var uploads sync.WaitGroup
Expand All @@ -167,19 +169,18 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {

state.Finished = time.Now().Unix()

// Normalize cancellation error
if errors.Is(err, pipeline.ErrCancel) || canceled.Load() {
canceled.Store(true)
err = pipeline.ErrCancel
}

if err != nil {
state.Error = err.Error()
if errors.Is(err, pipeline.ErrCancel) {
state.Canceled = true
// cleanup joined error messages
state.Error = pipeline.ErrCancel.Error()
}
}

logger.Debug().
Str("error", state.Error).
Bool("canceled", canceled.Load()).
Bool("canceled", state.Canceled).
Msg("workflow finished")

// Ensure all logs/traces are uploaded before finishing
Expand All @@ -195,6 +196,8 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {

if err := r.client.Done(doneCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("failed to update workflow status")
} else {
logger.Debug().Msg("signaling workflow stopped done")
}

return nil
Expand Down
10 changes: 7 additions & 3 deletions agent/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package agent

import (
"context"
"errors"
"runtime"
"strconv"
"sync"
Expand All @@ -30,6 +31,7 @@ import (
func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
return func(state *pipeline.State) error {
uploads.Add(1)
defer uploads.Done()

stepLogger := logger.With().
Str("image", state.Pipeline.Step.Image).
Expand All @@ -43,12 +45,15 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup,
StepUUID: state.Pipeline.Step.UUID,
Exited: state.Process.Exited,
ExitCode: state.Process.ExitCode,
Started: time.Now().Unix(), // TODO: do not do this
Finished: time.Now().Unix(),
Started: state.Process.Started,
Canceled: errors.Is(state.Process.Error, pipeline.ErrCancel),
}
if state.Process.Error != nil {
stepState.Error = state.Process.Error.Error()
}
if state.Process.Exited {
stepState.Finished = time.Now().Unix()
}

defer func() {
stepLogger.Debug().Msg("update step status")
Expand All @@ -60,7 +65,6 @@ func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup,
}

stepLogger.Debug().Msg("update step status complete")
uploads.Done()
}()
if state.Process.Exited {
return nil
Expand Down
23 changes: 1 addition & 22 deletions pipeline/backend/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,29 +250,11 @@ func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID str
return e.client.ContainerStart(ctx, containerName, container.StartOptions{})
}

// WaitStep waits for a step container to exit.
//
// When the context is canceled, the container is immediately killed to prevent
// orphaned containers from continuing to run after agent shutdown.
func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) {
log := log.Logger.With().
Str("taskUUID", taskUUID).
Str("stepUUID", step.UUID).
Logger()

log := log.Logger.With().Str("taskUUID", taskUUID).Str("stepUUID", step.UUID).Logger()
log.Trace().Msgf("wait for step %s", step.Name)

containerName := toContainerName(step)
done := make(chan struct{})

// Ensure container is killed if context is canceled (SIGTERM / pipeline cancel)
go func() {
select {
case <-ctx.Done():
_ = e.client.ContainerKill(context.Background(), containerName, "9") //nolint:contextcheck
case <-done:
}
}()

wait, errC := e.client.ContainerWait(ctx, containerName, "")
select {
Expand All @@ -282,9 +264,6 @@ func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID stri
log.Trace().Msgf("ContainerWait returned with err: %v", err)
}

// Stop cancellation watcher
close(done)

info, err := e.client.ContainerInspect(ctx, containerName)
if err != nil {
return nil, err
Expand Down
15 changes: 10 additions & 5 deletions pipeline/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,18 @@ func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string
}
}

func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) {
func (e *local) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)

stepState := &types.State{
Exited: true,
}

if err := ctx.Err(); err != nil {
stepState.Error = err
return stepState, nil
}

state, err := e.getStepState(taskUUID, step.UUID)
if err != nil {
return nil, err
Expand All @@ -183,10 +192,6 @@ func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (
return nil, errors.New("exec: step command not set up")
}

stepState := &types.State{
Exited: true,
}

// normally we use cmd.Wait() to wait for *exec.Cmd, but cmd.StdoutPipe() tells us not
// as Wait() would close the io pipe even if not all logs where read and send back
// so we have to do use the underlying functions
Expand Down
3 changes: 3 additions & 0 deletions pipeline/backend/types/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package types

// State defines a container state.
type State struct {
// Unix start time
Started int64 `json:"started"`
// Container exit code
ExitCode int `json:"exit_code"`
// Container exited, true or false
Exited bool `json:"exited"`
// Container is oom killed, true or false
// TODO (6024): well known errors as string enum into ./errors.go
OOMKilled bool `json:"oom_killed"`
// Container error
Error error
Expand Down
Loading