diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index badbe2ea0e9..434deb37c01 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,7 +19,6 @@ package scaling import ( "context" "fmt" - "math" "strings" "sync" "time" @@ -617,25 +616,19 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k return isScaledObjectActive, isScalerError, metricsRecord, nil } -type scalerMetrics struct { - queueLength float64 - maxValue float64 - isActive bool -} - // / --------------------------------------------------------------------------- /// // / ---------- ScaledJob related methods --------- /// // / --------------------------------------------------------------------------- /// // getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. -func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { +func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics { cache, err := h.GetScalersCache(ctx, scaledJob) if err != nil { log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) return nil } - var scalersMetrics []scalerMetrics + var scalersMetrics []scaledjob.ScalerMetrics scalers, _ := cache.GetScalers() for i, s := range scalers { var queueLength float64 @@ -678,12 +671,12 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav if targetAverageValue != 0 { averageLength := queueLength / targetAverageValue - maxValue = min(float64(scaledJob.MaxReplicaCount()), averageLength) + maxValue = scaledjob.GetMaxValue(averageLength, scaledJob.MaxReplicaCount()) } - scalersMetrics = append(scalersMetrics, scalerMetrics{ - queueLength: queueLength, - maxValue: maxValue, - isActive: isActive, + scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{ + QueueLength: queueLength, + MaxValue: maxValue, + IsActive: isActive, }) } return scalersMetrics @@ -693,73 +686,12 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav // is active as the first return value, // the second and the third return values indicate queueLength and maxValue for scale func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { - var queueLength float64 - var maxValue float64 - isActive := false - logger := logf.Log.WithName("scalemetrics") - scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) - switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { - case "min": - for _, metrics := range scalersMetrics { - if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - case "avg": - queueLengthSum := float64(0) - maxValueSum := float64(0) - length := 0 - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLengthSum += metrics.queueLength - maxValueSum += metrics.maxValue - isActive = metrics.isActive - length++ - } - } - if length != 0 { - queueLength = queueLengthSum / float64(length) - maxValue = maxValueSum / float64(length) - } - case "sum": - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLength += metrics.queueLength - maxValue += metrics.maxValue - isActive = metrics.isActive - } - } - default: // max - for _, metrics := range scalersMetrics { - if metrics.queueLength > queueLength && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - } - - if scaledJob.MinReplicaCount() > 0 { - isActive = true - } - - maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue) - logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) - return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue) -} - -func ceilToInt64(x float64) int64 { - return int64(math.Ceil(x)) -} + scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) + isActive, queueLength, maxValue, maxFloatValue := + scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount()) -// Min function for float64 -func min(x, y float64) float64 { - if x > y { - return y - } - return x + logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxFloatValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) + return isActive, queueLength, maxValue } diff --git a/pkg/scaling/scaledjob/metrics.go b/pkg/scaling/scaledjob/metrics.go index d535dbe0d25..b04444a1061 100644 --- a/pkg/scaling/scaledjob/metrics.go +++ b/pkg/scaling/scaledjob/metrics.go @@ -3,6 +3,7 @@ package scaledjob import ( v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" + "math" ) // GetTargetAverageValue returns the average of all the metrics' average value. @@ -39,3 +40,81 @@ func CreateMetricSpec(averageValue int64, metricName string) v2.MetricSpec { }, } } + +type ScalerMetrics struct { + QueueLength float64 + MaxValue float64 + IsActive bool +} + +func IsScaledJobActive(scalersMetrics []ScalerMetrics, multipleScalersCalculation string, minReplicaCount, maxReplicaCount int64) (bool, int64, int64, float64) { + var queueLength float64 + var maxValue float64 + isActive := false + + switch multipleScalersCalculation { + case "min": + for _, metrics := range scalersMetrics { + if (queueLength == 0 || metrics.QueueLength < queueLength) && metrics.IsActive { + queueLength = metrics.QueueLength + maxValue = metrics.MaxValue + isActive = metrics.IsActive + } + } + case "avg": + queueLengthSum := float64(0) + maxValueSum := float64(0) + length := 0 + for _, metrics := range scalersMetrics { + if metrics.IsActive { + queueLengthSum += metrics.QueueLength + maxValueSum += metrics.MaxValue + isActive = metrics.IsActive + length++ + } + } + if length != 0 { + queueLength = queueLengthSum / float64(length) + maxValue = maxValueSum / float64(length) + } + case "sum": + for _, metrics := range scalersMetrics { + if metrics.IsActive { + queueLength += metrics.QueueLength + maxValue += metrics.MaxValue + isActive = metrics.IsActive + } + } + default: // max + for _, metrics := range scalersMetrics { + if metrics.QueueLength > queueLength && metrics.IsActive { + queueLength = metrics.QueueLength + maxValue = metrics.MaxValue + isActive = metrics.IsActive + } + } + } + + if minReplicaCount > 0 { + isActive = true + } + + maxValue = GetMaxValue(maxValue, maxReplicaCount) + return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue), maxValue +} + +func ceilToInt64(x float64) int64 { + return int64(math.Ceil(x)) +} + +// Min function for float64 +func min(x, y float64) float64 { + if x > y { + return y + } + return x +} + +func GetMaxValue(maxValue float64, maxReplicaCount int64) float64 { + return min(maxValue, float64(maxReplicaCount)) +}