From d5d02a379d9c31d3c090826dc74857c81efaf61b Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Fri, 19 Nov 2021 17:32:56 +0300 Subject: [PATCH 1/9] add new ActiveMQ scaler with it's unit test Signed-off-by: melisatanrverdi --- config/manager/kustomization.yaml | 2 +- config/metrics-server/kustomization.yaml | 2 +- pkg/mock/mock_client/mock_interfaces.go | 2 +- pkg/mock/mock_scale/mock_interfaces.go | 2 +- pkg/scalers/activemq_scaler.go | 268 +++++++++++++++++++++++ pkg/scalers/activemq_scaler_test.go | 112 ++++++++++ pkg/scaling/scale_handler.go | 2 + 7 files changed, 386 insertions(+), 4 deletions(-) create mode 100644 pkg/scalers/activemq_scaler.go create mode 100644 pkg/scalers/activemq_scaler_test.go diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 77c0cd34eaf..e3a2a128f46 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -5,5 +5,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: ghcr.io/kedacore/keda - newName: ghcr.io/kedacore/keda + newName: docker.io/melisatanrverdi/keda newTag: main diff --git a/config/metrics-server/kustomization.yaml b/config/metrics-server/kustomization.yaml index bd650d97723..fcf2ffb300a 100644 --- a/config/metrics-server/kustomization.yaml +++ b/config/metrics-server/kustomization.yaml @@ -10,5 +10,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: ghcr.io/kedacore/keda-metrics-apiserver - newName: ghcr.io/kedacore/keda-metrics-apiserver + newName: docker.io/melisatanrverdi/keda-metrics-apiserver newTag: main diff --git a/pkg/mock/mock_client/mock_interfaces.go b/pkg/mock/mock_client/mock_interfaces.go index b33495dcdf2..115bc162416 100644 --- a/pkg/mock/mock_client/mock_interfaces.go +++ b/pkg/mock/mock_client/mock_interfaces.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: /go/pkg/mod/sigs.k8s.io/controller-runtime@v0.10.2/pkg/client/interfaces.go +// Source: /home/melisa/pkg/mod/sigs.k8s.io/controller-runtime@v0.10.3/pkg/client/interfaces.go // Package mock_client is a generated GoMock package. package mock_client diff --git a/pkg/mock/mock_scale/mock_interfaces.go b/pkg/mock/mock_scale/mock_interfaces.go index 48c6afb71bd..460da252f6b 100644 --- a/pkg/mock/mock_scale/mock_interfaces.go +++ b/pkg/mock/mock_scale/mock_interfaces.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: /home/ahmed/go/pkg/mod/k8s.io/client-go@v0.22.2/scale/interfaces.go +// Source: /home/melisa/pkg/mod/k8s.io/client-go@v0.22.3/scale/interfaces.go // Package mock_scale is a generated GoMock package. package mock_scale diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go new file mode 100644 index 00000000000..a49c68caa04 --- /dev/null +++ b/pkg/scalers/activemq_scaler.go @@ -0,0 +1,268 @@ +package scalers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type activeMQScaler struct { + metadata *activeMQMetadata + httpClient *http.Client +} + +type activeMQMetadata struct { + managementEndpoint string + destinationName string + brokerName string + username string + password string + restAPITemplate string + targetQueueSize int + metricName string + scalerIndex int +} + +type activeMQMonitoring struct { + MsgCount int `json:"value"` + Status int `json:"status"` + Timestamp int64 `json:"timestamp"` +} + +const ( + defaultTargetQueueSize = 10 + defaultActiveMQRestAPITemplate = "http://<>/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=<>,destinationType=Queue,destinationName=<>/QueueSize" +) + +var activeMQLog = logf.Log.WithName("activeMQ_scaler") + +// NewActiveMQScaler creates a new activeMQ Scaler +func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) { + meta, err := parseActiveMQMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing ActiveMQ metadata: %s", err) + } + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) + + return &activeMQScaler{ + metadata: meta, + httpClient: httpClient, + }, nil +} + +func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) { + meta := activeMQMetadata{} + + if val, ok := config.TriggerMetadata["restAPITemplate"]; ok && val != "" { + meta.restAPITemplate = config.TriggerMetadata["restAPITemplate"] + var err error + if meta, err = getRestAPIParameters(meta); err != nil { + return nil, fmt.Errorf("can't parse restAPITemplate : %s ", err) + } + } else { + meta.restAPITemplate = defaultActiveMQRestAPITemplate + if config.TriggerMetadata["managementEndpoint"] == "" { + return nil, errors.New("no management endpoint given") + } + meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] + + if config.TriggerMetadata["destinationName"] == "" { + return nil, errors.New("no destination name given") + } + meta.destinationName = config.TriggerMetadata["destinationName"] + + if config.TriggerMetadata["brokerName"] == "" { + return nil, errors.New("no broker name given") + } + meta.brokerName = config.TriggerMetadata["brokerName"] + } + + if val, ok := config.TriggerMetadata["targetQueueSize"]; ok { + queueSize, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("invalid targetQueueSize - must be an integer") + } + + meta.targetQueueSize = queueSize + } else { + meta.targetQueueSize = defaultTargetQueueSize + } + + if val, ok := config.AuthParams["username"]; ok && val != "" { + meta.username = val + } else if val, ok := config.TriggerMetadata["username"]; ok && val != "" { + username := val + + if val, ok := config.ResolvedEnv[username]; ok && val != "" { + meta.username = val + } else { + meta.username = username + } + } + + if meta.username == "" { + return nil, fmt.Errorf("username cannot be empty") + } + + if val, ok := config.AuthParams["password"]; ok && val != "" { + meta.password = val + } else if val, ok := config.TriggerMetadata["password"]; ok && val != "" { + password := val + + if val, ok := config.ResolvedEnv[password]; ok && val != "" { + meta.password = val + } else { + meta.password = password + } + } + + if meta.password == "" { + return nil, fmt.Errorf("password cannot be empty") + } + + if val, ok := config.TriggerMetadata["metricName"]; ok { + meta.metricName = kedautil.NormalizeString(fmt.Sprintf("activemq-%s", val)) + } else { + meta.metricName = kedautil.NormalizeString(fmt.Sprintf("activemq-%s", meta.destinationName)) + } + + meta.scalerIndex = config.ScalerIndex + + return &meta, nil +} + +func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) { + queueSize, err := s.getQueueMessageCount(ctx) + if err != nil { + activeMQLog.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + return false, err + } + + return queueSize > 0, nil +} + +// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName +func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) { + u, err := url.ParseRequestURI(meta.restAPITemplate) + if err != nil { + return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %s", err) + } + + meta.managementEndpoint = u.Host + splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<>,destinationType=Queue,destinationName=<> + replacer := strings.NewReplacer(",", "&") + v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<>] destinationName:[<>] destinationType:[Queue] type:[Broker]] + if err != nil { + return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %s", err) + } + + if len(v["destinationName"][0]) == 0 { + return meta, errors.New("no destinationName is given") + } + meta.destinationName = v["destinationName"][0] + + if len(v["brokerName"][0]) == 0 { + return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate) + } + meta.brokerName = v["brokerName"][0] + + return meta, nil +} + +func (s *activeMQScaler) getMonitoringEndpoint() string { + replacer := strings.NewReplacer("<>", s.metadata.managementEndpoint, + "<>", s.metadata.brokerName, + "<>", s.metadata.destinationName) + + monitoringEndpoint := replacer.Replace(s.metadata.restAPITemplate) + + return monitoringEndpoint +} + +func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error) { + var monitoringInfo *activeMQMonitoring + var queueMessageCount int + + client := s.httpClient + url := s.getMonitoringEndpoint() + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + + // Add HTTP Auth and Headers + req.SetBasicAuth(s.metadata.username, s.metadata.password) + req.Header.Set("Content-Type", "application/json") + + if err != nil { + return -1, err + } + resp, err := client.Do(req) + if err != nil { + return -1, err + } + + defer resp.Body.Close() + + if err := json.NewDecoder(resp.Body).Decode(&monitoringInfo); err != nil { + return -1, err + } + if resp.StatusCode == 200 && monitoringInfo.Status == 200 { + queueMessageCount = monitoringInfo.MsgCount + } else { + return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) + } + + activeMQLog.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize)) + + return queueMessageCount, nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(int64(s.metadata.targetQueueSize), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, s.metadata.metricName), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + queueSize, err := s.getQueueMessageCount(ctx) + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting ActiveMQ queue size: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(queueSize), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +func (s *activeMQScaler) Close(context.Context) error { + return nil +} diff --git a/pkg/scalers/activemq_scaler_test.go b/pkg/scalers/activemq_scaler_test.go new file mode 100644 index 00000000000..631a595b77e --- /dev/null +++ b/pkg/scalers/activemq_scaler_test.go @@ -0,0 +1,112 @@ +package scalers + +import ( + "context" + "fmt" + "net/http" + "testing" +) + +const ( + testInvalidRestAPITemplate = "testInvalidRestAPITemplate" +) + +type parseActiveMQMetadataTestData struct { + metadata map[string]string + isError bool + authParams map[string]string +} + +type activeMQMetricIdentifier struct { + metadataTestData *parseActiveMQMetadataTestData + scalerIndex int + name string +} + +// Setting metric identifier mock name +var activeMQMetricIdentifiers = []activeMQMetricIdentifier{ + {&testActiveMQMetadata[1], 0, "s0-activemq-testMetricName"}, + {&testActiveMQMetadata[2], 1, "s1-activemq-testQueue"}, +} + +var testActiveMQMetadata = []parseActiveMQMetadataTestData{ + // Nothing passed + {map[string]string{}, true, map[string]string{}}, + // Properly formed metadata + {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, false, map[string]string{"username": "testUsername", "password": "pass123"}}, + // no metricName passed, metricName is generated from destinationName + {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10"}, false, map[string]string{"username": "testUsername", "password": "pass123"}}, + // Invalid targetQueueSize using a string + {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "AA", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, + // Missing management endpoint should fail + {map[string]string{"destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, + // Missing destination name, should fail + {map[string]string{"managementEndpoint": "localhost:8161", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, + // Missing broker name, should fail + {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, + // Missing username, should fail + {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"password": "pass123"}}, + // Missing password, should fail + {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername"}}, + // Properly formed metadata with restAPITemplate + {map[string]string{"restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", "targetQueueSize": "10", "metricName": "testMetricName"}, false, map[string]string{"username": "testUsername", "password": "pass123"}}, + // Invalid restAPITemplate, should fail + {map[string]string{"restAPITemplate": testInvalidRestAPITemplate, "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, + // Missing username, should fail + {map[string]string{"restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"password": "pass123"}}, + // Missing password, should fail + {map[string]string{"restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername"}}, +} + +func TestActiveMQParseMetadata(t *testing.T) { + for _, testData := range testActiveMQMetadata { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] { + t.Error("Expected password from configuration but found something else: ", metadata.password) + fmt.Println(testData) + } + } +} + +var testDefaultTargetQueueSize = []parseActiveMQMetadataTestData{ + {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost"}, false, map[string]string{"username": "testUsername", "password": "pass123"}}, +} + +func TestParseDefaultTargetQueueSize(t *testing.T) { + for _, testData := range testDefaultTargetQueueSize { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + switch { + case err != nil && !testData.isError: + t.Error("Expected success but got error", err) + case testData.isError && err == nil: + t.Error("Expected error but got success") + case metadata.targetQueueSize != defaultTargetQueueSize: + t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.targetQueueSize) + } + } +} + +func TestActiveMQGetMetricsSpecForScaling(t *testing.T) { + for _, testData := range activeMQMetricIdentifiers { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockActiveMQScaler := activeMQScaler{ + metadata: metadata, + httpClient: http.DefaultClient, + } + + metricSpec := mockActiveMQScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Errorf("Wrong External metric source name: %s, expected: %s", metricName, testData.name) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 68be3bcadb9..e3edb4bcd34 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -327,6 +327,8 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp func buildScaler(ctx context.Context, client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { // TRIGGERS-START switch triggerType { + case "activemq": + return scalers.NewActiveMQScaler(config) case "artemis-queue": return scalers.NewArtemisQueueScaler(config) case "aws-cloudwatch": From 79b883cfdc7bb57b14b8b66e84550e8f722df38e Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Fri, 19 Nov 2021 20:16:39 +0300 Subject: [PATCH 2/9] restored changes that build command caused Signed-off-by: melisatanrverdi --- config/manager/kustomization.yaml | 2 +- config/metrics-server/kustomization.yaml | 2 +- pkg/mock/mock_client/mock_interfaces.go | 2 +- pkg/mock/mock_scale/mock_interfaces.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index e3a2a128f46..77c0cd34eaf 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -5,5 +5,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: ghcr.io/kedacore/keda - newName: docker.io/melisatanrverdi/keda + newName: ghcr.io/kedacore/keda newTag: main diff --git a/config/metrics-server/kustomization.yaml b/config/metrics-server/kustomization.yaml index fcf2ffb300a..bd650d97723 100644 --- a/config/metrics-server/kustomization.yaml +++ b/config/metrics-server/kustomization.yaml @@ -10,5 +10,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: ghcr.io/kedacore/keda-metrics-apiserver - newName: docker.io/melisatanrverdi/keda-metrics-apiserver + newName: ghcr.io/kedacore/keda-metrics-apiserver newTag: main diff --git a/pkg/mock/mock_client/mock_interfaces.go b/pkg/mock/mock_client/mock_interfaces.go index 115bc162416..b33495dcdf2 100644 --- a/pkg/mock/mock_client/mock_interfaces.go +++ b/pkg/mock/mock_client/mock_interfaces.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: /home/melisa/pkg/mod/sigs.k8s.io/controller-runtime@v0.10.3/pkg/client/interfaces.go +// Source: /go/pkg/mod/sigs.k8s.io/controller-runtime@v0.10.2/pkg/client/interfaces.go // Package mock_client is a generated GoMock package. package mock_client diff --git a/pkg/mock/mock_scale/mock_interfaces.go b/pkg/mock/mock_scale/mock_interfaces.go index 460da252f6b..48c6afb71bd 100644 --- a/pkg/mock/mock_scale/mock_interfaces.go +++ b/pkg/mock/mock_scale/mock_interfaces.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: /home/melisa/pkg/mod/k8s.io/client-go@v0.22.3/scale/interfaces.go +// Source: /home/ahmed/go/pkg/mod/k8s.io/client-go@v0.22.2/scale/interfaces.go // Package mock_scale is a generated GoMock package. package mock_scale From c94b63177a316e44de4c23613f036959655dd885 Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Thu, 2 Dec 2021 19:08:44 +0300 Subject: [PATCH 3/9] add e2e test for activemq and update changelog file Signed-off-by: melisatanrverdi --- CHANGELOG.md | 1 + tests/scalers/activemq.test.ts | 487 +++++++++++++++++++++++++++++++++ 2 files changed, 488 insertions(+) create mode 100644 tests/scalers/activemq.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 050667a147a..9878a4c92c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ ### New +- Add ActiveMQ Scaler ([#2305](https://github.com/kedacore/keda/pull/2305)) - ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016)) - ScaledJob: introduce `RolloutStrategy` ([#2164](https://github.com/kedacore/keda/pull/2164)) - Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092)) diff --git a/tests/scalers/activemq.test.ts b/tests/scalers/activemq.test.ts new file mode 100644 index 00000000000..25d5bbdfa2c --- /dev/null +++ b/tests/scalers/activemq.test.ts @@ -0,0 +1,487 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import {waitForRollout} from './helpers' + +const activeMQNamespace = 'activemq-test' +const activemqConf = '/opt/apache-activemq-5.16.3/conf' +const activemqHome = '/opt/apache-activemq-5.16.3' +const activeMQPath = 'bin/activemq' +const activeMQUsername = 'admin' +const activeMQPassword = 'admin' +const destinationName = 'testQ' +const nginxDeploymentName = 'nginx-deployment' + +test.before(t => { + // install ActiveMQ + sh.exec(`kubectl create namespace ${activeMQNamespace}`) + const activeMQTmpFile = tmp.fileSync() + fs.writeFileSync(activeMQTmpFile.name, activeMQDeployYaml) + + t.is(0, sh.exec(`kubectl apply --namespace ${activeMQNamespace} -f ${activeMQTmpFile.name}`).code, 'creating ActiveMQ deployment should work.') + t.is(0, waitForRollout('deployment', "activemq", activeMQNamespace)) + + const clusterIP = sh.exec(`kubectl get svc -n ${activeMQNamespace} -o jsonpath='{.items[0].spec.clusterIP}'`).stdout + + // ActiveMQ ready check + let activeMQReady + for (let i = 0; i < 30; i++) { + activeMQReady = sh.exec(`curl -u ${activeMQUsername}:${activeMQPassword} -s http://${clusterIP}:8161/api/jolokia/exec/org.apache.activemq:type=Broker,brokerName=localhost,service=Health/healthStatus | sed -e 's/[{}]/''/g' | awk -v RS=',"' -F: '/^status/ {print $2}'`) + if (activeMQReady != 200) { + sh.exec('sleep 5s') + } + else { + break + } + } + + // deploy Nginx, scaledobject etc. + const nginxTmpFile = tmp.fileSync() + fs.writeFileSync(nginxTmpFile.name, nginxDeployYaml.replace('{{CLUSTER_IP}}', clusterIP.toString())) + + t.is(0, sh.exec(`kubectl apply --namespace ${activeMQNamespace} -f ${nginxTmpFile.name}`).code, 'creating Nginx deployment should work.') + t.is(0, waitForRollout('deployment', "nginx-deployment", activeMQNamespace)) +}) + +test.serial('Deployment should have 0 replicas on start', t => { + const replicaCount = sh.exec(`kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + +test.serial('Deployment should scale to 5 (the max) with 1000 messages on the queue then back to 0', t => { + const clusterIP = sh.exec(`kubectl get svc -n ${activeMQNamespace} -o jsonpath='{.items[0].spec.clusterIP}'`).stdout + const activeMQPod = sh.exec(`kubectl get pods --selector=app=activemq-app -n ${activeMQNamespace} -o jsonpath='{.items[0].metadata.name'}`).stdout + + // produce 1000 messages to ActiveMQ + t.is( + 0, + sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- ${activeMQPath} producer --destination ${destinationName} --messageCount 1000`).code, + 'produce 1000 message to the ActiveMQ queue' + ) + + let replicaCount = '0' + const maxReplicaCount = '5' + + for (let i = 0; i < 30 && replicaCount !== maxReplicaCount; i++) { + replicaCount = sh.exec(`kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (replicaCount !== maxReplicaCount) { + sh.exec('sleep 2s') + } + } + t.is(maxReplicaCount, replicaCount, `Replica count should be ${maxReplicaCount} after 60 seconds`) + sh.exec('sleep 30s') + + // consume all messages from ActiveMQ + t.is( + 0, + sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- ${activeMQPath} consumer --destination ${destinationName} --messageCount 1000`).code, + 'consume all messages' + ) + + for (let i = 0; i < 50 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (replicaCount !== '0') { + sh.exec('sleep 5s') + } + } + t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes') + +}) + +test.after.always((t) => { + t.is(0, sh.exec(`kubectl delete namespace ${activeMQNamespace}`).code, 'Should delete ActiveMQ namespace') +}) + +const activeMQDeployYaml = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: activemq-app + name: activemq +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + app: activemq-app + template: + metadata: + labels: + app: activemq-app + spec: + containers: + - image: symptoma/activemq:5.16.3 + imagePullPolicy: IfNotPresent + name: activemq + ports: + - containerPort: 61616 + name: jmx + protocol: TCP + - containerPort: 8161 + name: ui + protocol: TCP + - containerPort: 61616 + name: openwire + protocol: TCP + - containerPort: 5672 + name: amqp + protocol: TCP + - containerPort: 61613 + name: stomp + protocol: TCP + - containerPort: 1883 + name: mqtt + protocol: TCP + resources: + requests: + memory: 500Mi + cpu: 200m + limits: + memory: 1000Mi + cpu: 400m + volumeMounts: + - name: activemq-config + mountPath: /opt/apache-activemq-5.16.3/webapps/api/WEB-INF/classes/jolokia-access.xml + subPath: jolokia-access.xml + - name: remote-access-cm + mountPath: /opt/apache-activemq-5.16.3/conf/jetty.xml + subPath: jetty.xml + volumes: + - name: activemq-config + configMap: + name: activemq-config + items: + - key: jolokia-access.xml + path: jolokia-access.xml + - name: remote-access-cm + configMap: + name: remote-access-cm + items: + - key: jetty.xml + path: jetty.xml +--- +apiVersion: v1 +kind: Service +metadata: + name: activemq +spec: + type: ClusterIP + selector: + app: activemq-app + ports: + - name: dashboard + port: 8161 + targetPort: 8161 + protocol: TCP + - name: openwire + port: 61616 + targetPort: 61616 + protocol: TCP + - name: amqp + port: 5672 + targetPort: 5672 + protocol: TCP + - name: stomp + port: 61613 + targetPort: 61613 + protocol: TCP + - name: mqtt + port: 1883 + targetPort: 1883 + protocol: TCP +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: activemq-config +data: + jolokia-access.xml: | + + + + 0.0.0.0/0 + + + + + com.sun.management:type=DiagnosticCommand + * + * + + + com.sun.management:type=HotSpotDiagnostic + * + * + + + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: remote-access-cm +data: + jetty.xml: | + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + index.html + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +` +const nginxDeployYaml = ` +apiVersion: v1 +kind: Secret +metadata: + name: activemq-secret +type: Opaque +data: + activemq-password: YWRtaW4= + activemq-username: YWRtaW4= +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: trigger-auth-activemq +spec: + secretTargetRef: + - parameter: username + name: activemq-secret + key: activemq-username + - parameter: password + name: activemq-secret + key: activemq-password +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: nginx + name: ${nginxDeploymentName} +spec: + replicas: 0 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + containers: + - image: nginx + name: nginx + ports: + - containerPort: 80 +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: activemq-scaledobject + labels: + deploymentName: ${nginxDeploymentName} +spec: + scaleTargetRef: + name: ${nginxDeploymentName} + pollingInterval: 5 + cooldownPeriod: 5 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: activemq + metadata: + managementEndpoint: "{{CLUSTER_IP}}:8161" + destinationName: "testQ" + brokerName: "localhost" + authenticationRef: + name: trigger-auth-activemq +` From a72454d723b2993873559f5e9748b47721c5edd2 Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Fri, 3 Dec 2021 23:33:33 +0300 Subject: [PATCH 4/9] Turn rest API template into a text/template Signed-off-by: melisatanrverdi --- pkg/scalers/activemq_scaler.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go index a49c68caa04..3154fdfab6d 100644 --- a/pkg/scalers/activemq_scaler.go +++ b/pkg/scalers/activemq_scaler.go @@ -1,6 +1,7 @@ package scalers import ( + "bytes" "context" "encoding/json" "errors" @@ -9,6 +10,7 @@ import ( "net/url" "strconv" "strings" + "text/template" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" @@ -45,7 +47,7 @@ type activeMQMonitoring struct { const ( defaultTargetQueueSize = 10 - defaultActiveMQRestAPITemplate = "http://<>/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=<>,destinationType=Queue,destinationName=<>/QueueSize" + defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize" ) var activeMQLog = logf.Log.WithName("activeMQ_scaler") @@ -183,14 +185,23 @@ func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) { return meta, nil } -func (s *activeMQScaler) getMonitoringEndpoint() string { - replacer := strings.NewReplacer("<>", s.metadata.managementEndpoint, - "<>", s.metadata.brokerName, - "<>", s.metadata.destinationName) - - monitoringEndpoint := replacer.Replace(s.metadata.restAPITemplate) - - return monitoringEndpoint +func (s *activeMQScaler) getMonitoringEndpoint() (string, error) { + var buf bytes.Buffer + endpoint := map[string]string{ + "ManagementEndpoint": s.metadata.managementEndpoint, + "BrokerName": s.metadata.brokerName, + "DestinationName": s.metadata.destinationName, + } + template, err := template.New("monitoring_endpoint").Parse(defaultActiveMQRestAPITemplate) + if err != nil { + return "", fmt.Errorf("error parsing template: %s", err) + } + err = template.Execute(&buf, endpoint) + if err != nil { + return "", fmt.Errorf("error executing template: %s", err) + } + monitoringEndpoint := buf.String() + return monitoringEndpoint, nil } func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error) { @@ -198,7 +209,10 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error) var queueMessageCount int client := s.httpClient - url := s.getMonitoringEndpoint() + url, err := s.getMonitoringEndpoint() + if err != nil { + return -1, err + } req, err := http.NewRequestWithContext(ctx, "GET", url, nil) From fcbb0a778da156d413db5465d25749d66076f00f Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Sat, 4 Dec 2021 01:10:42 +0300 Subject: [PATCH 5/9] Add name field to unit test and configure functions accordingly Signed-off-by: melisatanrverdi --- pkg/scalers/activemq_scaler_test.go | 264 ++++++++++++++++++++++------ 1 file changed, 213 insertions(+), 51 deletions(-) diff --git a/pkg/scalers/activemq_scaler_test.go b/pkg/scalers/activemq_scaler_test.go index 631a595b77e..bec06cb41c6 100644 --- a/pkg/scalers/activemq_scaler_test.go +++ b/pkg/scalers/activemq_scaler_test.go @@ -12,9 +12,10 @@ const ( ) type parseActiveMQMetadataTestData struct { + name string metadata map[string]string - isError bool authParams map[string]string + isError bool } type activeMQMetricIdentifier struct { @@ -30,70 +31,231 @@ var activeMQMetricIdentifiers = []activeMQMetricIdentifier{ } var testActiveMQMetadata = []parseActiveMQMetadataTestData{ - // Nothing passed - {map[string]string{}, true, map[string]string{}}, - // Properly formed metadata - {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, false, map[string]string{"username": "testUsername", "password": "pass123"}}, - // no metricName passed, metricName is generated from destinationName - {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10"}, false, map[string]string{"username": "testUsername", "password": "pass123"}}, - // Invalid targetQueueSize using a string - {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "AA", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, - // Missing management endpoint should fail - {map[string]string{"destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, - // Missing destination name, should fail - {map[string]string{"managementEndpoint": "localhost:8161", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, - // Missing broker name, should fail - {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, - // Missing username, should fail - {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"password": "pass123"}}, - // Missing password, should fail - {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername"}}, - // Properly formed metadata with restAPITemplate - {map[string]string{"restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", "targetQueueSize": "10", "metricName": "testMetricName"}, false, map[string]string{"username": "testUsername", "password": "pass123"}}, - // Invalid restAPITemplate, should fail - {map[string]string{"restAPITemplate": testInvalidRestAPITemplate, "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername", "password": "pass123"}}, - // Missing username, should fail - {map[string]string{"restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"password": "pass123"}}, - // Missing password, should fail - {map[string]string{"restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", "targetQueueSize": "10", "metricName": "testMetricName"}, true, map[string]string{"username": "testUsername"}}, + { + name: "nothing passed", + metadata: map[string]string{}, + authParams: map[string]string{}, + isError: true, + }, + { + name: "properly formed metadata", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "no metricName passed, metricName is generated from destinationName", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "Invalid targetQueueSize using a string", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "AA", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing management endpoint should fail", + metadata: map[string]string{ + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing destination name, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing broker name, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing username, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "password": "pass123", + }, + isError: true, + }, + { + name: "missing password, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + }, + isError: true, + }, + { + name: "properly formed metadata with restAPITemplate", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "invalid restAPITemplate, should fail", + metadata: map[string]string{ + "restAPITemplate": testInvalidRestAPITemplate, + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing username, should fail", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "password": "pass123", + }, + isError: true, + }, + { + name: "missing password, should fail", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + }, + isError: true, + }, } -func TestActiveMQParseMetadata(t *testing.T) { +func TestParseActiveMQMetadata(t *testing.T) { for _, testData := range testActiveMQMetadata { - metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) - if err != nil && !testData.isError { - t.Error("Expected success but got error", err) - } - if testData.isError && err == nil { - t.Error("Expected error but got success") - } - if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] { - t.Error("Expected password from configuration but found something else: ", metadata.password) - fmt.Println(testData) - } + t.Run(testData.name, func(t *testing.T) { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] { + t.Error("Expected password from configuration but found something else: ", metadata.password) + fmt.Println(testData) + } + }) } } var testDefaultTargetQueueSize = []parseActiveMQMetadataTestData{ - {map[string]string{"managementEndpoint": "localhost:8161", "destinationName": "testQueue", "brokerName": "localhost"}, false, map[string]string{"username": "testUsername", "password": "pass123"}}, + { + name: "properly formed metadata", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, } func TestParseDefaultTargetQueueSize(t *testing.T) { for _, testData := range testDefaultTargetQueueSize { - metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) - switch { - case err != nil && !testData.isError: - t.Error("Expected success but got error", err) - case testData.isError && err == nil: - t.Error("Expected error but got success") - case metadata.targetQueueSize != defaultTargetQueueSize: - t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.targetQueueSize) - } + t.Run(testData.name, func(t *testing.T) { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + switch { + case err != nil && !testData.isError: + t.Error("Expected success but got error", err) + case testData.isError && err == nil: + t.Error("Expected error but got success") + case metadata.targetQueueSize != defaultTargetQueueSize: + t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.targetQueueSize) + } + }) } } -func TestActiveMQGetMetricsSpecForScaling(t *testing.T) { +func TestActiveMQGetMetricSpecForScaling(t *testing.T) { for _, testData := range activeMQMetricIdentifiers { + ctx := context.Background() metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) @@ -103,7 +265,7 @@ func TestActiveMQGetMetricsSpecForScaling(t *testing.T) { httpClient: http.DefaultClient, } - metricSpec := mockActiveMQScaler.GetMetricSpecForScaling(context.Background()) + metricSpec := mockActiveMQScaler.GetMetricSpecForScaling(ctx) metricName := metricSpec[0].External.Metric.Name if metricName != testData.name { t.Errorf("Wrong External metric source name: %s, expected: %s", metricName, testData.name) From 4ec0558191ba9dc98395d690ebd78b27d6d95eba Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Tue, 7 Dec 2021 17:48:33 +0300 Subject: [PATCH 6/9] Fix e2e test fail, add suggested changes to scaler Signed-off-by: melisatanrverdi --- pkg/scalers/activemq_scaler.go | 10 +++++----- tests/scalers/activemq.test.ts | 9 ++++----- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go index 3154fdfab6d..dc8c4614271 100644 --- a/pkg/scalers/activemq_scaler.go +++ b/pkg/scalers/activemq_scaler.go @@ -215,14 +215,14 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error) } req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return -1, err + } // Add HTTP Auth and Headers req.SetBasicAuth(s.metadata.username, s.metadata.password) req.Header.Set("Content-Type", "application/json") - if err != nil { - return -1, err - } resp, err := client.Do(req) if err != nil { return -1, err @@ -265,7 +265,7 @@ func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metr func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { queueSize, err := s.getQueueMessageCount(ctx) if err != nil { - return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting ActiveMQ queue size: %s", err) + return nil, fmt.Errorf("error inspecting ActiveMQ queue size: %s", err) } metric := external_metrics.ExternalMetricValue{ @@ -274,7 +274,7 @@ func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metr Timestamp: metav1.Now(), } - return append([]external_metrics.ExternalMetricValue{}, metric), nil + return []external_metrics.ExternalMetricValue{metric}, nil } func (s *activeMQScaler) Close(context.Context) error { diff --git a/tests/scalers/activemq.test.ts b/tests/scalers/activemq.test.ts index 25d5bbdfa2c..a0a4391ec1a 100644 --- a/tests/scalers/activemq.test.ts +++ b/tests/scalers/activemq.test.ts @@ -22,12 +22,12 @@ test.before(t => { t.is(0, sh.exec(`kubectl apply --namespace ${activeMQNamespace} -f ${activeMQTmpFile.name}`).code, 'creating ActiveMQ deployment should work.') t.is(0, waitForRollout('deployment', "activemq", activeMQNamespace)) - const clusterIP = sh.exec(`kubectl get svc -n ${activeMQNamespace} -o jsonpath='{.items[0].spec.clusterIP}'`).stdout + const activeMQPod = sh.exec(`kubectl get pods --selector=app=activemq-app -n ${activeMQNamespace} -o jsonpath='{.items[0].metadata.name'}`).stdout // ActiveMQ ready check let activeMQReady for (let i = 0; i < 30; i++) { - activeMQReady = sh.exec(`curl -u ${activeMQUsername}:${activeMQPassword} -s http://${clusterIP}:8161/api/jolokia/exec/org.apache.activemq:type=Broker,brokerName=localhost,service=Health/healthStatus | sed -e 's/[{}]/''/g' | awk -v RS=',"' -F: '/^status/ {print $2}'`) + activeMQReady = sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- curl -u ${activeMQUsername}:${activeMQPassword} -s http://localhost:8161/api/jolokia/exec/org.apache.activemq:type=Broker,brokerName=localhost,service=Health/healthStatus | sed -e 's/[{}]/''/g' | awk -v RS=',"' -F: '/^status/ {print $2}'`) if (activeMQReady != 200) { sh.exec('sleep 5s') } @@ -38,7 +38,7 @@ test.before(t => { // deploy Nginx, scaledobject etc. const nginxTmpFile = tmp.fileSync() - fs.writeFileSync(nginxTmpFile.name, nginxDeployYaml.replace('{{CLUSTER_IP}}', clusterIP.toString())) + fs.writeFileSync(nginxTmpFile.name, nginxDeployYaml) t.is(0, sh.exec(`kubectl apply --namespace ${activeMQNamespace} -f ${nginxTmpFile.name}`).code, 'creating Nginx deployment should work.') t.is(0, waitForRollout('deployment', "nginx-deployment", activeMQNamespace)) @@ -50,7 +50,6 @@ test.serial('Deployment should have 0 replicas on start', t => { }) test.serial('Deployment should scale to 5 (the max) with 1000 messages on the queue then back to 0', t => { - const clusterIP = sh.exec(`kubectl get svc -n ${activeMQNamespace} -o jsonpath='{.items[0].spec.clusterIP}'`).stdout const activeMQPod = sh.exec(`kubectl get pods --selector=app=activemq-app -n ${activeMQNamespace} -o jsonpath='{.items[0].metadata.name'}`).stdout // produce 1000 messages to ActiveMQ @@ -479,7 +478,7 @@ spec: triggers: - type: activemq metadata: - managementEndpoint: "{{CLUSTER_IP}}:8161" + managementEndpoint: "activemq.${activeMQNamespace}:8161" destinationName: "testQ" brokerName: "localhost" authenticationRef: From 47edaaf6a867016b447cac0be2e7fd61f9006680 Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Tue, 7 Dec 2021 19:07:27 +0300 Subject: [PATCH 7/9] Update CHANGELOG.md file Signed-off-by: melisatanrverdi --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 072f1d7b55d..5a6d624b23c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ - TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) +- Add ActiveMQ Scaler ([#2305](https://github.com/kedacore/keda/pull/2305)) + ### Improvements - Graphite Scaler: use the latest datapoint returned, not the earliest ([#2365](https://github.com/kedacore/keda/pull/2365)) From 77df77df6b85af28cb021a5d533a1129e36303a6 Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Tue, 7 Dec 2021 22:18:37 +0300 Subject: [PATCH 8/9] Replace TODO line with new ActiveMQ line Signed-off-by: melisatanrverdi --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a6d624b23c..f6d699c1de6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,10 +23,10 @@ ### New -- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) - - Add ActiveMQ Scaler ([#2305](https://github.com/kedacore/keda/pull/2305)) +- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) + ### Improvements - Graphite Scaler: use the latest datapoint returned, not the earliest ([#2365](https://github.com/kedacore/keda/pull/2365)) From 509baf59c84597de8e38389d28412e80501d9275 Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Fri, 10 Dec 2021 17:39:48 +0300 Subject: [PATCH 9/9] Remove user-defined option of metricName parameter Signed-off-by: melisatanrverdi --- pkg/scalers/activemq_scaler.go | 8 ++------ pkg/scalers/activemq_scaler_test.go | 5 ++--- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go index dc8c4614271..5e854ad26f2 100644 --- a/pkg/scalers/activemq_scaler.go +++ b/pkg/scalers/activemq_scaler.go @@ -136,11 +136,7 @@ func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) { return nil, fmt.Errorf("password cannot be empty") } - if val, ok := config.TriggerMetadata["metricName"]; ok { - meta.metricName = kedautil.NormalizeString(fmt.Sprintf("activemq-%s", val)) - } else { - meta.metricName = kedautil.NormalizeString(fmt.Sprintf("activemq-%s", meta.destinationName)) - } + meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", meta.destinationName))) meta.scalerIndex = config.ScalerIndex @@ -249,7 +245,7 @@ func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metr targetMetricValue := resource.NewQuantity(int64(s.metadata.targetQueueSize), resource.DecimalSI) externalMetric := &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, s.metadata.metricName), + Name: s.metadata.metricName, }, Target: v2beta2.MetricTarget{ Type: v2beta2.AverageValueMetricType, diff --git a/pkg/scalers/activemq_scaler_test.go b/pkg/scalers/activemq_scaler_test.go index bec06cb41c6..46cbabbddee 100644 --- a/pkg/scalers/activemq_scaler_test.go +++ b/pkg/scalers/activemq_scaler_test.go @@ -26,8 +26,8 @@ type activeMQMetricIdentifier struct { // Setting metric identifier mock name var activeMQMetricIdentifiers = []activeMQMetricIdentifier{ - {&testActiveMQMetadata[1], 0, "s0-activemq-testMetricName"}, - {&testActiveMQMetadata[2], 1, "s1-activemq-testQueue"}, + {&testActiveMQMetadata[1], 0, "s0-activemq-testQueue"}, + {&testActiveMQMetadata[9], 1, "s1-activemq-testQueue"}, } var testActiveMQMetadata = []parseActiveMQMetadataTestData{ @@ -44,7 +44,6 @@ var testActiveMQMetadata = []parseActiveMQMetadataTestData{ "destinationName": "testQueue", "brokerName": "localhost", "targetQueueSize": "10", - "metricName": "testMetricName", }, authParams: map[string]string{ "username": "testUsername",