Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -50,13 +51,14 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen
}
}

func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:contextcheck
func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
log.Debug().Msg("request next execution")

// Preserve metadata AND cancellation from runnerCtx.
meta, _ := metadata.FromOutgoingContext(runnerCtx)
ctxMeta := metadata.NewOutgoingContext(context.Background(), meta)
ctxMeta := metadata.NewOutgoingContext(shutdownCtx, meta)

// get the next workflow from the queue
// Fetch next workflow from the queue
workflow, err := r.client.Next(runnerCtx, r.filter)
if err != nil {
return err
Expand All @@ -65,6 +67,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
return nil
}

// Compute workflow timeout
timeout := time.Hour
if minutes := workflow.Timeout; minutes != 0 {
timeout = time.Duration(minutes) * time.Minute
Expand All @@ -73,12 +76,8 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
repoName := extractRepositoryName(workflow.Config) // hack
pipelineNumber := extractPipelineNumber(workflow.Config) // hack

r.counter.Add(
workflow.ID,
timeout,
repoName,
pipelineNumber,
)
// Track workflow execution in runner state
r.counter.Add(workflow.ID, timeout, repoName, pipelineNumber)
defer r.counter.Done(workflow.ID)

logger := log.With().
Expand All @@ -89,57 +88,71 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co

logger.Debug().Msg("received execution")

// Workflow execution context.
// This context is the SINGLE source of truth for cancellation.
workflowCtx, cancel := context.WithTimeout(ctxMeta, timeout)
defer cancel()

// Add sigterm support for internal context.
// Required when the pipeline is terminated by external signals
// like kubernetes.
// Handle SIGTERM (k8s, docker, system shutdown)
workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() {
logger.Error().Msg("Received sigterm termination signal")
logger.Error().Msg("received sigterm termination signal")
cancel()
})

canceled := false
// canceled indicates whether the workflow was canceled remotely (UI/API).
// Must be atomic because it is written from a goroutine and read later.
var canceled atomic.Bool
Comment thread
6543 marked this conversation as resolved.

// Listen for remote cancel events (UI / API).
// When canceled, we MUST cancel the workflow context
// so that pipeline execution and backend processes stop immediately.
go func() {
logger.Debug().Msg("listen for cancel signal")
logger.Debug().Msg("listening for cancel signal")

if err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
canceled = true
logger.Warn().Err(err).Msg("cancel signal received")
logger.Warn().Err(err).Msg("cancel signal received from server")

// Mark workflow as canceled (thread-safe)
canceled.Store(true)

// Propagate cancellation to pipeline + backend
cancel()
} else {
logger.Debug().Msg("done listening for cancel signal")
// Wait returned without error, meaning the workflow finished normally
logger.Debug().Msg("cancel listener exited normally")
}
}()

// Periodically extend the workflow lease while running
go func() {
for {
select {
case <-workflowCtx.Done():
logger.Debug().Msg("pipeline done")
logger.Debug().Msg("workflow context done")
return

case <-time.After(constant.TaskTimeout / 3):
logger.Debug().Msg("pipeline lease renewed")
logger.Debug().Msg("renewing workflow lease")
if err := r.client.Extend(workflowCtx, workflow.ID); err != nil {
log.Error().Err(err).Msg("extending pipeline deadline failed")
logger.Error().Err(err).Msg("failed to extend workflow lease")
}
}
}
}()

state := rpc.WorkflowState{}
state.Started = time.Now().Unix()

err = r.client.Init(runnerCtx, workflow.ID, state)
if err != nil {
state := rpc.WorkflowState{
Started: time.Now().Unix(),
}
if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("workflow initialization failed")
// TODO: should we return here?
}

var uploads sync.WaitGroup
//nolint:contextcheck
err = pipeline.New(workflow.Config,

// Run pipeline
err = pipeline.New(
workflow.Config,
pipeline.WithContext(workflowCtx),
pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)),
Expand All @@ -154,10 +167,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co

state.Finished = time.Now().Unix()

if errors.Is(err, pipeline.ErrCancel) {
canceled = true
} else if canceled {
err = errors.Join(err, pipeline.ErrCancel)
// Normalize cancellation error
if errors.Is(err, pipeline.ErrCancel) || canceled.Load() {
canceled.Store(true)
err = pipeline.ErrCancel
}

if err != nil {
Expand All @@ -166,25 +179,22 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co

logger.Debug().
Str("error", state.Error).
Bool("canceled", canceled).
Bool("canceled", canceled.Load()).
Msg("workflow finished")

logger.Debug().Msg("uploading logs and traces / states ...")
// Ensure all logs/traces are uploaded before finishing
logger.Debug().Msg("waiting for logs and traces upload")
uploads.Wait()
logger.Debug().Msg("uploaded logs and traces / states")

logger.Debug().
Str("error", state.Error).
Msg("updating workflow status")
logger.Debug().Msg("logs and traces uploaded")

// Update workflow state
doneCtx := runnerCtx
if doneCtx.Err() != nil {
doneCtx = shutdownCtx
}

if err := r.client.Done(doneCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("updating workflow status failed")
} else {
logger.Debug().Msg("updating workflow status complete")
logger.Error().Err(err).Msg("failed to update workflow status")
}

return nil
Expand Down