Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add eagerScalingStrategy for ScaledJob #5872

Merged
merged 9 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114))
- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478))
- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802))
- **GCP Pub/Sub**: Add optional valueIfNull to allow a default scaling value and prevent errors when GCP metric returns no value. ([#5896](https://github.com/kedacore/keda/issues/5896))
Expand Down
28 changes: 19 additions & 9 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *scaleExecutor) getScalingDecision(scaledJob *kedav1alpha1.ScaledJob, ru
scaleTo = scaleToMinReplica
effectiveMaxScale = scaleToMinReplica
} else {
effectiveMaxScale = NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount-minReplicaCount, pendingJobCount, scaledJob.MaxReplicaCount())
effectiveMaxScale, scaleTo = NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount-minReplicaCount, pendingJobCount, scaledJob.MaxReplicaCount(), scaleTo)
}
return effectiveMaxScale, scaleTo
}
Expand Down Expand Up @@ -391,6 +391,9 @@ func NewScalingStrategy(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) S
case "accurate":
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "accurate")
return accurateScalingStrategy{}
case "eager":
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "eager")
return eagerScalingStrategy{}
default:
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "default")
return defaultScalingStrategy{}
Expand All @@ -399,33 +402,40 @@ func NewScalingStrategy(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) S

// ScalingStrategy is an interface for switching scaling algorithm
type ScalingStrategy interface {
GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64
GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, scaleTo int64) (int64, int64)
}

type defaultScalingStrategy struct {
}

func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, _ int64) int64 {
return maxScale - runningJobCount
func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, _, scaleTo int64) (int64, int64) {
return maxScale - runningJobCount, scaleTo
}

type customScalingStrategy struct {
CustomScalingQueueLengthDeduction *int32
CustomScalingRunningJobPercentage *float64
}

func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, maxReplicaCount int64) int64 {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount)
func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, maxReplicaCount, scaleTo int64) (int64, int64) {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount), scaleTo
}

type accurateScalingStrategy struct {
}

func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, scaleTo int64) (int64, int64) {
if (maxScale + runningJobCount) > maxReplicaCount {
return maxReplicaCount - runningJobCount
return maxReplicaCount - runningJobCount, scaleTo
}
return maxScale - pendingJobCount
return maxScale - pendingJobCount, scaleTo
}

type eagerScalingStrategy struct {
}

func (s eagerScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, _ int64) (int64, int64) {
return min(maxReplicaCount-runningJobCount-pendingJobCount, maxScale), maxReplicaCount
}

func min(x, y int64) int64 {
Expand Down
51 changes: 39 additions & 12 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,17 @@ func TestNewNewScalingStrategy(t *testing.T) {
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))
}

func maxScaleValue(maxValue, _ int64) int64 {
return maxValue
}

func TestDefaultScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(2), maxScaleValue(strategy.GetEffectiveMaxScale(2, 0, 0, 5, 1)))
}

func TestCustomScalingStrategy(t *testing.T) {
Expand All @@ -97,13 +101,13 @@ func TestCustomScalingStrategy(t *testing.T) {
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 0, 10))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(9), maxScaleValue(strategy.GetEffectiveMaxScale(10, 0, 0, 10, 1)))
strategy = NewScalingStrategy(logger, getMockScaledJobWithCustomStrategyWithNilParameter("custom", "custom"))

// If you don't set the two parameters is the same behavior as DefaultStrategy
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(2), maxScaleValue(strategy.GetEffectiveMaxScale(2, 0, 0, 5, 1)))

// Empty String will be DefaultStrategy
customScalingQueueLengthDeduction = int32(1)
Expand All @@ -115,25 +119,48 @@ func TestCustomScalingStrategy(t *testing.T) {
customScalingQueueLengthDeduction = int32(2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))

// Exceed the MaxReplicaCount
customScalingQueueLengthDeduction = int32(-2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 0, 4))
assert.Equal(t, int64(4), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 4, 1)))
}

func TestAccurateScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", 0, "0"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 0, 5))
assert.Equal(t, int64(3), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(3), maxScaleValue(strategy.GetEffectiveMaxScale(5, 2, 0, 5, 1)))

// Test with 2 pending jobs
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 4, 2, 10))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(5, 4, 2, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 4, 2, 10, 1)))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(5, 4, 2, 5, 1)))
}

func TestEagerScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("eager", "eager", 0, "0"))

maxScale, scaleTo := strategy.GetEffectiveMaxScale(4, 3, 0, 10, 1)
assert.Equal(t, int64(4), maxScale)
assert.Equal(t, int64(10), scaleTo)
maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 0, 3, 10, 1)
assert.Equal(t, int64(4), maxScale)
assert.Equal(t, int64(10), scaleTo)

maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 7, 0, 10, 1)
assert.Equal(t, int64(3), maxScale)
assert.Equal(t, int64(10), scaleTo)
maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 1, 6, 10, 1)
assert.Equal(t, int64(3), maxScale)
assert.Equal(t, int64(10), scaleTo)

maxScale, scaleTo = strategy.GetEffectiveMaxScale(15, 0, 0, 10, 1)
assert.Equal(t, int64(10), maxScale)
assert.Equal(t, int64(10), scaleTo)
}

func TestCleanUpMixedCaseWithSortByTime(t *testing.T) {
Expand Down
134 changes: 134 additions & 0 deletions tests/internals/scaling_strategies/eager_scaling_strategy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//go:build e2e
// +build e2e

package eager_scaling_strategy_test

import (
"encoding/base64"
"fmt"
"testing"

"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper" // For helper methods
. "github.com/kedacore/keda/v2/tests/scalers/rabbitmq"
)

var _ = godotenv.Load("../../.env") // For loading env variables from .env

const (
testName = "eager-scaling-strategy-test"
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
rmqNamespace = fmt.Sprintf("%s-rmq", testName)
scaledJobName = fmt.Sprintf("%s-sj", testName)
queueName = "hello"
user = fmt.Sprintf("%s-user", testName)
password = fmt.Sprintf("%s-password", testName)
vhost = "/"
connectionString = fmt.Sprintf("amqp://%s:%s@rabbitmq.%s.svc.cluster.local/", user, password, rmqNamespace)
httpConnectionString = fmt.Sprintf("http://%s:%s@rabbitmq.%s.svc.cluster.local/", user, password, rmqNamespace)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep found a possible database connection string built with string concatenation. Check for proper encoding/escaping of components to prevent parse errors and injection vulnerabilities.

Ignore this finding from db-connection-string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay.

secretName = fmt.Sprintf("%s-secret", testName)
)

// YAML templates for your Kubernetes resources
const (
scaledJobTemplate = `
apiVersion: v1
kind: Secret
metadata:
name: {{.SecretName}}
namespace: {{.TestNamespace}}
data:
RabbitApiHost: {{.Base64Connection}}
---
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
labels:
app: {{.ScaledJobName}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: sleeper
image: busybox
command:
- sleep
- "300"
imagePullPolicy: IfNotPresent
envFrom:
- secretRef:
name: {{.SecretName}}
restartPolicy: Never
backoffLimit: 1
pollingInterval: 5
maxReplicaCount: 10
scalingStrategy:
strategy: "eager"
triggers:
- type: rabbitmq
metadata:
queueName: {{.QueueName}}
hostFromEnv: RabbitApiHost
mode: QueueLength
value: '1'
`
)

type templateData struct {
ScaledJobName string
TestNamespace string
QueueName string
SecretName string
Base64Connection string
}

func TestScalingStrategy(t *testing.T) {
kc := GetKubernetesClient(t)
data, templates := getTemplateData()
t.Cleanup(func() {
DeleteKubernetesResources(t, testNamespace, data, templates)
RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth())
})

RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth())
CreateKubernetesResources(t, kc, testNamespace, data, templates)

testEagerScaling(t, kc)
}

func getTemplateData() (templateData, []Template) {
return templateData{
// Populate fields required in YAML templates
ScaledJobName: scaledJobName,
TestNamespace: testNamespace,
QueueName: queueName,
Base64Connection: base64.StdEncoding.EncodeToString([]byte(httpConnectionString)),
SecretName: secretName,
}, []Template{
{Name: "scaledJobTemplate", Config: scaledJobTemplate},
}
}

func testEagerScaling(t *testing.T, kc *kubernetes.Clientset) {
iterationCount := 20
RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1),
"job count should be %d after %d iterations", 4, iterationCount)

RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 8, iterationCount, 1),
"job count should be %d after %d iterations", 8, iterationCount)

RMQPublishMessages(t, rmqNamespace, connectionString, queueName, 4)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 10, iterationCount, 1),
"job count should be %d after %d iterations", 10, iterationCount)
}
Loading