Skip to content

Commit

Permalink
Move scaledJob metric logic under scaledJob package
Browse files Browse the repository at this point in the history
Signed-off-by: Yoon Park <[email protected]>
  • Loading branch information
yoongon committed Jul 8, 2023
1 parent fe0c68d commit 5832b46
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 80 deletions.
92 changes: 12 additions & 80 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scaling
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -617,25 +616,19 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
return isScaledObjectActive, isScalerError, metricsRecord, nil
}

type scalerMetrics struct {
queueLength float64
maxValue float64
isActive bool
}

// / --------------------------------------------------------------------------- ///
// / ---------- ScaledJob related methods --------- ///
// / --------------------------------------------------------------------------- ///

// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics {
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics {
cache, err := h.GetScalersCache(ctx, scaledJob)
if err != nil {
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
return nil
}
var scalersMetrics []scalerMetrics
var scalersMetrics []scaledjob.ScalerMetrics
scalers, _ := cache.GetScalers()
for i, s := range scalers {
var queueLength float64
Expand Down Expand Up @@ -678,12 +671,12 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav

if targetAverageValue != 0 {
averageLength := queueLength / targetAverageValue
maxValue = min(float64(scaledJob.MaxReplicaCount()), averageLength)
maxValue = scaledjob.GetMaxValue(averageLength, scaledJob.MaxReplicaCount())
}
scalersMetrics = append(scalersMetrics, scalerMetrics{
queueLength: queueLength,
maxValue: maxValue,
isActive: isActive,
scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{
QueueLength: queueLength,
MaxValue: maxValue,
IsActive: isActive,
})
}
return scalersMetrics
Expand All @@ -693,73 +686,12 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
// is active as the first return value,
// the second and the third return values indicate queueLength and maxValue for scale
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
var queueLength float64
var maxValue float64
isActive := false

logger := logf.Log.WithName("scalemetrics")
scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob)
switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation {
case "min":
for _, metrics := range scalersMetrics {
if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
case "avg":
queueLengthSum := float64(0)
maxValueSum := float64(0)
length := 0
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLengthSum += metrics.queueLength
maxValueSum += metrics.maxValue
isActive = metrics.isActive
length++
}
}
if length != 0 {
queueLength = queueLengthSum / float64(length)
maxValue = maxValueSum / float64(length)
}
case "sum":
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLength += metrics.queueLength
maxValue += metrics.maxValue
isActive = metrics.isActive
}
}
default: // max
for _, metrics := range scalersMetrics {
if metrics.queueLength > queueLength && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
}

if scaledJob.MinReplicaCount() > 0 {
isActive = true
}

maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue)
logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)

return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue)
}

func ceilToInt64(x float64) int64 {
return int64(math.Ceil(x))
}
scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob)
isActive, queueLength, maxValue, maxFloatValue :=
scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount())

// Min function for float64
func min(x, y float64) float64 {
if x > y {
return y
}
return x
logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxFloatValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)
return isActive, queueLength, maxValue
}
79 changes: 79 additions & 0 deletions pkg/scaling/scaledjob/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scaledjob
import (
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
"math"
)

// GetTargetAverageValue returns the average of all the metrics' average value.
Expand Down Expand Up @@ -39,3 +40,81 @@ func CreateMetricSpec(averageValue int64, metricName string) v2.MetricSpec {
},
}
}

type ScalerMetrics struct {
QueueLength float64
MaxValue float64
IsActive bool
}

func IsScaledJobActive(scalersMetrics []ScalerMetrics, multipleScalersCalculation string, minReplicaCount, maxReplicaCount int64) (bool, int64, int64, float64) {
var queueLength float64
var maxValue float64
isActive := false

switch multipleScalersCalculation {
case "min":
for _, metrics := range scalersMetrics {
if (queueLength == 0 || metrics.QueueLength < queueLength) && metrics.IsActive {
queueLength = metrics.QueueLength
maxValue = metrics.MaxValue
isActive = metrics.IsActive
}
}
case "avg":
queueLengthSum := float64(0)
maxValueSum := float64(0)
length := 0
for _, metrics := range scalersMetrics {
if metrics.IsActive {
queueLengthSum += metrics.QueueLength
maxValueSum += metrics.MaxValue
isActive = metrics.IsActive
length++
}
}
if length != 0 {
queueLength = queueLengthSum / float64(length)
maxValue = maxValueSum / float64(length)
}
case "sum":
for _, metrics := range scalersMetrics {
if metrics.IsActive {
queueLength += metrics.QueueLength
maxValue += metrics.MaxValue
isActive = metrics.IsActive
}
}
default: // max
for _, metrics := range scalersMetrics {
if metrics.QueueLength > queueLength && metrics.IsActive {
queueLength = metrics.QueueLength
maxValue = metrics.MaxValue
isActive = metrics.IsActive
}
}
}

if minReplicaCount > 0 {
isActive = true
}

maxValue = GetMaxValue(maxValue, maxReplicaCount)
return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue), maxValue
}

func ceilToInt64(x float64) int64 {
return int64(math.Ceil(x))
}

// Min function for float64
func min(x, y float64) float64 {
if x > y {
return y
}
return x
}

func GetMaxValue(maxValue float64, maxReplicaCount int64) float64 {
return min(maxValue, float64(maxReplicaCount))
}

0 comments on commit 5832b46

Please sign in to comment.