Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,16 @@ func (c *Controller) nodeTerminationTime(node *corev1.Node, nodeClaim *v1.NodeCl
func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
maxConcurrentReconciles := utilscontroller.LinearScaleReconciles(utilscontroller.CPUCount(ctx), 100, 5000)
log.FromContext(ctx).V(1).Info("node.termination maxConcurrentReconciles set", "maxConcurrentReconciles", maxConcurrentReconciles)
qps, bucketSize := utilscontroller.GetTypedBucketConfigs(10, 100, 5000)
return controllerruntime.NewControllerManagedBy(m).
Named("node.termination").
For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))).
WithOptions(
controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](100*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
// qps scales linearly at 10% of concurrentReconciles, bucket size is 10 * qps
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
),
MaxConcurrentReconciles: maxConcurrentReconciles,
},
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (q *Queue) Name() string {
func (q *Queue) Register(ctx context.Context, m manager.Manager) error {
maxConcurrentReconciles := utilscontroller.LinearScaleReconciles(utilscontroller.CPUCount(ctx), 100, 5000)
log.FromContext(ctx).V(1).Info("eviction-queue maxConcurrentReconciles set", "maxConcurrentReconciles", maxConcurrentReconciles)
qps, bucketSize := utilscontroller.GetTypedBucketConfigs(100, 100, 5000)
return controllerruntime.NewControllerManagedBy(m).
Named(q.Name()).
WatchesRawSource(source.Channel(q.source, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{
Expand All @@ -126,7 +127,8 @@ func (q *Queue) Register(ctx context.Context, m manager.Manager) error {
WithOptions(controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](evictionQueueBaseDelay, evictionQueueMaxDelay),
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(100), 1000)},
// qps scales linearly with concurrentReconciles, bucket size is 10 * qps
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
),
MaxConcurrentReconciles: maxConcurrentReconciles,
}).
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
// higher concurrency limit since we want fast reaction to node syncing and launch
maxConcurrentReconciles := utilscontroller.LinearScaleReconciles(utilscontroller.CPUCount(ctx), 1000, 5000)
log.FromContext(ctx).V(1).Info("nodeclaim.lifecycle maxConcurrentReconciles set", "maxConcurrentReconciles", maxConcurrentReconciles)
qps, bucketSize := utilscontroller.GetTypedBucketConfigs(10, 1000, 5000)
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsManagedPredicateFuncs(c.cloudProvider))).
Expand All @@ -95,8 +96,8 @@ func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
// back off until last attempt occurs ~90 seconds before nodeclaim expiration
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Second, time.Minute),
// 10 qps, 100 bucket size
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
// qps scales linearly at 1% of concurrentReconciles, bucket size is 10 * qps
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
),
MaxConcurrentReconciles: maxConcurrentReconciles,
}).
Expand Down
7 changes: 7 additions & 0 deletions pkg/utils/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ func LinearScaleReconciles(cpuCount float64, minReconciles int, maxReconciles in
// Clamp to ensure we stay within bounds
return lo.Clamp(result, minReconciles, maxReconciles)
}

func GetTypedBucketConfigs(minQPS int, minReconciles int, concurrentReconciles int) (int, int) {
qpsScaleFactor := float64(minQPS) / float64(minReconciles)
qps := int(math.Ceil(float64(concurrentReconciles) * qpsScaleFactor))
bucketSize := qps * 10
return qps, bucketSize
}
17 changes: 16 additions & 1 deletion pkg/utils/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func TestReconciles(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Reconciles")
RunSpecs(t, "ControllerUtils")
}

var _ = Describe("ControllerUtils", func() {
Expand All @@ -48,4 +48,19 @@ var _ = Describe("ControllerUtils", func() {
Entry("100 CPU cores should return maxReconciles (clamped)", 100.0, 1000),
)
})
Context("GetTypedBucketConfigs calculations", func() {
DescribeTable("should calculate QPS and bucket size correctly",
func(minQPS, minReconciles, concurrentReconciles, expectedQPS, expectedBucketSize int) {
qps, bucketSize := controller.GetTypedBucketConfigs(minQPS, minReconciles, concurrentReconciles)
Expect(qps).To(Equal(expectedQPS))
Expect(bucketSize).To(Equal(expectedBucketSize))
},
// Arguments are: minQPS, minReconciles, concurrentReconciles, expectedQPS, expectedBucketSize
Entry("scale of QPS is 100%, concurrentReconciles is equal to minimumReconciles", 10, 10, 10, 10, 100),
Entry("scale of QPS is 100%, concurrentReconciles is double minimumReconciles", 10, 10, 20, 20, 200),
Entry("scale of QPS is 10%, concurrentReconciles is equal to minimumReconciles", 10, 100, 100, 10, 100),
Entry("scale of QPS is 10%, concurrentReconciles is double minimumReconciles", 10, 100, 200, 20, 200),
Entry("scale of QPS is 25%, concurrentReconciles is 1.5x minimumReconciles", 25, 100, 150, 38, 380),
)
})
})
Loading