Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/agent/core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
return nil
}

log.Debug().Msg("polling new steps")
log.Debug().Msg("polling new workflow")
if err := runner.Run(agentCtx, shutdownCtx); err != nil {
log.Error().Err(err).Msg("runner error, retrying...")
// Check if context is canceled
Expand Down
25 changes: 11 additions & 14 deletions server/pipeline/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,28 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo
return &ErrNotFound{Msg: err.Error()}
}

// First cancel/evict steps in the queue in one go
// First cancel/evict workflows in the queue in one go
var (
stepsToCancel []string
stepsToEvict []string
workflowsToCancel []string
workflowsToEvict []string
)
for _, workflow := range workflows {
if workflow.State == model.StatusRunning {
stepsToCancel = append(stepsToCancel, fmt.Sprint(workflow.ID))
workflowsToCancel = append(workflowsToCancel, fmt.Sprint(workflow.ID))
}
if workflow.State == model.StatusPending {
stepsToEvict = append(stepsToEvict, fmt.Sprint(workflow.ID))
workflowsToEvict = append(workflowsToEvict, fmt.Sprint(workflow.ID))
}
}

if len(stepsToEvict) != 0 {
if err := server.Config.Services.Queue.EvictAtOnce(ctx, stepsToEvict); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToEvict)
}
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, stepsToEvict, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToEvict)
if len(workflowsToEvict) != 0 {
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToEvict, queue.ErrCancel); err != nil {
Comment thread
lafriks marked this conversation as resolved.
log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToEvict)
}
}
if len(stepsToCancel) != 0 {
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, stepsToCancel, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", stepsToCancel)
if len(workflowsToCancel) != 0 {
if err := server.Config.Services.Queue.ErrorAtOnce(ctx, workflowsToCancel, queue.ErrCancel); err != nil {
log.Error().Err(err).Msgf("queue: evict_at_once: %v", workflowsToCancel)
}
}

Expand Down
29 changes: 0 additions & 29 deletions server/queue/LICENSE

This file was deleted.

115 changes: 55 additions & 60 deletions server/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package queue
import (
"container/list"
"context"
"fmt"
"errors"
"slices"
"sync"
"time"
Expand Down Expand Up @@ -58,8 +58,6 @@ type fifo struct {
// as the agent pull in 10 milliseconds we should also give them work asap.
const processTimeInterval = 100 * time.Millisecond

var ErrWorkerKicked = fmt.Errorf("worker was kicked")

// NewMemoryQueue returns a new fifo queue.
func NewMemoryQueue(ctx context.Context) Queue {
q := &fifo{
Expand Down Expand Up @@ -90,23 +88,23 @@ func (q *fifo) Poll(c context.Context, agentID int64, filter FilterFn) (*model.T
q.Lock()
ctx, stop := context.WithCancelCause(c)

_worker := &worker{
w := &worker{
agentID: agentID,
channel: make(chan *model.Task, 1),
filter: filter,
stop: stop,
}
q.workers[_worker] = struct{}{}
q.workers[w] = struct{}{}
q.Unlock()

for {
select {
case <-ctx.Done():
q.Lock()
delete(q.workers, _worker)
delete(q.workers, w)
q.Unlock()
return nil, ctx.Err()
case t := <-_worker.channel:
case t := <-w.channel:
return t, nil
}
}
Expand All @@ -122,47 +120,40 @@ func (q *fifo) Error(_ context.Context, id string, err error) error {
return q.finished([]string{id}, model.StatusFailure, err)
}

// ErrorAtOnce signals multiple done are complete with an error.
// ErrorAtOnce signals multiple tasks are done and complete with an error.
// If still pending they will just get removed from the queue.
func (q *fifo) ErrorAtOnce(_ context.Context, ids []string, err error) error {
if errors.Is(err, ErrCancel) {
return q.finished(ids, model.StatusKilled, err)
}
return q.finished(ids, model.StatusFailure, err)
}

// locks the queue itself!
func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error {
q.Lock()
defer q.Unlock()

var errs []error
// we first process the tasks itself
for _, id := range ids {
taskEntry, ok := q.running[id]
if ok {
if taskEntry, ok := q.running[id]; ok {
taskEntry.error = err
close(taskEntry.done)
delete(q.running, id)
} else {
q.removeFromPending(id)
errs = append(errs, q.removeFromPendingAndWaiting(id))
}
q.updateDepStatusInQueue(id, exitStatus)
}

q.Unlock()
return nil
}

// EvictAtOnce removes multiple pending tasks from the queue.
func (q *fifo) EvictAtOnce(_ context.Context, taskIDs []string) error {
q.Lock()
defer q.Unlock()

for _, id := range taskIDs {
var next *list.Element
for element := q.pending.Front(); element != nil; element = next {
next = element.Next()
task, ok := element.Value.(*model.Task)
if ok && task.ID == id {
q.pending.Remove(element)
return nil
}
}
// next we aim for there dependencies
// we do this because in our ids list there could be tasks and its dependencies
// so not to mess things up
for _, id := range ids {
q.updateDepStatusInQueue(id, exitStatus)
}
return ErrNotFound

return errors.Join(errs...)
}

// Wait waits until the item is done executing.
Expand Down Expand Up @@ -286,19 +277,15 @@ func (q *fifo) process() {

func (q *fifo) filterWaiting() {
// resubmits all waiting tasks to pending, deps may have cleared
var nextWaiting *list.Element
for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting {
nextWaiting = e.Next()
task, _ := e.Value.(*model.Task)
for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() {
task, _ := element.Value.(*model.Task)
q.pending.PushBack(task)
}

// rebuild waitingDeps
q.waitingOnDeps = list.New()
var filtered []*list.Element
var nextPending *list.Element
for element := q.pending.Front(); element != nil; element = nextPending {
nextPending = element.Next()
for element := q.pending.Front(); element != nil; element = element.Next() {
task, _ := element.Value.(*model.Task)
if q.depsInQueue(task) {
log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID)
Expand All @@ -314,12 +301,10 @@ func (q *fifo) filterWaiting() {
}

func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element
var bestWorker *worker
var bestScore int

for element := q.pending.Front(); element != nil; element = next {
next = element.Next()
for element := q.pending.Front(); element != nil; element = element.Next() {
task, _ := element.Value.(*model.Task)
log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)

Expand Down Expand Up @@ -352,9 +337,7 @@ func (q *fifo) resubmitExpiredPipelines() {
}

func (q *fifo) depsInQueue(task *model.Task) bool {
var next *list.Element
for element := q.pending.Front(); element != nil; element = next {
next = element.Next()
for element := q.pending.Front(); element != nil; element = element.Next() {
possibleDep, ok := element.Value.(*model.Task)
log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID)
for _, dep := range task.Dependencies {
Expand All @@ -372,13 +355,12 @@ func (q *fifo) depsInQueue(task *model.Task) bool {
return false
}

// expects the q to be currently owned e.g. locked by caller!
func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) {
var next *list.Element
for element := q.pending.Front(); element != nil; element = next {
next = element.Next()
pending, ok := element.Value.(*model.Task)
for element := q.pending.Front(); element != nil; element = element.Next() {
pending, _ := element.Value.(*model.Task)
for _, dep := range pending.Dependencies {
if ok && taskID == dep {
if taskID == dep {
pending.DepStatus[dep] = status
}
}
Expand All @@ -392,27 +374,40 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) {
}
}

for element := q.waitingOnDeps.Front(); element != nil; element = next {
next = element.Next()
waiting, ok := element.Value.(*model.Task)
for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() {
waiting, _ := element.Value.(*model.Task)
for _, dep := range waiting.Dependencies {
if ok && taskID == dep {
if taskID == dep {
waiting.DepStatus[dep] = status
}
}
}
}

func (q *fifo) removeFromPending(taskID string) {
// expects the q to be currently owned e.g. locked by caller!
func (q *fifo) removeFromPendingAndWaiting(taskID string) error {
log.Debug().Msgf("queue: trying to remove %s", taskID)
var next *list.Element
for element := q.pending.Front(); element != nil; element = next {
next = element.Next()

// we assume pending first
for element := q.pending.Front(); element != nil; element = element.Next() {
task, _ := element.Value.(*model.Task)
if task.ID == taskID {
log.Debug().Msgf("queue: %s is removed from pending", taskID)
q.pending.Remove(element)
return
_ = q.pending.Remove(element)
return nil
}
}

// well looks like it's waiting
for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() {
task, _ := element.Value.(*model.Task)
if task.ID == taskID {
log.Debug().Msgf("queue: %s is removed from waitingOnDeps", taskID)
_ = q.waitingOnDeps.Remove(element)
return nil
}
}

// well it could not be found
return ErrNotFound
}
Loading