Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cluster-autoscaler/cloudprovider/resource_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package cloudprovider

import (
"fmt"
"k8s.io/apimachinery/pkg/util/sets"
"math"
"strings"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
)

// ResourceLimiter contains limits (max, min) for resources (cores, memory etc.).
Expand All @@ -29,6 +31,10 @@ type ResourceLimiter struct {
maxLimits map[string]int64
}

func (r *ResourceLimiter) ID() string {
return "cluster-wide"
}

// NewResourceLimiter creates new ResourceLimiter for map. Maps are deep copied.
func NewResourceLimiter(minLimits map[string]int64, maxLimits map[string]int64) *ResourceLimiter {
minLimitsCopy := make(map[string]int64)
Expand Down Expand Up @@ -88,3 +94,11 @@ func (r *ResourceLimiter) String() string {
}
return strings.Join(resourceDetails, ", ")
}

func (r *ResourceLimiter) AppliesTo(node *apiv1.Node) bool {
return true
}

func (r *ResourceLimiter) Limits() map[string]int64 {
return r.maxLimits
}
69 changes: 69 additions & 0 deletions cluster-autoscaler/resourcequotas/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package resourcequotas

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
)

// TrackerFactory builds trackers.
type TrackerFactory struct {
crp customresources.CustomResourcesProcessor
quotaProviders []Provider
usageCalculator *usageCalculator
}

type TrackerOptions struct {
CRP customresources.CustomResourcesProcessor
Providers []Provider
NodeFilter NodeFilter
}

// NewTrackerFactory creates a new TrackerFactory.
func NewTrackerFactory(opts TrackerOptions) *TrackerFactory {
uc := newUsageCalculator(opts.CRP, opts.NodeFilter)
return &TrackerFactory{
crp: opts.CRP,
quotaProviders: opts.Providers,
usageCalculator: uc,
}
}

// NewQuotasTracker builds a new Tracker.
func (f *TrackerFactory) NewQuotasTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node) (*Tracker, error) {
quotas, err := f.quotas()
if err != nil {
return nil, err
}
usages, err := f.usageCalculator.calculateUsages(ctx, nodes, quotas)
if err != nil {
return nil, err
}
limitsLeft := make(map[string]resourceList)
for _, rq := range quotas {
limitsLeft[rq.ID()] = make(resourceList)
limits := rq.Limits()
for resourceType, limit := range limits {
usage := usages[rq.ID()][resourceType]
limitsLeft[rq.ID()][resourceType] = max(0, limit-usage)
}
}
tracker := newTracker(f.crp, quotas, limitsLeft)
return tracker, nil
}

func (f *TrackerFactory) quotas() ([]Quota, error) {
var quotas []Quota
for _, provider := range f.quotaProviders {
provQuotas, err := provider.Quotas()
if err != nil {
return nil, fmt.Errorf("failed to get quotas from provider: %w", err)
}
for _, rq := range provQuotas {
quotas = append(quotas, rq)
}
}
return quotas, nil
}
229 changes: 229 additions & 0 deletions cluster-autoscaler/resourcequotas/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package resourcequotas

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
)

type nodeExcludeFn func(node *apiv1.Node) bool

func (n nodeExcludeFn) ExcludeFromTracking(node *apiv1.Node) bool {
return n(node)
}

func TestMaxLimitsTracker(t *testing.T) {
testCases := []struct {
name string
crp customresources.CustomResourcesProcessor
nodeFilter NodeFilter
nodes []*apiv1.Node
limits map[string]int64
newNode *apiv1.Node
nodeDelta int
wantResult *CheckDeltaResult
}{
{
name: "default config allowed operation",
nodes: []*apiv1.Node{
test.BuildTestNode("n1", 1000, 2*units.GiB),
test.BuildTestNode("n2", 2000, 4*units.GiB),
test.BuildTestNode("n3", 3000, 8*units.GiB),
},
limits: map[string]int64{
"cpu": 12,
"memory": 32 * units.GiB,
},
newNode: test.BuildTestNode("n4", 2000, 4*units.GiB),
nodeDelta: 2,
wantResult: &CheckDeltaResult{
AllowedDelta: 2,
},
},
{
name: "default config exceeded operation",
nodes: []*apiv1.Node{
test.BuildTestNode("n1", 1000, 2*units.GiB),
test.BuildTestNode("n2", 2000, 4*units.GiB),
test.BuildTestNode("n3", 3000, 8*units.GiB),
},
limits: map[string]int64{
"cpu": 6,
"memory": 16 * units.GiB,
},
newNode: test.BuildTestNode("n4", 2000, 4*units.GiB),
nodeDelta: 2,
wantResult: &CheckDeltaResult{
AllowedDelta: 0,
ExceededResources: map[string][]string{
"cluster-wide": {"cpu", "memory"},
},
},
},
{
name: "default config partially allowed operation",
nodes: []*apiv1.Node{
test.BuildTestNode("n1", 1000, 2*units.GiB),
test.BuildTestNode("n2", 2000, 4*units.GiB),
test.BuildTestNode("n3", 3000, 8*units.GiB),
},
limits: map[string]int64{
"cpu": 7,
"memory": 16 * units.GiB,
},
newNode: test.BuildTestNode("n4", 2000, 4*units.GiB),
nodeDelta: 2,
wantResult: &CheckDeltaResult{
AllowedDelta: 0,
ExceededResources: map[string][]string{
"cluster-wide": {"cpu", "memory"},
},
},
},
{
name: "custom resource config allowed operation",
crp: &fakeCustomResourcesProcessor{
NodeResourceTargets: func(n *apiv1.Node) []customresources.CustomResourceTarget {
if n.Name == "n1" {
return []customresources.CustomResourceTarget{
{
ResourceType: "gpu",
ResourceCount: 1,
},
}
}
return nil
},
},
nodes: []*apiv1.Node{
test.BuildTestNode("n1", 1000, 2*units.GiB),
test.BuildTestNode("n2", 2000, 4*units.GiB),
test.BuildTestNode("n3", 3000, 8*units.GiB),
},
limits: map[string]int64{
"cpu": 12,
"memory": 32 * units.GiB,
"gpu": 6,
},
newNode: test.BuildTestNode("n4", 2000, 4*units.GiB),
nodeDelta: 2,
wantResult: &CheckDeltaResult{
AllowedDelta: 2,
},
},
{
name: "custom resource config exceeded operation",
crp: &fakeCustomResourcesProcessor{
NodeResourceTargets: func(n *apiv1.Node) []customresources.CustomResourceTarget {
if n.Name == "n1" || n.Name == "n4" {
return []customresources.CustomResourceTarget{
{
ResourceType: "gpu",
ResourceCount: 1,
},
}
}
return nil
},
},
nodes: []*apiv1.Node{
test.BuildTestNode("n1", 1000, 2*units.GiB),
test.BuildTestNode("n2", 2000, 4*units.GiB),
test.BuildTestNode("n3", 3000, 8*units.GiB),
},
limits: map[string]int64{
"cpu": 12,
"memory": 32 * units.GiB,
"gpu": 1,
},
newNode: test.BuildTestNode("n4", 2000, 4*units.GiB),
nodeDelta: 2,
wantResult: &CheckDeltaResult{
AllowedDelta: 0,
ExceededResources: map[string][]string{
"cluster-wide": {"gpu"},
},
},
},
{
name: "node filter config allowed operation",
nodeFilter: nodeExcludeFn(func(node *apiv1.Node) bool {
return node.Name == "n3"
}),
nodes: []*apiv1.Node{
test.BuildTestNode("n1", 1000, 2*units.GiB),
test.BuildTestNode("n2", 2000, 4*units.GiB),
test.BuildTestNode("n3", 3000, 8*units.GiB),
},
limits: map[string]int64{
"cpu": 4,
"memory": 8 * units.GiB,
},
newNode: test.BuildTestNode("n4", 1000, 2*units.GiB),
nodeDelta: 1,
wantResult: &CheckDeltaResult{
AllowedDelta: 1,
},
},
{
name: "node filter config exceeded operation",
nodeFilter: nodeExcludeFn(func(node *apiv1.Node) bool {
return node.Name == "n3"
}),
nodes: []*apiv1.Node{
test.BuildTestNode("n1", 1000, 2*units.GiB),
test.BuildTestNode("n2", 2000, 4*units.GiB),
test.BuildTestNode("n3", 3000, 8*units.GiB),
},
limits: map[string]int64{
"cpu": 4,
"memory": 8 * units.GiB,
},
newNode: test.BuildTestNode("n4", 2000, 4*units.GiB),
nodeDelta: 1,
wantResult: &CheckDeltaResult{
AllowedDelta: 0,
ExceededResources: map[string][]string{
"cluster-wide": {"cpu", "memory"},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cloudProvider := cptest.NewTestCloudProviderBuilder().Build()
resourceLimiter := cloudprovider.NewResourceLimiter(nil, tc.limits)
cloudProvider.SetResourceLimiter(resourceLimiter)
ctx := &context.AutoscalingContext{CloudProvider: cloudProvider}
crp := tc.crp
if crp == nil {
crp = &fakeCustomResourcesProcessor{}
}
factory := NewTrackerFactory(TrackerOptions{
CRP: crp,
Providers: []Provider{NewCloudQuotasProvider(cloudProvider)},
NodeFilter: tc.nodeFilter,
})
tracker, err := factory.NewQuotasTracker(ctx, tc.nodes)
if err != nil {
t.Errorf("failed to create tracker: %v", err)
}
var ng cloudprovider.NodeGroup
result, err := tracker.CheckDelta(ctx, ng, tc.newNode, tc.nodeDelta)
if err != nil {
t.Errorf("failed to check delta: %v", err)
}
if diff := cmp.Diff(tc.wantResult, result, cmpopts.SortSlices(func(a, b string) bool { return a < b }), cmpopts.EquateEmpty()); diff != "" {
t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff)
}
})
}
}
26 changes: 26 additions & 0 deletions cluster-autoscaler/resourcequotas/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package resourcequotas

import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)

type Provider interface {
Quotas() ([]Quota, error)
}
type CloudQuotasProvider struct {
cloudProvider cloudprovider.CloudProvider
}

func (p *CloudQuotasProvider) Quotas() ([]Quota, error) {
rl, err := p.cloudProvider.GetResourceLimiter()
if err != nil {
return nil, err
}
return []Quota{rl}, nil
}

func NewCloudQuotasProvider(cloudProvider cloudprovider.CloudProvider) *CloudQuotasProvider {
return &CloudQuotasProvider{
cloudProvider: cloudProvider,
}
}
30 changes: 30 additions & 0 deletions cluster-autoscaler/resourcequotas/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package resourcequotas

import (
"testing"

"github.com/google/go-cmp/cmp"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
)

func TestCloudLimitersProvider(t *testing.T) {
cloudProvider := test.NewTestCloudProviderBuilder().Build()
maxLimits := map[string]int64{"cpu": 4, "memory": 16 * units.GiB}
resourceLimiter := cloudprovider.NewResourceLimiter(nil, maxLimits)
cloudProvider.SetResourceLimiter(resourceLimiter)

limitsProvider := NewCloudQuotasProvider(cloudProvider)
limiters, err := limitsProvider.Quotas()
if err != nil {
t.Errorf("failed to get quotas: %v", err)
}
if len(limiters) != 1 {
t.Errorf("got %d quotas, expected 1", len(limiters))
}
limiter := limiters[0]
if diff := cmp.Diff(maxLimits, limiter.Limits()); diff != "" {
t.Errorf("Limits() mismatch (-want +got):\n%s", diff)
}
}
Loading
Loading