Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/shared/utils"
)

const shutdownTimeout = time.Second * 5

type Runner struct {
client rpc.Peer
filter rpc.Filter
Expand All @@ -51,15 +53,19 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen
}
}

func GetShutdownContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), shutdownTimeout)
}

// TODO: refactor this big function into subfunctions in it's own subpackage

// Run executes a workflow using a backend, tracks its state and reports the state back to the server.
func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
func (r *Runner) Run(runnerCtx context.Context) error {
log.Debug().Msg("request next execution")

// Preserve metadata AND cancellation from runnerCtx.
meta, _ := metadata.FromOutgoingContext(runnerCtx)
ctxMeta := metadata.NewOutgoingContext(shutdownCtx, meta)
ctxMeta := metadata.NewOutgoingContext(runnerCtx, meta)

// Fetch next workflow from the queue
workflow, err := r.client.Next(runnerCtx, r.filter)
Expand Down Expand Up @@ -192,8 +198,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
logger.Debug().Msg("logs and traces uploaded")

// Update workflow state
doneCtx := runnerCtx
doneCtx := runnerCtx //nolint:contextcheck
if doneCtx.Err() != nil {
shutdownCtx, shutdownCtxCancel := GetShutdownContext()
defer shutdownCtxCancel()
doneCtx = shutdownCtx
}

Expand Down
33 changes: 9 additions & 24 deletions cmd/agent/core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,33 +54,14 @@ const (
authInterceptorRefreshInterval = time.Minute * 30
)

const (
shutdownTimeout = time.Second * 5
)

var (
stopAgentFunc context.CancelCauseFunc = func(error) {}
shutdownCancelFunc context.CancelFunc = func() {}
shutdownCtx = context.Background()
)

func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
log.Info().Str("version", version.String()).Msg("Starting Woodpecker agent")

agentCtx, ctxCancel := context.WithCancelCause(ctx)
stopAgentFunc = func(err error) {
msg := "shutdown of whole agent"
if err != nil {
log.Error().Err(err).Msg(msg)
} else {
log.Info().Msg(msg)
}
stopAgentFunc = func(error) {}
shutdownCtx, shutdownCancelFunc = context.WithTimeout(shutdownCtx, shutdownTimeout)
ctxCancel(err)
}
defer stopAgentFunc(nil)
defer shutdownCancelFunc()
defer func() {
log.Info().Msg("shutdown of whole agent")
ctxCancel(nil)
}()

serviceWaitingGroup := errgroup.Group{}

Expand All @@ -107,6 +88,10 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
go func() {
<-agentCtx.Done()
log.Info().Msg("shutdown healthcheck server ...")

shutdownCtx, shutdownCtxCancel := agent.GetShutdownContext()
defer shutdownCtxCancel()

if err := server.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
log.Error().Err(err).Msg("shutdown healthcheck server failed")
} else {
Expand Down Expand Up @@ -302,7 +287,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
}

log.Debug().Msg("polling new workflow")
if err := runner.Run(agentCtx, shutdownCtx); err != nil {
if err := runner.Run(agentCtx); err != nil {
if singleWorkflow {
log.Error().Err(err).Msg("runner done with error")
ctxCancel(nil)
Expand Down