Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cron scaler config #6098

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 50 additions & 59 deletions pkg/scalers/cron_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand All @@ -23,16 +22,47 @@ const (

type cronScaler struct {
metricType v2.MetricTargetType
metadata *cronMetadata
metadata cronMetadata
logger logr.Logger
}

type cronMetadata struct {
start string
end string
timezone string
desiredReplicas int64
triggerIndex int
Start string `keda:"name=start, order=triggerMetadata"`
End string `keda:"name=end, order=triggerMetadata"`
Timezone string `keda:"name=timezone, order=triggerMetadata"`
DesiredReplicas int64 `keda:"name=desiredReplicas, order=triggerMetadata"`
TriggerIndex int
}

func (m *cronMetadata) Validate() error {
if m.Timezone == "" {
return fmt.Errorf("no timezone specified")
}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if m.Start == "" {
return fmt.Errorf("no start schedule specified")
}
if _, err := parser.Parse(m.Start); err != nil {
return fmt.Errorf("error parsing start schedule: %w", err)
}

if m.End == "" {
return fmt.Errorf("no end schedule specified")
}
if _, err := parser.Parse(m.End); err != nil {
return fmt.Errorf("error parsing end schedule: %w", err)
}

if m.Start == m.End {
return fmt.Errorf("start and end can not have exactly same time input")
}

if m.DesiredReplicas == 0 {
return fmt.Errorf("no desiredReplicas specified")
}

return nil
}

// NewCronScaler creates a new cronScaler
Expand All @@ -42,9 +72,9 @@ func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

meta, parseErr := parseCronMetadata(config)
if parseErr != nil {
return nil, fmt.Errorf("error parsing cron metadata: %w", parseErr)
meta, err := parseCronMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing cron metadata: %w", err)
}

return &cronScaler{
Expand All @@ -68,51 +98,12 @@ func getCronTime(location *time.Location, spec string) (int64, error) {
return cronTime, nil
}

func parseCronMetadata(config *scalersconfig.ScalerConfig) (*cronMetadata, error) {
if len(config.TriggerMetadata) == 0 {
return nil, fmt.Errorf("invalid Input Metadata. %s", config.TriggerMetadata)
func parseCronMetadata(config *scalersconfig.ScalerConfig) (cronMetadata, error) {
meta := cronMetadata{TriggerIndex: config.TriggerIndex}
if err := config.TypedConfig(&meta); err != nil {
return meta, err
}

meta := cronMetadata{}
if val, ok := config.TriggerMetadata["timezone"]; ok && val != "" {
meta.timezone = val
} else {
return nil, fmt.Errorf("no timezone specified. %s", config.TriggerMetadata)
}
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if val, ok := config.TriggerMetadata["start"]; ok && val != "" {
_, err := parser.Parse(val)
if err != nil {
return nil, fmt.Errorf("error parsing start schedule: %w", err)
}
meta.start = val
} else {
return nil, fmt.Errorf("no start schedule specified. %s", config.TriggerMetadata)
}
if val, ok := config.TriggerMetadata["end"]; ok && val != "" {
_, err := parser.Parse(val)
if err != nil {
return nil, fmt.Errorf("error parsing end schedule: %w", err)
}
meta.end = val
} else {
return nil, fmt.Errorf("no end schedule specified. %s", config.TriggerMetadata)
}
if meta.start == meta.end {
return nil, fmt.Errorf("error parsing schedule. %s: start and end can not have exactly same time input", config.TriggerMetadata)
}
if val, ok := config.TriggerMetadata["desiredReplicas"]; ok && val != "" {
metadataDesiredReplicas, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing desiredReplicas metadata. %s", config.TriggerMetadata)
}

meta.desiredReplicas = int64(metadataDesiredReplicas)
} else {
return nil, fmt.Errorf("no DesiredReplicas specified. %s", config.TriggerMetadata)
}
meta.triggerIndex = config.TriggerIndex
return &meta, nil
return meta, nil
}

func (s *cronScaler) Close(context.Context) error {
Expand All @@ -132,7 +123,7 @@ func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var specReplicas int64 = 1
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("cron-%s-%s-%s", s.metadata.timezone, parseCronTimeFormat(s.metadata.start), parseCronTimeFormat(s.metadata.end)))),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("cron-%s-%s-%s", s.metadata.Timezone, parseCronTimeFormat(s.metadata.Start), parseCronTimeFormat(s.metadata.End)))),
},
Target: GetMetricTarget(s.metricType, specReplicas),
}
Expand All @@ -144,20 +135,20 @@ func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
func (s *cronScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var defaultDesiredReplicas = int64(defaultDesiredReplicas)

location, err := time.LoadLocation(s.metadata.timezone)
location, err := time.LoadLocation(s.metadata.Timezone)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone. Error: %w", err)
}

// Since we are considering the timestamp here and not the exact time, timezone does matter.
currentTime := time.Now().Unix()

nextStartTime, startTimecronErr := getCronTime(location, s.metadata.start)
nextStartTime, startTimecronErr := getCronTime(location, s.metadata.Start)
if startTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error initializing start cron: %w", startTimecronErr)
}

nextEndTime, endTimecronErr := getCronTime(location, s.metadata.end)
nextEndTime, endTimecronErr := getCronTime(location, s.metadata.End)
if endTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error intializing end cron: %w", endTimecronErr)
}
Expand All @@ -167,7 +158,7 @@ func (s *cronScaler) GetMetricsAndActivity(_ context.Context, metricName string)
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
case currentTime <= nextEndTime:
metric := GenerateMetricInMili(metricName, float64(s.metadata.desiredReplicas))
metric := GenerateMetricInMili(metricName, float64(s.metadata.DesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, true, nil
default:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
Expand Down
Loading