diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ad66a69b84..2acefe8ac9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,9 +31,9 @@ - Optimize Kafka scaler's `getLagForPartition` function. ([#1464](https://github.com/kedacore/keda/pull/1464)) - Reduce unnecessary /scale requests from ScaledObject controller ([#1453](https://github.com/kedacore/keda/pull/1453)) - Add support for the WATCH_NAMESPACE environment variable to the operator ([#1474](https://github.com/kedacore/keda/pull/1474)) +- Automatically determine the RabbitMQ protocol when possible, and support setting the protocl via TriggerAuthentication ([#1459](https://github.com/kedacore/keda/pulls/1459)) - Improve performance when fetching pod information ([#1457](https://github.com/kedacore/keda/pull/1457)) - ### Breaking Changes ### Other diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 60effdd4ee5..43bda488aa4 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -29,7 +29,8 @@ const ( const ( httpProtocol = "http" amqpProtocol = "amqp" - defaultProtocol = amqpProtocol + autoProtocol = "auto" + defaultProtocol = autoProtocol ) type rabbitMQScaler struct { @@ -99,12 +100,14 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { // Resolve protocol type meta.protocol = defaultProtocol + if val, ok := config.AuthParams["protocol"]; ok { + meta.protocol = val + } if val, ok := config.TriggerMetadata["protocol"]; ok { - if val == amqpProtocol || val == httpProtocol { - meta.protocol = val - } else { - return nil, fmt.Errorf("the protocol has to be either `%s` or `%s` but is `%s`", amqpProtocol, httpProtocol, val) - } + meta.protocol = val + } + if meta.protocol != amqpProtocol && meta.protocol != httpProtocol && meta.protocol != autoProtocol { + return nil, fmt.Errorf("the protocol has to be either `%s`, `%s`, or `%s` but is `%s`", amqpProtocol, httpProtocol, autoProtocol, meta.protocol) } // Resolve host value @@ -119,6 +122,22 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { return nil, fmt.Errorf("no host setting given") } + // If the protocol is auto, check the host scheme. + if meta.protocol == autoProtocol { + parsedURL, err := url.Parse(meta.host) + if err != nil { + return nil, fmt.Errorf("can't parse host to find protocol: %s", err) + } + switch parsedURL.Scheme { + case "amqp", "amqps": + meta.protocol = amqpProtocol + case "http", "https": + meta.protocol = httpProtocol + default: + return nil, fmt.Errorf("unknown host URL scheme `%s`", parsedURL.Scheme) + } + } + // Resolve queueName if val, ok := config.TriggerMetadata["queueName"]; ok { meta.queueName = val diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 9f3708497a2..7bb3ec681f2 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -50,6 +50,14 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"vhostName": "myVhost", "queueName": "namespace/name", "hostFromEnv": host}, false, map[string]string{}}, // vhost passed but empty {map[string]string{"vhostName": "", "queueName": "namespace/name", "hostFromEnv": host}, false, map[string]string{}}, + // protocol defined in authParams + {map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{"protocol": "http"}}, + // auto protocol and a bad URL + {map[string]string{"queueName": "sample", "host": "something://"}, true, map[string]string{}}, + // auto protocol and an HTTP URL + {map[string]string{"queueName": "sample", "host": "http://"}, false, map[string]string{}}, + // auto protocol and an HTTPS URL + {map[string]string{"queueName": "sample", "host": "https://"}, false, map[string]string{}}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{ @@ -71,9 +79,9 @@ func TestRabbitMQParseMetadata(t *testing.T) { var testDefaultQueueLength = []parseRabbitMQMetadataTestData{ // use default queueLength - {map[string]string{"queueName": "sample", "host": host}, false, map[string]string{}}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{}}, // use default queueLength with includeUnacked - {map[string]string{"queueName": "sample", "host": host, "protocol": "http"}, false, map[string]string{}}, + {map[string]string{"queueName": "sample", "hostFromEnv": host, "protocol": "http"}, false, map[string]string{}}, } func TestParseDefaultQueueLength(t *testing.T) { @@ -194,7 +202,7 @@ func TestGetQueueInfo(t *testing.T) { func TestRabbitMQGetMetricSpecForScaling(t *testing.T) { for _, testData := range rabbitMQMetricIdentifiers { - meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: map[string]string{"myHostSecret": "myHostSecret"}, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil}) + meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil}) if err != nil { t.Fatal("Could not parse metadata:", err) }