diff --git a/CHANGELOG.md b/CHANGELOG.md index b38dabcce81..fc2a7f61b7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,11 +35,14 @@ - Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them in query string (bug fix) ([#2055](https://github.com/kedacore/keda/pull/2055)) - TriggerAuthentication/Vault: add support for HashiCorp Vault namespace (Vault Enterprise) ([#2085](https://github.com/kedacore/keda/pull/2085)) - Add custom http timeout in RabbitMQ Scaler ([#2086](https://github.com/kedacore/keda/pull/2086)) +- Artemis Scaler parses out broker config parameters in case `restAPITemplate` is given ([#2104](https://github.com/kedacore/keda/pull/2104)) - Add support to get connection data from Trigger Authorization in MongoDB Scaler ([#2115](https://github.com/kedacore/keda/pull/2115)) - Add support to get connection data from Trigger Authorization in MySQL Scaler ([#2113](https://github.com/kedacore/keda/pull/2113)) - Add support to get connection data from Trigger Authorization in MSSQL Scaler ([#2112](https://github.com/kedacore/keda/pull/2112)) - Add support to get connection data from Trigger Authorization in PostgreSQL Scaler ([#2114](https://github.com/kedacore/keda/pull/2114)) - Add support to provide the metric name in Azure Log Analytics Scaler ([#2106](https://github.com/kedacore/keda/pull/2106)) +- Add `pageSize` (using regex) in RabbitMQ Scaler ([#2162](https://github.com/kedacore/keda/pull/2162)) +- Add `unsafeSsl` parameter in InfluxDB scaler ([#2157](https://github.com/kedacore/keda/pull/2157)) - Improve metric name creation to be unique using scaler index inside the scaler ([#2161](https://github.com/kedacore/keda/pull/2161)) ### Breaking Changes diff --git a/pkg/scalers/artemis_scaler.go b/pkg/scalers/artemis_scaler.go index 30c5106c554..a9c1fc7c16e 100644 --- a/pkg/scalers/artemis_scaler.go +++ b/pkg/scalers/artemis_scaler.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "net/url" "strconv" "strings" @@ -80,14 +81,32 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) { if val, ok := config.TriggerMetadata["restApiTemplate"]; ok && val != "" { meta.restAPITemplate = config.TriggerMetadata["restApiTemplate"] + var err error + if meta, err = getAPIParameters(meta); err != nil { + return nil, fmt.Errorf("can't parse restApiTemplate : %s ", err) + } } else { meta.restAPITemplate = defaultRestAPITemplate - } + if config.TriggerMetadata["managementEndpoint"] == "" { + return nil, errors.New("no management endpoint given") + } + meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] + + if config.TriggerMetadata["queueName"] == "" { + return nil, errors.New("no queue name given") + } + meta.queueName = config.TriggerMetadata["queueName"] + + if config.TriggerMetadata["brokerName"] == "" { + return nil, errors.New("no broker name given") + } + meta.brokerName = config.TriggerMetadata["brokerName"] - if config.TriggerMetadata["managementEndpoint"] == "" { - return nil, errors.New("no management endpoint given") + if config.TriggerMetadata["brokerAddress"] == "" { + return nil, errors.New("no broker address given") + } + meta.brokerAddress = config.TriggerMetadata["brokerAddress"] } - meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" { meta.corsHeader = config.TriggerMetadata["corsHeader"] @@ -95,21 +114,6 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) { meta.corsHeader = fmt.Sprintf(defaultCorsHeader, meta.managementEndpoint) } - if config.TriggerMetadata["queueName"] == "" { - return nil, errors.New("no queue name given") - } - meta.queueName = config.TriggerMetadata["queueName"] - - if config.TriggerMetadata["brokerName"] == "" { - return nil, errors.New("no broker name given") - } - meta.brokerName = config.TriggerMetadata["brokerName"] - - if config.TriggerMetadata["brokerAddress"] == "" { - return nil, errors.New("no broker address given") - } - meta.brokerAddress = config.TriggerMetadata["brokerAddress"] - if val, ok := config.TriggerMetadata["queueLength"]; ok { queueLength, err := strconv.Atoi(val) if err != nil { @@ -167,6 +171,38 @@ func (s *artemisScaler) IsActive(ctx context.Context) (bool, error) { return messages > 0, nil } +// getAPIParameters parse restAPITemplate to provide managementEndpoint , brokerName, brokerAddress, queueName +func getAPIParameters(meta artemisMetadata) (artemisMetadata, error) { + u, err := url.ParseRequestURI(meta.restAPITemplate) + if err != nil { + return meta, fmt.Errorf("unable to parse the artemis restAPITemplate: %s", err) + } + meta.managementEndpoint = u.Host + splitURL := strings.Split(strings.Split(u.RawPath, ":")[1], "/")[0] // This returns : broker="<>",component=addresses,address="<>",subcomponent=queues,routing-type="anycast",queue="<>" + replacer := strings.NewReplacer(",", "&", "\"\"", "") + v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[address:["<>"] broker:["<>"] component:[addresses] queue:["<>"] routing-type:["anycast"] subcomponent:[queues]] + if err != nil { + return meta, fmt.Errorf("unable to parse the artemis restAPITemplate: %s", err) + } + + if len(v["address"][0]) == 0 { + return meta, errors.New("no brokerAddress given") + } + meta.brokerAddress = v["address"][0] + + if len(v["queue"][0]) == 0 { + return meta, errors.New("no queueName is given") + } + meta.queueName = v["queue"][0] + + if len(v["broker"][0]) == 0 { + return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate) + } + meta.brokerName = v["broker"][0] + + return meta, nil +} + func (s *artemisScaler) getMonitoringEndpoint() string { replacer := strings.NewReplacer("<>", s.metadata.managementEndpoint, "<>", s.metadata.queueName, diff --git a/pkg/scalers/artemis_scaler_test.go b/pkg/scalers/artemis_scaler_test.go index e8f3b4b294c..7f127ddd6e2 100644 --- a/pkg/scalers/artemis_scaler_test.go +++ b/pkg/scalers/artemis_scaler_test.go @@ -54,6 +54,9 @@ var testArtemisMetadata = []parseArtemisMetadataTestData{ // Missing password, should fail {map[string]string{"managementEndpoint": "localhost:8161", "queueName": "queue1", "brokerName": "broker-activemq", "brokerAddress": "test", "username": "myUserName", "password": ""}, true}, {map[string]string{"managementEndpoint": "localhost:8161", "queueName": "queue1", "brokerName": "broker-activemq", "brokerAddress": "test", "username": "myUserName", "password": "myPassword"}, false}, + {map[string]string{"restApiTemplate": "http://localhost:8161/console/jolokia/read/org.apache.activemq.artemis:broker=\"broker-activemq\",component=addresses,address=\"test\",subcomponent=queues,routing-type=\"anycast\",queue=\"queue1\"/MessageCount", "username": "myUserName", "password": "myPassword"}, false}, + // Missing brokername , should fail + {map[string]string{"restApiTemplate": "http://localhost:8161/console/jolokia/read/org.apache.activemq.artemis:broker=\"\",component=addresses,address=\"test\",subcomponent=queues,routing-type=\"anycast\",queue=\"queue1\"/MessageCount", "username": "myUserName", "password": "myPassword"}, true}, } var artemisMetricIdentifiers = []artemisMetricIdentifier{ diff --git a/pkg/scalers/influxdb_scaler.go b/pkg/scalers/influxdb_scaler.go index cd064daf452..57b9b4582b0 100644 --- a/pkg/scalers/influxdb_scaler.go +++ b/pkg/scalers/influxdb_scaler.go @@ -2,6 +2,7 @@ package scalers import ( "context" + "crypto/tls" "fmt" "net/url" "strconv" @@ -28,6 +29,7 @@ type influxDBMetadata struct { organizationName string query string serverURL string + unsafeSsL bool thresholdValue float64 scalerIndex int } @@ -42,7 +44,13 @@ func NewInfluxDBScaler(config *ScalerConfig) (Scaler, error) { } influxDBLog.Info("starting up influxdb client") - + // In case unsafeSsL is enabled. + if meta.unsafeSsL { + return &influxDBScaler{ + client: influxdb2.NewClientWithOptions(meta.serverURL, meta.authToken, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true})), + metadata: meta, + }, nil + } return &influxDBScaler{ client: influxdb2.NewClient(meta.serverURL, meta.authToken), metadata: meta, @@ -56,6 +64,7 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) { var organizationName string var query string var serverURL string + var unsafeSsL bool var thresholdValue float64 val, ok := config.TriggerMetadata["authToken"] @@ -128,6 +137,14 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) { } else { return nil, fmt.Errorf("no threshold value given") } + unsafeSsL = false + if val, ok := config.TriggerMetadata["unsafeSsL"]; ok { + parsedVal, err := strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("error parsing unsafeSsL: %s", err) + } + unsafeSsL = parsedVal + } return &influxDBMetadata{ authToken: authToken, @@ -137,6 +154,7 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) { serverURL: serverURL, thresholdValue: thresholdValue, scalerIndex: config.ScalerIndex, + unsafeSsL: unsafeSsL, }, nil } diff --git a/pkg/scalers/influxdb_scaler_test.go b/pkg/scalers/influxdb_scaler_test.go index 88959e0d312..d528b7e19ea 100644 --- a/pkg/scalers/influxdb_scaler_test.go +++ b/pkg/scalers/influxdb_scaler_test.go @@ -27,21 +27,23 @@ var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{ // nothing passed {map[string]string{}, true, map[string]string{}}, // everything is passed in verbatim - {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}}, + {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken", "unsafeSsL": "false"}, false, map[string]string{}}, // everything is passed in (environment variables) - {map[string]string{"serverURL": "https://influxdata.com", "organizationNameFromEnv": "INFLUX_ORG", "query": "from(bucket: hello)", "thresholdValue": "10", "authTokenFromEnv": "INFLUX_TOKEN"}, false, map[string]string{}}, + {map[string]string{"serverURL": "https://influxdata.com", "organizationNameFromEnv": "INFLUX_ORG", "query": "from(bucket: hello)", "thresholdValue": "10", "authTokenFromEnv": "INFLUX_TOKEN", "unsafeSsL": "false"}, false, map[string]string{}}, // no serverURL passed - {map[string]string{"metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}}, + {map[string]string{"metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken", "unsafeSsL": "false"}, true, map[string]string{}}, // no organization name passed - {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}}, + {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken", "unsafeSsL": "false"}, true, map[string]string{}}, // no query passed - {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}}, + {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "thresholdValue": "10", "authToken": "myToken", "unsafeSsL": "false"}, true, map[string]string{}}, // no threshold value passed - {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "authToken": "myToken"}, true, map[string]string{}}, + {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "authToken": "myToken", "unsafeSsL": "false"}, true, map[string]string{}}, // no auth token passed - {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10"}, true, map[string]string{}}, + {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsL": "false"}, true, map[string]string{}}, // authToken, organizationName, and serverURL are defined in authParams - {map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}}, + {map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsL": "false"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}}, + // no sunsafeSsl value passed + {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}}, } var influxDBMetricIdentifiers = []influxDBMetricIdentifier{ diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 67452d9a59a..81e51c2aa15 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -67,6 +67,7 @@ type rabbitMQMetadata struct { protocol string // either http or amqp protocol vhostName *string // override the vhost from the connection info useRegex bool // specify if the queueName contains a rexeg + pageSize int // specify the page size if useRegex is enabled operation string // specify the operation to apply in case of multiples queues metricName string // custom metric name for trigger timeout time.Duration // custom http timeout for a specific trigger @@ -198,6 +199,20 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { meta.useRegex = useRegex } + // Resolve pageSize + if val, ok := config.TriggerMetadata["pageSize"]; ok { + pageSize, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("pageSize has invalid value") + } + meta.pageSize = int(pageSize) + if meta.pageSize < 1 { + return nil, fmt.Errorf("pageSize should be 1 or greater than 1") + } + } else { + meta.pageSize = 100 + } + // Resolve operation meta.operation = defaultOperation if val, ok := config.TriggerMetadata["operation"]; ok { @@ -415,9 +430,9 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) { parsedURL.Path = "" var getQueueInfoManagementURI string if s.metadata.useRegex { - getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?page=1&use_regex=true&pagination=false&name=", url.QueryEscape(s.metadata.queueName)) + getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), url.QueryEscape(s.metadata.queueName), s.metadata.pageSize) } else { - getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, url.QueryEscape(s.metadata.queueName)) + getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s/%s", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName)) } var info queueInfo diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 8047a228465..2677851aab0 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -105,6 +105,12 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "error"}, true, map[string]string{}}, // amqp timeout {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "amqp://", "timeout": "10"}, true, map[string]string{}}, + // valid pageSize + {map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "100"}, false, map[string]string{}}, + // pageSize less than 1 + {map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "-1"}, true, map[string]string{}}, + // invalid pageSize + {map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "a"}, true, map[string]string{}}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{ @@ -322,7 +328,7 @@ var testRegexQueueInfoTestData = []getQueueInfoTestData{ func TestGetQueueInfoWithRegex(t *testing.T) { for _, testData := range testRegexQueueInfoTestData { var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24" + expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24&page_size=100" if r.RequestURI != expectedPath { t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI) } @@ -379,6 +385,68 @@ func TestGetQueueInfoWithRegex(t *testing.T) { } } +type getRegexPageSizeTestData struct { + queueInfo getQueueInfoTestData + pageSize int +} + +var testRegexPageSizeTestData = []getRegexPageSizeTestData{ + {testRegexQueueInfoTestData[0], 100}, + {testRegexQueueInfoTestData[0], 200}, + {testRegexQueueInfoTestData[0], 500}, +} + +func TestGetPageSizeWithRegex(t *testing.T) { + for _, testData := range testRegexPageSizeTestData { + var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + expectedPath := fmt.Sprintf("/api/queues?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.pageSize) + if r.RequestURI != expectedPath { + t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI) + } + + w.WriteHeader(testData.queueInfo.responseStatus) + _, err := w.Write([]byte(testData.queueInfo.response)) + if err != nil { + t.Error("Expect request path to =", testData.queueInfo.response, "but it is", err) + } + })) + + resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.queueInfo.vhostPath), "plainHost": apiStub.URL} + + metadata := map[string]string{ + "queueName": "^evaluate_trials$", + "hostFromEnv": host, + "protocol": "http", + "useRegex": "true", + "pageSize": fmt.Sprint(testData.pageSize), + } + + s, err := NewRabbitMQScaler( + &ScalerConfig{ + ResolvedEnv: resolvedEnv, + TriggerMetadata: metadata, + AuthParams: map[string]string{}, + GlobalHTTPTimeout: 1000 * time.Millisecond, + }, + ) + + if err != nil { + t.Error("Expect success", err) + } + + ctx := context.TODO() + active, err := s.IsActive(ctx) + + if err != nil { + t.Error("Expect success", err) + } + + if !active { + t.Error("Expect to be active") + } + } +} + func TestRabbitMQGetMetricSpecForScaling(t *testing.T) { for _, testData := range rabbitMQMetricIdentifiers { meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil, ScalerIndex: testData.index}) @@ -453,7 +521,7 @@ var testRegexQueueInfoNavigationTestData = []getQueueInfoNavigationTestData{ func TestRegexQueueMissingError(t *testing.T) { for _, testData := range testRegexQueueInfoNavigationTestData { var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials" + expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials&page_size=100" if r.RequestURI != expectedPath { t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI) }