Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
metricsnode "sigs.k8s.io/karpenter/pkg/controllers/metrics/node"
metricsnodepool "sigs.k8s.io/karpenter/pkg/controllers/metrics/nodepool"
metricspod "sigs.k8s.io/karpenter/pkg/controllers/metrics/pod"
Expand Down Expand Up @@ -70,7 +69,7 @@ func NewControllers(
) []controller.Controller {
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)
disruptionQueue := disruption.NewQueue(kubeClient, recorder, cluster, clock, p)

controllers := []controller.Controller{
p, evictionQueue, disruptionQueue,
Expand Down
58 changes: 30 additions & 28 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand All @@ -53,7 +52,7 @@ const MinInstanceTypesForSpotToSpotConsolidation = 15
// consolidation methods.
type consolidation struct {
// Consolidation needs to be aware of the queue for validation
queue *orchestration.Queue
queue *Queue
clock clock.Clock
cluster *state.Cluster
kubeClient client.Client
Expand All @@ -64,7 +63,7 @@ type consolidation struct {
}

func MakeConsolidation(clock clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner,
cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, queue *orchestration.Queue) consolidation {
cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, queue *Queue) consolidation {
return consolidation{
queue: queue,
clock: clock,
Expand Down Expand Up @@ -131,16 +130,16 @@ func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
// computeConsolidation computes a consolidation action to take
//
// nolint:gocyclo
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, pscheduling.Results, error) {
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, error) {
var err error
// Run scheduling simulation to compute consolidation option
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
if err != nil {
// if a candidate node is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}
return Command{}, pscheduling.Results{}, err
return Command{}, err
}

// if not all of the pods were scheduled, we can't do anything
Expand All @@ -149,29 +148,30 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, pretty.Sentence(results.NonPendingPodSchedulingErrors()))...)
}
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}

// were we able to schedule all the pods on the inflight candidates?
if len(results.NewNodeClaims) == 0 {
return Command{
candidates: candidates,
}, results, nil
Candidates: candidates,
Results: results,
}, nil
}

// we're not going to turn a single node into multiple candidates
if len(results.NewNodeClaims) != 1 {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Can't remove without creating %d candidates", len(results.NewNodeClaims)))...)
}
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}

// get the current node price based on the offering
// fallback if we can't find the specific zonal pricing data
candidatePrice, err := getCandidatePrices(candidates)
if err != nil {
return Command{}, pscheduling.Results{}, fmt.Errorf("getting offering price from candidate node, %w", err)
return Command{}, fmt.Errorf("getting offering price from candidate node, %w", err)
}

allExistingAreSpot := true
Expand Down Expand Up @@ -200,13 +200,13 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Filtering by price: %v", err))...)
}
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}
if len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
}
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}

// We are consolidating a node from OD -> [OD,Spot] but have filtered the instance types by cost based on the
Expand All @@ -220,25 +220,25 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
}

return Command{
candidates: candidates,
replacements: results.NewNodeClaims,
}, results, nil
Candidates: candidates,
Replacements: replacementsFromNodeClaims(results.NewNodeClaims...),
Results: results,
}, nil
}

// Compute command to execute spot-to-spot consolidation if:
// 1. The SpotToSpotConsolidation feature flag is set to true.
// 2. For single-node consolidation:
// a. There are at least 15 cheapest instance type replacement options to consolidate.
// b. The current candidate is NOT part of the first 15 cheapest instance types inorder to avoid repeated consolidation.
func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, candidates []*Candidate, results pscheduling.Results,
candidatePrice float64) (Command, pscheduling.Results, error) {
func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, candidates []*Candidate, results pscheduling.Results, candidatePrice float64) (Command, error) {

// Spot consolidation is turned off.
if !options.FromContext(ctx).FeatureGates.SpotToSpotConsolidation {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "SpotToSpotConsolidation is disabled, can't replace a spot node with a spot node")...)
}
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}

// Since we are sure that the replacement nodeclaim considered for the spot candidates are spot, we will enforce it through the requirements.
Expand All @@ -253,22 +253,23 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Filtering by price: %v", err))...)
}
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}
if len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
}
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}

// For multi-node consolidation:
// We don't have any requirement to check the remaining instance type flexibility, so exit early in this case.
if len(candidates) > 1 {
return Command{
candidates: candidates,
replacements: results.NewNodeClaims,
}, results, nil
Candidates: candidates,
Replacements: replacementsFromNodeClaims(results.NewNodeClaims...),
Results: results,
}, nil
}

// For single-node consolidation:
Expand All @@ -279,7 +280,7 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
if len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions) < MinInstanceTypesForSpotToSpotConsolidation {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("SpotToSpotConsolidation requires %d cheaper instance type options than the current candidate to consolidate, got %d",
MinInstanceTypesForSpotToSpotConsolidation, len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions)))...)
return Command{}, pscheduling.Results{}, nil
return Command{}, nil
}

// If a user has minValues set in their NodePool requirements, then we cap the number of instancetypes at 100 which would be the actual number of instancetypes sent for launch to enable spot-to-spot consolidation.
Expand All @@ -300,9 +301,10 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
}

return Command{
candidates: candidates,
replacements: results.NewNodeClaims,
}, results, nil
Candidates: candidates,
Replacements: replacementsFromNodeClaims(results.NewNodeClaims...),
Results: results,
}, nil
}

// getCandidatePrices returns the sum of the prices of the given candidates
Expand Down
36 changes: 14 additions & 22 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,9 @@ var _ = Describe("Consolidation", func() {

var wg sync.WaitGroup
ExpectToWait(fakeClock, &wg)
cmd, results, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
wg.Wait()
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(emptyConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -297,9 +296,8 @@ var _ = Describe("Consolidation", func() {

multiNodeConsolidation.Validator = NewTestConsolidationValidator(cluster, nodePool, multiNodeConsolidation.Validator.(*disruption.ConsolidationValidator), validatorOpt)

cmd, results, err := multiNodeConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := multiNodeConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(multiNodeConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -342,9 +340,8 @@ var _ = Describe("Consolidation", func() {

singleNodeConsolidation.Validator = NewTestConsolidationValidator(cluster, nodePool, singleNodeConsolidation.Validator.(*disruption.ConsolidationValidator), validatorOpt)

cmd, results, err := singleNodeConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := singleNodeConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(singleNodeConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -763,9 +760,8 @@ var _ = Describe("Consolidation", func() {
candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, emptyConsolidation.ShouldDisrupt, emptyConsolidation.Class(), queue)
Expect(err).To(Succeed())

cmd, results, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(emptyConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -826,9 +822,9 @@ var _ = Describe("Consolidation", func() {
candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, emptyConsolidation.ShouldDisrupt, emptyConsolidation.Class(), queue)
Expect(err).To(Succeed())

cmd, results, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd.Results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(emptyConsolidation.IsConsolidated()).To(BeFalse())
Expand All @@ -850,9 +846,8 @@ var _ = Describe("Consolidation", func() {
candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, multiConsolidation.ShouldDisrupt, multiConsolidation.Class(), queue)
Expect(err).To(Succeed())

cmd, results, err := multiConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := multiConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(multiConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -913,9 +908,8 @@ var _ = Describe("Consolidation", func() {
candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, multiConsolidation.ShouldDisrupt, multiConsolidation.Class(), queue)
Expect(err).To(Succeed())

cmd, results, err := multiConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := multiConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(multiConsolidation.IsConsolidated()).To(BeFalse())
Expand All @@ -937,9 +931,8 @@ var _ = Describe("Consolidation", func() {
candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, singleConsolidation.ShouldDisrupt, singleConsolidation.Class(), queue)
Expect(err).To(Succeed())

cmd, results, err := singleConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := singleConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(singleConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -1000,9 +993,8 @@ var _ = Describe("Consolidation", func() {
candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, singleConsolidation.ShouldDisrupt, singleConsolidation.Class(), queue)
Expect(err).To(Succeed())

cmd, results, err := singleConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := singleConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))

Expect(singleConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -2468,13 +2460,13 @@ var _ = Describe("Consolidation", func() {

var wg sync.WaitGroup
ExpectToWait(fakeClock, &wg)
cmd, results, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
wg.Wait()
Expect(err).To(Succeed())
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd.Candidates()).To(HaveLen(1))
Expect(cmd.Results).To(Equal(pscheduling.Results{}))
Expect(cmd.Candidates).To(HaveLen(1))
// the test validator manually binds a pod to nodes[0], causing it to no longer be eligible
Expect(cmd.Candidates()[0].StateNode.Node.Name).To(Equal(nodes[1].Name))
Expect(cmd.Candidates[0].StateNode.Node.Name).To(Equal(nodes[1].Name))
Expect(cmd.Decision()).To(Equal(disruption.DeleteDecision))

Expect(emptyConsolidation.IsConsolidated()).To(BeFalse())
Expand Down
Loading
Loading