Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi
cloudProvider: cp,
lastRun: map[string]time.Time{},
methods: []Method{
// Terminate any NodeClaims that have drifted from provisioning specifications, allowing the pods to reschedule.
NewDrift(kubeClient, cluster, provisioner, recorder),
// Delete any empty NodeClaims as there is zero cost in terms of disruption.
NewEmptiness(c),
// Terminate any NodeClaims that have drifted from provisioning specifications, allowing the pods to reschedule.
NewDrift(kubeClient, cluster, provisioner, recorder),
// Attempt to identify multiple NodeClaims that we can consolidate simultaneously to reduce pod churn
NewMultiNodeConsolidation(c),
// And finally fall back our single NodeClaim consolidation to further reduce cluster cost.
Expand Down
25 changes: 5 additions & 20 deletions pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,14 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
candidates[j].NodeClaim.StatusConditions().Get(string(d.Reason())).LastTransitionTime.Time)
})

// Do a quick check through the candidates to see if they're empty.
// For each candidate that is empty with a nodePool allowing its disruption
// add it to the existing command.
empty := make([]*Candidate, 0, len(candidates))
for _, candidate := range candidates {
if len(candidate.reschedulablePods) > 0 {
// Filter out empty candidates. If there was an empty node that wasn't consolidated before this, we should
// assume that it was due to budgets. If we don't filter out budgets, users who set a budget for `empty`
// can find their nodes disrupted here, which while that in itself isn't an issue for empty nodes, it could
// constrain the `drift` budget.
if len(candidate.reschedulablePods) == 0 {
continue
}
// If there's disruptions allowed for the candidate's nodepool,
// add it to the list of candidates, and decrement the budget.
if disruptionBudgetMapping[candidate.NodePool.Name] > 0 {
empty = append(empty, candidate)
disruptionBudgetMapping[candidate.NodePool.Name]--
}
}
// Disrupt all empty drifted candidates, as they require no scheduling simulations.
if len(empty) > 0 {
return Command{
candidates: empty,
}, scheduling.Results{}, nil
}

for _, candidate := range candidates {
// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
// counter since drift commands can only have one candidate.
Expand Down
140 changes: 13 additions & 127 deletions pkg/controllers/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ var _ = Describe("Drift", func() {
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
})
It("should allow all empty nodes to be disrupted", func() {
It("should not disrupt empty nodes during drift", func() {
nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down Expand Up @@ -151,56 +151,18 @@ var _ = Describe("Drift", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)
ExpectSingletonReconciled(ctx, disruptionController)

metric, found := FindMetricWithLabelValues("karpenter_nodepools_allowed_disruptions", map[string]string{
metric, found := FindMetricWithLabelValues("karpenter_voluntary_disruption_decisions_total", map[string]string{
"nodepool": nodePool.Name,
"reason": string(v1.DisruptionReasonDrifted),
})
Expect(found).To(BeTrue())
Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 10))

// Execute command, thus deleting all nodes
ExpectSingletonReconciled(ctx, queue)
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(0))
})
It("should allow no empty nodes to be disrupted", func() {
nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
},
},
Status: v1.NodeClaimStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
})

nodePool.Spec.Disruption.Budgets = []v1.Budget{{Nodes: "0%"}}

ExpectApplied(ctx, env.Client, nodePool)
for i := 0; i < numNodes; i++ {
nodeClaims[i].StatusConditions().SetTrue(v1.ConditionTypeDrifted)
ExpectApplied(ctx, env.Client, nodeClaims[i], nodes[i])
}
// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)
ExpectSingletonReconciled(ctx, disruptionController)

metric, found := FindMetricWithLabelValues("karpenter_nodepools_allowed_disruptions", map[string]string{
"nodepool": nodePool.Name,
})
Expect(found).To(BeTrue())
Expect(found).To(BeFalse())
Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 0))

// Execute command, thus deleting no nodes
// Execute command, thus deleting all nodes
ExpectSingletonReconciled(ctx, queue)
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(numNodes))
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(10))
})
It("should only allow 3 empty nodes to be disrupted", func() {
It("should only allow 3 nodes to be disrupted", func() {
nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down Expand Up @@ -234,9 +196,9 @@ var _ = Describe("Drift", func() {
Expect(found).To(BeTrue())
Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 3))

// Execute command, thus deleting 3 nodes
// emptiness is not performed in drift, so expect all nodeclaims to exist
ExpectSingletonReconciled(ctx, queue)
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(7))
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(10))
})
It("should disrupt 3 nodes, taking into account commands in progress", func() {
nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{
Expand Down Expand Up @@ -373,9 +335,9 @@ var _ = Describe("Drift", func() {
Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 2))
}

// Execute the command in the queue, only deleting 20 nodes
// emptiness is not performed in drift, so expect all nodeclaims to exist
ExpectSingletonReconciled(ctx, queue)
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(10))
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(30))
})
It("should allow all nodes from each nodePool to be deleted", func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would now happen in emptiness because the test used to have empty && drifted nodes. There is a test that covers disruptions across multiple nodepools.

// Create 10 nodepools
Expand Down Expand Up @@ -434,9 +396,9 @@ var _ = Describe("Drift", func() {
Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 3))
}

// Execute the command in the queue, deleting all nodes
// emptiness is not performed in drift, so expect all nodeclaims to exist
ExpectSingletonReconciled(ctx, queue)
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(0))
Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(30))
})
})
Context("Drift", func() {
Expand Down Expand Up @@ -616,63 +578,6 @@ var _ = Describe("Drift", func() {
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, nodeClaim)
})
It("can delete drifted nodes", func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered by should delete drifted nodes

ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
ExpectSingletonReconciled(ctx, disruptionController)
// Process the item so that the nodes can be deleted.
ExpectSingletonReconciled(ctx, queue)
// Cascade any deletion of the nodeClaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim)

// We should delete the nodeClaim that has drifted
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, nodeClaim, node)
})
It("should disrupt all empty drifted nodes in parallel", func() {
nodeClaims, nodes := test.NodeClaimsAndNodes(100, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
},
},
Status: v1.NodeClaimStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
})
for _, m := range nodeClaims {
m.StatusConditions().SetTrue(v1.ConditionTypeDrifted)
ExpectApplied(ctx, env.Client, m)
}
for _, n := range nodes {
ExpectApplied(ctx, env.Client, n)
}
ExpectApplied(ctx, env.Client, nodePool)

// inform cluster state about nodes and nodeClaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)
ExpectSingletonReconciled(ctx, disruptionController)

// Process the item so that the nodes can be deleted.
ExpectSingletonReconciled(ctx, queue)
// Cascade any deletion of the nodeClaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims...)

// Expect that the drifted nodeClaims are gone
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0))
})
It("can replace drifted nodes", func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered by new should replace drifted nodes.

labels := map[string]string{
"app": "test",
Expand Down Expand Up @@ -941,24 +846,5 @@ var _ = Describe("Drift", func() {
ExpectExists(ctx, env.Client, nodeClaim)
ExpectExists(ctx, env.Client, node)
})
It("should delete nodes with the karpenter.sh/do-not-disrupt annotation set to false", func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to other annotation test in same file.

node.Annotations = lo.Assign(node.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "false"})
ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})

fakeClock.Step(10 * time.Minute)
ExpectSingletonReconciled(ctx, disruptionController)
// Process the item so that the nodes can be deleted.
ExpectSingletonReconciled(ctx, queue)
// Cascade any deletion of the nodeClaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim)

// We should delete the nodeClaim that has drifted
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, nodeClaim, node)
})
})
})
14 changes: 14 additions & 0 deletions pkg/controllers/disruption/emptiness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,20 @@ var _ = Describe("Emptiness", func() {
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, nodeClaim)
})
It("should delete nodes with the karpenter.sh/do-not-disrupt annotation set to false", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "false"})
ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})

ExpectSingletonReconciled(ctx, disruptionController)

// Expect to not create or delete more nodeclaims
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, nodeClaim)
})
It("should ignore nodes that have pods", func() {
pod := test.Pod()
ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod)
Expand Down
Loading