Skip to content
14 changes: 12 additions & 2 deletions pkg/controllers/disruption/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ import (
const (
queueBaseDelay = 1 * time.Second
queueMaxDelay = 10 * time.Second
maxRetryDuration = 10 * time.Minute
minRetryDuration = 10 * time.Minute
maxRetryDuration = 1 * time.Hour
maxConcurrentReconciles = 100
)

Expand All @@ -80,6 +81,11 @@ func IsUnrecoverableError(err error) bool {
return stderrors.As(err, &unrecoverableError)
}

func CalculateRetryDuration(numCommands int) time.Duration {
retryDuration := time.Duration(numCommands) * 80 * time.Millisecond
return lo.Clamp(retryDuration, minRetryDuration, maxRetryDuration)
}

type Queue struct {
sync.RWMutex
providerIDToCommand map[string]*Command // providerID -> command, maps a candidate to its command
Expand Down Expand Up @@ -168,9 +174,13 @@ func (q *Queue) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reconci
// Once the replacements are ready, it will terminate the candidates.
// nolint:gocyclo
func (q *Queue) waitOrTerminate(ctx context.Context, cmd *Command) (err error) {
q.RLock()
numCommands := len(q.providerIDToCommand)
q.RUnlock()
retryDuration := CalculateRetryDuration(numCommands)
// Wrap an error in an unrecoverable error if it timed out
defer func() {
if q.clock.Since(cmd.CreationTimestamp) > maxRetryDuration {
if q.clock.Since(cmd.CreationTimestamp) > retryDuration {
err = NewUnrecoverableError(serrors.Wrap(fmt.Errorf("command reached timeout, %w", err), "duration", q.clock.Since(cmd.CreationTimestamp)))
}
}()
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/disruption/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,5 +413,19 @@ var _ = Describe("Queue", func() {
// And expect the nodeClaim and node to be deleted
ExpectNotFound(ctx, env.Client, nodeClaim2, node2)
})
Context("CalculateRetryDuration", func() {
DescribeTable("should calculate correct timeout based on queue length",
func(numCommands int, expectedDuration time.Duration) {
actualDuration := disruption.CalculateRetryDuration(numCommands)
Expect(actualDuration).To(Equal(expectedDuration))
},
Entry("very small queue - 100 commands", 100, 10*time.Minute), // max(100*80ms, 10min) = 10min
Entry("small queue - 4000 commands", 4000, 10*time.Minute), // max(4000*80ms, 10min) = 10min
Entry("medium queue - 10000 commands", 10000, 13*time.Minute+20*time.Second), // 10000*80ms = 13min 20sec
Entry("large queue - 40000 commands", 40000, 53*time.Minute+20*time.Second), // 40000*80ms = 53min 20sec
Entry("very large queue - 80000 commands (capped)", 80000, 1*time.Hour), // min(80000*80ms, 1hr) = 1hr
Entry("extremely large queue - 100000 commands (capped)", 100000, 1*time.Hour), // min(100000*80ms, 1hr) = 1hr
)
})
})
})
Loading