diff --git a/pkg/scalers/aws_cloudwatch_scaler.go b/pkg/scalers/aws_cloudwatch_scaler.go index 6634db0c7a0..b19012e3acc 100644 --- a/pkg/scalers/aws_cloudwatch_scaler.go +++ b/pkg/scalers/aws_cloudwatch_scaler.go @@ -199,16 +199,24 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) { sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String(c.metadata.awsRegion), })) - creds := credentials.NewStaticCredentials(c.metadata.awsAuthorization.awsAccessKeyID, c.metadata.awsAuthorization.awsSecretAccessKey, "") - if c.metadata.awsAuthorization.awsRoleArn != "" { - creds = stscreds.NewCredentials(sess, c.metadata.awsAuthorization.awsRoleArn) - } + var cloudwatchClient *cloudwatch.CloudWatch + if c.metadata.awsAuthorization.podIdentityOwner { + creds := credentials.NewStaticCredentials(c.metadata.awsAuthorization.awsAccessKeyID, c.metadata.awsAuthorization.awsSecretAccessKey, "") + + if c.metadata.awsAuthorization.awsRoleArn != "" { + creds = stscreds.NewCredentials(sess, c.metadata.awsAuthorization.awsRoleArn) + } - cloudwatchClient := cloudwatch.New(sess, &aws.Config{ - Region: aws.String(c.metadata.awsRegion), - Credentials: creds, - }) + cloudwatchClient = cloudwatch.New(sess, &aws.Config{ + Region: aws.String(c.metadata.awsRegion), + Credentials: creds, + }) + } else { + cloudwatchClient = cloudwatch.New(sess, &aws.Config{ + Region: aws.String(c.metadata.awsRegion), + }) + } input := cloudwatch.GetMetricDataInput{ StartTime: aws.Time(time.Now().Add(time.Second * -1 * time.Duration(c.metadata.metricCollectionTime))), diff --git a/pkg/scalers/aws_cloudwatch_test.go b/pkg/scalers/aws_cloudwatch_test.go index 7fdad030a05..4ad1c10cf10 100644 --- a/pkg/scalers/aws_cloudwatch_test.go +++ b/pkg/scalers/aws_cloudwatch_test.go @@ -144,6 +144,21 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{ }, false, "with AWS Role from TriggerAuthentication"}, + {map[string]string{ + "namespace": "AWS/SQS", + "dimensionName": "QueueName", + "dimensionValue": "keda", + "metricName": "ApproximateNumberOfMessagesVisible", + "targetMetricValue": "2", + "minMetricValue": "0", + "metricCollectionTime": "300", + "metricStat": "Average", + "metricStatPeriod": "300", + "awsRegion": "eu-west-1", + "identityOwner": "operator"}, + map[string]string{}, + false, + "with AWS Role assigned on KEDA operator itself"}, } func TestCloudwatchParseMetadata(t *testing.T) { diff --git a/pkg/scalers/aws_iam_authorization.go b/pkg/scalers/aws_iam_authorization.go index 624c50700e1..5754234b324 100644 --- a/pkg/scalers/aws_iam_authorization.go +++ b/pkg/scalers/aws_iam_authorization.go @@ -14,37 +14,44 @@ type awsAuthorizationMetadata struct { awsAccessKeyID string awsSecretAccessKey string awsSessionToken string + + podIdentityOwner bool } func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (awsAuthorizationMetadata, error) { meta := awsAuthorizationMetadata{} - if authParams["awsRoleArn"] != "" { - meta.awsRoleArn = authParams["awsRoleArn"] - } else if (authParams["awsAccessKeyID"] != "" || authParams["awsAccessKeyId"] != "") && authParams["awsSecretAccessKey"] != "" { - meta.awsAccessKeyID = authParams["awsAccessKeyID"] - if meta.awsAccessKeyID == "" { - meta.awsAccessKeyID = authParams["awsAccessKeyId"] - } - meta.awsSecretAccessKey = authParams["awsSecretAccessKey"] - } else { - var keyName string - if keyName = metadata["awsAccessKeyID"]; keyName == "" { - keyName = awsAccessKeyIDEnvVar - } - if val, ok := resolvedEnv[keyName]; ok && val != "" { - meta.awsAccessKeyID = val - } else { - return meta, fmt.Errorf("'%s' doesn't exist in the deployment environment", keyName) - } - - if keyName = metadata["awsSecretAccessKey"]; keyName == "" { - keyName = awsSecretAccessKeyEnvVar - } - if val, ok := resolvedEnv[keyName]; ok && val != "" { - meta.awsSecretAccessKey = val + if metadata["identityOwner"] == "operator" { + meta.podIdentityOwner = false + } else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" { + meta.podIdentityOwner = true + if authParams["awsRoleArn"] != "" { + meta.awsRoleArn = authParams["awsRoleArn"] + } else if (authParams["awsAccessKeyID"] != "" || authParams["awsAccessKeyId"] != "") && authParams["awsSecretAccessKey"] != "" { + meta.awsAccessKeyID = authParams["awsAccessKeyID"] + if meta.awsAccessKeyID == "" { + meta.awsAccessKeyID = authParams["awsAccessKeyId"] + } + meta.awsSecretAccessKey = authParams["awsSecretAccessKey"] } else { - return meta, fmt.Errorf("'%s' doesn't exist in the deployment environment", keyName) + var keyName string + if keyName = metadata["awsAccessKeyID"]; keyName == "" { + keyName = awsAccessKeyIDEnvVar + } + if val, ok := resolvedEnv[keyName]; ok && val != "" { + meta.awsAccessKeyID = val + } else { + return meta, fmt.Errorf("'%s' doesn't exist in the deployment environment", keyName) + } + + if keyName = metadata["awsSecretAccessKey"]; keyName == "" { + keyName = awsSecretAccessKeyEnvVar + } + if val, ok := resolvedEnv[keyName]; ok && val != "" { + meta.awsSecretAccessKey = val + } else { + return meta, fmt.Errorf("'%s' doesn't exist in the deployment environment", keyName) + } } } diff --git a/pkg/scalers/aws_kinesis_stream_scaler.go b/pkg/scalers/aws_kinesis_stream_scaler.go index 846d4ccac1d..8740f427e36 100644 --- a/pkg/scalers/aws_kinesis_stream_scaler.go +++ b/pkg/scalers/aws_kinesis_stream_scaler.go @@ -135,16 +135,24 @@ func (s *awsKinesisStreamScaler) GetAwsKinesisOpenShardCount() (int64, error) { sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String(s.metadata.awsRegion), })) - creds := credentials.NewStaticCredentials(s.metadata.awsAuthorization.awsAccessKeyID, s.metadata.awsAuthorization.awsSecretAccessKey, "") - if s.metadata.awsAuthorization.awsRoleArn != "" { - creds = stscreds.NewCredentials(sess, s.metadata.awsAuthorization.awsRoleArn) - } + var kinesisClinent *kinesis.Kinesis + if s.metadata.awsAuthorization.podIdentityOwner { + creds := credentials.NewStaticCredentials(s.metadata.awsAuthorization.awsAccessKeyID, s.metadata.awsAuthorization.awsSecretAccessKey, "") + + if s.metadata.awsAuthorization.awsRoleArn != "" { + creds = stscreds.NewCredentials(sess, s.metadata.awsAuthorization.awsRoleArn) + } - kinesisClinent := kinesis.New(sess, &aws.Config{ - Region: aws.String(s.metadata.awsRegion), - Credentials: creds, - }) + kinesisClinent = kinesis.New(sess, &aws.Config{ + Region: aws.String(s.metadata.awsRegion), + Credentials: creds, + }) + } else { + kinesisClinent = kinesis.New(sess, &aws.Config{ + Region: aws.String(s.metadata.awsRegion), + }) + } output, err := kinesisClinent.DescribeStreamSummary(input) if err != nil { diff --git a/pkg/scalers/aws_kinesis_stream_test.go b/pkg/scalers/aws_kinesis_stream_test.go index eb62bc99c67..ba55f3df1ff 100644 --- a/pkg/scalers/aws_kinesis_stream_test.go +++ b/pkg/scalers/aws_kinesis_stream_test.go @@ -51,6 +51,7 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ awsAuthorization: awsAuthorizationMetadata{ awsAccessKeyID: testAWSKinesisAccessKeyID, awsSecretAccessKey: testAWSKinesisSecretAccessKey, + podIdentityOwner: true, }, }, isError: false, @@ -86,6 +87,7 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ awsAuthorization: awsAuthorizationMetadata{ awsAccessKeyID: testAWSKinesisAccessKeyID, awsSecretAccessKey: testAWSKinesisSecretAccessKey, + podIdentityOwner: true, }, }, isError: false, @@ -103,6 +105,7 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ awsAuthorization: awsAuthorizationMetadata{ awsAccessKeyID: testAWSKinesisAccessKeyID, awsSecretAccessKey: testAWSKinesisSecretAccessKey, + podIdentityOwner: true, }, }, isError: false, @@ -143,11 +146,28 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ streamName: testAWSKinesisStreamName, awsRegion: testAWSRegion, awsAuthorization: awsAuthorizationMetadata{ - awsRoleArn: testAWSKinesisRoleArn, + awsRoleArn: testAWSKinesisRoleArn, + podIdentityOwner: true, }, }, isError: false, comment: "with AWS Role from TriggerAuthentication"}, + {metadata: map[string]string{ + "streamName": testAWSKinesisStreamName, + "shardCount": "2", + "awsRegion": testAWSRegion, + "identityOwner": "operator"}, + authParams: map[string]string{}, + expected: &awsKinesisStreamMetadata{ + targetShardCount: 2, + streamName: testAWSKinesisStreamName, + awsRegion: testAWSRegion, + awsAuthorization: awsAuthorizationMetadata{ + podIdentityOwner: false, + }, + }, + isError: false, + comment: "with AWS Role assigned on KEDA operator itself"}, } func TestKinesisParseMetadata(t *testing.T) { diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index 9834c8e6266..69f18b31d0c 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -152,16 +152,24 @@ func (s *awsSqsQueueScaler) GetAwsSqsQueueLength() (int32, error) { sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String(s.metadata.awsRegion), })) - creds := credentials.NewStaticCredentials(s.metadata.awsAuthorization.awsAccessKeyID, s.metadata.awsAuthorization.awsSecretAccessKey, "") - if s.metadata.awsAuthorization.awsRoleArn != "" { - creds = stscreds.NewCredentials(sess, s.metadata.awsAuthorization.awsRoleArn) - } + var sqsClient *sqs.SQS + if s.metadata.awsAuthorization.podIdentityOwner { + creds := credentials.NewStaticCredentials(s.metadata.awsAuthorization.awsAccessKeyID, s.metadata.awsAuthorization.awsSecretAccessKey, "") + + if s.metadata.awsAuthorization.awsRoleArn != "" { + creds = stscreds.NewCredentials(sess, s.metadata.awsAuthorization.awsRoleArn) + } - sqsClient := sqs.New(sess, &aws.Config{ - Region: aws.String(s.metadata.awsRegion), - Credentials: creds, - }) + sqsClient = sqs.New(sess, &aws.Config{ + Region: aws.String(s.metadata.awsRegion), + Credentials: creds, + }) + } else { + sqsClient = sqs.New(sess, &aws.Config{ + Region: aws.String(s.metadata.awsRegion), + }) + } output, err := sqsClient.GetQueueAttributes(input) if err != nil { diff --git a/pkg/scalers/aws_sqs_queue_test.go b/pkg/scalers/aws_sqs_queue_test.go index 7c143a5bd61..0b55484e889 100644 --- a/pkg/scalers/aws_sqs_queue_test.go +++ b/pkg/scalers/aws_sqs_queue_test.go @@ -117,6 +117,17 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{ }, false, "with AWS Role from TriggerAuthentication"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "identityOwner": "operator"}, + map[string]string{ + "awsAccessKeyId": "", + "awsSecretAccessKey": "", + }, + false, + "with AWS Role assigned on KEDA operator itself"}, } func TestSQSParseMetadata(t *testing.T) {