From 3af7f987cd5b631aea62145e62114bcd3074a870 Mon Sep 17 00:00:00 2001 From: pnkcaht Date: Mon, 26 Jan 2026 17:19:39 -0500 Subject: [PATCH 1/7] fix(agent): properly cancel workflow execution on server cancel --- agent/runner.go | 95 ++++++++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 41 deletions(-) diff --git a/agent/runner.go b/agent/runner.go index 4cf5b835eaf..8e61edc53ba 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/rs/zerolog/log" @@ -53,10 +54,11 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:contextcheck log.Debug().Msg("request next execution") + // Preserve metadata AND cancellation from runnerCtx. meta, _ := metadata.FromOutgoingContext(runnerCtx) - ctxMeta := metadata.NewOutgoingContext(context.Background(), meta) + ctxMeta := metadata.NewOutgoingContext(runnerCtx, meta) - // get the next workflow from the queue + // Fetch next workflow from the queue workflow, err := r.client.Next(runnerCtx, r.filter) if err != nil { return err @@ -65,6 +67,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co return nil } + // Compute workflow timeout timeout := time.Hour if minutes := workflow.Timeout; minutes != 0 { timeout = time.Duration(minutes) * time.Minute @@ -73,12 +76,8 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co repoName := extractRepositoryName(workflow.Config) // hack pipelineNumber := extractPipelineNumber(workflow.Config) // hack - r.counter.Add( - workflow.ID, - timeout, - repoName, - pipelineNumber, - ) + // Track workflow execution in runner state + r.counter.Add(workflow.ID, timeout, repoName, pipelineNumber) defer r.counter.Done(workflow.ID) logger := log.With(). @@ -89,57 +88,74 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co logger.Debug().Msg("received execution") + // Workflow execution context. + // This context is the SINGLE source of truth for cancellation. workflowCtx, cancel := context.WithTimeout(ctxMeta, timeout) defer cancel() - // Add sigterm support for internal context. - // Required when the pipeline is terminated by external signals - // like kubernetes. + // Handle SIGTERM (k8s, docker, system shutdown) workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() { - logger.Error().Msg("Received sigterm termination signal") + logger.Error().Msg("received sigterm termination signal") + cancel() }) - canceled := false + // canceled indicates whether the workflow was canceled remotely (UI/API). + // Must be atomic because it is written from a goroutine and read later. + var canceled atomic.Bool + + // Listen for remote cancel events (UI / API). + // When canceled, we MUST cancel the workflow context + // so that pipeline execution and backend processes stop immediately. go func() { - logger.Debug().Msg("listen for cancel signal") + logger.Debug().Msg("listening for cancel signal") if err := r.client.Wait(workflowCtx, workflow.ID); err != nil { - canceled = true - logger.Warn().Err(err).Msg("cancel signal received") + logger.Warn().Err(err).Msg("cancel signal received from server") + + // Mark workflow as canceled (thread-safe) + canceled.Store(true) + + // Propagate cancellation to pipeline + backend cancel() } else { - logger.Debug().Msg("done listening for cancel signal") + // Wait returned without error, meaning the workflow finished normally + logger.Debug().Msg("cancel listener exited normally") } }() + // Periodically extend the workflow lease while running go func() { for { select { case <-workflowCtx.Done(): - logger.Debug().Msg("pipeline done") + logger.Debug().Msg("workflow context done") return case <-time.After(constant.TaskTimeout / 3): - logger.Debug().Msg("pipeline lease renewed") + logger.Debug().Msg("renewing workflow lease") if err := r.client.Extend(workflowCtx, workflow.ID); err != nil { - log.Error().Err(err).Msg("extending pipeline deadline failed") + logger.Error().Err(err).Msg("failed to extend workflow lease") } } } }() - state := rpc.WorkflowState{} - state.Started = time.Now().Unix() + state := rpc.WorkflowState{ + Started: time.Now().Unix(), + } - err = r.client.Init(runnerCtx, workflow.ID, state) - if err != nil { + // Initialize workflow on the server + if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil { logger.Error().Err(err).Msg("workflow initialization failed") - // TODO: should we return here? + // Do not return here: even if init fails, we still need to run the pipeline + // and ensure proper cancellation, log upload, and final status reporting } var uploads sync.WaitGroup - //nolint:contextcheck - err = pipeline.New(workflow.Config, + + // Run pipeline + err = pipeline.New( + workflow.Config, pipeline.WithContext(workflowCtx), pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)), pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)), @@ -154,10 +170,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co state.Finished = time.Now().Unix() - if errors.Is(err, pipeline.ErrCancel) { - canceled = true - } else if canceled { - err = errors.Join(err, pipeline.ErrCancel) + // Normalize cancellation error + if errors.Is(err, pipeline.ErrCancel) || canceled.Load() { + canceled.Store(true) + err = pipeline.ErrCancel } if err != nil { @@ -165,26 +181,23 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co } logger.Debug(). + Bool("canceled", canceled.Load()). Str("error", state.Error). - Bool("canceled", canceled). Msg("workflow finished") - logger.Debug().Msg("uploading logs and traces / states ...") + // Ensure all logs/traces are uploaded before finishing + logger.Debug().Msg("waiting for logs and traces upload") uploads.Wait() - logger.Debug().Msg("uploaded logs and traces / states") - - logger.Debug(). - Str("error", state.Error). - Msg("updating workflow status") + logger.Debug().Msg("logs and traces uploaded") + // Update workflow state doneCtx := runnerCtx if doneCtx.Err() != nil { doneCtx = shutdownCtx } + if err := r.client.Done(doneCtx, workflow.ID, state); err != nil { - logger.Error().Err(err).Msg("updating workflow status failed") - } else { - logger.Debug().Msg("updating workflow status complete") + logger.Error().Err(err).Msg("failed to update workflow status") } return nil From c2ed1af35ccda1f6ecf0f60099319da6d0aab465 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 27 Jan 2026 01:00:38 +0100 Subject: [PATCH 2/7] Apply suggestion from @6543 --- agent/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/runner.go b/agent/runner.go index 8e61edc53ba..ca5d8ad8d94 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -51,7 +51,7 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen } } -func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:contextcheck +func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { log.Debug().Msg("request next execution") // Preserve metadata AND cancellation from runnerCtx. From 392e36ec84e2e001a76c39101b38c2cccdb86739 Mon Sep 17 00:00:00 2001 From: pnkcaht Date: Mon, 26 Jan 2026 19:34:00 -0500 Subject: [PATCH 3/7] fix(agent): ensure steps are marked canceled when agent is terminated --- agent/runner.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/agent/runner.go b/agent/runner.go index ca5d8ad8d94..2a7cf0a6103 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -167,6 +167,11 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { "pipeline_number": pipelineNumber, }), ).Run(runnerCtx) + // If the workflow context was canceled but the pipeline returned no error, + // force a cancellation error so running steps are not reported as success. + if err == nil && workflowCtx.Err() != nil { + err = pipeline.ErrCancel + } state.Finished = time.Now().Unix() From 2b529a34ee4bee5d16e327d6fae8cf88202fca43 Mon Sep 17 00:00:00 2001 From: Sam Richard Date: Mon, 26 Jan 2026 19:56:44 -0500 Subject: [PATCH 4/7] Update agent/runner.go fix(agent): use shutdown context for reporting metadata Co-authored-by: 6543 <6543@obermui.de> --- agent/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/runner.go b/agent/runner.go index 2a7cf0a6103..3116ce1eca7 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -56,7 +56,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { // Preserve metadata AND cancellation from runnerCtx. meta, _ := metadata.FromOutgoingContext(runnerCtx) - ctxMeta := metadata.NewOutgoingContext(runnerCtx, meta) + ctxMeta := metadata.NewOutgoingContext(shutdownCtx, meta) // Fetch next workflow from the queue workflow, err := r.client.Next(runnerCtx, r.filter) From 9712d1f67d7cbb148d2fdce26113824a20203bf7 Mon Sep 17 00:00:00 2001 From: pnkcaht Date: Mon, 26 Jan 2026 20:02:14 -0500 Subject: [PATCH 5/7] fix(agent): keep Init handling unchanged (handled in #6011) --- agent/runner.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/agent/runner.go b/agent/runner.go index 3116ce1eca7..ada9db3ce62 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -143,12 +143,9 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { state := rpc.WorkflowState{ Started: time.Now().Unix(), } - - // Initialize workflow on the server if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil { logger.Error().Err(err).Msg("workflow initialization failed") - // Do not return here: even if init fails, we still need to run the pipeline - // and ensure proper cancellation, log upload, and final status reporting + // TODO: should we return here? } var uploads sync.WaitGroup From dd1bcfe9c5310e2f71892656a79ff666e8d62798 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 27 Jan 2026 02:08:55 +0100 Subject: [PATCH 6/7] Apply suggestion from @6543 --- agent/runner.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/agent/runner.go b/agent/runner.go index ada9db3ce62..c7fc35ef3de 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -164,11 +164,6 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { "pipeline_number": pipelineNumber, }), ).Run(runnerCtx) - // If the workflow context was canceled but the pipeline returned no error, - // force a cancellation error so running steps are not reported as success. - if err == nil && workflowCtx.Err() != nil { - err = pipeline.ErrCancel - } state.Finished = time.Now().Unix() From 860fc34c1d2445810473d2b3fb49023b579afa8f Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 27 Jan 2026 02:10:53 +0100 Subject: [PATCH 7/7] less diff --- agent/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/runner.go b/agent/runner.go index c7fc35ef3de..a618dc7ef52 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -178,8 +178,8 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { } logger.Debug(). - Bool("canceled", canceled.Load()). Str("error", state.Error). + Bool("canceled", canceled.Load()). Msg("workflow finished") // Ensure all logs/traces are uploaded before finishing