diff --git a/CHANGELOG.md b/CHANGELOG.md index 557774ab611..1fe5c0f9375 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,7 +74,7 @@ New deprecation(s): ### Other -- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781)) ## v2.11.1 diff --git a/apis/keda/v1alpha1/identifier_test.go b/apis/keda/v1alpha1/identifier_test.go index 251829c190a..312f88db914 100644 --- a/apis/keda/v1alpha1/identifier_test.go +++ b/apis/keda/v1alpha1/identifier_test.go @@ -14,39 +14,37 @@ type testData struct { soKind string } -var tests = []testData{ - { - name: "all lowercase", - expectedIdentifier: "scaledobject.namespace.name", - soName: "name", - soNamespace: "namespace", - soKind: "scaledobject", - }, - { - name: "all uppercase", - expectedIdentifier: "scaledobject.namespace.name", - soName: "NAME", - soNamespace: "NAMESPACE", - soKind: "SCALEDOBJECT", - }, - { - name: "camel case", - expectedIdentifier: "scaledobject.namespace.name", - soName: "name", - soNamespace: "namespace", - soKind: "scaledobject", - }, - { - name: "missing namespace", - expectedIdentifier: "scaledobject..name", - soName: "name", - soKind: "scaledobject", - }, -} - func TestGeneratedIdentifierForScaledObject(t *testing.T) { + tests := []testData{ + { + name: "all lowercase", + expectedIdentifier: "scaledobject.namespace.name", + soName: "name", + soNamespace: "namespace", + soKind: "scaledobject", + }, + { + name: "all uppercase", + expectedIdentifier: "scaledobject.namespace.name", + soName: "NAME", + soNamespace: "NAMESPACE", + soKind: "SCALEDOBJECT", + }, + { + name: "camel case", + expectedIdentifier: "scaledobject.namespace.name", + soName: "name", + soNamespace: "namespace", + soKind: "scaledobject", + }, + { + name: "missing namespace", + expectedIdentifier: "scaledobject..name", + soName: "name", + soKind: "scaledobject", + }, + } for _, test := range tests { - test := test t.Run(test.name, func(t *testing.T) { expectedIdentifier := test.expectedIdentifier genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName) @@ -79,3 +77,67 @@ func TestGeneratedIdentifierForScaledObject(t *testing.T) { }) } } + +func TestGeneratedIdentifierForScaledJob(t *testing.T) { + tests := []testData{ + { + name: "all lowercase", + expectedIdentifier: "scaledjob.namespace.name", + soName: "name", + soNamespace: "namespace", + soKind: "scaledjob", + }, + { + name: "all uppercase", + expectedIdentifier: "scaledjob.namespace.name", + soName: "NAME", + soNamespace: "NAMESPACE", + soKind: "SCALEDJOB", + }, + { + name: "camel case", + expectedIdentifier: "scaledjob.namespace.name", + soName: "name", + soNamespace: "namespace", + soKind: "scaledjob", + }, + { + name: "missing namespace", + expectedIdentifier: "scaledjob..name", + soName: "name", + soKind: "scaledjob", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + expectedIdentifier := test.expectedIdentifier + genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName) + + scaledJob := &ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: test.soName, + Namespace: test.soNamespace, + }, + } + scaledJobIdentifier := scaledJob.GenerateIdentifier() + + withTriggers, err := AsDuckWithTriggers(scaledJob) + if err != nil { + t.Errorf("got error while converting to WithTriggers object: %s", err) + } + withTriggersIdentifier := withTriggers.GenerateIdentifier() + + if expectedIdentifier != genericIdentifier { + t.Errorf("genericIdentifier=%q doesn't equal the expectedIdentifier=%q", genericIdentifier, expectedIdentifier) + } + + if expectedIdentifier != scaledJobIdentifier { + t.Errorf("scaledJobIdentifier=%q doesn't equal the expectedIdentifier=%q", scaledJobIdentifier, expectedIdentifier) + } + + if expectedIdentifier != withTriggersIdentifier { + t.Errorf("withTriggersIdentifier=%q doesn't equal the expectedIdentifier=%q", withTriggersIdentifier, expectedIdentifier) + } + }) + } +} diff --git a/apis/keda/v1alpha1/scaledjob_types.go b/apis/keda/v1alpha1/scaledjob_types.go index d6aa1fa4852..366f22a14cd 100644 --- a/apis/keda/v1alpha1/scaledjob_types.go +++ b/apis/keda/v1alpha1/scaledjob_types.go @@ -142,3 +142,7 @@ func (s ScaledJob) MinReplicaCount() int64 { } return defaultScaledJobMinReplicaCount } + +func (s *ScaledJob) GenerateIdentifier() string { + return GenerateIdentifier("ScaledJob", s.Namespace, s.Name) +} diff --git a/apis/keda/v1alpha1/withtriggers_types.go b/apis/keda/v1alpha1/withtriggers_types.go index 5d280499498..5df47ff2b08 100644 --- a/apis/keda/v1alpha1/withtriggers_types.go +++ b/apis/keda/v1alpha1/withtriggers_types.go @@ -94,7 +94,7 @@ func (t *WithTriggers) GenerateIdentifier() string { return GenerateIdentifier(t.InternalKind, t.Namespace, t.Name) } -// AsDuckWithTriggers tries to generates WithTriggers object for input object +// AsDuckWithTriggers tries to generate WithTriggers object for input object // returns error if input object is unknown func AsDuckWithTriggers(scalableObject interface{}) (*WithTriggers, error) { switch obj := scalableObject.(type) { diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index de14500b9cf..da3a0d05100 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -19,17 +19,14 @@ package cache import ( "context" "fmt" - "math" "time" v2 "k8s.io/api/autoscaling/v2" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "k8s.io/metrics/pkg/apis/external_metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/scalers" ) @@ -141,68 +138,6 @@ func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index return metric, activity, time.Since(startTime).Milliseconds(), err } -// TODO needs refactor - move ScaledJob related methods to scale_handler, the similar way ScaledObject methods are -// refactor logic -func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { - var queueLength float64 - var maxValue float64 - isActive := false - - logger := logf.Log.WithName("scalemetrics") - scalersMetrics := c.getScaledJobMetrics(ctx, scaledJob) - switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation { - case "min": - for _, metrics := range scalersMetrics { - if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - case "avg": - queueLengthSum := float64(0) - maxValueSum := float64(0) - length := 0 - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLengthSum += metrics.queueLength - maxValueSum += metrics.maxValue - isActive = metrics.isActive - length++ - } - } - if length != 0 { - queueLength = queueLengthSum / float64(length) - maxValue = maxValueSum / float64(length) - } - case "sum": - for _, metrics := range scalersMetrics { - if metrics.isActive { - queueLength += metrics.queueLength - maxValue += metrics.maxValue - isActive = metrics.isActive - } - } - default: // max - for _, metrics := range scalersMetrics { - if metrics.queueLength > queueLength && metrics.isActive { - queueLength = metrics.queueLength - maxValue = metrics.maxValue - isActive = metrics.isActive - } - } - } - - if scaledJob.MinReplicaCount() > 0 { - isActive = true - } - - maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue) - logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) - - return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue) -} - func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) { if id < 0 || id >= len(c.Scalers) { return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers)) @@ -226,105 +161,3 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale return ns, nil } - -type scalerMetrics struct { - queueLength float64 - maxValue float64 - isActive bool -} - -func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics { - // TODO this loop should be probably done similar way the ScaledObject loop is done - var scalersMetrics []scalerMetrics - for i, s := range c.Scalers { - var queueLength float64 - var targetAverageValue float64 - isActive := false - maxValue := float64(0) - scalerType := fmt.Sprintf("%T:", s) - - scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) - - metricSpecs := s.Scaler.GetMetricSpecForScaling(ctx) - - // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) - // or skip cpu/memory resource scaler - if len(metricSpecs) < 1 || metricSpecs[0].External == nil { - continue - } - - // TODO here we should probably loop through all metrics in a Scaler - // as it is done for ScaledObject - metrics, isTriggerActive, err := s.Scaler.GetMetricsAndActivity(ctx, metricSpecs[0].External.Metric.Name) - if err != nil { - var ns scalers.Scaler - ns, err = c.refreshScaler(ctx, i) - if err == nil { - metrics, isTriggerActive, err = ns.GetMetricsAndActivity(ctx, metricSpecs[0].External.Metric.Name) - } - } - - if err != nil { - scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) - c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - continue - } - - targetAverageValue = getTargetAverageValue(metricSpecs) - - var metricValue float64 - for _, m := range metrics { - if m.MetricName == metricSpecs[0].External.Metric.Name { - metricValue = m.Value.AsApproximateFloat64() - queueLength += metricValue - } - } - scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) - - if isTriggerActive { - isActive = true - } - - if targetAverageValue != 0 { - averageLength := queueLength / targetAverageValue - maxValue = min(float64(scaledJob.MaxReplicaCount()), averageLength) - } - scalersMetrics = append(scalersMetrics, scalerMetrics{ - queueLength: queueLength, - maxValue: maxValue, - isActive: isActive, - }) - } - return scalersMetrics -} - -func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { - var targetAverageValue float64 - var metricValue float64 - for _, metric := range metricSpecs { - if metric.External.Target.AverageValue == nil { - metricValue = 0 - } else { - metricValue = metric.External.Target.AverageValue.AsApproximateFloat64() - } - - targetAverageValue += metricValue - } - count := float64(len(metricSpecs)) - if count != 0 { - return targetAverageValue / count - } - return 0 -} - -func ceilToInt64(x float64) int64 { - return int64(math.Ceil(x)) -} - -// Min function for float64 -func min(x, y float64) float64 { - if x > y { - return y - } - return x -} diff --git a/pkg/scaling/cache/scalers_cache_test.go b/pkg/scaling/cache/scalers_cache_test.go deleted file mode 100644 index a8559288d7f..00000000000 --- a/pkg/scaling/cache/scalers_cache_test.go +++ /dev/null @@ -1,280 +0,0 @@ -package cache - -import ( - "context" - "fmt" - "testing" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - v2 "k8s.io/api/autoscaling/v2" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/client-go/tools/record" - "k8s.io/metrics/pkg/apis/external_metrics" - - kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" - "github.com/kedacore/keda/v2/pkg/scalers" -) - -func TestTargetAverageValue(t *testing.T) { - // count = 0 - specs := []v2.MetricSpec{} - metricName := "s0-messageCount" - targetAverageValue := getTargetAverageValue(specs) - assert.Equal(t, float64(0), targetAverageValue) - // 1 1 - specs = []v2.MetricSpec{ - createMetricSpec(1, metricName), - createMetricSpec(1, metricName), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, float64(1), targetAverageValue) - // 5 5 3 -> 4.333333333333333 - specs = []v2.MetricSpec{ - createMetricSpec(5, metricName), - createMetricSpec(5, metricName), - createMetricSpec(3, metricName), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, 4.333333333333333, targetAverageValue) - - // 5 5 4 -> 4.666666666666667 - specs = []v2.MetricSpec{ - createMetricSpec(5, metricName), - createMetricSpec(5, metricName), - createMetricSpec(4, metricName), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, 4.666666666666667, targetAverageValue) -} - -func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { - qty := resource.NewQuantity(averageValue, resource.DecimalSI) - return v2.MetricSpec{ - External: &v2.ExternalMetricSource{ - Target: v2.MetricTarget{ - AverageValue: qty, - }, - Metric: v2.MetricIdentifier{ - Name: metricName, - }, - }, - } -} - -func TestIsScaledJobActive(t *testing.T) { - metricName := "s0-queueLength" - ctrl := gomock.NewController(t) - recorder := record.NewFakeRecorder(1) - // Keep the current behavior - // Assme 1 trigger only - scaledJobSingle := createScaledJob(0, 100, "") // testing default = max - scalerSingle := []ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil - }, - }} - - cache := ScalersCache{ - Scalers: scalerSingle, - Recorder: recorder, - } - - isActive, queueLength, maxValue := cache.IsScaledJobActive(context.TODO(), scaledJobSingle) - assert.Equal(t, true, isActive) - assert.Equal(t, int64(20), queueLength) - assert.Equal(t, int64(10), maxValue) - cache.Close(context.Background()) - - // Non-Active trigger only - scalerSingle = []ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(0), int64(2), false, metricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, int64(0), int64(2), false, metricName), &scalers.ScalerConfig{}, nil - }, - }} - - cache = ScalersCache{ - Scalers: scalerSingle, - Recorder: recorder, - } - - isActive, queueLength, maxValue = cache.IsScaledJobActive(context.TODO(), scaledJobSingle) - assert.Equal(t, false, isActive) - assert.Equal(t, int64(0), queueLength) - assert.Equal(t, int64(0), maxValue) - cache.Close(context.Background()) - - // Test the valiation - scalerTestDatam := []scalerTestData{ - newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20), - newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2), - newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9), - newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27), - newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25), - } - - for index, scalerTestData := range scalerTestDatam { - scaledJob := createScaledJob(scalerTestData.MinReplicaCount, scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) - scalersToTest := []ScalerBuilder{{ - Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil - }, - }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil - }, - }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil - }, - }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil - }, - }} - - cache = ScalersCache{ - Scalers: scalersToTest, - Recorder: recorder, - } - fmt.Printf("index: %d", index) - isActive, queueLength, maxValue = cache.IsScaledJobActive(context.TODO(), scaledJob) - // assert.Equal(t, 5, index) - assert.Equal(t, scalerTestData.ResultIsActive, isActive) - assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) - assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) - cache.Close(context.Background()) - } -} - -func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T) { - metricName := "s0-queueLength" - ctrl := gomock.NewController(t) - recorder := record.NewFakeRecorder(1) - // Keep the current behavior - // Assme 1 trigger only - scaledJobSingle := createScaledJob(1, 100, "") // testing default = max - scalerSingle := []ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(0), int64(1), true, metricName), - Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { - return createScaler(ctrl, int64(0), int64(1), true, metricName), &scalers.ScalerConfig{}, nil - }, - }} - - cache := ScalersCache{ - Scalers: scalerSingle, - Recorder: recorder, - } - - isActive, queueLength, maxValue := cache.IsScaledJobActive(context.TODO(), scaledJobSingle) - assert.Equal(t, true, isActive) - assert.Equal(t, int64(0), queueLength) - assert.Equal(t, int64(0), maxValue) - cache.Close(context.Background()) -} - -func newScalerTestData( - metricName string, - maxReplicaCount int, - multipleScalersCalculation string, - scaler1QueueLength, //nolint:golint,unparam - scaler1AverageValue int, //nolint:golint,unparam - scaler1IsActive bool, //nolint:golint,unparam - scaler2QueueLength, //nolint:golint,unparam - scaler2AverageValue int, //nolint:golint,unparam - scaler2IsActive bool, //nolint:golint,unparam - scaler3QueueLength, //nolint:golint,unparam - scaler3AverageValue int, //nolint:golint,unparam - scaler3IsActive bool, //nolint:golint,unparam - scaler4QueueLength, //nolint:golint,unparam - scaler4AverageValue int, //nolint:golint,unparam - scaler4IsActive bool, //nolint:golint,unparam - resultIsActive bool, //nolint:golint,unparam - resultQueueLength, - resultMaxLength int) scalerTestData { - return scalerTestData{ - MetricName: metricName, - MaxReplicaCount: int32(maxReplicaCount), - MultipleScalersCalculation: multipleScalersCalculation, - Scaler1QueueLength: int64(scaler1QueueLength), - Scaler1AverageValue: int64(scaler1AverageValue), - Scaler1IsActive: scaler1IsActive, - Scaler2QueueLength: int64(scaler2QueueLength), - Scaler2AverageValue: int64(scaler2AverageValue), - Scaler2IsActive: scaler2IsActive, - Scaler3QueueLength: int64(scaler3QueueLength), - Scaler3AverageValue: int64(scaler3AverageValue), - Scaler3IsActive: scaler3IsActive, - Scaler4QueueLength: int64(scaler4QueueLength), - Scaler4AverageValue: int64(scaler4AverageValue), - Scaler4IsActive: scaler4IsActive, - ResultIsActive: resultIsActive, - ResultQueueLength: int64(resultQueueLength), - ResultMaxValue: int64(resultMaxLength), - } -} - -type scalerTestData struct { - MetricName string - MaxReplicaCount int32 - MultipleScalersCalculation string - Scaler1QueueLength int64 - Scaler1AverageValue int64 - Scaler1IsActive bool - Scaler2QueueLength int64 - Scaler2AverageValue int64 - Scaler2IsActive bool - Scaler3QueueLength int64 - Scaler3AverageValue int64 - Scaler3IsActive bool - Scaler4QueueLength int64 - Scaler4AverageValue int64 - Scaler4IsActive bool - ResultIsActive bool - ResultQueueLength int64 - ResultMaxValue int64 - MinReplicaCount int32 -} - -func createScaledJob(minReplicaCount int32, maxReplicaCount int32, multipleScalersCalculation string) *kedav1alpha1.ScaledJob { - if multipleScalersCalculation != "" { - return &kedav1alpha1.ScaledJob{ - Spec: kedav1alpha1.ScaledJobSpec{ - MinReplicaCount: &minReplicaCount, - MaxReplicaCount: &maxReplicaCount, - ScalingStrategy: kedav1alpha1.ScalingStrategy{ - MultipleScalersCalculation: multipleScalersCalculation, - }, - }, - } - } - return &kedav1alpha1.ScaledJob{ - Spec: kedav1alpha1.ScaledJobSpec{ - MinReplicaCount: &minReplicaCount, - MaxReplicaCount: &maxReplicaCount, - }, - } -} - -func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool, metricName string) *mock_scalers.MockScaler { - scaler := mock_scalers.NewMockScaler(ctrl) - metricsSpecs := []v2.MetricSpec{createMetricSpec(averageValue, metricName)} - - metrics := []external_metrics.ExternalMetricValue{ - { - MetricName: metricName, - Value: *resource.NewQuantity(queueLength, resource.DecimalSI), - }, - } - scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) - scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return(metrics, isActive, nil) - scaler.EXPECT().Close(gomock.Any()) - return scaler -} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 192cc8ff2a2..649d19fd1a7 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -42,6 +42,7 @@ import ( "github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache" "github.com/kedacore/keda/v2/pkg/scaling/executor" "github.com/kedacore/keda/v2/pkg/scaling/resolver" + "github.com/kedacore/keda/v2/pkg/scaling/scaledjob" ) var log = logf.Log.WithName("scale_handler") @@ -246,19 +247,13 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac h.scaledObjectsMetricCache.StoreRecords(obj.GenerateIdentifier(), metricsRecords) } case *kedav1alpha1.ScaledJob: - cache, err := h.GetScalersCache(ctx, scalableObject) - if err != nil { - log.Error(err, "error getting scalers cache", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) - return - } - - err = h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) + err := h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) if err != nil { log.Error(err, "error getting scaledJob", "scaledJob.Namespace", obj.Namespace, "scaledJob.Name", obj.Name) return } - isActive, scaleTo, maxScale := cache.IsScaledJobActive(ctx, obj) + isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, obj) h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale) } } @@ -404,7 +399,7 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int /// ---------- ScaledObject related methods --------- /// /// --------------------------------------------------------------------------- /// -// GetScaledObjectMetrics returns metrics for specified metric name for a ScaledObject identified by it's name and namespace. +// GetScaledObjectMetrics returns metrics for specified metric name for a ScaledObject identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error) { logger := log.WithValues("scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName) @@ -514,9 +509,9 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN // getScaledObjectState returns whether the input ScaledObject: // is active as the first return value, -// the second return value indicates whether there was any error during quering scalers, -// the third return value is a map of metrics record - a metric value for each scaler and it's metric -// the fourth return value contains error if is not able access scalers cache +// the second return value indicates whether there was any error during querying scalers, +// the third return value is a map of metrics record - a metric value for each scaler and its metric +// the fourth return value contains error if is not able to access scalers cache func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, map[string]metricscache.MetricsRecord, error) { logger := log.WithValues("scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) @@ -620,3 +615,68 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k return isScaledObjectActive, isScalerError, metricsRecord, nil } + +// / --------------------------------------------------------------------------- /// +// / ---------- ScaledJob related methods --------- /// +// / --------------------------------------------------------------------------- /// + +// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. +// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. +func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics { + cache, err := h.GetScalersCache(ctx, scaledJob) + if err != nil { + log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) + return nil + } + var scalersMetrics []scaledjob.ScalerMetrics + scalers, _ := cache.GetScalers() + for i, s := range scalers { + isActive := false + scalerType := fmt.Sprintf("%T:", s) + + scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) + + metricSpecs := s.GetMetricSpecForScaling(ctx) + + // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) + // or skip cpu/memory resource scaler + if len(metricSpecs) < 1 || metricSpecs[0].External == nil { + continue + } + + metrics, isTriggerActive, _, err := cache.GetMetricsAndActivityForScaler(ctx, i, metricSpecs[0].External.Metric.Name) + if err != nil { + scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) + cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + continue + } + if isTriggerActive { + isActive = true + } + + queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount()) + + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) + + scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{ + QueueLength: queueLength, + MaxValue: maxValue, + IsActive: isActive, + }) + } + return scalersMetrics +} + +// isScaledJobActive returns whether the input ScaledJob: +// is active as the first return value, +// the second and the third return values indicate queueLength and maxValue for scale +func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { + logger := logf.Log.WithName("scalemetrics") + + scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob) + isActive, queueLength, maxValue, maxFloatValue := + scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount()) + + logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxFloatValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation) + return isActive, queueLength, maxValue +} diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index dc9b40fdf5d..b10a8dab183 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -19,6 +19,7 @@ package scaling import ( "context" "errors" + "fmt" "sync" "testing" "time" @@ -26,6 +27,8 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" v2 "k8s.io/api/autoscaling/v2" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" @@ -357,6 +360,263 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { assert.Equal(t, true, isError) } +func TestIsScaledJobActive(t *testing.T) { + metricName := "s0-queueLength" + ctrl := gomock.NewController(t) + recorder := record.NewFakeRecorder(1) + // Keep the current behavior + // Assme 1 trigger only + scaledJobSingle := createScaledJob(1, 100, "") // testing default = max + scalerCache := cache.ScalersCache{ + Scalers: []cache.ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil + }, + }}, + Recorder: recorder, + } + + caches := map[string]*cache.ScalersCache{} + caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache + + sh := scaleHandler{ + scaleLoopContexts: &sync.Map{}, + globalHTTPTimeout: time.Duration(1000), + recorder: recorder, + scalerCaches: caches, + scalerCachesLock: &sync.RWMutex{}, + scaledObjectsMetricCache: metricscache.NewMetricsCache(), + } + isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle) + assert.Equal(t, true, isActive) + assert.Equal(t, int64(20), queueLength) + assert.Equal(t, int64(10), maxValue) + scalerCache.Close(context.Background()) + + // Test the valiation + scalerTestDatam := []scalerTestData{ + newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20), + newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2), + newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9), + newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27), + newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25), + } + + for index, scalerTestData := range scalerTestDatam { + scaledJob := createScaledJob(scalerTestData.MinReplicaCount, scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) + scalersToTest := []cache.ScalerBuilder{{ + Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil + }, + }, { + Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil + }, + }} + + scalerCache = cache.ScalersCache{ + Scalers: scalersToTest, + Recorder: recorder, + } + + caches = map[string]*cache.ScalersCache{} + caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache + + sh = scaleHandler{ + scaleLoopContexts: &sync.Map{}, + globalHTTPTimeout: time.Duration(1000), + recorder: recorder, + scalerCaches: caches, + scalerCachesLock: &sync.RWMutex{}, + scaledObjectsMetricCache: metricscache.NewMetricsCache(), + } + fmt.Printf("index: %d", index) + isActive, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob) + // assert.Equal(t, 5, index) + assert.Equal(t, scalerTestData.ResultIsActive, isActive) + assert.Equal(t, scalerTestData.ResultQueueLength, queueLength) + assert.Equal(t, scalerTestData.ResultMaxValue, maxValue) + scalerCache.Close(context.Background()) + } +} + +func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T) { + metricName := "s0-queueLength" + ctrl := gomock.NewController(t) + recorder := record.NewFakeRecorder(1) + // Keep the current behavior + // Assme 1 trigger only + scaledJobSingle := createScaledJob(1, 100, "") // testing default = max + scalerSingle := []cache.ScalerBuilder{{ + Scaler: createScaler(ctrl, int64(0), int64(1), true, metricName), + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, int64(0), int64(1), true, metricName), &scalers.ScalerConfig{}, nil + }, + }} + + scalerCache := cache.ScalersCache{ + Scalers: scalerSingle, + Recorder: recorder, + } + + caches := map[string]*cache.ScalersCache{} + caches[scaledJobSingle.GenerateIdentifier()] = &scalerCache + + sh := scaleHandler{ + scaleLoopContexts: &sync.Map{}, + globalHTTPTimeout: time.Duration(1000), + recorder: recorder, + scalerCaches: caches, + scalerCachesLock: &sync.RWMutex{}, + scaledObjectsMetricCache: metricscache.NewMetricsCache(), + } + + isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle) + assert.Equal(t, true, isActive) + assert.Equal(t, int64(0), queueLength) + assert.Equal(t, int64(0), maxValue) + scalerCache.Close(context.Background()) +} + +func newScalerTestData( + metricName string, + maxReplicaCount int, + multipleScalersCalculation string, + scaler1QueueLength, //nolint:golint,unparam + scaler1AverageValue int, //nolint:golint,unparam + scaler1IsActive bool, //nolint:golint,unparam + scaler2QueueLength, //nolint:golint,unparam + scaler2AverageValue int, //nolint:golint,unparam + scaler2IsActive bool, //nolint:golint,unparam + scaler3QueueLength, //nolint:golint,unparam + scaler3AverageValue int, //nolint:golint,unparam + scaler3IsActive bool, //nolint:golint,unparam + scaler4QueueLength, //nolint:golint,unparam + scaler4AverageValue int, //nolint:golint,unparam + scaler4IsActive bool, //nolint:golint,unparam + resultIsActive bool, //nolint:golint,unparam + resultQueueLength, + resultMaxLength int) scalerTestData { + return scalerTestData{ + MetricName: metricName, + MaxReplicaCount: int32(maxReplicaCount), + MultipleScalersCalculation: multipleScalersCalculation, + Scaler1QueueLength: int64(scaler1QueueLength), + Scaler1AverageValue: int64(scaler1AverageValue), + Scaler1IsActive: scaler1IsActive, + Scaler2QueueLength: int64(scaler2QueueLength), + Scaler2AverageValue: int64(scaler2AverageValue), + Scaler2IsActive: scaler2IsActive, + Scaler3QueueLength: int64(scaler3QueueLength), + Scaler3AverageValue: int64(scaler3AverageValue), + Scaler3IsActive: scaler3IsActive, + Scaler4QueueLength: int64(scaler4QueueLength), + Scaler4AverageValue: int64(scaler4AverageValue), + Scaler4IsActive: scaler4IsActive, + ResultIsActive: resultIsActive, + ResultQueueLength: int64(resultQueueLength), + ResultMaxValue: int64(resultMaxLength), + } +} + +type scalerTestData struct { + MetricName string + MaxReplicaCount int32 + MultipleScalersCalculation string + Scaler1QueueLength int64 + Scaler1AverageValue int64 + Scaler1IsActive bool + Scaler2QueueLength int64 + Scaler2AverageValue int64 + Scaler2IsActive bool + Scaler3QueueLength int64 + Scaler3AverageValue int64 + Scaler3IsActive bool + Scaler4QueueLength int64 + Scaler4AverageValue int64 + Scaler4IsActive bool + ResultIsActive bool + ResultQueueLength int64 + ResultMaxValue int64 + MinReplicaCount int32 +} + +func createScaledJob(minReplicaCount int32, maxReplicaCount int32, multipleScalersCalculation string) *kedav1alpha1.ScaledJob { + if multipleScalersCalculation != "" { + return &kedav1alpha1.ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + Spec: kedav1alpha1.ScaledJobSpec{ + MinReplicaCount: &minReplicaCount, + MaxReplicaCount: &maxReplicaCount, + ScalingStrategy: kedav1alpha1.ScalingStrategy{ + MultipleScalersCalculation: multipleScalersCalculation, + }, + JobTargetRef: &batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + }, + EnvSourceContainerName: "test", + }, + } + } + return &kedav1alpha1.ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + Spec: kedav1alpha1.ScaledJobSpec{ + MinReplicaCount: &minReplicaCount, + MaxReplicaCount: &maxReplicaCount, + JobTargetRef: &batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + }, + EnvSourceContainerName: "test", + }, + } +} + +func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool, metricName string) *mock_scalers.MockScaler { + scaler := mock_scalers.NewMockScaler(ctrl) + metricsSpecs := []v2.MetricSpec{createMetricSpec(averageValue, metricName)} + + metrics := []external_metrics.ExternalMetricValue{ + { + MetricName: metricName, + Value: *resource.NewQuantity(queueLength, resource.DecimalSI), + }, + } + scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) + scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return(metrics, isActive, nil) + scaler.EXPECT().Close(gomock.Any()) + return scaler +} + +// createMetricSpec creates MetricSpec for given metric name and target value. func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { qty := resource.NewQuantity(averageValue, resource.DecimalSI) return v2.MetricSpec{ diff --git a/pkg/scaling/scaledjob/metrics.go b/pkg/scaling/scaledjob/metrics.go new file mode 100644 index 00000000000..4164cee4868 --- /dev/null +++ b/pkg/scaling/scaledjob/metrics.go @@ -0,0 +1,126 @@ +package scaledjob + +import ( + "math" + + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +// GetTargetAverageValue returns the average of all the metrics' average value. +func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 { + var totalAverageValue float64 + var metricValue float64 + for _, metric := range metricSpecs { + if metric.External.Target.AverageValue == nil { + metricValue = 0 + } else { + metricValue = metric.External.Target.AverageValue.AsApproximateFloat64() + } + + totalAverageValue += metricValue + } + count := float64(len(metricSpecs)) + if count != 0 { + return totalAverageValue / count + } + return 0 +} + +// CalculateQueueLengthAndMaxValue returns queueLength, maxValue, and targetAverageValue for the given metrics +func CalculateQueueLengthAndMaxValue(metrics []external_metrics.ExternalMetricValue, metricSpecs []v2.MetricSpec, maxReplicaCount int64) (queueLength, maxValue, targetAverageValue float64) { + var metricValue float64 + for _, m := range metrics { + if m.MetricName == metricSpecs[0].External.Metric.Name { + metricValue = m.Value.AsApproximateFloat64() + queueLength += metricValue + } + } + targetAverageValue = getTargetAverageValue(metricSpecs) + if targetAverageValue != 0 { + averageLength := queueLength / targetAverageValue + maxValue = getMaxValue(averageLength, maxReplicaCount) + } + return queueLength, maxValue, targetAverageValue +} + +type ScalerMetrics struct { + QueueLength float64 + MaxValue float64 + IsActive bool +} + +// IsScaledJobActive returns whether the input ScaledJob is active and queueLength and maxValue for scale +func IsScaledJobActive(scalersMetrics []ScalerMetrics, multipleScalersCalculation string, minReplicaCount, maxReplicaCount int64) (bool, int64, int64, float64) { + var queueLength float64 + var maxValue float64 + isActive := false + + switch multipleScalersCalculation { + case "min": + for _, metrics := range scalersMetrics { + if (queueLength == 0 || metrics.QueueLength < queueLength) && metrics.IsActive { + queueLength = metrics.QueueLength + maxValue = metrics.MaxValue + isActive = metrics.IsActive + } + } + case "avg": + queueLengthSum := float64(0) + maxValueSum := float64(0) + length := 0 + for _, metrics := range scalersMetrics { + if metrics.IsActive { + queueLengthSum += metrics.QueueLength + maxValueSum += metrics.MaxValue + isActive = metrics.IsActive + length++ + } + } + if length != 0 { + queueLength = queueLengthSum / float64(length) + maxValue = maxValueSum / float64(length) + } + case "sum": + for _, metrics := range scalersMetrics { + if metrics.IsActive { + queueLength += metrics.QueueLength + maxValue += metrics.MaxValue + isActive = metrics.IsActive + } + } + default: // max + for _, metrics := range scalersMetrics { + if metrics.QueueLength > queueLength && metrics.IsActive { + queueLength = metrics.QueueLength + maxValue = metrics.MaxValue + isActive = metrics.IsActive + } + } + } + + if minReplicaCount > 0 { + isActive = true + } + + maxValue = getMaxValue(maxValue, maxReplicaCount) + return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue), maxValue +} + +// ceilToInt64 returns the int64 ceil value for the float64 input +func ceilToInt64(x float64) int64 { + return int64(math.Ceil(x)) +} + +// min returns the minimum for input float64 values +func min(x, y float64) float64 { + if x > y { + return y + } + return x +} + +// getMaxValue returns maxValue, unless it is exceeding the MaxReplicaCount. +func getMaxValue(maxValue float64, maxReplicaCount int64) float64 { + return min(maxValue, float64(maxReplicaCount)) +} diff --git a/pkg/scaling/scaledjob/metrics_test.go b/pkg/scaling/scaledjob/metrics_test.go new file mode 100644 index 00000000000..de8d07ffca0 --- /dev/null +++ b/pkg/scaling/scaledjob/metrics_test.go @@ -0,0 +1,61 @@ +package scaledjob + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestTargetAverageValue(t *testing.T) { + // count = 0 + specs := []v2.MetricSpec{} + metricName := "s0-messageCount" + targetAverageValue := getTargetAverageValue(specs) + assert.Equal(t, float64(0), targetAverageValue) + // 1 1 + specs = []v2.MetricSpec{ + createMetricSpec(1, metricName), + createMetricSpec(1, metricName), + } + + metricName = "s1-messageCount" + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, float64(1), targetAverageValue) + // 5 5 3 -> 4.333333333333333 + specs = []v2.MetricSpec{ + createMetricSpec(5, metricName), + createMetricSpec(5, metricName), + createMetricSpec(3, metricName), + } + + metricName = "s2-messageCount" + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, 4.333333333333333, targetAverageValue) + + // 5 5 4 -> 4.666666666666667 + specs = []v2.MetricSpec{ + createMetricSpec(5, metricName), + createMetricSpec(5, metricName), + createMetricSpec(4, metricName), + } + + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, 4.666666666666667, targetAverageValue) +} + +// createMetricSpec creates MetricSpec for given metric name and target value. +func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec { + qty := resource.NewQuantity(averageValue, resource.DecimalSI) + return v2.MetricSpec{ + External: &v2.ExternalMetricSource{ + Target: v2.MetricTarget{ + AverageValue: qty, + }, + Metric: v2.MetricIdentifier{ + Name: metricName, + }, + }, + } +}