Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws: Reference ScaledObject's/ScaledJob's name in the scalers log #3488

Merged
merged 1 commit into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -32,6 +32,7 @@ type awsCloudwatchScaler struct {
metricType v2beta2.MetricTargetType
metadata *awsCloudwatchMetadata
cwClient cloudwatchiface.CloudWatchAPI
logger logr.Logger
}

type awsCloudwatchMetadata struct {
Expand All @@ -58,8 +59,6 @@ type awsCloudwatchMetadata struct {
scalerIndex int
}

var cloudwatchLog = logf.Log.WithName("aws_cloudwatch_scaler")

// NewAwsCloudwatchScaler creates a new awsCloudwatchScaler
func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
Expand All @@ -76,6 +75,7 @@ func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
cwClient: createCloudwatchClient(meta),
logger: InitializeLogger(config, "aws_cloudwatch_scaler"),
}, nil
}

Expand Down Expand Up @@ -290,11 +290,11 @@ func computeQueryWindow(current time.Time, metricPeriodSec, metricEndTimeOffsetS
return
}

func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := c.GetCloudwatchMetrics()
func (s *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := s.GetCloudwatchMetrics()

if err != nil {
cloudwatchLog.Error(err, "Error getting metric value")
s.logger.Error(err, "Error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

Expand All @@ -303,70 +303,70 @@ func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string,
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (c *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
func (s *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
var metricNameSuffix string

if c.metadata.expression != "" {
metricNameSuffix = c.metadata.metricsName
if s.metadata.expression != "" {
metricNameSuffix = s.metadata.metricsName
} else {
metricNameSuffix = c.metadata.dimensionName[0]
metricNameSuffix = s.metadata.dimensionName[0]
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
},
Target: GetMetricTargetMili(c.metricType, c.metadata.targetMetricValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.targetMetricValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

func (c *awsCloudwatchScaler) IsActive(ctx context.Context) (bool, error) {
val, err := c.GetCloudwatchMetrics()
func (s *awsCloudwatchScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.GetCloudwatchMetrics()

if err != nil {
return false, err
}

return val > c.metadata.activationTargetMetricValue, nil
return val > s.metadata.activationTargetMetricValue, nil
}

func (c *awsCloudwatchScaler) Close(context.Context) error {
func (s *awsCloudwatchScaler) Close(context.Context) error {
return nil
}

func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
func (s *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
var input cloudwatch.GetMetricDataInput

startTime, endTime := computeQueryWindow(time.Now(), c.metadata.metricStatPeriod, c.metadata.metricEndTimeOffset, c.metadata.metricCollectionTime)
startTime, endTime := computeQueryWindow(time.Now(), s.metadata.metricStatPeriod, s.metadata.metricEndTimeOffset, s.metadata.metricCollectionTime)

if c.metadata.expression != "" {
if s.metadata.expression != "" {
input = cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Expression: aws.String(c.metadata.expression),
Expression: aws.String(s.metadata.expression),
Id: aws.String("q1"),
Period: aws.Int64(c.metadata.metricStatPeriod),
Label: aws.String(c.metadata.metricsName),
Period: aws.Int64(s.metadata.metricStatPeriod),
Label: aws.String(s.metadata.metricsName),
},
},
}
} else {
dimensions := []*cloudwatch.Dimension{}
for i := range c.metadata.dimensionName {
for i := range s.metadata.dimensionName {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: &c.metadata.dimensionName[i],
Value: &c.metadata.dimensionValue[i],
Name: &s.metadata.dimensionName[i],
Value: &s.metadata.dimensionValue[i],
})
}

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
if s.metadata.metricUnit != "" {
metricUnit = aws.String(s.metadata.metricUnit)
}

input = cloudwatch.GetMetricDataInput{
Expand All @@ -378,12 +378,12 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
Id: aws.String("c1"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(c.metadata.namespace),
Namespace: aws.String(s.metadata.namespace),
Dimensions: dimensions,
MetricName: aws.String(c.metadata.metricsName),
MetricName: aws.String(s.metadata.metricsName),
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Period: aws.Int64(s.metadata.metricStatPeriod),
Stat: aws.String(s.metadata.metricStat),
Unit: metricUnit,
},
ReturnData: aws.Bool(true),
Expand All @@ -392,20 +392,20 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
}
}

output, err := c.cwClient.GetMetricData(&input)
output, err := s.cwClient.GetMetricData(&input)

if err != nil {
cloudwatchLog.Error(err, "Failed to get output")
s.logger.Error(err, "Failed to get output")
return -1, err
}

cloudwatchLog.V(1).Info("Received Metric Data", "data", output)
s.logger.V(1).Info("Received Metric Data", "data", output)
var metricValue float64
if len(output.MetricDataResults) > 0 && len(output.MetricDataResults[0].Values) > 0 {
metricValue = *output.MetricDataResults[0].Values[0]
} else {
cloudwatchLog.Info("empty metric data received, returning minMetricValue")
metricValue = c.metadata.minMetricValue
s.logger.Info("empty metric data received, returning minMetricValue")
metricValue = s.metadata.minMetricValue
}

return metricValue, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/scalers/aws_cloudwatch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)
Expand Down Expand Up @@ -487,7 +488,7 @@ func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", meta, &mockCloudwatch{}}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", meta, &mockCloudwatch{}, logr.Discard()}

metricSpec := mockAWSCloudwatchScaler.GetMetricSpecForScaling(ctx)
metricName := metricSpec[0].External.Metric.Name
Expand All @@ -500,7 +501,7 @@ func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
func TestAWSCloudwatchScalerGetMetrics(t *testing.T) {
var selector labels.Selector
for _, meta := range awsCloudwatchGetMetricTestData {
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", &meta, &mockCloudwatch{}}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", &meta, &mockCloudwatch{}, logr.Discard()}
value, err := mockAWSCloudwatchScaler.GetMetrics(context.Background(), meta.metricsName, selector)
switch meta.metricsName {
case testAWSCloudwatchErrorMetric:
Expand Down
40 changes: 20 additions & 20 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/go-logr/logr"
"go.mongodb.org/mongo-driver/bson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -26,6 +26,7 @@ type awsDynamoDBScaler struct {
metricType v2beta2.MetricTargetType
metadata *awsDynamoDBMetadata
dbClient dynamodbiface.DynamoDBAPI
logger logr.Logger
}

type awsDynamoDBMetadata struct {
Expand All @@ -41,8 +42,6 @@ type awsDynamoDBMetadata struct {
metricName string
}

var dynamoDBLog = logf.Log.WithName("aws_dynamodb_scaler")

func NewAwsDynamoDBScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -58,6 +57,7 @@ func NewAwsDynamoDBScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
dbClient: createDynamoDBClient(meta),
logger: InitializeLogger(config, "aws_dynamodb_scaler"),
}, nil
}

Expand Down Expand Up @@ -171,10 +171,10 @@ func createDynamoDBClient(meta *awsDynamoDBMetadata) *dynamodb.DynamoDB {
return dbClient
}

func (c *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := c.GetQueryMetrics()
func (s *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := s.GetQueryMetrics()
if err != nil {
dynamoDBLog.Error(err, "Error getting metric value")
s.logger.Error(err, "Error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

Expand All @@ -183,12 +183,12 @@ func (c *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, m
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (c *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
func (s *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: c.metadata.metricName,
Name: s.metadata.metricName,
},
Target: GetMetricTarget(c.metricType, c.metadata.targetValue),
Target: GetMetricTarget(s.metricType, s.metadata.targetValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}

Expand All @@ -197,30 +197,30 @@ func (c *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2beta2.M
}
}

func (c *awsDynamoDBScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := c.GetQueryMetrics()
func (s *awsDynamoDBScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.GetQueryMetrics()
if err != nil {
return false, fmt.Errorf("error inspecting aws-dynamodb: %s", err)
}

return messages > float64(c.metadata.activationTargetValue), nil
return messages > float64(s.metadata.activationTargetValue), nil
}

func (c *awsDynamoDBScaler) Close(context.Context) error {
func (s *awsDynamoDBScaler) Close(context.Context) error {
return nil
}

func (c *awsDynamoDBScaler) GetQueryMetrics() (float64, error) {
func (s *awsDynamoDBScaler) GetQueryMetrics() (float64, error) {
dimensions := dynamodb.QueryInput{
TableName: aws.String(c.metadata.tableName),
KeyConditionExpression: aws.String(c.metadata.keyConditionExpression),
ExpressionAttributeNames: c.metadata.expressionAttributeNames,
ExpressionAttributeValues: c.metadata.expressionAttributeValues,
TableName: aws.String(s.metadata.tableName),
KeyConditionExpression: aws.String(s.metadata.keyConditionExpression),
ExpressionAttributeNames: s.metadata.expressionAttributeNames,
ExpressionAttributeValues: s.metadata.expressionAttributeValues,
}

res, err := c.dbClient.Query(&dimensions)
res, err := s.dbClient.Query(&dimensions)
if err != nil {
dynamoDBLog.Error(err, "Failed to get output")
s.logger.Error(err, "Failed to get output")
return 0, err
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/scalers/aws_dynamodb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)
Expand Down Expand Up @@ -275,7 +276,7 @@ func TestDynamoGetMetrics(t *testing.T) {

for _, meta := range awsDynamoDBGetMetricTestData {
t.Run(meta.tableName, func(t *testing.T) {
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}}
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}, logr.Discard()}

value, err := scaler.GetMetrics(context.Background(), "aws-dynamodb", selector)
switch meta.tableName {
Expand All @@ -293,7 +294,7 @@ func TestDynamoGetMetrics(t *testing.T) {
func TestDynamoGetQueryMetrics(t *testing.T) {
for _, meta := range awsDynamoDBGetMetricTestData {
t.Run(meta.tableName, func(t *testing.T) {
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}}
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}, logr.Discard()}

value, err := scaler.GetQueryMetrics()
switch meta.tableName {
Expand All @@ -311,7 +312,7 @@ func TestDynamoGetQueryMetrics(t *testing.T) {
func TestDynamoIsActive(t *testing.T) {
for _, meta := range awsDynamoDBGetMetricTestData {
t.Run(meta.tableName, func(t *testing.T) {
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}}
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}, logr.Discard()}

value, err := scaler.IsActive(context.Background())
switch meta.tableName {
Expand Down
Loading