Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor ScaledJob to be used with scale_handler #4707

Merged
merged 21 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ New deprecation(s):

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781))

## v2.11.1

Expand Down
124 changes: 93 additions & 31 deletions apis/keda/v1alpha1/identifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,37 @@ type testData struct {
soKind string
}

var tests = []testData{
{
name: "all lowercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "all uppercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "NAME",
soNamespace: "NAMESPACE",
soKind: "SCALEDOBJECT",
},
{
name: "camel case",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "missing namespace",
expectedIdentifier: "scaledobject..name",
soName: "name",
soKind: "scaledobject",
},
}

func TestGeneratedIdentifierForScaledObject(t *testing.T) {
tests := []testData{
{
name: "all lowercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "all uppercase",
expectedIdentifier: "scaledobject.namespace.name",
soName: "NAME",
soNamespace: "NAMESPACE",
soKind: "SCALEDOBJECT",
},
{
name: "camel case",
expectedIdentifier: "scaledobject.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledobject",
},
{
name: "missing namespace",
expectedIdentifier: "scaledobject..name",
soName: "name",
soKind: "scaledobject",
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
expectedIdentifier := test.expectedIdentifier
genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName)
Expand Down Expand Up @@ -79,3 +77,67 @@ func TestGeneratedIdentifierForScaledObject(t *testing.T) {
})
}
}

func TestGeneratedIdentifierForScaledJob(t *testing.T) {
tests := []testData{
{
name: "all lowercase",
expectedIdentifier: "scaledjob.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledjob",
},
{
name: "all uppercase",
expectedIdentifier: "scaledjob.namespace.name",
soName: "NAME",
soNamespace: "NAMESPACE",
soKind: "SCALEDJOB",
},
{
name: "camel case",
expectedIdentifier: "scaledjob.namespace.name",
soName: "name",
soNamespace: "namespace",
soKind: "scaledjob",
},
{
name: "missing namespace",
expectedIdentifier: "scaledjob..name",
soName: "name",
soKind: "scaledjob",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
expectedIdentifier := test.expectedIdentifier
genericIdentifier := GenerateIdentifier(test.soKind, test.soNamespace, test.soName)

scaledJob := &ScaledJob{
ObjectMeta: metav1.ObjectMeta{
Name: test.soName,
Namespace: test.soNamespace,
},
}
scaledJobIdentifier := scaledJob.GenerateIdentifier()

withTriggers, err := AsDuckWithTriggers(scaledJob)
if err != nil {
t.Errorf("got error while converting to WithTriggers object: %s", err)
}
withTriggersIdentifier := withTriggers.GenerateIdentifier()

if expectedIdentifier != genericIdentifier {
t.Errorf("genericIdentifier=%q doesn't equal the expectedIdentifier=%q", genericIdentifier, expectedIdentifier)
}

if expectedIdentifier != scaledJobIdentifier {
t.Errorf("scaledJobIdentifier=%q doesn't equal the expectedIdentifier=%q", scaledJobIdentifier, expectedIdentifier)
}

if expectedIdentifier != withTriggersIdentifier {
t.Errorf("withTriggersIdentifier=%q doesn't equal the expectedIdentifier=%q", withTriggersIdentifier, expectedIdentifier)
}
})
}
}
4 changes: 4 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,7 @@ func (s ScaledJob) MinReplicaCount() int64 {
}
return defaultScaledJobMinReplicaCount
}

func (s *ScaledJob) GenerateIdentifier() string {
return GenerateIdentifier("ScaledJob", s.Namespace, s.Name)
}
2 changes: 1 addition & 1 deletion apis/keda/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (t *WithTriggers) GenerateIdentifier() string {
return GenerateIdentifier(t.InternalKind, t.Namespace, t.Name)
}

// AsDuckWithTriggers tries to generates WithTriggers object for input object
// AsDuckWithTriggers tries to generate WithTriggers object for input object
// returns error if input object is unknown
func AsDuckWithTriggers(scalableObject interface{}) (*WithTriggers, error) {
switch obj := scalableObject.(type) {
Expand Down
167 changes: 0 additions & 167 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ package cache
import (
"context"
"fmt"
"math"
"time"

v2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/scalers"
)

Expand Down Expand Up @@ -141,68 +138,6 @@ func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index
return metric, activity, time.Since(startTime).Milliseconds(), err
}

// TODO needs refactor - move ScaledJob related methods to scale_handler, the similar way ScaledObject methods are
// refactor logic
func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
var queueLength float64
var maxValue float64
isActive := false

logger := logf.Log.WithName("scalemetrics")
scalersMetrics := c.getScaledJobMetrics(ctx, scaledJob)
switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation {
case "min":
for _, metrics := range scalersMetrics {
if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
case "avg":
queueLengthSum := float64(0)
maxValueSum := float64(0)
length := 0
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLengthSum += metrics.queueLength
maxValueSum += metrics.maxValue
isActive = metrics.isActive
length++
}
}
if length != 0 {
queueLength = queueLengthSum / float64(length)
maxValue = maxValueSum / float64(length)
}
case "sum":
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLength += metrics.queueLength
maxValue += metrics.maxValue
isActive = metrics.isActive
}
}
default: // max
for _, metrics := range scalersMetrics {
if metrics.queueLength > queueLength && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
}

if scaledJob.MinReplicaCount() > 0 {
isActive = true
}

maxValue = min(float64(scaledJob.MaxReplicaCount()), maxValue)
logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)

return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue)
}

func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) {
if id < 0 || id >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers))
Expand All @@ -226,105 +161,3 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale

return ns, nil
}

type scalerMetrics struct {
queueLength float64
maxValue float64
isActive bool
}

func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics {
// TODO this loop should be probably done similar way the ScaledObject loop is done
var scalersMetrics []scalerMetrics
for i, s := range c.Scalers {
var queueLength float64
var targetAverageValue float64
isActive := false
maxValue := float64(0)
scalerType := fmt.Sprintf("%T:", s)

scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType)

metricSpecs := s.Scaler.GetMetricSpecForScaling(ctx)

// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || metricSpecs[0].External == nil {
continue
}

// TODO here we should probably loop through all metrics in a Scaler
// as it is done for ScaledObject
metrics, isTriggerActive, err := s.Scaler.GetMetricsAndActivity(ctx, metricSpecs[0].External.Metric.Name)
if err != nil {
var ns scalers.Scaler
ns, err = c.refreshScaler(ctx, i)
if err == nil {
metrics, isTriggerActive, err = ns.GetMetricsAndActivity(ctx, metricSpecs[0].External.Metric.Name)
}
}

if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err)
c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
}

targetAverageValue = getTargetAverageValue(metricSpecs)

var metricValue float64
for _, m := range metrics {
if m.MetricName == metricSpecs[0].External.Metric.Name {
metricValue = m.Value.AsApproximateFloat64()
queueLength += metricValue
}
}
scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue)

if isTriggerActive {
isActive = true
}

if targetAverageValue != 0 {
averageLength := queueLength / targetAverageValue
maxValue = min(float64(scaledJob.MaxReplicaCount()), averageLength)
}
scalersMetrics = append(scalersMetrics, scalerMetrics{
queueLength: queueLength,
maxValue: maxValue,
isActive: isActive,
})
}
return scalersMetrics
}

func getTargetAverageValue(metricSpecs []v2.MetricSpec) float64 {
var targetAverageValue float64
var metricValue float64
for _, metric := range metricSpecs {
if metric.External.Target.AverageValue == nil {
metricValue = 0
} else {
metricValue = metric.External.Target.AverageValue.AsApproximateFloat64()
}

targetAverageValue += metricValue
}
count := float64(len(metricSpecs))
if count != 0 {
return targetAverageValue / count
}
return 0
}

func ceilToInt64(x float64) int64 {
return int64(math.Ceil(x))
}

// Min function for float64
func min(x, y float64) float64 {
if x > y {
return y
}
return x
}
Loading
Loading