Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Scaling Strategy for ScaledJob #1227

Merged
merged 2 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions api/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ type ScaledJobSpec struct {
// +optional
EnvSourceContainerName string `json:"envSourceContainerName,omitempty"`
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
Triggers []ScaleTriggers `json:"triggers"`
MaxReplicaCount *int32 `json:"maxReplicaCount,omitempty"`
// +optional
ScalingStrategy string `json:"scalingStrategy,omitempty"`
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
// +optional
CustomScalingQueueLengthDeduction *int32 `json:"customScalingQueueLengthDeduction,omitempty"`
// +optional
CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"`
Triggers []ScaleTriggers `json:"triggers"`
}

// ScaledJobStatus defines the observed state of ScaledJob
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ spec:
spec:
description: ScaledJobSpec defines the desired state of ScaledJob
properties:
customScalingQueueLengthDeduction:
format: int32
type: integer
customScalingRunningJobPercentage:
type: string
envSourceContainerName:
type: string
failedJobsHistoryLimit:
Expand Down Expand Up @@ -6343,6 +6348,8 @@ spec:
pollingInterval:
format: int32
type: integer
scalingStrategy:
type: string
successfulJobsHistoryLimit:
format: int32
type: integer
Expand Down
73 changes: 67 additions & 6 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executor
import (
"context"
"sort"
"strconv"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -12,6 +13,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/kedacore/keda/api/v1alpha1"
kedav1alpha1 "github.com/kedacore/keda/api/v1alpha1"
version "github.com/kedacore/keda/version"
)
Expand All @@ -27,12 +29,7 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
runningJobCount := e.getRunningJobCount(scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)

var effectiveMaxScale int64
if (maxScale + runningJobCount) > scaledJob.MaxReplicaCount() {
effectiveMaxScale = scaledJob.MaxReplicaCount() - runningJobCount
} else {
effectiveMaxScale = maxScale
}
effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, scaledJob.MaxReplicaCount())

if effectiveMaxScale < 0 {
effectiveMaxScale = 0
Expand Down Expand Up @@ -227,3 +224,67 @@ func (e *scaleExecutor) getFinishedJobConditionType(j *batchv1.Job) batchv1.JobC
}
return ""
}

// NewScalingStrategy returns ScalingStrategy instance
func NewScalingStrategy(logger logr.Logger, scaledJob *v1alpha1.ScaledJob) ScalingStrategy {
switch scaledJob.Spec.ScalingStrategy {
case "custom":
logger.Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy, "selected:", "custom", "customScalingQueueLength", scaledJob.Spec.CustomScalingQueueLengthDeduction, "customScallingRunningJobPercentage", scaledJob.Spec.CustomScalingRunningJobPercentage)
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
var err error
if percentage, err := strconv.ParseFloat(scaledJob.Spec.CustomScalingRunningJobPercentage, 64); err == nil {
return customScalingStrategy{
CustomScalingQueueLengthDeduction: scaledJob.Spec.CustomScalingQueueLengthDeduction,
CustomScalingRunningJobPercentage: &percentage,
}
}

logger.Error(err, "Fail to convert CustomScalingRunningJobPercentage into float", scaledJob.Spec.CustomScalingRunningJobPercentage)
logger.Info("Selecting Scale has been changed", "selected", "default")
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
return defaultScalingStrategy{}

case "accurate":
logger.Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy, "selected", "accurate")
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
return accurateScalingStrategy{}
default:
logger.Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy, "selected", "default")
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
return defaultScalingStrategy{}
}
}

// ScalingStrategy is an interface for switching scaling algorithm
type ScalingStrategy interface {
GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64
}

type defaultScalingStrategy struct {
}

func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
return maxScale - runningJobCount
}

type customScalingStrategy struct {
CustomScalingQueueLengthDeduction *int32
CustomScalingRunningJobPercentage *float64
}

func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount)
}

type accurateScalingStrategy struct {
}

func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
if (maxScale + runningJobCount) > maxReplicaCount {
return maxReplicaCount - runningJobCount
}
return maxScale
}

func min(x, y int64) int64 {
if x > y {
return y
}
return x
}
91 changes: 91 additions & 0 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,67 @@ func TestCleanUpNormalCase(t *testing.T) {
assert.True(t, ok)
}

func TestNewNewScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", int32(10), "0"))
assert.Equal(t, "executor.customScalingStrategy", fmt.Sprintf("%T", strategy))
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", int32(0), "0"))
assert.Equal(t, "executor.accurateScalingStrategy", fmt.Sprintf("%T", strategy))
strategy = NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("default", "default", int32(0), "0"))
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))
}

func TestDefaultScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5))
}

func TestCustomScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
customScalingQueueLengthDeduction := int32(1)
customScalingRunningJobPercentage := "0.5"
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 10))
strategy = NewScalingStrategy(logger, getMockScaledJobWithCustomStrategyWithNilParameter("custom", "custom"))

// If you don't set the two parameters is the same behavior as DefaultStrategy
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5))

// Empty String will be DefaultStrategy
customScalingQueueLengthDeduction = int32(1)
customScalingRunningJobPercentage = ""
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))

// Set 0 as customScalingRunningJobPercentage
customScalingQueueLengthDeduction = int32(2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))

// Exceed the MaxReplicaCount
customScalingQueueLengthDeduction = int32(-2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 4))
}

func TestAccurateScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", 0, "0"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 5))
}

func TestCleanUpMixedCaseWithSortByTime(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -151,6 +212,36 @@ func getMockScaledJobWithDefault() *kedav1alpha1.ScaledJob {
return scaledJob
}

func getMockScaledJobWithStrategy(name, scalingStrategy string, customScalingQueueLengthDeduction int32, customScalingRunningJobPercentage string) *kedav1alpha1.ScaledJob {
scaledJob := &kedav1alpha1.ScaledJob{
Spec: kedav1alpha1.ScaledJobSpec{
ScalingStrategy: scalingStrategy,
CustomScalingQueueLengthDeduction: &customScalingQueueLengthDeduction,
CustomScalingRunningJobPercentage: customScalingRunningJobPercentage,
},
}
scaledJob.ObjectMeta.Name = name
return scaledJob
}

func getMockScaledJobWithCustomStrategyWithNilParameter(name, scalingStrategy string) *kedav1alpha1.ScaledJob {
scaledJob := &kedav1alpha1.ScaledJob{
Spec: kedav1alpha1.ScaledJobSpec{
ScalingStrategy: scalingStrategy,
},
}
scaledJob.ObjectMeta.Name = name
return scaledJob
}

func getMockScaledJobWithDefaultStrategy(name string) *kedav1alpha1.ScaledJob {
scaledJob := &kedav1alpha1.ScaledJob{
Spec: kedav1alpha1.ScaledJobSpec{},
}
scaledJob.ObjectMeta.Name = name
return scaledJob
}

func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParameter, deletedJobName *map[string]string) *mock_client.MockClient {
client := mock_client.NewMockClient(ctrl)
client.EXPECT().
Expand Down