Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ import (
terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
utilscontroller "sigs.k8s.io/karpenter/pkg/utils/controller"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
"sigs.k8s.io/karpenter/pkg/utils/pod"
volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume"
)

const (
minReconciles = 100
maxReconciles = 5000
)

// Controller for the resource
type Controller struct {
clock clock.Clock
Expand Down Expand Up @@ -385,18 +391,21 @@ func (c *Controller) nodeTerminationTime(node *corev1.Node, nodeClaim *v1.NodeCl
return &expirationTime, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
concurrentReconciles := utilscontroller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
qps := concurrentReconciles / 10
bucketSize := 10 * qps
return controllerruntime.NewControllerManagedBy(m).
Named("node.termination").
For(&corev1.Node{}, builder.WithPredicates(nodeutils.IsManagedPredicateFuncs(c.cloudProvider))).
WithOptions(
controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](100*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
// qps scales linearly at 10% of concurrentReconciles, bucket size is 10 * qps
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
),
MaxConcurrentReconciles: 100,
MaxConcurrentReconciles: concurrentReconciles,
},
).
Complete(reconcile.AsReconciler(m.GetClient(), c))
Expand Down
13 changes: 10 additions & 3 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,16 @@ import (
terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/injection"
utilscontroller "sigs.k8s.io/karpenter/pkg/utils/controller"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
podutils "sigs.k8s.io/karpenter/pkg/utils/pod"
)

const (
evictionQueueBaseDelay = 100 * time.Millisecond
evictionQueueMaxDelay = 10 * time.Second
minReconciles = 100
maxReconciles = 5000

multiplePodDisruptionBudgetsError = "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support."
)
Expand Down Expand Up @@ -110,7 +113,10 @@ func (q *Queue) Name() string {
return "eviction-queue"
}

func (q *Queue) Register(_ context.Context, m manager.Manager) error {
func (q *Queue) Register(ctx context.Context, m manager.Manager) error {
concurrentReconciles := utilscontroller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
qps := concurrentReconciles
bucketSize := 10 * qps
return controllerruntime.NewControllerManagedBy(m).
Named(q.Name()).
WatchesRawSource(source.Channel(q.source, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{
Expand All @@ -123,9 +129,10 @@ func (q *Queue) Register(_ context.Context, m manager.Manager) error {
WithOptions(controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](evictionQueueBaseDelay, evictionQueueMaxDelay),
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(100), 1000)},
// qps scales linearly with concurrentReconciles, bucket size is 10 * qps
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
),
MaxConcurrentReconciles: 100,
MaxConcurrentReconciles: concurrentReconciles,
}).
Complete(reconcile.AsReconciler(m.GetClient(), q))
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,17 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
utilscontroller "sigs.k8s.io/karpenter/pkg/utils/controller"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
"sigs.k8s.io/karpenter/pkg/utils/result"
)

const (
// higher concurrency limit since we want fast reaction to node syncing and launch
minReconciles = 1000
maxReconciles = 5000
)

// Controller is a NodeClaim Lifecycle controller that manages the lifecycle of the NodeClaim up until its termination
// The controller is responsible for ensuring that new Nodes get launched, that they have properly registered with
// the cluster as nodes and that they are properly initialized, ensuring that nodeclaims that do not have matching nodes
Expand Down Expand Up @@ -79,7 +86,10 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou
}
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
concurrentReconciles := utilscontroller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
qps := concurrentReconciles / 100
bucketSize := 10 * qps
return controllerruntime.NewControllerManagedBy(m).
Named(c.Name()).
For(&v1.NodeClaim{}, builder.WithPredicates(nodeclaimutils.IsManagedPredicateFuncs(c.cloudProvider))).
Expand All @@ -91,10 +101,10 @@ func (c *Controller) Register(_ context.Context, m manager.Manager) error {
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
// back off until last attempt occurs ~90 seconds before nodeclaim expiration
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Second, time.Minute),
// 10 qps, 100 bucket size
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
// qps scales linearly at 1% of concurrentReconciles, bucket size is 10 * qps
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
),
MaxConcurrentReconciles: 1000, // higher concurrency limit since we want fast reaction to node syncing and launch
MaxConcurrentReconciles: concurrentReconciles,
}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/utils/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"math"

"github.com/samber/lo"

"sigs.k8s.io/karpenter/pkg/operator/options"
)

// cpuCount calculates CPU count in cores from context options (in millicores)
func cpuCount(ctx context.Context) int {
return int(math.Ceil(float64(options.FromContext(ctx).CPURequests) / 1000.0))
}

// LinearScaleReconciles calculates maxConcurrentReconciles using linear scaling
func LinearScaleReconciles(ctx context.Context, minReconciles int, maxReconciles int) int {
cpuCount := cpuCount(ctx)
// At 1 core: minReconciles; At 60 cores: maxReconciles
slope := float64(maxReconciles-minReconciles) / 59.0
result := int(slope*float64(cpuCount-1)) + minReconciles
// Clamp to ensure we stay within bounds
return lo.Clamp(result, minReconciles, maxReconciles)
}
85 changes: 85 additions & 0 deletions pkg/utils/controller/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller_test

import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/utils/controller"
)

func TestReconciles(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Reconciles")
}

func contextWithCPURequests(cpuRequests int64) context.Context {
opts := &options.Options{
CPURequests: cpuRequests,
}
return opts.ToContext(context.Background())
}

var _ = Describe("Reconciles", func() {
minReconciles := 10
maxReconciles := 1000
Context("LinearScaleReconciles Calculations", func() {
It("should calculate minReconciles for 0.5 CPU core", func() {
ctx := contextWithCPURequests(0.5 * 1000.0)
result := controller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
Expect(result).To(Equal(minReconciles))
})
It("should calculate minReconciles for 1 CPU core", func() {
ctx := contextWithCPURequests(1 * 1000)
result := controller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
Expect(result).To(Equal(minReconciles))
})
It("should calculate maxReconciles for 60 CPU cores", func() {
ctx := contextWithCPURequests(60 * 1000)
result := controller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
Expect(result).To(Equal(maxReconciles))
})
It("should calculate maxReconciles for 100 CPU cores", func() {
ctx := contextWithCPURequests(100 * 1000)
result := controller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
Expect(result).To(Equal(maxReconciles))
})
It("should follow the linear scaling formula", func() {
ctx := contextWithCPURequests(15 * 1000) // 15 cores
result := controller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
// At 15 cores
// slope = (maxReconciles - minReconciles)/59 = (1000-10)/59 = 990/59 = ~16.78
// result = int(slope * (cores - 1)) + minReconciles ~= 16.78 * (15-1) + 10 = 234 + 10 = 244
expected := 244
Expect(result).To(Equal(expected))
})
It("should handle fractional CPU cores correctly", func() {
ctx := contextWithCPURequests(1.5 * 1000.0) // 1.5 cores
result := controller.LinearScaleReconciles(ctx, minReconciles, maxReconciles)
// At 2 cores (ceil(1.5))
// slope = (maxReconciles - minReconciles)/59 = (1000-10)/59 = 990/59 = ~16.78
// result = int(slope * (cores - 1)) + minReconciles ~= 16.78 * (2-1) + 10 = 16 + 10 = 26
expected := 26
Expect(result).To(Equal(expected))
})
})
})
Loading