-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
scale_loop.go
131 lines (106 loc) · 3.85 KB
/
scale_loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package handler
import (
"context"
"time"
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
)
// HandleScaleLoop blocks forever and checks the scaledObject based on its pollingInterval
func (h *ScaleHandler) HandleScaleLoop(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) {
h.logger = h.logger.WithValues("ScaledObject.Namespace", scaledObject.Namespace, "ScaledObject.Name", scaledObject.Name, "ScaledObject.ScaleType", scaledObject.Spec.ScaleType)
h.handleScale(ctx, scaledObject)
var pollingInterval time.Duration
if scaledObject.Spec.PollingInterval != nil {
pollingInterval = time.Second * time.Duration(*scaledObject.Spec.PollingInterval)
} else {
pollingInterval = time.Second * time.Duration(defaultPollingInterval)
}
h.logger.V(1).Info("Watching scaledObject with pollingInterval", "ScaledObject.PollingInterval", pollingInterval)
for {
select {
case <-time.After(pollingInterval):
h.handleScale(ctx, scaledObject)
case <-ctx.Done():
h.logger.V(1).Info("Context for scaledObject canceled")
return
}
}
}
// handleScale contains the main logic for the ScaleHandler scaling logic.
// It'll check each trigger active status then call scaleDeployment
func (h *ScaleHandler) handleScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) {
switch scaledObject.Spec.ScaleType {
case kedav1alpha1.ScaleTypeJob:
h.handleScaleJob(ctx, scaledObject)
break
default:
h.handleScaleDeployment(ctx, scaledObject)
}
return
}
func (h *ScaleHandler) handleScaleJob(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) {
//TODO: need to actually handle the scale here
h.logger.V(1).Info("Handle Scale Job called")
scalers, err := h.getJobScalers(scaledObject)
if err != nil {
h.logger.Error(err, "Error getting scalers")
return
}
isScaledObjectActive := false
h.logger.Info("Scalers count", "Count", len(scalers))
var queueLength int64
var maxValue int64
for _, scaler := range scalers {
scalerLogger := h.logger.WithValues("Scaler", scaler)
isTriggerActive, err := scaler.IsActive(ctx)
scalerLogger.Info("Active trigger", "isTriggerActive", isTriggerActive)
metricSpecs := scaler.GetMetricSpecForScaling()
var metricValue int64
for _, metric := range metricSpecs {
metricValue, _ = metric.External.TargetAverageValue.AsInt64()
maxValue += metricValue
}
scalerLogger.Info("Scaler max value", "MaxValue", maxValue)
metrics, _ := scaler.GetMetrics(ctx, "queueLength", nil)
for _, m := range metrics {
if m.MetricName == "queueLength" {
metricValue, _ = m.Value.AsInt64()
queueLength += metricValue
}
}
scalerLogger.Info("QueueLength Metric value", "queueLength", queueLength)
if err != nil {
scalerLogger.V(1).Info("Error getting scale decision, but continue", "Error", err)
continue
} else if isTriggerActive {
isScaledObjectActive = true
scalerLogger.Info("Scaler is active")
}
scaler.Close()
}
h.scaleJobs(scaledObject, isScaledObjectActive, queueLength, maxValue)
}
// handleScaleDeployment contains the main logic for the ScaleHandler scaling logic.
// It'll check each trigger active status then call scaleDeployment
func (h *ScaleHandler) handleScaleDeployment(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) {
scalers, deployment, err := h.GetDeploymentScalers(scaledObject)
if deployment == nil {
return
}
if err != nil {
h.logger.Error(err, "Error getting scalers")
return
}
isScaledObjectActive := false
for _, scaler := range scalers {
defer scaler.Close()
isTriggerActive, err := scaler.IsActive(ctx)
if err != nil {
h.logger.V(1).Info("Error getting scale decision", "Error", err)
continue
} else if isTriggerActive {
isScaledObjectActive = true
h.logger.V(1).Info("Scaler for scaledObject is active", "Scaler", scaler)
}
}
h.scaleDeployment(deployment, scaledObject, isScaledObjectActive)
}