Skip to content

Commit

Permalink
Merge branch 'main' into unique_names_index_in_scaler
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Turrado Ferrero authored Oct 10, 2021
2 parents 1298324 + 7e08fc6 commit 2b43756
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 32 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
- Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them in query string (bug fix) ([#2055](https://github.com/kedacore/keda/pull/2055))
- TriggerAuthentication/Vault: add support for HashiCorp Vault namespace (Vault Enterprise) ([#2085](https://github.com/kedacore/keda/pull/2085))
- Add custom http timeout in RabbitMQ Scaler ([#2086](https://github.com/kedacore/keda/pull/2086))
- Artemis Scaler parses out broker config parameters in case `restAPITemplate` is given ([#2104](https://github.com/kedacore/keda/pull/2104))
- Add support to get connection data from Trigger Authorization in MongoDB Scaler ([#2115](https://github.com/kedacore/keda/pull/2115))
- Add support to get connection data from Trigger Authorization in MySQL Scaler ([#2113](https://github.com/kedacore/keda/pull/2113))
- Add support to get connection data from Trigger Authorization in MSSQL Scaler ([#2112](https://github.com/kedacore/keda/pull/2112))
- Add support to get connection data from Trigger Authorization in PostgreSQL Scaler ([#2114](https://github.com/kedacore/keda/pull/2114))
- Add support to provide the metric name in Azure Log Analytics Scaler ([#2106](https://github.com/kedacore/keda/pull/2106))
- Add `pageSize` (using regex) in RabbitMQ Scaler ([#2162](https://github.com/kedacore/keda/pull/2162))
- Add `unsafeSsl` parameter in InfluxDB scaler ([#2157](https://github.com/kedacore/keda/pull/2157))
- Improve metric name creation to be unique using scaler index inside the scaler ([#2161](https://github.com/kedacore/keda/pull/2161))

### Breaking Changes
Expand Down
74 changes: 55 additions & 19 deletions pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

Expand Down Expand Up @@ -80,36 +81,39 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) {

if val, ok := config.TriggerMetadata["restApiTemplate"]; ok && val != "" {
meta.restAPITemplate = config.TriggerMetadata["restApiTemplate"]
var err error
if meta, err = getAPIParameters(meta); err != nil {
return nil, fmt.Errorf("can't parse restApiTemplate : %s ", err)
}
} else {
meta.restAPITemplate = defaultRestAPITemplate
}
if config.TriggerMetadata["managementEndpoint"] == "" {
return nil, errors.New("no management endpoint given")
}
meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"]

if config.TriggerMetadata["queueName"] == "" {
return nil, errors.New("no queue name given")
}
meta.queueName = config.TriggerMetadata["queueName"]

if config.TriggerMetadata["brokerName"] == "" {
return nil, errors.New("no broker name given")
}
meta.brokerName = config.TriggerMetadata["brokerName"]

if config.TriggerMetadata["managementEndpoint"] == "" {
return nil, errors.New("no management endpoint given")
if config.TriggerMetadata["brokerAddress"] == "" {
return nil, errors.New("no broker address given")
}
meta.brokerAddress = config.TriggerMetadata["brokerAddress"]
}
meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"]

if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" {
meta.corsHeader = config.TriggerMetadata["corsHeader"]
} else {
meta.corsHeader = fmt.Sprintf(defaultCorsHeader, meta.managementEndpoint)
}

if config.TriggerMetadata["queueName"] == "" {
return nil, errors.New("no queue name given")
}
meta.queueName = config.TriggerMetadata["queueName"]

if config.TriggerMetadata["brokerName"] == "" {
return nil, errors.New("no broker name given")
}
meta.brokerName = config.TriggerMetadata["brokerName"]

if config.TriggerMetadata["brokerAddress"] == "" {
return nil, errors.New("no broker address given")
}
meta.brokerAddress = config.TriggerMetadata["brokerAddress"]

if val, ok := config.TriggerMetadata["queueLength"]; ok {
queueLength, err := strconv.Atoi(val)
if err != nil {
Expand Down Expand Up @@ -167,6 +171,38 @@ func (s *artemisScaler) IsActive(ctx context.Context) (bool, error) {
return messages > 0, nil
}

// getAPIParameters parse restAPITemplate to provide managementEndpoint , brokerName, brokerAddress, queueName
func getAPIParameters(meta artemisMetadata) (artemisMetadata, error) {
u, err := url.ParseRequestURI(meta.restAPITemplate)
if err != nil {
return meta, fmt.Errorf("unable to parse the artemis restAPITemplate: %s", err)
}
meta.managementEndpoint = u.Host
splitURL := strings.Split(strings.Split(u.RawPath, ":")[1], "/")[0] // This returns : broker="<<brokerName>>",component=addresses,address="<<brokerAddress>>",subcomponent=queues,routing-type="anycast",queue="<<queueName>>"
replacer := strings.NewReplacer(",", "&", "\"\"", "")
v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[address:["<<brokerAddress>>"] broker:["<<brokerName>>"] component:[addresses] queue:["<<queueName>>"] routing-type:["anycast"] subcomponent:[queues]]
if err != nil {
return meta, fmt.Errorf("unable to parse the artemis restAPITemplate: %s", err)
}

if len(v["address"][0]) == 0 {
return meta, errors.New("no brokerAddress given")
}
meta.brokerAddress = v["address"][0]

if len(v["queue"][0]) == 0 {
return meta, errors.New("no queueName is given")
}
meta.queueName = v["queue"][0]

if len(v["broker"][0]) == 0 {
return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate)
}
meta.brokerName = v["broker"][0]

return meta, nil
}

func (s *artemisScaler) getMonitoringEndpoint() string {
replacer := strings.NewReplacer("<<managementEndpoint>>", s.metadata.managementEndpoint,
"<<queueName>>", s.metadata.queueName,
Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/artemis_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ var testArtemisMetadata = []parseArtemisMetadataTestData{
// Missing password, should fail
{map[string]string{"managementEndpoint": "localhost:8161", "queueName": "queue1", "brokerName": "broker-activemq", "brokerAddress": "test", "username": "myUserName", "password": ""}, true},
{map[string]string{"managementEndpoint": "localhost:8161", "queueName": "queue1", "brokerName": "broker-activemq", "brokerAddress": "test", "username": "myUserName", "password": "myPassword"}, false},
{map[string]string{"restApiTemplate": "http://localhost:8161/console/jolokia/read/org.apache.activemq.artemis:broker=\"broker-activemq\",component=addresses,address=\"test\",subcomponent=queues,routing-type=\"anycast\",queue=\"queue1\"/MessageCount", "username": "myUserName", "password": "myPassword"}, false},
// Missing brokername , should fail
{map[string]string{"restApiTemplate": "http://localhost:8161/console/jolokia/read/org.apache.activemq.artemis:broker=\"\",component=addresses,address=\"test\",subcomponent=queues,routing-type=\"anycast\",queue=\"queue1\"/MessageCount", "username": "myUserName", "password": "myPassword"}, true},
}

var artemisMetricIdentifiers = []artemisMetricIdentifier{
Expand Down
20 changes: 19 additions & 1 deletion pkg/scalers/influxdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scalers

import (
"context"
"crypto/tls"
"fmt"
"net/url"
"strconv"
Expand All @@ -28,6 +29,7 @@ type influxDBMetadata struct {
organizationName string
query string
serverURL string
unsafeSsL bool
thresholdValue float64
scalerIndex int
}
Expand All @@ -42,7 +44,13 @@ func NewInfluxDBScaler(config *ScalerConfig) (Scaler, error) {
}

influxDBLog.Info("starting up influxdb client")

// In case unsafeSsL is enabled.
if meta.unsafeSsL {
return &influxDBScaler{
client: influxdb2.NewClientWithOptions(meta.serverURL, meta.authToken, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true})),
metadata: meta,
}, nil
}
return &influxDBScaler{
client: influxdb2.NewClient(meta.serverURL, meta.authToken),
metadata: meta,
Expand All @@ -56,6 +64,7 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) {
var organizationName string
var query string
var serverURL string
var unsafeSsL bool
var thresholdValue float64

val, ok := config.TriggerMetadata["authToken"]
Expand Down Expand Up @@ -128,6 +137,14 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) {
} else {
return nil, fmt.Errorf("no threshold value given")
}
unsafeSsL = false
if val, ok := config.TriggerMetadata["unsafeSsL"]; ok {
parsedVal, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsL: %s", err)
}
unsafeSsL = parsedVal
}

return &influxDBMetadata{
authToken: authToken,
Expand All @@ -137,6 +154,7 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) {
serverURL: serverURL,
thresholdValue: thresholdValue,
scalerIndex: config.ScalerIndex,
unsafeSsL: unsafeSsL,
}, nil
}

Expand Down
18 changes: 10 additions & 8 deletions pkg/scalers/influxdb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,23 @@ var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{
// nothing passed
{map[string]string{}, true, map[string]string{}},
// everything is passed in verbatim
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}},
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken", "unsafeSsL": "false"}, false, map[string]string{}},
// everything is passed in (environment variables)
{map[string]string{"serverURL": "https://influxdata.com", "organizationNameFromEnv": "INFLUX_ORG", "query": "from(bucket: hello)", "thresholdValue": "10", "authTokenFromEnv": "INFLUX_TOKEN"}, false, map[string]string{}},
{map[string]string{"serverURL": "https://influxdata.com", "organizationNameFromEnv": "INFLUX_ORG", "query": "from(bucket: hello)", "thresholdValue": "10", "authTokenFromEnv": "INFLUX_TOKEN", "unsafeSsL": "false"}, false, map[string]string{}},
// no serverURL passed
{map[string]string{"metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}},
{map[string]string{"metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken", "unsafeSsL": "false"}, true, map[string]string{}},
// no organization name passed
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}},
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken", "unsafeSsL": "false"}, true, map[string]string{}},
// no query passed
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}},
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "thresholdValue": "10", "authToken": "myToken", "unsafeSsL": "false"}, true, map[string]string{}},
// no threshold value passed
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "authToken": "myToken"}, true, map[string]string{}},
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "authToken": "myToken", "unsafeSsL": "false"}, true, map[string]string{}},
// no auth token passed
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10"}, true, map[string]string{}},
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsL": "false"}, true, map[string]string{}},
// authToken, organizationName, and serverURL are defined in authParams
{map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}},
{map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsL": "false"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}},
// no sunsafeSsl value passed
{map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}},
}

var influxDBMetricIdentifiers = []influxDBMetricIdentifier{
Expand Down
19 changes: 17 additions & 2 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type rabbitMQMetadata struct {
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
useRegex bool // specify if the queueName contains a rexeg
pageSize int // specify the page size if useRegex is enabled
operation string // specify the operation to apply in case of multiples queues
metricName string // custom metric name for trigger
timeout time.Duration // custom http timeout for a specific trigger
Expand Down Expand Up @@ -198,6 +199,20 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
meta.useRegex = useRegex
}

// Resolve pageSize
if val, ok := config.TriggerMetadata["pageSize"]; ok {
pageSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("pageSize has invalid value")
}
meta.pageSize = int(pageSize)
if meta.pageSize < 1 {
return nil, fmt.Errorf("pageSize should be 1 or greater than 1")
}
} else {
meta.pageSize = 100
}

// Resolve operation
meta.operation = defaultOperation
if val, ok := config.TriggerMetadata["operation"]; ok {
Expand Down Expand Up @@ -415,9 +430,9 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?page=1&use_regex=true&pagination=false&name=", url.QueryEscape(s.metadata.queueName))
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), url.QueryEscape(s.metadata.queueName), s.metadata.pageSize)
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, url.QueryEscape(s.metadata.queueName))
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s/%s", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName))
}

var info queueInfo
Expand Down
72 changes: 70 additions & 2 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "error"}, true, map[string]string{}},
// amqp timeout
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "amqp://", "timeout": "10"}, true, map[string]string{}},
// valid pageSize
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "100"}, false, map[string]string{}},
// pageSize less than 1
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "-1"}, true, map[string]string{}},
// invalid pageSize
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "a"}, true, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand Down Expand Up @@ -322,7 +328,7 @@ var testRegexQueueInfoTestData = []getQueueInfoTestData{
func TestGetQueueInfoWithRegex(t *testing.T) {
for _, testData := range testRegexQueueInfoTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24"
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24&page_size=100"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down Expand Up @@ -379,6 +385,68 @@ func TestGetQueueInfoWithRegex(t *testing.T) {
}
}

type getRegexPageSizeTestData struct {
queueInfo getQueueInfoTestData
pageSize int
}

var testRegexPageSizeTestData = []getRegexPageSizeTestData{
{testRegexQueueInfoTestData[0], 100},
{testRegexQueueInfoTestData[0], 200},
{testRegexQueueInfoTestData[0], 500},
}

func TestGetPageSizeWithRegex(t *testing.T) {
for _, testData := range testRegexPageSizeTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := fmt.Sprintf("/api/queues?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.pageSize)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}

w.WriteHeader(testData.queueInfo.responseStatus)
_, err := w.Write([]byte(testData.queueInfo.response))
if err != nil {
t.Error("Expect request path to =", testData.queueInfo.response, "but it is", err)
}
}))

resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.queueInfo.vhostPath), "plainHost": apiStub.URL}

metadata := map[string]string{
"queueName": "^evaluate_trials$",
"hostFromEnv": host,
"protocol": "http",
"useRegex": "true",
"pageSize": fmt.Sprint(testData.pageSize),
}

s, err := NewRabbitMQScaler(
&ScalerConfig{
ResolvedEnv: resolvedEnv,
TriggerMetadata: metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
},
)

if err != nil {
t.Error("Expect success", err)
}

ctx := context.TODO()
active, err := s.IsActive(ctx)

if err != nil {
t.Error("Expect success", err)
}

if !active {
t.Error("Expect to be active")
}
}
}

func TestRabbitMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range rabbitMQMetricIdentifiers {
meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil, ScalerIndex: testData.index})
Expand Down Expand Up @@ -453,7 +521,7 @@ var testRegexQueueInfoNavigationTestData = []getQueueInfoNavigationTestData{
func TestRegexQueueMissingError(t *testing.T) {
for _, testData := range testRegexQueueInfoNavigationTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials"
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials&page_size=100"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down

0 comments on commit 2b43756

Please sign in to comment.