Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion server/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e
q.Lock()
defer q.Unlock()

// it's an external error so we wrap it
err = NewErrExternal(err)

var errs []error
// we first process the tasks itself
for _, id := range ids {
Expand Down Expand Up @@ -166,7 +169,10 @@ func (q *fifo) Wait(ctx context.Context, taskID string) error {
select {
case <-ctx.Done():
case <-state.done:
return state.error
// only return queue errors and no workflow errors
if !errors.Is(state.error, new(ErrExternal)) {
Comment thread
6543 marked this conversation as resolved.
Comment thread
6543 marked this conversation as resolved.
return state.error
}
}
}
return nil
Expand Down
97 changes: 97 additions & 0 deletions server/queue/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,103 @@ func TestFifoBasicOperations(t *testing.T) {
}
})

t.Run("external error filtered by Wait", func(t *testing.T) {
// Test that external errors (from Error/ErrorAtOnce) are wrapped as ErrExternal
// and filtered out by Wait(), while internal errors like context cancellation
// are passed through

// Test 1: External error is filtered by Wait
task1 := &model.Task{ID: "wait-external-1"}
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1}))
waitForProcess()

got1, err := q.Poll(ctx, 1, filterFnTrue)
assert.NoError(t, err)

// Start waiting on the task
waitDone := make(chan error, 1)
go func() {
waitDone <- q.Wait(ctx, got1.ID)
}()

time.Sleep(10 * time.Millisecond)

// Report an external error (agent reported error)
externalErr := fmt.Errorf("agent reported error")
assert.NoError(t, q.Error(ctx, got1.ID, externalErr))

// Wait should return nil (external error filtered out)
select {
case err := <-waitDone:
assert.NoError(t, err, "Wait should filter ErrExternal and return nil")
case <-time.After(time.Second):
t.Fatal("Wait should have returned")
}

// Test 2: Internal error (context cancellation) passes through Wait
task2 := &model.Task{ID: "wait-internal-1"}
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2}))
waitForProcess()

got2, err := q.Poll(ctx, 2, filterFnTrue)
assert.NoError(t, err)

waitCtx, waitCancel := context.WithCancelCause(ctx)
waitDone2 := make(chan error, 1)
go func() {
waitDone2 <- q.Wait(waitCtx, got2.ID)
}()

time.Sleep(10 * time.Millisecond)
waitCancel(nil)

// Context cancellation should cause Wait to return (internal error handling)
select {
case err := <-waitDone2:
// Wait returns nil when context is canceled (normal behavior)
assert.NoError(t, err, "Wait should return nil when context is canceled")
case <-time.After(time.Second):
t.Fatal("Wait should return when context is canceled")
}

// Clean up
assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess))
waitForProcess()

// Test 3: Multiple waiters all get nil when external error occurs
task3 := &model.Task{ID: "wait-multi-1"}
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task3}))
waitForProcess()

got3, err := q.Poll(ctx, 3, filterFnTrue)
assert.NoError(t, err)

// Start multiple waiters
numWaiters := 3
waitResults := make(chan error, numWaiters)
for i := 0; i < numWaiters; i++ {
go func() {
waitResults <- q.Wait(ctx, got3.ID)
}()
}

time.Sleep(10 * time.Millisecond)

// Report an external error
batchErr := fmt.Errorf("external batch failure")
assert.NoError(t, q.ErrorAtOnce(ctx, []string{got3.ID}, batchErr))

// All waiters should return nil (external error filtered)
for i := 0; i < numWaiters; i++ {
select {
case err := <-waitResults:
assert.NoError(t, err, "All waiters should get nil when ErrExternal is filtered")
case <-time.After(time.Second):
t.Fatalf("Waiter %d didn't return in time", i)
}
}
})

t.Run("error at once", func(t *testing.T) {
task1 := &model.Task{ID: "batch-1"}
task2 := &model.Task{ID: "batch-2"}
Expand Down
28 changes: 28 additions & 0 deletions server/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,34 @@ var (
ErrWorkerKicked = errors.New("worker was kicked")
)

// ErrExternal wraps an external error.
type ErrExternal struct {
err error
}

func (e *ErrExternal) Error() string {
return fmt.Sprintf("external error: %s", e.err)
}

// Unwrap allows errors.Is and errors.As to work with the wrapped error.
func (e *ErrExternal) Unwrap() error {
return e.err
}

// Is allows errors.Is to match against ErrExternal types.
func (e *ErrExternal) Is(target error) bool {
_, ok := target.(*ErrExternal)
return ok
}

// NewErrExternal wraps an error as external one so queue can filter it out if needed.
func NewErrExternal(err error) error {
if err == nil {
return nil
}
return &ErrExternal{err: err}
}

// InfoT provides runtime information.
type InfoT struct {
Pending []*model.Task `json:"pending"`
Expand Down