Skip to content

Commit

Permalink
aws: Reference ScaledObject's/ScaledJob's name in the scalers log (#3488
Browse files Browse the repository at this point in the history
)

Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik authored Aug 3, 2022
1 parent f79df64 commit b85c558
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 104 deletions.
72 changes: 36 additions & 36 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -32,6 +32,7 @@ type awsCloudwatchScaler struct {
metricType v2beta2.MetricTargetType
metadata *awsCloudwatchMetadata
cwClient cloudwatchiface.CloudWatchAPI
logger logr.Logger
}

type awsCloudwatchMetadata struct {
Expand All @@ -58,8 +59,6 @@ type awsCloudwatchMetadata struct {
scalerIndex int
}

var cloudwatchLog = logf.Log.WithName("aws_cloudwatch_scaler")

// NewAwsCloudwatchScaler creates a new awsCloudwatchScaler
func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
Expand All @@ -76,6 +75,7 @@ func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
cwClient: createCloudwatchClient(meta),
logger: InitializeLogger(config, "aws_cloudwatch_scaler"),
}, nil
}

Expand Down Expand Up @@ -290,11 +290,11 @@ func computeQueryWindow(current time.Time, metricPeriodSec, metricEndTimeOffsetS
return
}

func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := c.GetCloudwatchMetrics()
func (s *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := s.GetCloudwatchMetrics()

if err != nil {
cloudwatchLog.Error(err, "Error getting metric value")
s.logger.Error(err, "Error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

Expand All @@ -303,70 +303,70 @@ func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string,
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (c *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
func (s *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
var metricNameSuffix string

if c.metadata.expression != "" {
metricNameSuffix = c.metadata.metricsName
if s.metadata.expression != "" {
metricNameSuffix = s.metadata.metricsName
} else {
metricNameSuffix = c.metadata.dimensionName[0]
metricNameSuffix = s.metadata.dimensionName[0]
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
},
Target: GetMetricTargetMili(c.metricType, c.metadata.targetMetricValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.targetMetricValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

func (c *awsCloudwatchScaler) IsActive(ctx context.Context) (bool, error) {
val, err := c.GetCloudwatchMetrics()
func (s *awsCloudwatchScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.GetCloudwatchMetrics()

if err != nil {
return false, err
}

return val > c.metadata.activationTargetMetricValue, nil
return val > s.metadata.activationTargetMetricValue, nil
}

func (c *awsCloudwatchScaler) Close(context.Context) error {
func (s *awsCloudwatchScaler) Close(context.Context) error {
return nil
}

func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
func (s *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
var input cloudwatch.GetMetricDataInput

startTime, endTime := computeQueryWindow(time.Now(), c.metadata.metricStatPeriod, c.metadata.metricEndTimeOffset, c.metadata.metricCollectionTime)
startTime, endTime := computeQueryWindow(time.Now(), s.metadata.metricStatPeriod, s.metadata.metricEndTimeOffset, s.metadata.metricCollectionTime)

if c.metadata.expression != "" {
if s.metadata.expression != "" {
input = cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Expression: aws.String(c.metadata.expression),
Expression: aws.String(s.metadata.expression),
Id: aws.String("q1"),
Period: aws.Int64(c.metadata.metricStatPeriod),
Label: aws.String(c.metadata.metricsName),
Period: aws.Int64(s.metadata.metricStatPeriod),
Label: aws.String(s.metadata.metricsName),
},
},
}
} else {
dimensions := []*cloudwatch.Dimension{}
for i := range c.metadata.dimensionName {
for i := range s.metadata.dimensionName {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: &c.metadata.dimensionName[i],
Value: &c.metadata.dimensionValue[i],
Name: &s.metadata.dimensionName[i],
Value: &s.metadata.dimensionValue[i],
})
}

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
if s.metadata.metricUnit != "" {
metricUnit = aws.String(s.metadata.metricUnit)
}

input = cloudwatch.GetMetricDataInput{
Expand All @@ -378,12 +378,12 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
Id: aws.String("c1"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(c.metadata.namespace),
Namespace: aws.String(s.metadata.namespace),
Dimensions: dimensions,
MetricName: aws.String(c.metadata.metricsName),
MetricName: aws.String(s.metadata.metricsName),
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Period: aws.Int64(s.metadata.metricStatPeriod),
Stat: aws.String(s.metadata.metricStat),
Unit: metricUnit,
},
ReturnData: aws.Bool(true),
Expand All @@ -392,20 +392,20 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
}
}

output, err := c.cwClient.GetMetricData(&input)
output, err := s.cwClient.GetMetricData(&input)

if err != nil {
cloudwatchLog.Error(err, "Failed to get output")
s.logger.Error(err, "Failed to get output")
return -1, err
}

cloudwatchLog.V(1).Info("Received Metric Data", "data", output)
s.logger.V(1).Info("Received Metric Data", "data", output)
var metricValue float64
if len(output.MetricDataResults) > 0 && len(output.MetricDataResults[0].Values) > 0 {
metricValue = *output.MetricDataResults[0].Values[0]
} else {
cloudwatchLog.Info("empty metric data received, returning minMetricValue")
metricValue = c.metadata.minMetricValue
s.logger.Info("empty metric data received, returning minMetricValue")
metricValue = s.metadata.minMetricValue
}

return metricValue, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/scalers/aws_cloudwatch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)
Expand Down Expand Up @@ -487,7 +488,7 @@ func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", meta, &mockCloudwatch{}}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", meta, &mockCloudwatch{}, logr.Discard()}

metricSpec := mockAWSCloudwatchScaler.GetMetricSpecForScaling(ctx)
metricName := metricSpec[0].External.Metric.Name
Expand All @@ -500,7 +501,7 @@ func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
func TestAWSCloudwatchScalerGetMetrics(t *testing.T) {
var selector labels.Selector
for _, meta := range awsCloudwatchGetMetricTestData {
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", &meta, &mockCloudwatch{}}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", &meta, &mockCloudwatch{}, logr.Discard()}
value, err := mockAWSCloudwatchScaler.GetMetrics(context.Background(), meta.metricsName, selector)
switch meta.metricsName {
case testAWSCloudwatchErrorMetric:
Expand Down
40 changes: 20 additions & 20 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/go-logr/logr"
"go.mongodb.org/mongo-driver/bson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -26,6 +26,7 @@ type awsDynamoDBScaler struct {
metricType v2beta2.MetricTargetType
metadata *awsDynamoDBMetadata
dbClient dynamodbiface.DynamoDBAPI
logger logr.Logger
}

type awsDynamoDBMetadata struct {
Expand All @@ -41,8 +42,6 @@ type awsDynamoDBMetadata struct {
metricName string
}

var dynamoDBLog = logf.Log.WithName("aws_dynamodb_scaler")

func NewAwsDynamoDBScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -58,6 +57,7 @@ func NewAwsDynamoDBScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
dbClient: createDynamoDBClient(meta),
logger: InitializeLogger(config, "aws_dynamodb_scaler"),
}, nil
}

Expand Down Expand Up @@ -171,10 +171,10 @@ func createDynamoDBClient(meta *awsDynamoDBMetadata) *dynamodb.DynamoDB {
return dbClient
}

func (c *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := c.GetQueryMetrics()
func (s *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := s.GetQueryMetrics()
if err != nil {
dynamoDBLog.Error(err, "Error getting metric value")
s.logger.Error(err, "Error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

Expand All @@ -183,12 +183,12 @@ func (c *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, m
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (c *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
func (s *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: c.metadata.metricName,
Name: s.metadata.metricName,
},
Target: GetMetricTarget(c.metricType, c.metadata.targetValue),
Target: GetMetricTarget(s.metricType, s.metadata.targetValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}

Expand All @@ -197,30 +197,30 @@ func (c *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2beta2.M
}
}

func (c *awsDynamoDBScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := c.GetQueryMetrics()
func (s *awsDynamoDBScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.GetQueryMetrics()
if err != nil {
return false, fmt.Errorf("error inspecting aws-dynamodb: %s", err)
}

return messages > float64(c.metadata.activationTargetValue), nil
return messages > float64(s.metadata.activationTargetValue), nil
}

func (c *awsDynamoDBScaler) Close(context.Context) error {
func (s *awsDynamoDBScaler) Close(context.Context) error {
return nil
}

func (c *awsDynamoDBScaler) GetQueryMetrics() (float64, error) {
func (s *awsDynamoDBScaler) GetQueryMetrics() (float64, error) {
dimensions := dynamodb.QueryInput{
TableName: aws.String(c.metadata.tableName),
KeyConditionExpression: aws.String(c.metadata.keyConditionExpression),
ExpressionAttributeNames: c.metadata.expressionAttributeNames,
ExpressionAttributeValues: c.metadata.expressionAttributeValues,
TableName: aws.String(s.metadata.tableName),
KeyConditionExpression: aws.String(s.metadata.keyConditionExpression),
ExpressionAttributeNames: s.metadata.expressionAttributeNames,
ExpressionAttributeValues: s.metadata.expressionAttributeValues,
}

res, err := c.dbClient.Query(&dimensions)
res, err := s.dbClient.Query(&dimensions)
if err != nil {
dynamoDBLog.Error(err, "Failed to get output")
s.logger.Error(err, "Failed to get output")
return 0, err
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/scalers/aws_dynamodb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)
Expand Down Expand Up @@ -275,7 +276,7 @@ func TestDynamoGetMetrics(t *testing.T) {

for _, meta := range awsDynamoDBGetMetricTestData {
t.Run(meta.tableName, func(t *testing.T) {
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}}
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}, logr.Discard()}

value, err := scaler.GetMetrics(context.Background(), "aws-dynamodb", selector)
switch meta.tableName {
Expand All @@ -293,7 +294,7 @@ func TestDynamoGetMetrics(t *testing.T) {
func TestDynamoGetQueryMetrics(t *testing.T) {
for _, meta := range awsDynamoDBGetMetricTestData {
t.Run(meta.tableName, func(t *testing.T) {
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}}
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}, logr.Discard()}

value, err := scaler.GetQueryMetrics()
switch meta.tableName {
Expand All @@ -311,7 +312,7 @@ func TestDynamoGetQueryMetrics(t *testing.T) {
func TestDynamoIsActive(t *testing.T) {
for _, meta := range awsDynamoDBGetMetricTestData {
t.Run(meta.tableName, func(t *testing.T) {
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}}
scaler := awsDynamoDBScaler{"", &meta, &mockDynamoDB{}, logr.Discard()}

value, err := scaler.IsActive(context.Background())
switch meta.tableName {
Expand Down
Loading

0 comments on commit b85c558

Please sign in to comment.