Skip to content

Commit

Permalink
fix/datadog-scaler-null-last-point (kedacore#3954)
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Lee <[email protected]>
Signed-off-by: Tony Lee <[email protected]>
Signed-off-by: Zbynek Roubalik <[email protected]>
Co-authored-by: Tony Lee <[email protected]>
Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
3 people authored and pedro-stanaka committed Jan 18, 2023
1 parent 41fccb7 commit b4ae488
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 32 deletions.
11 changes: 6 additions & 5 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ linters:
- goprintffuncname
- govet
- nolintlint
- rowserrcheck
#- rowserrcheck
- gofmt
- revive
- goimports
Expand All @@ -27,8 +27,6 @@ linters:
- ineffassign
- staticcheck
- exportloopref
- structcheck
- deadcode
- depguard
- dogsled
- errcheck
Expand All @@ -40,7 +38,6 @@ linters:
- gosimple
- stylecheck
- unused
- varcheck
- unparam
- unconvert
- whitespace
Expand Down Expand Up @@ -91,7 +88,11 @@ issues:
- path: stan_scaler.go
linters:
- dupl

# Exclude for datadog_scaler, reason:
# Introduce new parameters to fix DataDog API response issue #3906 (PR #3954)
- path: datadog_scaler.go
linters:
- gocyclo
linters-settings:
funlen:
lines: 80
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Respect optional parameter inside envs for ScaledJobs ([#3568](https://github.com/kedacore/keda/issues/3568))
- **General:** Close is called twice on PushScaler's deletion ([#3881](https://github.com/kedacore/keda/issues/3881))
- **Azure Blob Scaler** Store forgotten logger ([#3811](https://github.com/kedacore/keda/issues/3811))
- **Datadog Scaler** The last data point of some specific query is always null ([#3906](https://github.com/kedacore/keda/issues/3906))
- **GCP Stackdriver Scalar:** Update Stackdriver client to handle detecting double and int64 value types ([#3777](https://github.com/kedacore/keda/issues/3777))
- **New Relic Scaler** Store forgotten logger ([#3945](https://github.com/kedacore/keda/issues/3945))
- **Prometheus Scaler:** Treat Inf the same as Null result ([#3644](https://github.com/kedacore/keda/issues/3644))
Expand Down
145 changes: 119 additions & 26 deletions pkg/scalers/datadog_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@ type datadogScaler struct {
}

type datadogMetadata struct {
apiKey string
appKey string
datadogSite string
query string
queryValue float64
activationQueryValue float64
vType v2beta2.MetricTargetType
metricName string
age int
useFiller bool
fillValue float64
apiKey string
appKey string
datadogSite string
query string
queryValue float64
queryAggegrator string
activationQueryValue float64
vType v2beta2.MetricTargetType
metricName string
age int
timeWindowOffset int
lastAvailablePointOffset int
useFiller bool
fillValue float64
}

const maxString = "max"
const avgString = "average"

var filter *regexp.Regexp

func init() {
Expand Down Expand Up @@ -83,13 +89,43 @@ func parseDatadogMetadata(config *ScalerConfig, logger logr.Logger) (*datadogMet
}
meta.age = age

if age < 0 {
return nil, fmt.Errorf("age should not be smaller than 0 seconds")
}
if age < 60 {
logger.Info("selecting a window smaller than 60 seconds can cause Datadog not finding a metric value for the query")
}
} else {
meta.age = 90 // Default window 90 seconds
}

if val, ok := config.TriggerMetadata["timeWindowOffset"]; ok {
timeWindowOffset, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("timeWindowOffset parsing error %s", err.Error())
}
if timeWindowOffset < 0 {
return nil, fmt.Errorf("timeWindowOffset should not be smaller than 0 seconds")
}
meta.timeWindowOffset = timeWindowOffset
} else {
meta.timeWindowOffset = 0 // Default delay 0 seconds
}

if val, ok := config.TriggerMetadata["lastAvailablePointOffset"]; ok {
lastAvailablePointOffset, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("lastAvailablePointOffset parsing error %s", err.Error())
}

if lastAvailablePointOffset < 0 {
return nil, fmt.Errorf("lastAvailablePointOffset should not be smaller than 0")
}
meta.lastAvailablePointOffset = lastAvailablePointOffset
} else {
meta.lastAvailablePointOffset = 0 // Default use the last point
}

if val, ok := config.TriggerMetadata["query"]; ok {
_, err := parseDatadogQuery(val)

Expand All @@ -111,6 +147,18 @@ func parseDatadogMetadata(config *ScalerConfig, logger logr.Logger) (*datadogMet
return nil, fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["queryAggregator"]; ok && val != "" {
queryAggregator := strings.ToLower(val)
switch queryAggregator {
case avgString, maxString:
meta.queryAggegrator = queryAggregator
default:
return nil, fmt.Errorf("queryAggregator value %s has to be one of '%s, %s'", queryAggregator, avgString, maxString)
}
} else {
meta.queryAggegrator = ""
}

meta.activationQueryValue = 0
if val, ok := config.TriggerMetadata["activationQueryValue"]; ok {
activationQueryValue, err := strconv.ParseFloat(val, 64)
Expand All @@ -136,7 +184,7 @@ func parseDatadogMetadata(config *ScalerConfig, logger logr.Logger) (*datadogMet
}
val = strings.ToLower(val)
switch val {
case "average":
case avgString:
meta.vType = v2beta2.AverageValueMetricType
case "global":
meta.vType = v2beta2.ValueMetricType
Expand Down Expand Up @@ -247,7 +295,9 @@ func (s *datadogScaler) getQueryResult(ctx context.Context) (float64, error) {
"site": s.metadata.datadogSite,
})

resp, r, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query) //nolint:bodyclose
timeWindowTo := time.Now().Unix() - int64(s.metadata.timeWindowOffset)
timeWindowFrom := timeWindowTo - int64(s.metadata.age)
resp, r, err := s.apiClient.MetricsApi.QueryMetrics(ctx, timeWindowFrom, timeWindowTo, s.metadata.query) //nolint:bodyclose
if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}
Expand All @@ -272,29 +322,52 @@ func (s *datadogScaler) getQueryResult(ctx context.Context) (float64, error) {

series := resp.GetSeries()

if len(series) > 1 {
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series")
}

if len(series) == 0 {
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
}
return s.metadata.fillValue, nil
}

points := series[0].GetPointlist()
// Require queryAggregator be set explicitly for multi-query
if len(series) > 1 && s.metadata.queryAggegrator == "" {
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series or add a queryAggregator")
}

index := len(points) - 1
if len(points) == 0 || len(points[index]) < 2 || points[index][1] == nil {
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
// Collect all latest point values from any/all series
results := make([]float64, len(series))
for i := 0; i < len(series); i++ {
points := series[i].GetPointlist()
index := len(points) - 1
// Find out the last point != nil
for j := index; j >= 0; j-- {
if len(points[j]) >= 2 && points[j][1] != nil {
index = j
break
}
}
return s.metadata.fillValue, nil
if index < s.metadata.lastAvailablePointOffset {
return 0, fmt.Errorf("index is smaller than the lastAvailablePointOffset")
}
index -= s.metadata.lastAvailablePointOffset

if len(points) == 0 || len(points[index]) < 2 || points[index][1] == nil {
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
}
return s.metadata.fillValue, nil
}
// Return the last point from the series
results[i] = *points[index][1]
}

// Return the last point from the series
return *points[index][1], nil
switch s.metadata.queryAggegrator {
case avgString:
return AvgFloatFromSlice(results), nil
default:
// Aggregate Results - default Max value:
return MaxFloatFromSlice(results), nil
}
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
Expand All @@ -312,7 +385,7 @@ func (s *datadogScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metri
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult(ctx)
if err != nil {
s.logger.Error(err, "error getting metrics from Datadog")
Expand All @@ -323,3 +396,23 @@ func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, metri

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Find the largest value in a slice of floats
func MaxFloatFromSlice(results []float64) float64 {
max := results[0]
for _, result := range results {
if result > max {
max = result
}
}
return max
}

// Find the average value in a slice of floats
func AvgFloatFromSlice(results []float64) float64 {
total := 0.0
for _, result := range results {
total += result
}
return total / float64(len(results))
}
40 changes: 39 additions & 1 deletion pkg/scalers/datadog_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,31 @@ type datadogAuthMetadataTestData struct {
isError bool
}

func assertEqual(t *testing.T, a interface{}, b interface{}) {
if a == b {
return
}
t.Errorf("%v != %v", a, b)
}

func TestMaxFloatFromSlice(t *testing.T) {
input := []float64{1.0, 2.0, 3.0, 4.0}
expectedOutput := float64(4.0)

output := MaxFloatFromSlice(input)

assertEqual(t, output, expectedOutput)
}

func TestAvgFloatFromSlice(t *testing.T) {
input := []float64{1.0, 2.0, 3.0, 4.0}
expectedOutput := float64(2.5)

output := AvgFloatFromSlice(input)

assertEqual(t, output, expectedOutput)
}

var testParseQueries = []datadogQueries{
{"", false, true},
// All properly formed
Expand All @@ -36,9 +61,14 @@ var testParseQueries = []datadogQueries{
{"top(per_second(abs(sum:http.requests{service:myapp,dc:us-west-2}.rollup(max, 2))), 5, 'mean', 'desc')", true, false},
{"system.cpu.user{*}.rollup(sum, 30)", true, false},
{"min:system.cpu.user{*}", true, false},
// Multi-query
{"avg:system.cpu.user{*}.rollup(sum, 30),sum:system.cpu.user{*}.rollup(30)", true, false},

// Missing filter
{"min:system.cpu.user", false, true},

// Find out last point with value
{"sum:trace.express.request.hits{*}.as_rate()/avg:kubernetes.cpu.requests{*}.rollup(10)", true, false},
}

func TestDatadogScalerParseQueries(t *testing.T) {
Expand All @@ -62,9 +92,15 @@ var testDatadogMetadata = []datadogAuthMetadataTestData{
{"", map[string]string{}, map[string]string{}, true},

// all properly formed
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "1.5", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "1.5", "type": "average", "age": "60", "timeWindowOffset": "30", "lastAvailablePointOffset": "1"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// Multi-query all properly formed
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count(),sum:trace.redis.command.hits{env:none,service:redis}.as_count()/2", "queryValue": "7", "queryAggregator": "average", "metricUnavailableValue": "1.5", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default age
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "type": "average"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default timeWindowOffset
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "1.5", "type": "average", "age": "60", "lastAvailablePointOffset": "1"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default lastAvailablePointOffset
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "1.5", "type": "average", "age": "60", "timeWindowOffset": "30"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default type
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// wrong type
Expand All @@ -77,6 +113,8 @@ var testDatadogMetadata = []datadogAuthMetadataTestData{
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong query value type
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "notanint", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong queryAggregator value
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "notanint", "queryAggegrator": "1.0", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong activation query value type
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "1", "activationQueryValue": "notanint", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// malformed query
Expand Down

0 comments on commit b4ae488

Please sign in to comment.