diff --git a/server/queue/fifo.go b/server/queue/fifo.go index 3b370186ebe..e215c817a1c 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -134,6 +134,9 @@ func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) e q.Lock() defer q.Unlock() + // it's an external error so we wrap it + err = NewErrExternal(err) + var errs []error // we first process the tasks itself for _, id := range ids { @@ -166,7 +169,10 @@ func (q *fifo) Wait(ctx context.Context, taskID string) error { select { case <-ctx.Done(): case <-state.done: - return state.error + // only return queue errors and no workflow errors + if !errors.Is(state.error, new(ErrExternal)) { + return state.error + } } } return nil diff --git a/server/queue/fifo_test.go b/server/queue/fifo_test.go index d6ba5eabbfd..2904d503f8e 100644 --- a/server/queue/fifo_test.go +++ b/server/queue/fifo_test.go @@ -114,6 +114,103 @@ func TestFifoBasicOperations(t *testing.T) { } }) + t.Run("external error filtered by Wait", func(t *testing.T) { + // Test that external errors (from Error/ErrorAtOnce) are wrapped as ErrExternal + // and filtered out by Wait(), while internal errors like context cancellation + // are passed through + + // Test 1: External error is filtered by Wait + task1 := &model.Task{ID: "wait-external-1"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task1})) + waitForProcess() + + got1, err := q.Poll(ctx, 1, filterFnTrue) + assert.NoError(t, err) + + // Start waiting on the task + waitDone := make(chan error, 1) + go func() { + waitDone <- q.Wait(ctx, got1.ID) + }() + + time.Sleep(10 * time.Millisecond) + + // Report an external error (agent reported error) + externalErr := fmt.Errorf("agent reported error") + assert.NoError(t, q.Error(ctx, got1.ID, externalErr)) + + // Wait should return nil (external error filtered out) + select { + case err := <-waitDone: + assert.NoError(t, err, "Wait should filter ErrExternal and return nil") + case <-time.After(time.Second): + t.Fatal("Wait should have returned") + } + + // Test 2: Internal error (context cancellation) passes through Wait + task2 := &model.Task{ID: "wait-internal-1"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2})) + waitForProcess() + + got2, err := q.Poll(ctx, 2, filterFnTrue) + assert.NoError(t, err) + + waitCtx, waitCancel := context.WithCancelCause(ctx) + waitDone2 := make(chan error, 1) + go func() { + waitDone2 <- q.Wait(waitCtx, got2.ID) + }() + + time.Sleep(10 * time.Millisecond) + waitCancel(nil) + + // Context cancellation should cause Wait to return (internal error handling) + select { + case err := <-waitDone2: + // Wait returns nil when context is canceled (normal behavior) + assert.NoError(t, err, "Wait should return nil when context is canceled") + case <-time.After(time.Second): + t.Fatal("Wait should return when context is canceled") + } + + // Clean up + assert.NoError(t, q.Done(ctx, got2.ID, model.StatusSuccess)) + waitForProcess() + + // Test 3: Multiple waiters all get nil when external error occurs + task3 := &model.Task{ID: "wait-multi-1"} + assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task3})) + waitForProcess() + + got3, err := q.Poll(ctx, 3, filterFnTrue) + assert.NoError(t, err) + + // Start multiple waiters + numWaiters := 3 + waitResults := make(chan error, numWaiters) + for i := 0; i < numWaiters; i++ { + go func() { + waitResults <- q.Wait(ctx, got3.ID) + }() + } + + time.Sleep(10 * time.Millisecond) + + // Report an external error + batchErr := fmt.Errorf("external batch failure") + assert.NoError(t, q.ErrorAtOnce(ctx, []string{got3.ID}, batchErr)) + + // All waiters should return nil (external error filtered) + for i := 0; i < numWaiters; i++ { + select { + case err := <-waitResults: + assert.NoError(t, err, "All waiters should get nil when ErrExternal is filtered") + case <-time.After(time.Second): + t.Fatalf("Waiter %d didn't return in time", i) + } + } + }) + t.Run("error at once", func(t *testing.T) { task1 := &model.Task{ID: "batch-1"} task2 := &model.Task{ID: "batch-2"} diff --git a/server/queue/queue.go b/server/queue/queue.go index 77fa3ed8ec8..a73ebee12bf 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -41,6 +41,34 @@ var ( ErrWorkerKicked = errors.New("worker was kicked") ) +// ErrExternal wraps an external error. +type ErrExternal struct { + err error +} + +func (e *ErrExternal) Error() string { + return fmt.Sprintf("external error: %s", e.err) +} + +// Unwrap allows errors.Is and errors.As to work with the wrapped error. +func (e *ErrExternal) Unwrap() error { + return e.err +} + +// Is allows errors.Is to match against ErrExternal types. +func (e *ErrExternal) Is(target error) bool { + _, ok := target.(*ErrExternal) + return ok +} + +// NewErrExternal wraps an error as external one so queue can filter it out if needed. +func NewErrExternal(err error) error { + if err == nil { + return nil + } + return &ErrExternal{err: err} +} + // InfoT provides runtime information. type InfoT struct { Pending []*model.Task `json:"pending"`