From 9a6becb699516a49be9f121e9abc17e7899c60c0 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Tue, 13 Jun 2023 23:24:30 +0900 Subject: [PATCH 01/21] Refator to use GetMetricsAndActivityForScaler Signed-off-by: Yoon Park --- pkg/scaling/cache/scalers_cache.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index de14500b9cf..c34d457dbaa 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -254,16 +254,7 @@ func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav } // TODO here we should probably loop through all metrics in a Scaler - // as it is done for ScaledObject - metrics, isTriggerActive, err := s.Scaler.GetMetricsAndActivity(ctx, metricSpecs[0].External.Metric.Name) - if err != nil { - var ns scalers.Scaler - ns, err = c.refreshScaler(ctx, i) - if err == nil { - metrics, isTriggerActive, err = ns.GetMetricsAndActivity(ctx, metricSpecs[0].External.Metric.Name) - } - } - + metrics, isTriggerActive, _, err := c.GetMetricsAndActivityForScaler(ctx, i, metricSpecs[0].External.Metric.Name) if err != nil { scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) From 37e75b64965cad884e950936dd0390b1ceebbead Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Thu, 15 Jun 2023 21:59:52 +0900 Subject: [PATCH 02/21] Implement test cases inside scale_handler_test.go Signed-off-by: Yoon Park --- apis/keda/v1alpha1/scaledjob_types.go | 4 + pkg/scaling/scale_handler.go | 177 ++++++++++++++++++- pkg/scaling/scale_handler_test.go | 241 ++++++++++++++++++++++++++ 3 files changed, 415 insertions(+), 7 deletions(-) diff --git a/apis/keda/v1alpha1/scaledjob_types.go b/apis/keda/v1alpha1/scaledjob_types.go index d6aa1fa4852..1d68bfe0b09 100644 --- a/apis/keda/v1alpha1/scaledjob_types.go +++ b/apis/keda/v1alpha1/scaledjob_types.go @@ -142,3 +142,7 @@ func (s ScaledJob) MinReplicaCount() int64 { } return defaultScaledJobMinReplicaCount } + +func (so *ScaledJob) GenerateIdentifier() string { + return GenerateIdentifier("ScaledJob", so.Namespace, so.Name) +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 192cc8ff2a2..1f12c26bc2a 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,6 +19,8 @@ package scaling import ( "context" "fmt" + v2 "k8s.io/api/autoscaling/v2" + "math" "strings" "sync" "time" @@ -246,19 +248,19 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac h.scaledObjectsMetricCache.StoreRecords(obj.GenerateIdentifier(), metricsRecords) } case *kedav1alpha1.ScaledJob: - cache, err := h.GetScalersCache(ctx, scalableObject) - if err != nil { - log.Error(err, "error getting scalers cache", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) - return - } + //cache, err := h.GetScalersCache(ctx, scalableObject) + //if err != nil { + // log.Error(err, "error getting scalers cache", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) + // return + //} - err = h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) + err := h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) if err != nil { log.Error(err, "error getting scaledJob", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) return } - isActive, scaleTo, maxScale := cache.IsScaledJobActive(ctx, obj) + isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, obj) h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale) } } @@ -620,3 +622,164 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k return isScaledObjectActive, isScalerError, metricsRecord, nil } + +func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { + var queueLength float64 + var maxValue float64 + isActive := false + + logger := logf.Log.WithName("scalemetrics") + scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) + switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { + case "min": + for _, metrics := range scalersMetrics { + if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + case "avg": + queueLengthSum := float64(0) + maxValueSum := float64(0) + length := 0 + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLengthSum += metrics.queueLength + maxValueSum += metrics.maxValue + isActive = metrics.isActive + length++ + } + } + if length != 0 { + queueLength = queueLengthSum / float64(length) + maxValue = maxValueSum / float64(length) + } + case "sum": + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLength += metrics.queueLength + maxValue += metrics.maxValue + isActive = metrics.isActive + } + } + default: // max + for _, metrics := range scalersMetrics { + if metrics.queueLength > queueLength && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + } + + if scaledJob.MinReplicaCount() > 0 { + isActive = true + } + + maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue) + logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) + + return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue) +} + +func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { + // TODO this loop should be probably done similar way the ScaledObject loop is done + + // yoon GetScalersCache !!! This is all !!! + cache, err := h.GetScalersCache(ctx, scaledJob) + if err != nil { + log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) + return nil + } + var scalersMetrics []scalerMetrics + scalers, _ := cache.GetScalers() + for i, s := range scalers { + var queueLength float64 + var targetAverageValue float64 + isActive := false + maxValue := float64(0) + scalerType := fmt.Sprintf("%T:", s) + + scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) + + metricSpecs := s.GetMetricSpecForScaling(ctx) + + // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) + // or skip cpu/memory resource scaler + if len(metricSpecs) < 1 || metricSpecs[0].External == nil { + continue + } + + // TODO here we should probably loop through all metrics in a Scaler + metrics, isTriggerActive, _, err := cache.GetMetricsAndActivityForScaler(ctx, i, metricSpecs[0].External.Metric.Name) + if err != nil { + scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) + cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + continue + } + + targetAverageValue = getTargetAverageValue(metricSpecs) + + var metricValue float64 + for _, m := range metrics { + if m.MetricName == metricSpecs[0].External.Metric.Name { + metricValue = m.Value.AsApproximateFloat64() + queueLength += metricValue + } + } + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) + + if isTriggerActive { + isActive = true + } + + if targetAverageValue != 0 { + averageLength := queueLength / targetAverageValue + maxValue = min(float64(scaledJob.MaxReplicaCount()), averageLength) + } + scalersMetrics = append(scalersMetrics, scalerMetrics{ + queueLength: queueLength, + maxValue: maxValue, + isActive: isActive, + }) + } + return scalersMetrics +} + +func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { + var targetAverageValue float64 + var metricValue float64 + for _, metric := range metricSpecs { + if metric.External.Target.AverageValue == nil { + metricValue = 0 + } else { + metricValue = metric.External.Target.AverageValue.AsApproximateFloat64() + } + + targetAverageValue += metricValue + } + count := float64(len(metricSpecs)) + if count != 0 { + return targetAverageValue / count + } + return 0 +} + +func ceilToInt64(x float64) int64 { + return int64(math.Ceil(x)) +} + +// Min function for float64 +func min(x, y float64) float64 { + if x > y { + return y + } + return x +} + +type scalerMetrics struct { + queueLength float64 + maxValue float64 + isActive bool +} diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index dc9b40fdf5d..8eb6e848e23 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -19,6 +19,8 @@ package scaling import ( "context" "errors" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" "sync" "testing" "time" @@ -357,6 +359,37 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { assert.Equal(t, true, isError) } +func TestTargetAverageValue(t *testing.T) { + // count = 0 + specs := []v2.MetricSpec{} + metricName := "s0-messageCount" + targetAverageValue := getTargetAverageValue(specs) + assert.Equal(t, float64(0), targetAverageValue) + // 1 1 + specs = []v2.MetricSpec{ + createMetricSpec(1, metricName), + createMetricSpec(1, metricName), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, float64(1), targetAverageValue) + // 5 5 3 -> 4.333333333333333 + specs = []v2.MetricSpec{ + createMetricSpec(5, metricName), + createMetricSpec(5, metricName), + createMetricSpec(3, metricName), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, 4.333333333333333, targetAverageValue) + + // 5 5 4 -> 4.666666666666667 + specs = []v2.MetricSpec{ + createMetricSpec(5, metricName), + createMetricSpec(5, metricName), + createMetricSpec(4, metricName), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, 4.666666666666667, targetAverageValue) +} func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { qty := resource.NewQuantity(averageValue, resource.DecimalSI) return v2.MetricSpec{ @@ -370,3 +403,211 @@ func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { }, } } + +func TestIsScaledJobActive(t *testing.T) { + metricName := "s0-queueLength" + ctrl := gomock.NewController(t) + recorder := record.NewFakeRecorder(1) + // Keep the current behavior + // Assme 1 trigger only + scaledJobSingle := createScaledJob(1, 100, "") // testing default = max + scalerSingle := []cache.ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil + }, + }} + + scalerCache := cache.ScalersCache{ + //Scalers: scalers, + Scalers: scalerSingle, + Recorder: recorder, + } + + caches := map[string]*cache.ScalersCache{} + caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache + + sh := scaleHandler{ + //client: mockClient, + scaleLoopContexts: &sync.Map{}, + //scaleExecutor: mockExecutor, + globalHTTPTimeout: time.Duration(1000), + recorder: recorder, + scalerCaches: caches, + scalerCachesLock: &sync.RWMutex{}, + scaledObjectsMetricCache: metricscache.NewMetricsCache(), + } + isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle) + assert.Equal(t, true, isActive) + assert.Equal(t, int64(20), queueLength) + assert.Equal(t, int64(10), maxValue) + // yoon + //cache.Close(context.Background()) +} + +func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T) { + metricName := "s0-queueLength" + ctrl := gomock.NewController(t) + recorder := record.NewFakeRecorder(1) + // Keep the current behavior + // Assme 1 trigger only + scaledJobSingle := createScaledJob(1, 100, "") // testing default = max + scalerSingle := []cache.ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(0), int64(1), true, metricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, int64(0), int64(1), true, metricName), &scalers.ScalerConfig{}, nil + }, + }} + + scalerCache := cache.ScalersCache{ + //Scalers: scalers, + Scalers: scalerSingle, + Recorder: recorder, + } + + caches := map[string]*cache.ScalersCache{} + caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache + + sh := scaleHandler{ + //client: mockClient, + scaleLoopContexts: &sync.Map{}, + //scaleExecutor: mockExecutor, + globalHTTPTimeout: time.Duration(1000), + recorder: recorder, + scalerCaches: caches, + scalerCachesLock: &sync.RWMutex{}, + scaledObjectsMetricCache: metricscache.NewMetricsCache(), + } + + isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle) + assert.Equal(t, true, isActive) + assert.Equal(t, int64(0), queueLength) + assert.Equal(t, int64(0), maxValue) + //cache.Close(context.Background()) +} + +func newScalerTestData( + metricName string, + maxReplicaCount int, + multipleScalersCalculation string, + scaler1QueueLength, //nolint:golint,unparam + scaler1AverageValue int, //nolint:golint,unparam + scaler1IsActive bool, //nolint:golint,unparam + scaler2QueueLength, //nolint:golint,unparam + scaler2AverageValue int, //nolint:golint,unparam + scaler2IsActive bool, //nolint:golint,unparam + scaler3QueueLength, //nolint:golint,unparam + scaler3AverageValue int, //nolint:golint,unparam + scaler3IsActive bool, //nolint:golint,unparam + scaler4QueueLength, //nolint:golint,unparam + scaler4AverageValue int, //nolint:golint,unparam + scaler4IsActive bool, //nolint:golint,unparam + resultIsActive bool, //nolint:golint,unparam + resultQueueLength, + resultMaxLength int) scalerTestData { + return scalerTestData{ + MetricName: metricName, + MaxReplicaCount: int32(maxReplicaCount), + MultipleScalersCalculation: multipleScalersCalculation, + Scaler1QueueLength: int64(scaler1QueueLength), + Scaler1AverageValue: int64(scaler1AverageValue), + Scaler1IsActive: scaler1IsActive, + Scaler2QueueLength: int64(scaler2QueueLength), + Scaler2AverageValue: int64(scaler2AverageValue), + Scaler2IsActive: scaler2IsActive, + Scaler3QueueLength: int64(scaler3QueueLength), + Scaler3AverageValue: int64(scaler3AverageValue), + Scaler3IsActive: scaler3IsActive, + Scaler4QueueLength: int64(scaler4QueueLength), + Scaler4AverageValue: int64(scaler4AverageValue), + Scaler4IsActive: scaler4IsActive, + ResultIsActive: resultIsActive, + ResultQueueLength: int64(resultQueueLength), + ResultMaxValue: int64(resultMaxLength), + } +} + +type scalerTestData struct { + MetricName string + MaxReplicaCount int32 + MultipleScalersCalculation string + Scaler1QueueLength int64 + Scaler1AverageValue int64 + Scaler1IsActive bool + Scaler2QueueLength int64 + Scaler2AverageValue int64 + Scaler2IsActive bool + Scaler3QueueLength int64 + Scaler3AverageValue int64 + Scaler3IsActive bool + Scaler4QueueLength int64 + Scaler4AverageValue int64 + Scaler4IsActive bool + ResultIsActive bool + ResultQueueLength int64 + ResultMaxValue int64 + MinReplicaCount int32 +} + +func createScaledJob(minReplicaCount int32, maxReplicaCount int32, multipleScalersCalculation string) *kedav1alpha1.ScaledJob { + if multipleScalersCalculation != "" { + return &kedav1alpha1.ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + Spec: kedav1alpha1.ScaledJobSpec{ + MinReplicaCount: &minReplicaCount, + MaxReplicaCount: &maxReplicaCount, + ScalingStrategy: kedav1alpha1.ScalingStrategy{ + MultipleScalersCalculation: multipleScalersCalculation, + }, + JobTargetRef: &batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + }, + EnvSourceContainerName: "test", + }, + } + } + return &kedav1alpha1.ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + Spec: kedav1alpha1.ScaledJobSpec{ + MinReplicaCount: &minReplicaCount, + MaxReplicaCount: &maxReplicaCount, + JobTargetRef: &batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + }, + EnvSourceContainerName: "test", + }, + } +} + +func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool, metricName string) *mock_scalers.MockScaler { + scaler := mock_scalers.NewMockScaler(ctrl) + metricsSpecs := []v2.MetricSpec{createMetricSpec(averageValue, metricName)} + + metrics := []external_metrics.ExternalMetricValue{ + { + MetricName: metricName, + Value: *resource.NewQuantity(queueLength, resource.DecimalSI), + }, + } + scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) + scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return(metrics, isActive, nil) + // yoon + //scaler.EXPECT().Close(gomock.Any()) + return scaler +} From 9fce15896b8f97c009073e5ded0958bb23a9e392 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Fri, 16 Jun 2023 22:19:22 +0900 Subject: [PATCH 03/21] All tests work Signed-off-by: Yoon Park --- pkg/scaling/scale_handler_test.go | 72 +++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 4 deletions(-) diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 8eb6e848e23..d733b62d5ab 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -19,6 +19,7 @@ package scaling import ( "context" "errors" + "fmt" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "sync" @@ -411,7 +412,7 @@ func TestIsScaledJobActive(t *testing.T) { // Keep the current behavior // Assme 1 trigger only scaledJobSingle := createScaledJob(1, 100, "") // testing default = max - scalerSingle := []cache.ScalerBuilder{{ + scalers2 := []cache.ScalerBuilder{{ Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil @@ -419,8 +420,7 @@ func TestIsScaledJobActive(t *testing.T) { }} scalerCache := cache.ScalersCache{ - //Scalers: scalers, - Scalers: scalerSingle, + Scalers: scalers2, Recorder: recorder, } @@ -441,8 +441,72 @@ func TestIsScaledJobActive(t *testing.T) { assert.Equal(t, true, isActive) assert.Equal(t, int64(20), queueLength) assert.Equal(t, int64(10), maxValue) - // yoon + //yoon //cache.Close(context.Background()) + + // Test the valiation + scalerTestDatam := []scalerTestData{ + newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20), + newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2), + newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9), + newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27), + newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25), + } + + for index, scalerTestData := range scalerTestDatam { + scaledJob := createScaledJob(scalerTestData.MinReplicaCount, scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) + scalersToTest := []cache.ScalerBuilder{{ + Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil + }, + }} + + //cache = ScalersCache{ + // Scalers: scalersToTest, + // Recorder: recorder, + //} + scalerCache = cache.ScalersCache{ + Scalers: scalersToTest, + Recorder: recorder, + } + + caches := map[string]*cache.ScalersCache{} + caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache + + sh := scaleHandler{ + //client: mockClient, + scaleLoopContexts: &sync.Map{}, + //scaleExecutor: mockExecutor, + globalHTTPTimeout: time.Duration(1000), + recorder: recorder, + scalerCaches: caches, + scalerCachesLock: &sync.RWMutex{}, + scaledObjectsMetricCache: metricscache.NewMetricsCache(), + } + fmt.Printf("index: %d", index) + isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJob) + // assert.Equal(t, 5, index) + assert.Equal(t, scalerTestData.ResultIsActive, isActive) + assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) + assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) + //cache.Close(context.Background()) + } } func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T) { From b9818da4f71a8432ba887c78e5d63c6b4721934c Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sun, 18 Jun 2023 15:17:05 +0900 Subject: [PATCH 04/21] Make all test cases runnable under scale_hanlder_test Signed-off-by: Yoon Park --- apis/keda/v1alpha1/withtriggers_types.go | 2 +- pkg/scaling/scale_handler.go | 146 ++++++++++++----------- pkg/scaling/scale_handler_test.go | 47 +++----- 3 files changed, 93 insertions(+), 102 deletions(-) diff --git a/apis/keda/v1alpha1/withtriggers_types.go b/apis/keda/v1alpha1/withtriggers_types.go index 5d280499498..5df47ff2b08 100644 --- a/apis/keda/v1alpha1/withtriggers_types.go +++ b/apis/keda/v1alpha1/withtriggers_types.go @@ -94,7 +94,7 @@ func (t *WithTriggers) GenerateIdentifier() string { return GenerateIdentifier(t.InternalKind, t.Namespace, t.Name) } -// AsDuckWithTriggers tries to generates WithTriggers object for input object +// AsDuckWithTriggers tries to generate WithTriggers object for input object // returns error if input object is unknown func AsDuckWithTriggers(scalableObject interface{}) (*WithTriggers, error) { switch obj := scalableObject.(type) { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 1f12c26bc2a..a1e0eadf180 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -406,7 +406,7 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int /// ---------- ScaledObject related methods --------- /// /// --------------------------------------------------------------------------- /// -// GetScaledObjectMetrics returns metrics for specified metric name for a ScaledObject identified by it's name and namespace. +// GetScaledObjectMetrics returns metrics for specified metric name for a ScaledObject identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error) { logger := log.WithValues("scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName) @@ -516,9 +516,9 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN // getScaledObjectState returns whether the input ScaledObject: // is active as the first return value, -// the second return value indicates whether there was any error during quering scalers, -// the third return value is a map of metrics record - a metric value for each scaler and it's metric -// the fourth return value contains error if is not able access scalers cache +// the second return value indicates whether there was any error during querying scalers, +// the third return value is a map of metrics record - a metric value for each scaler and its metric +// the fourth return value contains error if is not able to access scalers cache func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, map[string]metricscache.MetricsRecord, error) { logger := log.WithValues("scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) @@ -623,70 +623,19 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k return isScaledObjectActive, isScalerError, metricsRecord, nil } -func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { - var queueLength float64 - var maxValue float64 - isActive := false - - logger := logf.Log.WithName("scalemetrics") - scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) - switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { - case "min": - for _, metrics := range scalersMetrics { - if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - case "avg": - queueLengthSum := float64(0) - maxValueSum := float64(0) - length := 0 - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLengthSum += metrics.queueLength - maxValueSum += metrics.maxValue - isActive = metrics.isActive - length++ - } - } - if length != 0 { - queueLength = queueLengthSum / float64(length) - maxValue = maxValueSum / float64(length) - } - case "sum": - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLength += metrics.queueLength - maxValue += metrics.maxValue - isActive = metrics.isActive - } - } - default: // max - for _, metrics := range scalersMetrics { - if metrics.queueLength > queueLength && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - } - - if scaledJob.MinReplicaCount() > 0 { - isActive = true - } - - maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue) - logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) - - return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue) +type scalerMetrics struct { + queueLength float64 + maxValue float64 + isActive bool } +// / --------------------------------------------------------------------------- /// +// / ---------- ScaledJob related methods --------- /// +// / --------------------------------------------------------------------------- /// +// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. +// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { // TODO this loop should be probably done similar way the ScaledObject loop is done - - // yoon GetScalersCache !!! This is all !!! cache, err := h.GetScalersCache(ctx, scaledJob) if err != nil { log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) @@ -747,6 +696,69 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav return scalersMetrics } +// isScaledJobActive returns whether the input ScaledJob: +// is active as the first return value, +// the second and the third return values indicate queueLength and maxValue for scale +func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { + var queueLength float64 + var maxValue float64 + isActive := false + + logger := logf.Log.WithName("scalemetrics") + scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) + switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { + case "min": + for _, metrics := range scalersMetrics { + if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + case "avg": + queueLengthSum := float64(0) + maxValueSum := float64(0) + length := 0 + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLengthSum += metrics.queueLength + maxValueSum += metrics.maxValue + isActive = metrics.isActive + length++ + } + } + if length != 0 { + queueLength = queueLengthSum / float64(length) + maxValue = maxValueSum / float64(length) + } + case "sum": + for _, metrics := range scalersMetrics { + if metrics.isActive { + queueLength += metrics.queueLength + maxValue += metrics.maxValue + isActive = metrics.isActive + } + } + default: // max + for _, metrics := range scalersMetrics { + if metrics.queueLength > queueLength && metrics.isActive { + queueLength = metrics.queueLength + maxValue = metrics.maxValue + isActive = metrics.isActive + } + } + } + + if scaledJob.MinReplicaCount() > 0 { + isActive = true + } + + maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue) + logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) + + return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue) +} + func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { var targetAverageValue float64 var metricValue float64 @@ -777,9 +789,3 @@ func min(x, y float64) float64 { } return x } - -type scalerMetrics struct { - queueLength float64 - maxValue float64 - isActive bool -} diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index d733b62d5ab..58bc053377c 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -412,15 +412,13 @@ func TestIsScaledJobActive(t *testing.T) { // Keep the current behavior // Assme 1 trigger only scaledJobSingle := createScaledJob(1, 100, "") // testing default = max - scalers2 := []cache.ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil - }, - }} - scalerCache := cache.ScalersCache{ - Scalers: scalers2, + Scalers: []cache.ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil + }, + }}, Recorder: recorder, } @@ -428,9 +426,7 @@ func TestIsScaledJobActive(t *testing.T) { caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache sh := scaleHandler{ - //client: mockClient, - scaleLoopContexts: &sync.Map{}, - //scaleExecutor: mockExecutor, + scaleLoopContexts: &sync.Map{}, globalHTTPTimeout: time.Duration(1000), recorder: recorder, scalerCaches: caches, @@ -441,8 +437,7 @@ func TestIsScaledJobActive(t *testing.T) { assert.Equal(t, true, isActive) assert.Equal(t, int64(20), queueLength) assert.Equal(t, int64(10), maxValue) - //yoon - //cache.Close(context.Background()) + scalerCache.Close(context.Background()) // Test the valiation scalerTestDatam := []scalerTestData{ @@ -477,22 +472,16 @@ func TestIsScaledJobActive(t *testing.T) { }, }} - //cache = ScalersCache{ - // Scalers: scalersToTest, - // Recorder: recorder, - //} scalerCache = cache.ScalersCache{ Scalers: scalersToTest, Recorder: recorder, } - caches := map[string]*cache.ScalersCache{} + caches = map[string]*cache.ScalersCache{} caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache - sh := scaleHandler{ - //client: mockClient, - scaleLoopContexts: &sync.Map{}, - //scaleExecutor: mockExecutor, + sh = scaleHandler{ + scaleLoopContexts: &sync.Map{}, globalHTTPTimeout: time.Duration(1000), recorder: recorder, scalerCaches: caches, @@ -500,12 +489,12 @@ func TestIsScaledJobActive(t *testing.T) { scaledObjectsMetricCache: metricscache.NewMetricsCache(), } fmt.Printf("index: %d", index) - isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJob) + isActive, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob) // assert.Equal(t, 5, index) assert.Equal(t, scalerTestData.ResultIsActive, isActive) assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) - //cache.Close(context.Background()) + scalerCache.Close(context.Background()) } } @@ -524,7 +513,6 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T }} scalerCache := cache.ScalersCache{ - //Scalers: scalers, Scalers: scalerSingle, Recorder: recorder, } @@ -533,9 +521,7 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache sh := scaleHandler{ - //client: mockClient, - scaleLoopContexts: &sync.Map{}, - //scaleExecutor: mockExecutor, + scaleLoopContexts: &sync.Map{}, globalHTTPTimeout: time.Duration(1000), recorder: recorder, scalerCaches: caches, @@ -547,7 +533,7 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T assert.Equal(t, true, isActive) assert.Equal(t, int64(0), queueLength) assert.Equal(t, int64(0), maxValue) - //cache.Close(context.Background()) + scalerCache.Close(context.Background()) } func newScalerTestData( @@ -671,7 +657,6 @@ func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64 } scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return(metrics, isActive, nil) - // yoon - //scaler.EXPECT().Close(gomock.Any()) + scaler.EXPECT().Close(gomock.Any()) return scaler } From 2d3ed5d1dfd30a48a258a962b939c041f55423ff Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sun, 18 Jun 2023 15:19:25 +0900 Subject: [PATCH 05/21] Delete unnecessary lines Signed-off-by: Yoon Park --- pkg/scaling/scale_handler.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index a1e0eadf180..fe1dbced57f 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -248,12 +248,6 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac h.scaledObjectsMetricCache.StoreRecords(obj.GenerateIdentifier(), metricsRecords) } case *kedav1alpha1.ScaledJob: - //cache, err := h.GetScalersCache(ctx, scalableObject) - //if err != nil { - // log.Error(err, "error getting scalers cache", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) - // return - //} - err := h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) if err != nil { log.Error(err, "error getting scaledJob", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) @@ -632,6 +626,7 @@ type scalerMetrics struct { // / --------------------------------------------------------------------------- /// // / ---------- ScaledJob related methods --------- /// // / --------------------------------------------------------------------------- /// + // getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { From 145a357c90922078e61817ff64ea08a3634b4b4f Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sun, 18 Jun 2023 15:22:17 +0900 Subject: [PATCH 06/21] Delete scalers_cache_tests.go after migration Signed-off-by: Yoon Park --- pkg/scaling/cache/scalers_cache_test.go | 280 ------------------------ 1 file changed, 280 deletions(-) delete mode 100644 pkg/scaling/cache/scalers_cache_test.go diff --git a/pkg/scaling/cache/scalers_cache_test.go b/pkg/scaling/cache/scalers_cache_test.go deleted file mode 100644 index a8559288d7f..00000000000 --- a/pkg/scaling/cache/scalers_cache_test.go +++ /dev/null @@ -1,280 +0,0 @@ -package cache - -import ( - "context" - "fmt" - "testing" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - v2 "k8s.io/api/autoscaling/v2" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/client-go/tools/record" - "k8s.io/metrics/pkg/apis/external_metrics" - - kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" - "github.com/kedacore/keda/v2/pkg/scalers" -) - -func TestTargetAverageValue(t *testing.T) { - // count = 0 - specs := []v2.MetricSpec{} - metricName := "s0-messageCount" - targetAverageValue := getTargetAverageValue(specs) - assert.Equal(t, float64(0), targetAverageValue) - // 1 1 - specs = []v2.MetricSpec{ - createMetricSpec(1, metricName), - createMetricSpec(1, metricName), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, float64(1), targetAverageValue) - // 5 5 3 -> 4.333333333333333 - specs = []v2.MetricSpec{ - createMetricSpec(5, metricName), - createMetricSpec(5, metricName), - createMetricSpec(3, metricName), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, 4.333333333333333, targetAverageValue) - - // 5 5 4 -> 4.666666666666667 - specs = []v2.MetricSpec{ - createMetricSpec(5, metricName), - createMetricSpec(5, metricName), - createMetricSpec(4, metricName), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, 4.666666666666667, targetAverageValue) -} - -func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { - qty := resource.NewQuantity(averageValue, resource.DecimalSI) - return v2.MetricSpec{ - External: &v2.ExternalMetricSource{ - Target: v2.MetricTarget{ - AverageValue: qty, - }, - Metric: v2.MetricIdentifier{ - Name: metricName, - }, - }, - } -} - -func TestIsScaledJobActive(t *testing.T) { - metricName := "s0-queueLength" - ctrl := gomock.NewController(t) - recorder := record.NewFakeRecorder(1) - // Keep the current behavior - // Assme 1 trigger only - scaledJobSingle := createScaledJob(0, 100, "") // testing default = max - scalerSingle := []ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil - }, - }} - - cache := ScalersCache{ - Scalers: scalerSingle, - Recorder: recorder, - } - - isActive, queueLength, maxValue := cache.IsScaledJobActive(context.TODO(), scaledJobSingle) - assert.Equal(t, true, isActive) - assert.Equal(t, int64(20), queueLength) - assert.Equal(t, int64(10), maxValue) - cache.Close(context.Background()) - - // Non-Active trigger only - scalerSingle = []ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(0), int64(2), false, metricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, int64(0), int64(2), false, metricName), &scalers.ScalerConfig{}, nil - }, - }} - - cache = ScalersCache{ - Scalers: scalerSingle, - Recorder: recorder, - } - - isActive, queueLength, maxValue = cache.IsScaledJobActive(context.TODO(), scaledJobSingle) - assert.Equal(t, false, isActive) - assert.Equal(t, int64(0), queueLength) - assert.Equal(t, int64(0), maxValue) - cache.Close(context.Background()) - - // Test the valiation - scalerTestDatam := []scalerTestData{ - newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20), - newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2), - newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9), - newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27), - newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25), - } - - for index, scalerTestData := range scalerTestDatam { - scaledJob := createScaledJob(scalerTestData.MinReplicaCount, scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) - scalersToTest := []ScalerBuilder{{ - Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil - }, - }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil - }, - }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil - }, - }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil - }, - }} - - cache = ScalersCache{ - Scalers: scalersToTest, - Recorder: recorder, - } - fmt.Printf("index: %d", index) - isActive, queueLength, maxValue = cache.IsScaledJobActive(context.TODO(), scaledJob) - // assert.Equal(t, 5, index) - assert.Equal(t, scalerTestData.ResultIsActive, isActive) - assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) - assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) - cache.Close(context.Background()) - } -} - -func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T) { - metricName := "s0-queueLength" - ctrl := gomock.NewController(t) - recorder := record.NewFakeRecorder(1) - // Keep the current behavior - // Assme 1 trigger only - scaledJobSingle := createScaledJob(1, 100, "") // testing default = max - scalerSingle := []ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(0), int64(1), true, metricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, int64(0), int64(1), true, metricName), &scalers.ScalerConfig{}, nil - }, - }} - - cache := ScalersCache{ - Scalers: scalerSingle, - Recorder: recorder, - } - - isActive, queueLength, maxValue := cache.IsScaledJobActive(context.TODO(), scaledJobSingle) - assert.Equal(t, true, isActive) - assert.Equal(t, int64(0), queueLength) - assert.Equal(t, int64(0), maxValue) - cache.Close(context.Background()) -} - -func newScalerTestData( - metricName string, - maxReplicaCount int, - multipleScalersCalculation string, - scaler1QueueLength, //nolint:golint,unparam - scaler1AverageValue int, //nolint:golint,unparam - scaler1IsActive bool, //nolint:golint,unparam - scaler2QueueLength, //nolint:golint,unparam - scaler2AverageValue int, //nolint:golint,unparam - scaler2IsActive bool, //nolint:golint,unparam - scaler3QueueLength, //nolint:golint,unparam - scaler3AverageValue int, //nolint:golint,unparam - scaler3IsActive bool, //nolint:golint,unparam - scaler4QueueLength, //nolint:golint,unparam - scaler4AverageValue int, //nolint:golint,unparam - scaler4IsActive bool, //nolint:golint,unparam - resultIsActive bool, //nolint:golint,unparam - resultQueueLength, - resultMaxLength int) scalerTestData { - return scalerTestData{ - MetricName: metricName, - MaxReplicaCount: int32(maxReplicaCount), - MultipleScalersCalculation: multipleScalersCalculation, - Scaler1QueueLength: int64(scaler1QueueLength), - Scaler1AverageValue: int64(scaler1AverageValue), - Scaler1IsActive: scaler1IsActive, - Scaler2QueueLength: int64(scaler2QueueLength), - Scaler2AverageValue: int64(scaler2AverageValue), - Scaler2IsActive: scaler2IsActive, - Scaler3QueueLength: int64(scaler3QueueLength), - Scaler3AverageValue: int64(scaler3AverageValue), - Scaler3IsActive: scaler3IsActive, - Scaler4QueueLength: int64(scaler4QueueLength), - Scaler4AverageValue: int64(scaler4AverageValue), - Scaler4IsActive: scaler4IsActive, - ResultIsActive: resultIsActive, - ResultQueueLength: int64(resultQueueLength), - ResultMaxValue: int64(resultMaxLength), - } -} - -type scalerTestData struct { - MetricName string - MaxReplicaCount int32 - MultipleScalersCalculation string - Scaler1QueueLength int64 - Scaler1AverageValue int64 - Scaler1IsActive bool - Scaler2QueueLength int64 - Scaler2AverageValue int64 - Scaler2IsActive bool - Scaler3QueueLength int64 - Scaler3AverageValue int64 - Scaler3IsActive bool - Scaler4QueueLength int64 - Scaler4AverageValue int64 - Scaler4IsActive bool - ResultIsActive bool - ResultQueueLength int64 - ResultMaxValue int64 - MinReplicaCount int32 -} - -func createScaledJob(minReplicaCount int32, maxReplicaCount int32, multipleScalersCalculation string) *kedav1alpha1.ScaledJob { - if multipleScalersCalculation != "" { - return &kedav1alpha1.ScaledJob{ - Spec: kedav1alpha1.ScaledJobSpec{ - MinReplicaCount: &minReplicaCount, - MaxReplicaCount: &maxReplicaCount, - ScalingStrategy: kedav1alpha1.ScalingStrategy{ - MultipleScalersCalculation: multipleScalersCalculation, - }, - }, - } - } - return &kedav1alpha1.ScaledJob{ - Spec: kedav1alpha1.ScaledJobSpec{ - MinReplicaCount: &minReplicaCount, - MaxReplicaCount: &maxReplicaCount, - }, - } -} - -func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool, metricName string) *mock_scalers.MockScaler { - scaler := mock_scalers.NewMockScaler(ctrl) - metricsSpecs := []v2.MetricSpec{createMetricSpec(averageValue, metricName)} - - metrics := []external_metrics.ExternalMetricValue{ - { - MetricName: metricName, - Value: *resource.NewQuantity(queueLength, resource.DecimalSI), - }, - } - scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) - scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return(metrics, isActive, nil) - scaler.EXPECT().Close(gomock.Any()) - return scaler -} From b9083136b63ec9517e7e191ffec37b4ed8f48b40 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sun, 18 Jun 2023 15:27:45 +0900 Subject: [PATCH 07/21] Remove methods at scalers_cache.go as well Signed-off-by: Yoon Park --- pkg/scaling/cache/scalers_cache.go | 158 ----------------------------- 1 file changed, 158 deletions(-) diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index c34d457dbaa..da3a0d05100 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -19,17 +19,14 @@ package cache import ( "context" "fmt" - "math" "time" v2 "k8s.io/api/autoscaling/v2" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "k8s.io/metrics/pkg/apis/external_metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/scalers" ) @@ -141,68 +138,6 @@ func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index return metric, activity, time.Since(startTime).Milliseconds(), err } -// TODO needs refactor - move ScaledJob related methods to scale_handler, the similar way ScaledObject methods are -// refactor logic -func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { - var queueLength float64 - var maxValue float64 - isActive := false - - logger := logf.Log.WithName("scalemetrics") - scalersMetrics := c.getScaledJobMetrics(ctx, scaledJob) - switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { - case "min": - for _, metrics := range scalersMetrics { - if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - case "avg": - queueLengthSum := float64(0) - maxValueSum := float64(0) - length := 0 - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLengthSum += metrics.queueLength - maxValueSum += metrics.maxValue - isActive = metrics.isActive - length++ - } - } - if length != 0 { - queueLength = queueLengthSum / float64(length) - maxValue = maxValueSum / float64(length) - } - case "sum": - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLength += metrics.queueLength - maxValue += metrics.maxValue - isActive = metrics.isActive - } - } - default: // max - for _, metrics := range scalersMetrics { - if metrics.queueLength > queueLength && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - } - - if scaledJob.MinReplicaCount() > 0 { - isActive = true - } - - maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue) - logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) - - return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue) -} - func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) { if id < 0 || id >= len(c.Scalers) { return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers)) @@ -226,96 +161,3 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale return ns, nil } - -type scalerMetrics struct { - queueLength float64 - maxValue float64 - isActive bool -} - -func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { - // TODO this loop should be probably done similar way the ScaledObject loop is done - var scalersMetrics []scalerMetrics - for i, s := range c.Scalers { - var queueLength float64 - var targetAverageValue float64 - isActive := false - maxValue := float64(0) - scalerType := fmt.Sprintf("%T:", s) - - scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) - - metricSpecs := s.Scaler.GetMetricSpecForScaling(ctx) - - // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) - // or skip cpu/memory resource scaler - if len(metricSpecs) < 1 || metricSpecs[0].External == nil { - continue - } - - // TODO here we should probably loop through all metrics in a Scaler - metrics, isTriggerActive, _, err := c.GetMetricsAndActivityForScaler(ctx, i, metricSpecs[0].External.Metric.Name) - if err != nil { - scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) - c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - continue - } - - targetAverageValue = getTargetAverageValue(metricSpecs) - - var metricValue float64 - for _, m := range metrics { - if m.MetricName == metricSpecs[0].External.Metric.Name { - metricValue = m.Value.AsApproximateFloat64() - queueLength += metricValue - } - } - scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) - - if isTriggerActive { - isActive = true - } - - if targetAverageValue != 0 { - averageLength := queueLength / targetAverageValue - maxValue = min(float64(scaledJob.MaxReplicaCount()), averageLength) - } - scalersMetrics = append(scalersMetrics, scalerMetrics{ - queueLength: queueLength, - maxValue: maxValue, - isActive: isActive, - }) - } - return scalersMetrics -} - -func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { - var targetAverageValue float64 - var metricValue float64 - for _, metric := range metricSpecs { - if metric.External.Target.AverageValue == nil { - metricValue = 0 - } else { - metricValue = metric.External.Target.AverageValue.AsApproximateFloat64() - } - - targetAverageValue += metricValue - } - count := float64(len(metricSpecs)) - if count != 0 { - return targetAverageValue / count - } - return 0 -} - -func ceilToInt64(x float64) int64 { - return int64(math.Ceil(x)) -} - -// Min function for float64 -func min(x, y float64) float64 { - if x > y { - return y - } - return x -} From 764b4d803bd3c33bf6379f3411df8fe093903d20 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sun, 18 Jun 2023 22:27:18 +0900 Subject: [PATCH 08/21] Change receiver name Signed-off-by: Yoon Park --- apis/keda/v1alpha1/scaledjob_types.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apis/keda/v1alpha1/scaledjob_types.go b/apis/keda/v1alpha1/scaledjob_types.go index 1d68bfe0b09..366f22a14cd 100644 --- a/apis/keda/v1alpha1/scaledjob_types.go +++ b/apis/keda/v1alpha1/scaledjob_types.go @@ -143,6 +143,6 @@ func (s ScaledJob) MinReplicaCount() int64 { return defaultScaledJobMinReplicaCount } -func (so *ScaledJob) GenerateIdentifier() string { - return GenerateIdentifier("ScaledJob", so.Namespace, so.Name) +func (s *ScaledJob) GenerateIdentifier() string { + return GenerateIdentifier("ScaledJob", s.Namespace, s.Name) } From 7480fcdf44ef1e6fc26382763e788796647df395 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sun, 18 Jun 2023 22:57:41 +0900 Subject: [PATCH 09/21] Sort module to pass statick analysis Signed-off-by: Yoon Park --- pkg/scaling/scale_handler.go | 2 +- pkg/scaling/scale_handler_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index fe1dbced57f..02772c2f6a9 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,12 +19,12 @@ package scaling import ( "context" "fmt" - v2 "k8s.io/api/autoscaling/v2" "math" "strings" "sync" "time" + v2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 58bc053377c..5e59358548e 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -20,8 +20,6 @@ import ( "context" "errors" "fmt" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" "sync" "testing" "time" @@ -29,6 +27,8 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" v2 "k8s.io/api/autoscaling/v2" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" From 62ec041cda077084bb20b679142a2a883eb667d4 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Tue, 20 Jun 2023 18:41:51 +0900 Subject: [PATCH 10/21] Use separate package for the method getTargetAverageValue Signed-off-by: Yoon Park --- pkg/scaling/scale_handler.go | 23 ++--------------------- pkg/scaling/scale_handler_test.go | 9 +++++---- pkg/scaling/utils/metrics.go | 22 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 25 deletions(-) create mode 100644 pkg/scaling/utils/metrics.go diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 02772c2f6a9..d77a9f5c53d 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -24,7 +24,6 @@ import ( "sync" "time" - v2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -44,6 +43,7 @@ import ( "github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache" "github.com/kedacore/keda/v2/pkg/scaling/executor" "github.com/kedacore/keda/v2/pkg/scaling/resolver" + "github.com/kedacore/keda/v2/pkg/scaling/utils" ) var log = logf.Log.WithName("scale_handler") @@ -663,7 +663,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav continue } - targetAverageValue = getTargetAverageValue(metricSpecs) + targetAverageValue = utils.GetTargetAverageValue(metricSpecs) var metricValue float64 for _, m := range metrics { @@ -754,25 +754,6 @@ func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1a return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue) } -func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { - var targetAverageValue float64 - var metricValue float64 - for _, metric := range metricSpecs { - if metric.External.Target.AverageValue == nil { - metricValue = 0 - } else { - metricValue = metric.External.Target.AverageValue.AsApproximateFloat64() - } - - targetAverageValue += metricValue - } - count := float64(len(metricSpecs)) - if count != 0 { - return targetAverageValue / count - } - return 0 -} - func ceilToInt64(x float64) int64 { return int64(math.Ceil(x)) } diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 5e59358548e..f949de8b6d1 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -41,6 +41,7 @@ import ( "github.com/kedacore/keda/v2/pkg/scalers" "github.com/kedacore/keda/v2/pkg/scaling/cache" "github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache" + "github.com/kedacore/keda/v2/pkg/scaling/utils" ) func TestGetScaledObjectMetrics_DirectCall(t *testing.T) { @@ -364,14 +365,14 @@ func TestTargetAverageValue(t *testing.T) { // count = 0 specs := []v2.MetricSpec{} metricName := "s0-messageCount" - targetAverageValue := getTargetAverageValue(specs) + targetAverageValue := utils.GetTargetAverageValue(specs) assert.Equal(t, float64(0), targetAverageValue) // 1 1 specs = []v2.MetricSpec{ createMetricSpec(1, metricName), createMetricSpec(1, metricName), } - targetAverageValue = getTargetAverageValue(specs) + targetAverageValue = utils.GetTargetAverageValue(specs) assert.Equal(t, float64(1), targetAverageValue) // 5 5 3 -> 4.333333333333333 specs = []v2.MetricSpec{ @@ -379,7 +380,7 @@ func TestTargetAverageValue(t *testing.T) { createMetricSpec(5, metricName), createMetricSpec(3, metricName), } - targetAverageValue = getTargetAverageValue(specs) + targetAverageValue = utils.GetTargetAverageValue(specs) assert.Equal(t, 4.333333333333333, targetAverageValue) // 5 5 4 -> 4.666666666666667 @@ -388,7 +389,7 @@ func TestTargetAverageValue(t *testing.T) { createMetricSpec(5, metricName), createMetricSpec(4, metricName), } - targetAverageValue = getTargetAverageValue(specs) + targetAverageValue = utils.GetTargetAverageValue(specs) assert.Equal(t, 4.666666666666667, targetAverageValue) } func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { diff --git a/pkg/scaling/utils/metrics.go b/pkg/scaling/utils/metrics.go new file mode 100644 index 00000000000..b848487fe42 --- /dev/null +++ b/pkg/scaling/utils/metrics.go @@ -0,0 +1,22 @@ +package utils + +import v2 "k8s.io/api/autoscaling/v2" + +func GetTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { + var totalAverageValue float64 + var metricValue float64 + for _, metric := range metricSpecs { + if metric.External.Target.AverageValue == nil { + metricValue = 0 + } else { + metricValue = metric.External.Target.AverageValue.AsApproximateFloat64() + } + + totalAverageValue += metricValue + } + count := float64(len(metricSpecs)) + if count != 0 { + return totalAverageValue / count + } + return 0 +} From c44fcfbf07459120fc7ec1aa2dcf4504a67fdb02 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Tue, 20 Jun 2023 19:14:48 +0900 Subject: [PATCH 11/21] Relocate createMetricSpec and related test cases Signed-off-by: Yoon Park --- pkg/scaling/scale_handler_test.go | 55 +++---------------------------- pkg/scaling/utils/metrics.go | 19 ++++++++++- pkg/scaling/utils/metrics_test.go | 39 ++++++++++++++++++++++ 3 files changed, 62 insertions(+), 51 deletions(-) create mode 100644 pkg/scaling/utils/metrics_test.go diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index f949de8b6d1..80f0c8c3b59 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -56,7 +56,7 @@ func TestGetScaledObjectMetrics_DirectCall(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) mockStatusWriter := mock_client.NewMockStatusWriter(ctrl) - metricsSpecs := []v2.MetricSpec{createMetricSpec(10, metricName)} + metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(10, metricName)} metricValue := scalers.GenerateMetricInMili(metricName, float64(10)) metricsRecord := map[string]metricscache.MetricsRecord{} @@ -147,7 +147,7 @@ func TestGetScaledObjectMetrics_FromCache(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) mockStatusWriter := mock_client.NewMockStatusWriter(ctrl) - metricsSpecs := []v2.MetricSpec{createMetricSpec(10, metricName)} + metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(10, metricName)} metricValue := scalers.GenerateMetricInMili(metricName, float64(10)) metricsRecord := map[string]metricscache.MetricsRecord{} @@ -231,7 +231,7 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) recorder := record.NewFakeRecorder(1) - metricsSpecs := []v2.MetricSpec{createMetricSpec(1, "metric-name")} + metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(1, "metric-name")} scaler := mock_scalers.NewMockScaler(ctrl) scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) @@ -292,7 +292,7 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) recorder := record.NewFakeRecorder(1) - metricsSpecs := []v2.MetricSpec{createMetricSpec(1, "metric-name")} + metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(1, "metric-name")} activeFactory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { scaler := mock_scalers.NewMockScaler(ctrl) @@ -361,51 +361,6 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { assert.Equal(t, true, isError) } -func TestTargetAverageValue(t *testing.T) { - // count = 0 - specs := []v2.MetricSpec{} - metricName := "s0-messageCount" - targetAverageValue := utils.GetTargetAverageValue(specs) - assert.Equal(t, float64(0), targetAverageValue) - // 1 1 - specs = []v2.MetricSpec{ - createMetricSpec(1, metricName), - createMetricSpec(1, metricName), - } - targetAverageValue = utils.GetTargetAverageValue(specs) - assert.Equal(t, float64(1), targetAverageValue) - // 5 5 3 -> 4.333333333333333 - specs = []v2.MetricSpec{ - createMetricSpec(5, metricName), - createMetricSpec(5, metricName), - createMetricSpec(3, metricName), - } - targetAverageValue = utils.GetTargetAverageValue(specs) - assert.Equal(t, 4.333333333333333, targetAverageValue) - - // 5 5 4 -> 4.666666666666667 - specs = []v2.MetricSpec{ - createMetricSpec(5, metricName), - createMetricSpec(5, metricName), - createMetricSpec(4, metricName), - } - targetAverageValue = utils.GetTargetAverageValue(specs) - assert.Equal(t, 4.666666666666667, targetAverageValue) -} -func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { - qty := resource.NewQuantity(averageValue, resource.DecimalSI) - return v2.MetricSpec{ - External: &v2.ExternalMetricSource{ - Target: v2.MetricTarget{ - AverageValue: qty, - }, - Metric: v2.MetricIdentifier{ - Name: metricName, - }, - }, - } -} - func TestIsScaledJobActive(t *testing.T) { metricName := "s0-queueLength" ctrl := gomock.NewController(t) @@ -648,7 +603,7 @@ func createScaledJob(minReplicaCount int32, maxReplicaCount int32, multipleScale func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool, metricName string) *mock_scalers.MockScaler { scaler := mock_scalers.NewMockScaler(ctrl) - metricsSpecs := []v2.MetricSpec{createMetricSpec(averageValue, metricName)} + metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(averageValue, metricName)} metrics := []external_metrics.ExternalMetricValue{ { diff --git a/pkg/scaling/utils/metrics.go b/pkg/scaling/utils/metrics.go index b848487fe42..c1a98ddda42 100644 --- a/pkg/scaling/utils/metrics.go +++ b/pkg/scaling/utils/metrics.go @@ -1,6 +1,9 @@ package utils -import v2 "k8s.io/api/autoscaling/v2" +import ( + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/resource" +) func GetTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { var totalAverageValue float64 @@ -20,3 +23,17 @@ func GetTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { } return 0 } + +func CreateMetricSpec(averageValue int64, metricName string) v2.MetricSpec { + qty := resource.NewQuantity(averageValue, resource.DecimalSI) + return v2.MetricSpec{ + External: &v2.ExternalMetricSource{ + Target: v2.MetricTarget{ + AverageValue: qty, + }, + Metric: v2.MetricIdentifier{ + Name: metricName, + }, + }, + } +} diff --git a/pkg/scaling/utils/metrics_test.go b/pkg/scaling/utils/metrics_test.go new file mode 100644 index 00000000000..3df1b85ba9c --- /dev/null +++ b/pkg/scaling/utils/metrics_test.go @@ -0,0 +1,39 @@ +package utils + +import ( + "github.com/stretchr/testify/assert" + v2 "k8s.io/api/autoscaling/v2" + "testing" +) + +func TestTargetAverageValue(t *testing.T) { + // count = 0 + specs := []v2.MetricSpec{} + metricName := "s0-messageCount" + targetAverageValue := GetTargetAverageValue(specs) + assert.Equal(t, float64(0), targetAverageValue) + // 1 1 + specs = []v2.MetricSpec{ + CreateMetricSpec(1, metricName), + CreateMetricSpec(1, metricName), + } + targetAverageValue = GetTargetAverageValue(specs) + assert.Equal(t, float64(1), targetAverageValue) + // 5 5 3 -> 4.333333333333333 + specs = []v2.MetricSpec{ + CreateMetricSpec(5, metricName), + CreateMetricSpec(5, metricName), + CreateMetricSpec(3, metricName), + } + targetAverageValue = GetTargetAverageValue(specs) + assert.Equal(t, 4.333333333333333, targetAverageValue) + + // 5 5 4 -> 4.666666666666667 + specs = []v2.MetricSpec{ + CreateMetricSpec(5, metricName), + CreateMetricSpec(5, metricName), + CreateMetricSpec(4, metricName), + } + targetAverageValue = GetTargetAverageValue(specs) + assert.Equal(t, 4.666666666666667, targetAverageValue) +} From ce7c4642c67d0b354d54d99b5f389cc8de38d92e Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Tue, 20 Jun 2023 19:38:02 +0900 Subject: [PATCH 12/21] Rearrange import module to pass static analysis Signed-off-by: Yoon Park --- pkg/scaling/utils/metrics_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/scaling/utils/metrics_test.go b/pkg/scaling/utils/metrics_test.go index 3df1b85ba9c..f96b5446a69 100644 --- a/pkg/scaling/utils/metrics_test.go +++ b/pkg/scaling/utils/metrics_test.go @@ -1,9 +1,10 @@ package utils import ( + "testing" + "github.com/stretchr/testify/assert" v2 "k8s.io/api/autoscaling/v2" - "testing" ) func TestTargetAverageValue(t *testing.T) { From d486a66a03e7e95fc88ce048af51b314eb76874b Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Wed, 21 Jun 2023 08:05:43 +0900 Subject: [PATCH 13/21] Add tests for scaledJob identifier Signed-off-by: Yoon Park --- apis/keda/v1alpha1/identifier_test.go | 124 +++++++++++++++++++------- 1 file changed, 93 insertions(+), 31 deletions(-) diff --git a/apis/keda/v1alpha1/identifier_test.go b/apis/keda/v1alpha1/identifier_test.go index 251829c190a..312f88db914 100644 --- a/apis/keda/v1alpha1/identifier_test.go +++ b/apis/keda/v1alpha1/identifier_test.go @@ -14,39 +14,37 @@ type testData struct { soKind string } -var tests = []testData{ - { - name: "all lowercase", - expectedIdentifier: "scaledobject.namespace.name", - soName: "name", - soNamespace: "namespace", - soKind: "scaledobject", - }, - { - name: "all uppercase", - expectedIdentifier: "scaledobject.namespace.name", - soName: "NAME", - soNamespace: "NAMESPACE", - soKind: "SCALEDOBJECT", - }, - { - name: "camel case", - expectedIdentifier: "scaledobject.namespace.name", - soName: "name", - soNamespace: "namespace", - soKind: "scaledobject", - }, - { - name: "missing namespace", - expectedIdentifier: "scaledobject..name", - soName: "name", - soKind: "scaledobject", - }, -} - func TestGeneratedIdentifierForScaledObject(t *testing.T) { + tests := []testData{ + { + name: "all lowercase", + expectedIdentifier: "scaledobject.namespace.name", + soName: "name", + soNamespace: "namespace", + soKind: "scaledobject", + }, + { + name: "all uppercase", + expectedIdentifier: "scaledobject.namespace.name", + soName: "NAME", + soNamespace: "NAMESPACE", + soKind: "SCALEDOBJECT", + }, + { + name: "camel case", + expectedIdentifier: "scaledobject.namespace.name", + soName: "name", + soNamespace: "namespace", + soKind: "scaledobject", + }, + { + name: "missing namespace", + expectedIdentifier: "scaledobject..name", + soName: "name", + soKind: "scaledobject", + }, + } for _, test := range tests { - test := test t.Run(test.name, func(t *testing.T) { expectedIdentifier := test.expectedIdentifier genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName) @@ -79,3 +77,67 @@ func TestGeneratedIdentifierForScaledObject(t *testing.T) { }) } } + +func TestGeneratedIdentifierForScaledJob(t *testing.T) { + tests := []testData{ + { + name: "all lowercase", + expectedIdentifier: "scaledjob.namespace.name", + soName: "name", + soNamespace: "namespace", + soKind: "scaledjob", + }, + { + name: "all uppercase", + expectedIdentifier: "scaledjob.namespace.name", + soName: "NAME", + soNamespace: "NAMESPACE", + soKind: "SCALEDJOB", + }, + { + name: "camel case", + expectedIdentifier: "scaledjob.namespace.name", + soName: "name", + soNamespace: "namespace", + soKind: "scaledjob", + }, + { + name: "missing namespace", + expectedIdentifier: "scaledjob..name", + soName: "name", + soKind: "scaledjob", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + expectedIdentifier := test.expectedIdentifier + genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName) + + scaledJob := &ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: test.soName, + Namespace: test.soNamespace, + }, + } + scaledJobIdentifier := scaledJob.GenerateIdentifier() + + withTriggers, err := AsDuckWithTriggers(scaledJob) + if err != nil { + t.Errorf("got error while converting to WithTriggers object: %s", err) + } + withTriggersIdentifier := withTriggers.GenerateIdentifier() + + if expectedIdentifier != genericIdentifier { + t.Errorf("genericIdentifier=%q doesn't equal the expectedIdentifier=%q", genericIdentifier, expectedIdentifier) + } + + if expectedIdentifier != scaledJobIdentifier { + t.Errorf("scaledJobIdentifier=%q doesn't equal the expectedIdentifier=%q", scaledJobIdentifier, expectedIdentifier) + } + + if expectedIdentifier != withTriggersIdentifier { + t.Errorf("withTriggersIdentifier=%q doesn't equal the expectedIdentifier=%q", withTriggersIdentifier, expectedIdentifier) + } + }) + } +} From 0b862a1c3d692b5d806ddbddc3167f5847160f26 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 8 Jul 2023 13:41:11 +0900 Subject: [PATCH 14/21] rename the package name and delete comments Signed-off-by: Yoon Park --- pkg/scaling/scale_handler.go | 6 ++---- pkg/scaling/scale_handler_test.go | 12 ++++++------ pkg/scaling/{utils => scaledjob}/metrics.go | 4 +++- pkg/scaling/{utils => scaledjob}/metrics_test.go | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) rename pkg/scaling/{utils => scaledjob}/metrics.go (83%) rename pkg/scaling/{utils => scaledjob}/metrics_test.go (98%) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index d77a9f5c53d..badbe2ea0e9 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -43,7 +43,7 @@ import ( "github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache" "github.com/kedacore/keda/v2/pkg/scaling/executor" "github.com/kedacore/keda/v2/pkg/scaling/resolver" - "github.com/kedacore/keda/v2/pkg/scaling/utils" + "github.com/kedacore/keda/v2/pkg/scaling/scaledjob" ) var log = logf.Log.WithName("scale_handler") @@ -630,7 +630,6 @@ type scalerMetrics struct { // getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { - // TODO this loop should be probably done similar way the ScaledObject loop is done cache, err := h.GetScalersCache(ctx, scaledJob) if err != nil { log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) @@ -655,7 +654,6 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav continue } - // TODO here we should probably loop through all metrics in a Scaler metrics, isTriggerActive, _, err := cache.GetMetricsAndActivityForScaler(ctx, i, metricSpecs[0].External.Metric.Name) if err != nil { scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) @@ -663,7 +661,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav continue } - targetAverageValue = utils.GetTargetAverageValue(metricSpecs) + targetAverageValue = scaledjob.GetTargetAverageValue(metricSpecs) var metricValue float64 for _, m := range metrics { diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 80f0c8c3b59..f057e0026ef 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -41,7 +41,7 @@ import ( "github.com/kedacore/keda/v2/pkg/scalers" "github.com/kedacore/keda/v2/pkg/scaling/cache" "github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache" - "github.com/kedacore/keda/v2/pkg/scaling/utils" + "github.com/kedacore/keda/v2/pkg/scaling/scaledjob" ) func TestGetScaledObjectMetrics_DirectCall(t *testing.T) { @@ -56,7 +56,7 @@ func TestGetScaledObjectMetrics_DirectCall(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) mockStatusWriter := mock_client.NewMockStatusWriter(ctrl) - metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(10, metricName)} + metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(10, metricName)} metricValue := scalers.GenerateMetricInMili(metricName, float64(10)) metricsRecord := map[string]metricscache.MetricsRecord{} @@ -147,7 +147,7 @@ func TestGetScaledObjectMetrics_FromCache(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) mockStatusWriter := mock_client.NewMockStatusWriter(ctrl) - metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(10, metricName)} + metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(10, metricName)} metricValue := scalers.GenerateMetricInMili(metricName, float64(10)) metricsRecord := map[string]metricscache.MetricsRecord{} @@ -231,7 +231,7 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) recorder := record.NewFakeRecorder(1) - metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(1, "metric-name")} + metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(1, "metric-name")} scaler := mock_scalers.NewMockScaler(ctrl) scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) @@ -292,7 +292,7 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) recorder := record.NewFakeRecorder(1) - metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(1, "metric-name")} + metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(1, "metric-name")} activeFactory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { scaler := mock_scalers.NewMockScaler(ctrl) @@ -603,7 +603,7 @@ func createScaledJob(minReplicaCount int32, maxReplicaCount int32, multipleScale func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool, metricName string) *mock_scalers.MockScaler { scaler := mock_scalers.NewMockScaler(ctrl) - metricsSpecs := []v2.MetricSpec{utils.CreateMetricSpec(averageValue, metricName)} + metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(averageValue, metricName)} metrics := []external_metrics.ExternalMetricValue{ { diff --git a/pkg/scaling/utils/metrics.go b/pkg/scaling/scaledjob/metrics.go similarity index 83% rename from pkg/scaling/utils/metrics.go rename to pkg/scaling/scaledjob/metrics.go index c1a98ddda42..d535dbe0d25 100644 --- a/pkg/scaling/utils/metrics.go +++ b/pkg/scaling/scaledjob/metrics.go @@ -1,10 +1,11 @@ -package utils +package scaledjob import ( v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" ) +// GetTargetAverageValue returns the average of all the metrics' average value. func GetTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { var totalAverageValue float64 var metricValue float64 @@ -24,6 +25,7 @@ func GetTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { return 0 } +// CreateMetricSpec creates MetricSpec for given metric name and target value. func CreateMetricSpec(averageValue int64, metricName string) v2.MetricSpec { qty := resource.NewQuantity(averageValue, resource.DecimalSI) return v2.MetricSpec{ diff --git a/pkg/scaling/utils/metrics_test.go b/pkg/scaling/scaledjob/metrics_test.go similarity index 98% rename from pkg/scaling/utils/metrics_test.go rename to pkg/scaling/scaledjob/metrics_test.go index f96b5446a69..db3a5a92f40 100644 --- a/pkg/scaling/utils/metrics_test.go +++ b/pkg/scaling/scaledjob/metrics_test.go @@ -1,4 +1,4 @@ -package utils +package scaledjob import ( "testing" From ff5cd11b08aa403539b57b138ca30c3b33a9f257 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 8 Jul 2023 14:46:36 +0900 Subject: [PATCH 15/21] Move scaledJob metric logic under scaledJob package Signed-off-by: Yoon Park --- pkg/scaling/scale_handler.go | 92 +++++--------------------------- pkg/scaling/scaledjob/metrics.go | 79 +++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 80 deletions(-) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index badbe2ea0e9..434deb37c01 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,7 +19,6 @@ package scaling import ( "context" "fmt" - "math" "strings" "sync" "time" @@ -617,25 +616,19 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k return isScaledObjectActive, isScalerError, metricsRecord, nil } -type scalerMetrics struct { - queueLength float64 - maxValue float64 - isActive bool -} - // / --------------------------------------------------------------------------- /// // / ---------- ScaledJob related methods --------- /// // / --------------------------------------------------------------------------- /// // getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. -func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { +func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics { cache, err := h.GetScalersCache(ctx, scaledJob) if err != nil { log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) return nil } - var scalersMetrics []scalerMetrics + var scalersMetrics []scaledjob.ScalerMetrics scalers, _ := cache.GetScalers() for i, s := range scalers { var queueLength float64 @@ -678,12 +671,12 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav if targetAverageValue != 0 { averageLength := queueLength / targetAverageValue - maxValue = min(float64(scaledJob.MaxReplicaCount()), averageLength) + maxValue = scaledjob.GetMaxValue(averageLength, scaledJob.MaxReplicaCount()) } - scalersMetrics = append(scalersMetrics, scalerMetrics{ - queueLength: queueLength, - maxValue: maxValue, - isActive: isActive, + scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{ + QueueLength: queueLength, + MaxValue: maxValue, + IsActive: isActive, }) } return scalersMetrics @@ -693,73 +686,12 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav // is active as the first return value, // the second and the third return values indicate queueLength and maxValue for scale func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { - var queueLength float64 - var maxValue float64 - isActive := false - logger := logf.Log.WithName("scalemetrics") - scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) - switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { - case "min": - for _, metrics := range scalersMetrics { - if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - case "avg": - queueLengthSum := float64(0) - maxValueSum := float64(0) - length := 0 - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLengthSum += metrics.queueLength - maxValueSum += metrics.maxValue - isActive = metrics.isActive - length++ - } - } - if length != 0 { - queueLength = queueLengthSum / float64(length) - maxValue = maxValueSum / float64(length) - } - case "sum": - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLength += metrics.queueLength - maxValue += metrics.maxValue - isActive = metrics.isActive - } - } - default: // max - for _, metrics := range scalersMetrics { - if metrics.queueLength > queueLength && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - } - - if scaledJob.MinReplicaCount() > 0 { - isActive = true - } - - maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue) - logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) - return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue) -} - -func ceilToInt64(x float64) int64 { - return int64(math.Ceil(x)) -} + scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) + isActive, queueLength, maxValue, maxFloatValue := + scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount()) -// Min function for float64 -func min(x, y float64) float64 { - if x > y { - return y - } - return x + logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxFloatValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) + return isActive, queueLength, maxValue } diff --git a/pkg/scaling/scaledjob/metrics.go b/pkg/scaling/scaledjob/metrics.go index d535dbe0d25..b04444a1061 100644 --- a/pkg/scaling/scaledjob/metrics.go +++ b/pkg/scaling/scaledjob/metrics.go @@ -3,6 +3,7 @@ package scaledjob import ( v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" + "math" ) // GetTargetAverageValue returns the average of all the metrics' average value. @@ -39,3 +40,81 @@ func CreateMetricSpec(averageValue int64, metricName string) v2.MetricSpec { }, } } + +type ScalerMetrics struct { + QueueLength float64 + MaxValue float64 + IsActive bool +} + +func IsScaledJobActive(scalersMetrics []ScalerMetrics, multipleScalersCalculation string, minReplicaCount, maxReplicaCount int64) (bool, int64, int64, float64) { + var queueLength float64 + var maxValue float64 + isActive := false + + switch multipleScalersCalculation { + case "min": + for _, metrics := range scalersMetrics { + if (queueLength == 0 || metrics.QueueLength < queueLength) && metrics.IsActive { + queueLength = metrics.QueueLength + maxValue = metrics.MaxValue + isActive = metrics.IsActive + } + } + case "avg": + queueLengthSum := float64(0) + maxValueSum := float64(0) + length := 0 + for _, metrics := range scalersMetrics { + if metrics.IsActive { + queueLengthSum += metrics.QueueLength + maxValueSum += metrics.MaxValue + isActive = metrics.IsActive + length++ + } + } + if length != 0 { + queueLength = queueLengthSum / float64(length) + maxValue = maxValueSum / float64(length) + } + case "sum": + for _, metrics := range scalersMetrics { + if metrics.IsActive { + queueLength += metrics.QueueLength + maxValue += metrics.MaxValue + isActive = metrics.IsActive + } + } + default: // max + for _, metrics := range scalersMetrics { + if metrics.QueueLength > queueLength && metrics.IsActive { + queueLength = metrics.QueueLength + maxValue = metrics.MaxValue + isActive = metrics.IsActive + } + } + } + + if minReplicaCount > 0 { + isActive = true + } + + maxValue = GetMaxValue(maxValue, maxReplicaCount) + return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue), maxValue +} + +func ceilToInt64(x float64) int64 { + return int64(math.Ceil(x)) +} + +// Min function for float64 +func min(x, y float64) float64 { + if x > y { + return y + } + return x +} + +func GetMaxValue(maxValue float64, maxReplicaCount int64) float64 { + return min(maxValue, float64(maxReplicaCount)) +} From 080651c6d0d63f37b6461ccbdd41ce5bf556796c Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 8 Jul 2023 16:04:58 +0900 Subject: [PATCH 16/21] Add change log Signed-off-by: Yoon Park --- CHANGELOG.md | 1 + pkg/scaling/scaledjob/metrics.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 557774ab611..14141d86fce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -172,6 +172,7 @@ New deprecation(s): - **General**: Bump `kubernetes-sigs/controller-runtime` to v0.15.0 and code alignment ([#4582](https://github.com/kedacore/keda/pull/4582)) - **General**: Drop a transitive dependency on bou.ke/monkey ([#4364](https://github.com/kedacore/keda/issues/4364)) - **General**: Fix odd number of arguments passed as key-value pairs for logging ([#4368](https://github.com/kedacore/keda/issues/4368)) +- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781)) - **General**: Refactor several functions for Status & Conditions handling into pkg util functions ([#2906](https://github.com/kedacore/keda/pull/2906)) - **General**: Stop logging errors for paused ScaledObject (with `autoscaling.keda.sh/paused-replicas` annotation) by skipping reconciliation loop for the object (stop the scale loop and delete the HPA) ([#4253](https://github.com/kedacore/keda/pull/4253)) - **General**: Trying to prevent operator crash when accessing `ScaledObject.Status.ScaleTargetGVKR` ([#4389](https://github.com/kedacore/keda/issues/4389)) diff --git a/pkg/scaling/scaledjob/metrics.go b/pkg/scaling/scaledjob/metrics.go index b04444a1061..92001d8ced6 100644 --- a/pkg/scaling/scaledjob/metrics.go +++ b/pkg/scaling/scaledjob/metrics.go @@ -1,9 +1,10 @@ package scaledjob import ( + "math" + v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" - "math" ) // GetTargetAverageValue returns the average of all the metrics' average value. From 8197f20e9d319a68aa5cbef85660db1fad01d059 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Wed, 12 Jul 2023 21:20:26 +0900 Subject: [PATCH 17/21] Add CalculateQueueLengthAndMaxValue at scaledJob Signed-off-by: Yoon Park --- CHANGELOG.md | 3 +-- pkg/scaling/scale_handler.go | 23 ++++------------------- pkg/scaling/scaledjob/metrics.go | 29 +++++++++++++++++++++++++---- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14141d86fce..1fe5c0f9375 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,7 +74,7 @@ New deprecation(s): ### Other -- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781)) ## v2.11.1 @@ -172,7 +172,6 @@ New deprecation(s): - **General**: Bump `kubernetes-sigs/controller-runtime` to v0.15.0 and code alignment ([#4582](https://github.com/kedacore/keda/pull/4582)) - **General**: Drop a transitive dependency on bou.ke/monkey ([#4364](https://github.com/kedacore/keda/issues/4364)) - **General**: Fix odd number of arguments passed as key-value pairs for logging ([#4368](https://github.com/kedacore/keda/issues/4368)) -- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781)) - **General**: Refactor several functions for Status & Conditions handling into pkg util functions ([#2906](https://github.com/kedacore/keda/pull/2906)) - **General**: Stop logging errors for paused ScaledObject (with `autoscaling.keda.sh/paused-replicas` annotation) by skipping reconciliation loop for the object (stop the scale loop and delete the HPA) ([#4253](https://github.com/kedacore/keda/pull/4253)) - **General**: Trying to prevent operator crash when accessing `ScaledObject.Status.ScaleTargetGVKR` ([#4389](https://github.com/kedacore/keda/issues/4389)) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 434deb37c01..649d19fd1a7 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -631,10 +631,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav var scalersMetrics []scaledjob.ScalerMetrics scalers, _ := cache.GetScalers() for i, s := range scalers { - var queueLength float64 - var targetAverageValue float64 isActive := false - maxValue := float64(0) scalerType := fmt.Sprintf("%T:", s) scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) @@ -653,26 +650,14 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) continue } - - targetAverageValue = scaledjob.GetTargetAverageValue(metricSpecs) - - var metricValue float64 - for _, m := range metrics { - if m.MetricName == metricSpecs[0].External.Metric.Name { - metricValue = m.Value.AsApproximateFloat64() - queueLength += metricValue - } - } - scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) - if isTriggerActive { isActive = true } - if targetAverageValue != 0 { - averageLength := queueLength / targetAverageValue - maxValue = scaledjob.GetMaxValue(averageLength, scaledJob.MaxReplicaCount()) - } + queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount()) + + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) + scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{ QueueLength: queueLength, MaxValue: maxValue, diff --git a/pkg/scaling/scaledjob/metrics.go b/pkg/scaling/scaledjob/metrics.go index 92001d8ced6..8d61fbb6d42 100644 --- a/pkg/scaling/scaledjob/metrics.go +++ b/pkg/scaling/scaledjob/metrics.go @@ -5,10 +5,11 @@ import ( v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/metrics/pkg/apis/external_metrics" ) // GetTargetAverageValue returns the average of all the metrics' average value. -func GetTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { +func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { var totalAverageValue float64 var metricValue float64 for _, metric := range metricSpecs { @@ -27,6 +28,23 @@ func GetTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { return 0 } +// CalculateQueueLengthAndMaxValue returns queueLength, maxValue, and targetAverageValue for the given metrics +func CalculateQueueLengthAndMaxValue(metrics []external_metrics.ExternalMetricValue, metricSpecs []v2.MetricSpec, maxReplicaCount int64) (queueLength, maxValue, targetAverageValue float64) { + var metricValue float64 + for _, m := range metrics { + if m.MetricName == metricSpecs[0].External.Metric.Name { + metricValue = m.Value.AsApproximateFloat64() + queueLength += metricValue + } + } + targetAverageValue = getTargetAverageValue(metricSpecs) + if targetAverageValue != 0 { + averageLength := queueLength / targetAverageValue + maxValue = getMaxValue(averageLength, maxReplicaCount) + } + return queueLength, maxValue, targetAverageValue +} + // CreateMetricSpec creates MetricSpec for given metric name and target value. func CreateMetricSpec(averageValue int64, metricName string) v2.MetricSpec { qty := resource.NewQuantity(averageValue, resource.DecimalSI) @@ -48,6 +66,7 @@ type ScalerMetrics struct { IsActive bool } +// IsScaledJobActive returns whether the input ScaledJob is active and queueLength and maxValue for scale func IsScaledJobActive(scalersMetrics []ScalerMetrics, multipleScalersCalculation string, minReplicaCount, maxReplicaCount int64) (bool, int64, int64, float64) { var queueLength float64 var maxValue float64 @@ -100,15 +119,16 @@ func IsScaledJobActive(scalersMetrics []ScalerMetrics, multipleScalersCalculatio isActive = true } - maxValue = GetMaxValue(maxValue, maxReplicaCount) + maxValue = getMaxValue(maxValue, maxReplicaCount) return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue), maxValue } +// ceilToInt64 returns the int64 ceil value for the float64 input func ceilToInt64(x float64) int64 { return int64(math.Ceil(x)) } -// Min function for float64 +// min returns the minimum for input float64 values func min(x, y float64) float64 { if x > y { return y @@ -116,6 +136,7 @@ func min(x, y float64) float64 { return x } -func GetMaxValue(maxValue float64, maxReplicaCount int64) float64 { +// getMaxValue returns maxValue, unless it is exceeding the MaxReplicaCount. +func getMaxValue(maxValue float64, maxReplicaCount int64) float64 { return min(maxValue, float64(maxReplicaCount)) } From 40922d3befec8412f494dd15c3ed4ec7fa88f641 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Wed, 12 Jul 2023 21:34:51 +0900 Subject: [PATCH 18/21] Use renamed function getTargetAverageValue Signed-off-by: Yoon Park --- pkg/scaling/scaledjob/metrics_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/scaling/scaledjob/metrics_test.go b/pkg/scaling/scaledjob/metrics_test.go index db3a5a92f40..9934fc3eeaa 100644 --- a/pkg/scaling/scaledjob/metrics_test.go +++ b/pkg/scaling/scaledjob/metrics_test.go @@ -11,14 +11,14 @@ func TestTargetAverageValue(t *testing.T) { // count = 0 specs := []v2.MetricSpec{} metricName := "s0-messageCount" - targetAverageValue := GetTargetAverageValue(specs) + targetAverageValue := getTargetAverageValue(specs) assert.Equal(t, float64(0), targetAverageValue) // 1 1 specs = []v2.MetricSpec{ CreateMetricSpec(1, metricName), CreateMetricSpec(1, metricName), } - targetAverageValue = GetTargetAverageValue(specs) + targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, float64(1), targetAverageValue) // 5 5 3 -> 4.333333333333333 specs = []v2.MetricSpec{ @@ -26,7 +26,7 @@ func TestTargetAverageValue(t *testing.T) { CreateMetricSpec(5, metricName), CreateMetricSpec(3, metricName), } - targetAverageValue = GetTargetAverageValue(specs) + targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, 4.333333333333333, targetAverageValue) // 5 5 4 -> 4.666666666666667 @@ -35,6 +35,6 @@ func TestTargetAverageValue(t *testing.T) { CreateMetricSpec(5, metricName), CreateMetricSpec(4, metricName), } - targetAverageValue = GetTargetAverageValue(specs) + targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, 4.666666666666667, targetAverageValue) } From 68e6779bed5746b16283acd1e51efd2db9e823f3 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Wed, 12 Jul 2023 22:41:45 +0900 Subject: [PATCH 19/21] Move createMetricSpec inside test code Signed-off-by: Yoon Park --- pkg/scaling/scale_handler_test.go | 26 +++++++++++++++++----- pkg/scaling/scaledjob/metrics.go | 16 -------------- pkg/scaling/scaledjob/metrics_test.go | 32 ++++++++++++++++++++------- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index f057e0026ef..b10a8dab183 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -41,7 +41,6 @@ import ( "github.com/kedacore/keda/v2/pkg/scalers" "github.com/kedacore/keda/v2/pkg/scaling/cache" "github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache" - "github.com/kedacore/keda/v2/pkg/scaling/scaledjob" ) func TestGetScaledObjectMetrics_DirectCall(t *testing.T) { @@ -56,7 +55,7 @@ func TestGetScaledObjectMetrics_DirectCall(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) mockStatusWriter := mock_client.NewMockStatusWriter(ctrl) - metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(10, metricName)} + metricsSpecs := []v2.MetricSpec{createMetricSpec(10, metricName)} metricValue := scalers.GenerateMetricInMili(metricName, float64(10)) metricsRecord := map[string]metricscache.MetricsRecord{} @@ -147,7 +146,7 @@ func TestGetScaledObjectMetrics_FromCache(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) mockStatusWriter := mock_client.NewMockStatusWriter(ctrl) - metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(10, metricName)} + metricsSpecs := []v2.MetricSpec{createMetricSpec(10, metricName)} metricValue := scalers.GenerateMetricInMili(metricName, float64(10)) metricsRecord := map[string]metricscache.MetricsRecord{} @@ -231,7 +230,7 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) recorder := record.NewFakeRecorder(1) - metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(1, "metric-name")} + metricsSpecs := []v2.MetricSpec{createMetricSpec(1, "metric-name")} scaler := mock_scalers.NewMockScaler(ctrl) scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) @@ -292,7 +291,7 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) recorder := record.NewFakeRecorder(1) - metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(1, "metric-name")} + metricsSpecs := []v2.MetricSpec{createMetricSpec(1, "metric-name")} activeFactory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { scaler := mock_scalers.NewMockScaler(ctrl) @@ -603,7 +602,7 @@ func createScaledJob(minReplicaCount int32, maxReplicaCount int32, multipleScale func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool, metricName string) *mock_scalers.MockScaler { scaler := mock_scalers.NewMockScaler(ctrl) - metricsSpecs := []v2.MetricSpec{scaledjob.CreateMetricSpec(averageValue, metricName)} + metricsSpecs := []v2.MetricSpec{createMetricSpec(averageValue, metricName)} metrics := []external_metrics.ExternalMetricValue{ { @@ -616,3 +615,18 @@ func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64 scaler.EXPECT().Close(gomock.Any()) return scaler } + +// createMetricSpec creates MetricSpec for given metric name and target value. +func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { + qty := resource.NewQuantity(averageValue, resource.DecimalSI) + return v2.MetricSpec{ + External: &v2.ExternalMetricSource{ + Target: v2.MetricTarget{ + AverageValue: qty, + }, + Metric: v2.MetricIdentifier{ + Name: metricName, + }, + }, + } +} diff --git a/pkg/scaling/scaledjob/metrics.go b/pkg/scaling/scaledjob/metrics.go index 8d61fbb6d42..4164cee4868 100644 --- a/pkg/scaling/scaledjob/metrics.go +++ b/pkg/scaling/scaledjob/metrics.go @@ -4,7 +4,6 @@ import ( "math" v2 "k8s.io/api/autoscaling/v2" - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/metrics/pkg/apis/external_metrics" ) @@ -45,21 +44,6 @@ func CalculateQueueLengthAndMaxValue(metrics []external_metrics.ExternalMetricVa return queueLength, maxValue, targetAverageValue } -// CreateMetricSpec creates MetricSpec for given metric name and target value. -func CreateMetricSpec(averageValue int64, metricName string) v2.MetricSpec { - qty := resource.NewQuantity(averageValue, resource.DecimalSI) - return v2.MetricSpec{ - External: &v2.ExternalMetricSource{ - Target: v2.MetricTarget{ - AverageValue: qty, - }, - Metric: v2.MetricIdentifier{ - Name: metricName, - }, - }, - } -} - type ScalerMetrics struct { QueueLength float64 MaxValue float64 diff --git a/pkg/scaling/scaledjob/metrics_test.go b/pkg/scaling/scaledjob/metrics_test.go index 9934fc3eeaa..634d96f0a36 100644 --- a/pkg/scaling/scaledjob/metrics_test.go +++ b/pkg/scaling/scaledjob/metrics_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/assert" v2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/resource" ) func TestTargetAverageValue(t *testing.T) { @@ -15,26 +16,41 @@ func TestTargetAverageValue(t *testing.T) { assert.Equal(t, float64(0), targetAverageValue) // 1 1 specs = []v2.MetricSpec{ - CreateMetricSpec(1, metricName), - CreateMetricSpec(1, metricName), + createMetricSpec(1, metricName), + createMetricSpec(1, metricName), } targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, float64(1), targetAverageValue) // 5 5 3 -> 4.333333333333333 specs = []v2.MetricSpec{ - CreateMetricSpec(5, metricName), - CreateMetricSpec(5, metricName), - CreateMetricSpec(3, metricName), + createMetricSpec(5, metricName), + createMetricSpec(5, metricName), + createMetricSpec(3, metricName), } targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, 4.333333333333333, targetAverageValue) // 5 5 4 -> 4.666666666666667 specs = []v2.MetricSpec{ - CreateMetricSpec(5, metricName), - CreateMetricSpec(5, metricName), - CreateMetricSpec(4, metricName), + createMetricSpec(5, metricName), + createMetricSpec(5, metricName), + createMetricSpec(4, metricName), } targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, 4.666666666666667, targetAverageValue) } + +// createMetricSpec creates MetricSpec for given metric name and target value. +func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { + qty := resource.NewQuantity(averageValue, resource.DecimalSI) + return v2.MetricSpec{ + External: &v2.ExternalMetricSource{ + Target: v2.MetricTarget{ + AverageValue: qty, + }, + Metric: v2.MetricIdentifier{ + Name: metricName, + }, + }, + } +} From f9344e9c4a17562882c051311c7f870745efa0aa Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Wed, 12 Jul 2023 23:17:33 +0900 Subject: [PATCH 20/21] Use different metric name to pass static analysis Signed-off-by: Yoon Park --- pkg/scaling/scaledjob/metrics_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/scaling/scaledjob/metrics_test.go b/pkg/scaling/scaledjob/metrics_test.go index 634d96f0a36..502bde885e5 100644 --- a/pkg/scaling/scaledjob/metrics_test.go +++ b/pkg/scaling/scaledjob/metrics_test.go @@ -19,6 +19,8 @@ func TestTargetAverageValue(t *testing.T) { createMetricSpec(1, metricName), createMetricSpec(1, metricName), } + + metricName = "s1-messageCount" targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, float64(1), targetAverageValue) // 5 5 3 -> 4.333333333333333 @@ -27,6 +29,8 @@ func TestTargetAverageValue(t *testing.T) { createMetricSpec(5, metricName), createMetricSpec(3, metricName), } + + metricName = "s2-messageCount" targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, 4.333333333333333, targetAverageValue) @@ -36,6 +40,8 @@ func TestTargetAverageValue(t *testing.T) { createMetricSpec(5, metricName), createMetricSpec(4, metricName), } + + metricName = "s3-messageCount" targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, 4.666666666666667, targetAverageValue) } From 4259185b0178b852af8d643ceb816ec12874b0d8 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Wed, 12 Jul 2023 23:36:34 +0900 Subject: [PATCH 21/21] Delete ineffectual assignment Signed-off-by: Yoon Park --- pkg/scaling/scaledjob/metrics_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/scaling/scaledjob/metrics_test.go b/pkg/scaling/scaledjob/metrics_test.go index 502bde885e5..de8d07ffca0 100644 --- a/pkg/scaling/scaledjob/metrics_test.go +++ b/pkg/scaling/scaledjob/metrics_test.go @@ -41,7 +41,6 @@ func TestTargetAverageValue(t *testing.T) { createMetricSpec(4, metricName), } - metricName = "s3-messageCount" targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, 4.666666666666667, targetAverageValue) }