Skip to content

Commit c2756ca

Browse files
committed
feat(nodeScaleDownTimeTracker): add a new metric to track unprocessed nodes during scaleDown
1 parent 9c42198 commit c2756ca

File tree

5 files changed

+262
-0
lines changed

5 files changed

+262
-0
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,11 @@ type AutoscalingOptions struct {
349349
CapacitybufferControllerEnabled bool
350350
// CapacitybufferPodInjectionEnabled tells if CA should injects fake pods for capacity buffers that are ready for provisioning
351351
CapacitybufferPodInjectionEnabled bool
352+
// LongestNodeScaleDownTimeTrackerEnabled is used to enabled/disable the tracking of longest node ScaleDown evaluation time.
353+
// We want to track all the nodes that were marked as unneeded, but were unprocessed during the ScaleDown.
354+
// If a node was unneeded, but unprocessed multiple times consecutively, we store only the earliest time it happened.
355+
// The difference between the current time and the earliest time among all unprocessed nodes will give the longest time
356+
LongestNodeScaleDownTimeTrackerEnabled bool
352357
}
353358

354359
// KubeClientOptions specify options for kube client

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ var (
230230
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
231231
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
232232
capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly")
233+
longestNodeScaleDownTimeTrackerEnabled = flag.Bool("longest-node-scaledown-timetracker-enabled", false, "Whether to track the eval time of longestNodeScaleDown")
233234

234235
// Deprecated flags
235236
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
@@ -414,6 +415,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
414415
NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL,
415416
CapacitybufferControllerEnabled: *capacitybufferControllerEnabled,
416417
CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled,
418+
LongestNodeScaleDownTimeTrackerEnabled: *longestNodeScaleDownTimeTrackerEnabled,
417419
}
418420
}
419421

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
3131
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
3232
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
33+
"k8s.io/autoscaler/cluster-autoscaler/metrics"
3334
"k8s.io/autoscaler/cluster-autoscaler/processors"
3435
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
3536
"k8s.io/autoscaler/cluster-autoscaler/simulator"
@@ -76,6 +77,7 @@ type Planner struct {
7677
cc controllerReplicasCalculator
7778
scaleDownSetProcessor nodes.ScaleDownSetProcessor
7879
scaleDownContext *nodes.ScaleDownContext
80+
longestNodeScaleDownT *longestNodeScaleDownEvalTime
7981
}
8082

8183
// New creates a new Planner object.
@@ -91,6 +93,11 @@ func New(autoscalingCtx *ca_context.AutoscalingContext, processors *processors.A
9193
unneededNodes.LoadFromExistingTaints(autoscalingCtx.ListerRegistry, time.Now(), autoscalingCtx.AutoscalingOptions.NodeDeletionCandidateTTL)
9294
}
9395

96+
var longestNodeScaleDownTime *longestNodeScaleDownEvalTime
97+
if autoscalingCtx.AutoscalingOptions.LongestNodeScaleDownTimeTrackerEnabled {
98+
longestNodeScaleDownTime = newLongestNodeScaleDownEvalTime(time.Now())
99+
}
100+
94101
return &Planner{
95102
autoscalingCtx: autoscalingCtx,
96103
unremovableNodes: unremovable.NewNodes(),
@@ -104,6 +111,7 @@ func New(autoscalingCtx *ca_context.AutoscalingContext, processors *processors.A
104111
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
105112
scaleDownContext: nodes.NewDefaultScaleDownContext(),
106113
minUpdateInterval: minUpdateInterval,
114+
longestNodeScaleDownT: longestNodeScaleDownTime,
107115
}
108116
}
109117

@@ -277,13 +285,16 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
277285
}
278286
p.nodeUtilizationMap = utilizationMap
279287
timer := time.NewTimer(p.autoscalingCtx.ScaleDownSimulationTimeout)
288+
endedPrematurely := false
280289

281290
for i, node := range currentlyUnneededNodeNames {
282291
if timedOut(timer) {
292+
p.handleUnprocessedNodes(currentlyUnneededNodeNames[i:], time.Now(), &endedPrematurely)
283293
klog.Warningf("%d out of %d nodes skipped in scale down simulation due to timeout.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames))
284294
break
285295
}
286296
if len(removableList)-atomicScaleDownNodesCount >= p.unneededNodesLimit() {
297+
p.handleUnprocessedNodes(currentlyUnneededNodeNames[i:], time.Now(), &endedPrematurely)
287298
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more. Total atomic scale down nodes: %d", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList), atomicScaleDownNodesCount)
288299
break
289300
}
@@ -306,6 +317,7 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
306317
p.unremovableNodes.AddTimeout(unremovable, unremovableTimeout)
307318
}
308319
}
320+
p.handleUnprocessedNodes(nil, time.Now(), &endedPrematurely)
309321
p.unneededNodes.Update(removableList, p.latestUpdate)
310322
if unremovableCount > 0 {
311323
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
@@ -435,3 +447,64 @@ func timedOut(timer *time.Timer) bool {
435447
return false
436448
}
437449
}
450+
451+
func (p *Planner) handleUnprocessedNodes(unprocessedNodeNames []string, currentTime time.Time, endedPrematurely *bool) {
452+
// if p.longestNodeScaleDownT is not set (flag is disabled) or endedPrematurely is already true (ndoes were already reported in this iteration) do not do anything
453+
if p.longestNodeScaleDownT == nil || *endedPrematurely {
454+
return
455+
}
456+
*endedPrematurely = true
457+
p.longestNodeScaleDownT.update(unprocessedNodeNames, currentTime)
458+
}
459+
460+
type longestNodeScaleDownEvalTime struct {
461+
// defaultTime is the time of previous currentlyUnneededNodeNames parsing
462+
defaultTime time.Time
463+
nodeNamesWithTimeStamps map[string]time.Time
464+
// minimumTime is the earliest time stored in nodeNamesWithTimeStamps
465+
minimumTime time.Time
466+
}
467+
468+
func newLongestNodeScaleDownEvalTime(currentTime time.Time) *longestNodeScaleDownEvalTime {
469+
return &longestNodeScaleDownEvalTime{defaultTime: currentTime}
470+
}
471+
472+
func (l *longestNodeScaleDownEvalTime) get(nodeName string) time.Time {
473+
if _, ok := l.nodeNamesWithTimeStamps[nodeName]; ok {
474+
return l.nodeNamesWithTimeStamps[nodeName]
475+
}
476+
return l.defaultTime
477+
}
478+
479+
func (l *longestNodeScaleDownEvalTime) update(nodeNames []string, currentTime time.Time) time.Duration {
480+
var longestTime time.Duration
481+
// if nodeNames is nil it means that all nodes were processed
482+
if nodeNames == nil {
483+
// if l.minimumTime is 0, then in previous iteration we also processed all the nodes, so the longest time is 0
484+
// otherwise -> report the longest time from previous iteration and reset the minimumTime
485+
if l.minimumTime.IsZero() {
486+
longestTime = 0
487+
} else {
488+
longestTime = currentTime.Sub(l.minimumTime)
489+
l.minimumTime = time.Time{}
490+
}
491+
l.nodeNamesWithTimeStamps = make(map[string]time.Time)
492+
} else {
493+
newNodes := make(map[string]time.Time, len(nodeNames))
494+
l.minimumTime = l.defaultTime
495+
for _, nodeName := range nodeNames {
496+
// if a node is not in nodeNamesWithTimeStamps use the default time
497+
// if a node is already in nodeNamesWithTimeStamps copy the last value
498+
valueFromPrevIter := l.get(nodeName)
499+
newNodes[nodeName] = valueFromPrevIter
500+
if l.minimumTime.After(valueFromPrevIter) {
501+
l.minimumTime = valueFromPrevIter
502+
}
503+
}
504+
l.nodeNamesWithTimeStamps = newNodes
505+
longestTime = currentTime.Sub(l.minimumTime)
506+
}
507+
l.defaultTime = currentTime
508+
metrics.ObserveLongestNodeScaleDownEvalTime(longestTime)
509+
return longestTime
510+
}

cluster-autoscaler/core/scaledown/planner/planner_test.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,171 @@ func TestNodesToDelete(t *testing.T) {
10351035
}
10361036
}
10371037

1038+
func TestLongestNodeScaleDownTime(t *testing.T) {
1039+
testCases := []struct {
1040+
name string
1041+
nodes []*apiv1.Node
1042+
unprocessedNodesInFirstIter []string
1043+
unprocessedNodes []string
1044+
run int
1045+
}{
1046+
{
1047+
name: "Test the functionality of longestNodeScaleDownT with all nodes processed in the first teration",
1048+
nodes: []*apiv1.Node{
1049+
BuildTestNode("n1", 1000, 10),
1050+
BuildTestNode("n2", 1000, 10),
1051+
BuildTestNode("n3", 1000, 10),
1052+
},
1053+
unprocessedNodesInFirstIter: nil,
1054+
unprocessedNodes: []string{"n1", "n2"},
1055+
run: 2,
1056+
},
1057+
{
1058+
name: "Test the functionality of longestNodeScaleDownT with not all nodes processed in the first teration",
1059+
nodes: []*apiv1.Node{
1060+
BuildTestNode("n1", 1000, 10),
1061+
BuildTestNode("n2", 1000, 10),
1062+
BuildTestNode("n3", 1000, 10),
1063+
},
1064+
unprocessedNodesInFirstIter: []string{"n1", "n2"},
1065+
unprocessedNodes: []string{"n1", "n2"},
1066+
run: 2,
1067+
},
1068+
}
1069+
for _, tc := range testCases {
1070+
tc := tc
1071+
t.Run(tc.name, func(t *testing.T) {
1072+
t.Parallel()
1073+
start := time.Now()
1074+
timestamp := start
1075+
longestScaleDownEvalT := newLongestNodeScaleDownEvalTime(start)
1076+
timestamp = timestamp.Add(1 * time.Second)
1077+
if tc.unprocessedNodesInFirstIter == nil {
1078+
assert.Equal(t, longestScaleDownEvalT.update(tc.unprocessedNodesInFirstIter, timestamp), time.Duration(0))
1079+
start = timestamp
1080+
} else {
1081+
assert.Equal(t, longestScaleDownEvalT.update(tc.unprocessedNodesInFirstIter, timestamp), timestamp.Sub(start))
1082+
}
1083+
for i := 0; i < tc.run; i++ {
1084+
timestamp = timestamp.Add(1 * time.Second)
1085+
longestScaleDownEvalT.update(tc.unprocessedNodes, timestamp)
1086+
assert.Equal(t, len(longestScaleDownEvalT.nodeNamesWithTimeStamps), len(tc.unprocessedNodes))
1087+
for _, val := range longestScaleDownEvalT.nodeNamesWithTimeStamps {
1088+
assert.Equal(t, val, start)
1089+
}
1090+
}
1091+
tc.unprocessedNodes = []string{"n2", "n3"}
1092+
timestamp = timestamp.Add(1 * time.Second)
1093+
prevDefaultTime := longestScaleDownEvalT.defaultTime
1094+
assert.Equal(t, longestScaleDownEvalT.update(tc.unprocessedNodes, timestamp), timestamp.Sub(start)) // longestTime is for node n2
1095+
assert.Equal(t, longestScaleDownEvalT.get("n1"), longestScaleDownEvalT.defaultTime)
1096+
assert.Equal(t, longestScaleDownEvalT.get("n2"), start)
1097+
assert.Equal(t, longestScaleDownEvalT.get("n3"), prevDefaultTime) // timestemp for new nodes is the default time before update
1098+
timestamp = timestamp.Add(1 * time.Second)
1099+
assert.Equal(t, longestScaleDownEvalT.update(nil, timestamp), timestamp.Sub(start)) // leftover from the previous iteration is time for node n2
1100+
timestamp = timestamp.Add(1 * time.Second)
1101+
assert.Equal(t, longestScaleDownEvalT.update(nil, timestamp), time.Duration(0)) // no leftover, so the longestTime will be 0
1102+
})
1103+
}
1104+
}
1105+
1106+
func TestLongestNodeScaleDownTimeWithTimeout(t *testing.T) {
1107+
testCases := []struct {
1108+
name string
1109+
nodes []*apiv1.Node
1110+
actuationStatus *fakeActuationStatus
1111+
eligible []string
1112+
maxParallel int
1113+
isSimulationTimeout bool
1114+
unprocessedNodes int
1115+
isFlagEnabled bool
1116+
}{
1117+
{
1118+
name: "Unneeded node limit is exceeded",
1119+
nodes: []*apiv1.Node{
1120+
BuildTestNode("n1", 1000, 10),
1121+
BuildTestNode("n2", 1000, 10),
1122+
BuildTestNode("n3", 1000, 10),
1123+
},
1124+
actuationStatus: &fakeActuationStatus{},
1125+
eligible: []string{"n1", "n2"},
1126+
maxParallel: 0,
1127+
isSimulationTimeout: false,
1128+
// maxParallel=0 forces p.unneededNodesLimit() to be 0, so we will break in the second check inside p.categorizeNodes() right away
1129+
unprocessedNodes: 2,
1130+
isFlagEnabled: true,
1131+
},
1132+
{
1133+
name: "Simulation timeout is hit",
1134+
nodes: []*apiv1.Node{
1135+
BuildTestNode("n1", 1000, 10),
1136+
BuildTestNode("n2", 1000, 10),
1137+
BuildTestNode("n3", 1000, 10),
1138+
},
1139+
actuationStatus: &fakeActuationStatus{},
1140+
eligible: []string{"n1", "n2"},
1141+
maxParallel: 1,
1142+
isSimulationTimeout: true,
1143+
// first node will be deleted and for the second timeout will be triggered
1144+
unprocessedNodes: 1,
1145+
isFlagEnabled: true,
1146+
},
1147+
{
1148+
name: "longestLastScaleDownEvalDuration flag is disabled",
1149+
nodes: []*apiv1.Node{
1150+
BuildTestNode("n1", 1000, 10),
1151+
BuildTestNode("n2", 1000, 10),
1152+
BuildTestNode("n3", 1000, 10),
1153+
},
1154+
actuationStatus: &fakeActuationStatus{},
1155+
eligible: []string{"n1", "n2"},
1156+
maxParallel: 1,
1157+
isSimulationTimeout: false,
1158+
isFlagEnabled: false,
1159+
},
1160+
}
1161+
for _, tc := range testCases {
1162+
tc := tc
1163+
t.Run(tc.name, func(t *testing.T) {
1164+
t.Parallel()
1165+
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil)
1166+
provider := testprovider.NewTestCloudProviderBuilder().Build()
1167+
provider.AddNodeGroup("ng1", 0, 0, 0)
1168+
for _, node := range tc.nodes {
1169+
provider.AddNode("ng1", node)
1170+
}
1171+
autoscalingCtx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
1172+
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
1173+
ScaleDownUnneededTime: 10 * time.Minute,
1174+
},
1175+
ScaleDownSimulationTimeout: 1 * time.Second,
1176+
MaxScaleDownParallelism: tc.maxParallel,
1177+
LongestNodeScaleDownTimeTrackerEnabled: tc.isFlagEnabled,
1178+
}, &fake.Clientset{}, registry, provider, nil, nil)
1179+
assert.NoError(t, err)
1180+
clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingCtx.ClusterSnapshot, tc.nodes, nil)
1181+
deleteOptions := options.NodeDeleteOptions{}
1182+
p := New(&autoscalingCtx, processorstest.NewTestProcessors(&autoscalingCtx), deleteOptions, nil)
1183+
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
1184+
if tc.isSimulationTimeout {
1185+
autoscalingCtx.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
1186+
rs := &fakeRemovalSimulator{
1187+
nodes: tc.nodes,
1188+
sleep: 2 * time.Second,
1189+
}
1190+
p.rs = rs
1191+
}
1192+
assert.NoError(t, p.UpdateClusterState(tc.nodes, tc.nodes, &fakeActuationStatus{}, time.Now()))
1193+
if !tc.isFlagEnabled {
1194+
// if flag is disabled p.longestNodeScaleDownT is not initialized
1195+
assert.Nil(t, p.longestNodeScaleDownT)
1196+
} else {
1197+
assert.Equal(t, len(p.longestNodeScaleDownT.nodeNamesWithTimeStamps), tc.unprocessedNodes)
1198+
}
1199+
})
1200+
}
1201+
}
1202+
10381203
func sizedNodeGroup(id string, size int, atomic bool) cloudprovider.NodeGroup {
10391204
ng := testprovider.NewTestNodeGroup(id, 10000, 0, size, true, false, "n1-standard-2", nil, nil)
10401205
ng.SetOptions(&config.NodeGroupAutoscalingOptions{

cluster-autoscaler/metrics/metrics.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,14 @@ var (
425425
Buckets: k8smetrics.ExponentialBuckets(1, 2, 6), // 1, 2, 4, ..., 32
426426
}, []string{"instance_type", "cpu_count", "namespace_count"},
427427
)
428+
429+
longestLastScaleDownEvalDuration = k8smetrics.NewGauge(
430+
&k8smetrics.GaugeOpts{
431+
Namespace: caNamespace,
432+
Name: "longest_node_scale_down_eval_duration_seconds",
433+
Help: "Longest node evaluation time during ScaleDown.",
434+
},
435+
)
428436
)
429437

430438
// RegisterAll registers all metrics.
@@ -461,6 +469,7 @@ func RegisterAll(emitPerNodeGroupMetrics bool) {
461469
legacyregistry.MustRegister(nodeTaintsCount)
462470
legacyregistry.MustRegister(inconsistentInstancesMigsCount)
463471
legacyregistry.MustRegister(binpackingHeterogeneity)
472+
legacyregistry.MustRegister(longestLastScaleDownEvalDuration)
464473

465474
if emitPerNodeGroupMetrics {
466475
legacyregistry.MustRegister(nodesGroupMinNodes)
@@ -748,3 +757,11 @@ func UpdateInconsistentInstancesMigsCount(migCount int) {
748757
func ObserveBinpackingHeterogeneity(instanceType, cpuCount, namespaceCount string, pegCount int) {
749758
binpackingHeterogeneity.WithLabelValues(instanceType, cpuCount, namespaceCount).Observe(float64(pegCount))
750759
}
760+
761+
// ObserveLongestNodeScaleDownEvalTime records the longest time during which node was not processed during ScaleDown.
762+
// If a node is not processed multiple times consecutively, we store only the earliest timestamp.
763+
// Here we report the difference between current time and the earliest time among all unprocessed nodes in current ScaleDown iteration
764+
// If we never timedOut in categorizeNodes() or never exceeded p.unneededNodesLimit(), this value will be 0
765+
func ObserveLongestNodeScaleDownEvalTime(duration time.Duration) {
766+
longestLastScaleDownEvalDuration.Set(float64(duration))
767+
}

0 commit comments

Comments
 (0)