Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package disruption
import (
"context"
"errors"
"slices"
"sort"

"github.com/samber/lo"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/utils/pretty"
Expand Down Expand Up @@ -61,14 +63,14 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
candidates[j].NodeClaim.StatusConditions().Get(string(d.Reason())).LastTransitionTime.Time)
})

for _, candidate := range candidates {
// Filter out empty candidates. If there was an empty node that wasn't consolidated before this, we should
// assume that it was due to budgets. If we don't filter out budgets, users who set a budget for `empty`
// can find their nodes disrupted here, which while that in itself isn't an issue for empty nodes, it could
// constrain the `drift` budget.
if len(candidate.reschedulablePods) == 0 {
continue
}
emptyCandidates, nonEmptyCandidates := lo.FilterReject(candidates, func(c *Candidate, _ int) bool {
return len(c.reschedulablePods) == 0
})

// Prioritize empty candidates since we want them to get priority over non-empty candidates if the budget is constrained.
// Disrupting empty candidates first also helps reduce the overall churn because if a non-empty candidate is disrupted first,
// the pods from that node can reschedule on the empty nodes and will need to move again when those nodes get disrupted.
for _, candidate := range slices.Concat(emptyCandidates, nonEmptyCandidates) {
// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
// counter since drift commands can only have one candidate.
Expand Down
184 changes: 113 additions & 71 deletions pkg/controllers/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var _ = Describe("Drift", func() {
nodePool = test.NodePool(v1.NodePool{
Spec: v1.NodePoolSpec{
Disruption: v1.Disruption{
ConsolidateAfter: v1.MustParseNillableDuration("Never"),
ConsolidateAfter: v1.MustParseNillableDuration("1h"),
// Disrupt away!
Budgets: []v1.Budget{{
Nodes: "100%",
Expand Down Expand Up @@ -118,76 +118,6 @@ var _ = Describe("Drift", func() {
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
})
It("should not consider empty nodes for drift budgets", func() {
nodeClaims, nodes = test.NodeClaimsAndNodes(2, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
},
},
Status: v1.NodeClaimStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
})

nodePool.Spec.Disruption.Budgets = []v1.Budget{
{Reasons: []v1.DisruptionReason{v1.DisruptionReasonDrifted}, Nodes: "1"},
}

ExpectApplied(ctx, env.Client, nodePool)
for i := 0; i < 2; i++ {
nodeClaims[i].StatusConditions().SetTrue(v1.ConditionTypeDrifted)
ExpectApplied(ctx, env.Client, nodeClaims[i], nodes[i])
}
pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}}})

emptyNodeClaim, driftedNodeClaim := nodeClaims[0], nodeClaims[1]
driftedNode := nodes[1]
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)
ExpectApplied(ctx, env.Client, rs, pod, nodePool)

// bind pod to the node eligible for disruption
ExpectManualBinding(ctx, env.Client, pod, driftedNode)

// inform cluster state about nodes and nodeclaims
ExpectSingletonReconciled(ctx, disruptionController)

// only allow a single node to be disrupted because of budgets
ExpectMetricGaugeValue(disruption.NodePoolAllowedDisruptions, 1, map[string]string{
metrics.NodePoolLabel: nodePool.Name,
metrics.ReasonLabel: string(v1.DisruptionReasonDrifted),
})

// Execute command, deleting one node
ExpectObjectReconciled(ctx, env.Client, queue, driftedNodeClaim)
// Cascade any deletion of the nodeClaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, driftedNodeClaim)

ExpectNotFound(ctx, env.Client, nodeClaim, driftedNode)
ncs := ExpectNodeClaims(ctx, env.Client)
Expect(len(ncs)).To(Equal(1))
Expect(ncs[0].Name).To(Equal(emptyNodeClaim.Name))
ExpectMetricCounterValue(disruption.DecisionsPerformedTotal, 1, map[string]string{
metrics.ReasonLabel: "drifted",
})
})
It("should only consider 3 nodes allowed to be disrupted because of budgets", func() {
nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -396,6 +326,7 @@ var _ = Describe("Drift", func() {
Context("Drift", func() {
BeforeEach(func() {
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted)
Expect(nodeClaim.StatusConditions().Clear(v1.ConditionTypeConsolidatable)).To(BeNil())
})
It("should continue to the next drifted node if the first cannot reschedule all pods", func() {
pod := test.Pod(test.PodOptions{
Expand Down Expand Up @@ -727,6 +658,117 @@ var _ = Describe("Drift", func() {
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectNotFound(ctx, env.Client, nodeClaim, node)
})
It("should delete drifted nodes when they are empty and consolidation is disabled", func() {
nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never")
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})
ExpectSingletonReconciled(ctx, disruptionController)
ExpectObjectReconciled(ctx, env.Client, queue, nodeClaim)

// Cascade any deletion of the nodeClaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim)

// we should delete the empty node
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, nodeClaim, node)
ExpectMetricGaugeValue(disruption.EligibleNodes, 1, map[string]string{
metrics.ReasonLabel: "drifted",
})
})
It("should drift empty nodes before non-empty nodes", func() {
nodePool.Spec.Disruption.ConsolidateAfter = v1.MustParseNillableDuration("Never")
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted)

labels := map[string]string{
"app": "test",
}
// create replicaset
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}}})
nodeClaim2, node2 := test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
},
},
Status: v1.NodeClaimStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
})
nodeClaim2.StatusConditions().SetTrue(v1.ConditionTypeDrifted)

ExpectApplied(ctx, env.Client, rs, pod, nodePool, nodeClaim, nodeClaim2, node, node2)
ExpectManualBinding(ctx, env.Client, pod, node)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node, node2}, []*v1.NodeClaim{nodeClaim, nodeClaim2})
ExpectSingletonReconciled(ctx, disruptionController)
ExpectObjectReconciled(ctx, env.Client, queue, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, queue, nodeClaim2)

// Cascade any deletion of the nodeClaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim)
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim2)

// we should delete the empty node
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, nodeClaim)
ExpectExists(ctx, env.Client, node)
ExpectNotFound(ctx, env.Client, nodeClaim2, node2)
ExpectMetricGaugeValue(disruption.EligibleNodes, 2, map[string]string{
metrics.ReasonLabel: "drifted",
})
ExpectMetricCounterValue(disruption.DecisionsPerformedTotal, 1, map[string]string{
metrics.ReasonLabel: "drifted",
})
})
It("should give emptiness priority to delete drifted nodes when they are empty and consolidatable", func() {
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted)
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})
ExpectSingletonReconciled(ctx, disruptionController)
ExpectObjectReconciled(ctx, env.Client, queue, nodeClaim)

// Cascade any deletion of the nodeClaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim)

// we should delete the empty node
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, nodeClaim, node)
ExpectMetricGaugeValue(disruption.EligibleNodes, 1, map[string]string{
metrics.ReasonLabel: "empty",
})
})
It("should untaint nodes when drift replacement fails", func() {
labels := map[string]string{
"app": "test",
Expand Down
Loading