Skip to content

Commit

Permalink
refactor IBM MQ
Browse files Browse the repository at this point in the history
Signed-off-by: Rick Brouwer <[email protected]>
  • Loading branch information
rickbrouwer committed Aug 12, 2024
1 parent 995b612 commit 14fa14d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 175 deletions.
245 changes: 81 additions & 164 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -20,36 +16,28 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// Default variables and settings
const (
defaultTargetQueueDepth = 20
)

// IBMMQScaler assigns struct data pointer to metadata variable
type IBMMQScaler struct {
metricType v2.MetricTargetType
metadata *IBMMQMetadata
defaultHTTPTimeout time.Duration
httpClient *http.Client
logger logr.Logger
type ibmmqScaler struct {
metricType v2.MetricTargetType
metadata ibmmqMetadata
httpClient *http.Client
logger logr.Logger
}

// IBMMQMetadata Metadata used by KEDA to query IBM MQ queue depth and scale
type IBMMQMetadata struct {
host string
queueName string
username string
password string
queueDepth int64
activationQueueDepth int64
triggerIndex int

// TLS
ca string
cert string
key string
keyPassword string
unsafeSsl bool
type ibmmqMetadata struct {
Host string `keda:"name=host, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`

triggerIndex int
}

// CommandResponse Full structured response from MQ admin REST query
Expand All @@ -68,7 +56,23 @@ type Parameters struct {
Curdepth int `json:"curdepth"`
}

// NewIBMMQScaler creates a new IBM MQ scaler
func (m *ibmmqMetadata) Validate() error {
if (m.Cert == "") != (m.Key == "") {
return fmt.Errorf("both cert and key must be provided when using TLS")
}

if m.TLS && m.UnsafeSsl {
return fmt.Errorf("'tls' and 'unsafeSsl' are both specified. Please use only 'unsafeSsl'")
}

// TODO: DEPRECATED to be removed in v2.17
if m.TLS {
m.UnsafeSsl = m.TLS
}

return nil
}

func NewIBMMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -77,133 +81,75 @@ func NewIBMMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

logger := InitializeLogger(config, "ibm_mq_scaler")

meta, err := parseIBMMQMetadata(config, logger)
meta, err := parseIBMMQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing IBM MQ metadata: %w", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.unsafeSsl)
// TODO: DEPRECATED to be removed in v2.17
if meta.TLS {
logger.Info("The 'tls' setting is DEPRECATED and will be removed in v2.17 - Use 'unsafeSsl' instead")
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSsl)

// Configure TLS if cert and key are specified
if meta.cert != "" && meta.key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca, meta.unsafeSsl)
if meta.Cert != "" && meta.Key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.Cert, meta.Key, meta.KeyPassword, meta.CA, meta.UnsafeSsl)
if err != nil {
return nil, err
}
httpClient.Transport = kedautil.CreateHTTPTransportWithTLSConfig(tlsConfig)
}

return &IBMMQScaler{
metricType: metricType,
metadata: meta,
defaultHTTPTimeout: config.GlobalHTTPTimeout,
httpClient: httpClient,
logger: logger,
return &ibmmqScaler{
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: logger,
}, nil
}

// Close closes and returns nil
func (s *IBMMQScaler) Close(context.Context) error {
func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, error) {
meta := ibmmqMetadata{triggerIndex: config.TriggerIndex}
if err := config.TypedConfig(&meta); err != nil {
return meta, err
}
return meta, nil
}

func (s *ibmmqScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
return nil
}

// parseIBMMQMetadata checks the existence of and validates the MQ connection data provided
func parseIBMMQMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*IBMMQMetadata, error) {
meta := IBMMQMetadata{}

if val, ok := config.TriggerMetadata["host"]; ok {
_, err := url.ParseRequestURI(val)
if err != nil {
return nil, fmt.Errorf("invalid URL: %w", err)
}
meta.host = val
} else {
return nil, fmt.Errorf("no host URI given")
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
}

if val, ok := config.TriggerMetadata["queueDepth"]; ok && val != "" {
queueDepth, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid queueDepth - must be an integer")
}
meta.queueDepth = queueDepth
} else {
meta.queueDepth = defaultTargetQueueDepth
}

meta.activationQueueDepth = 0
if val, ok := config.TriggerMetadata["activationQueueDepth"]; ok && val != "" {
activationQueueDepth, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid activationQueueDepth - must be an integer")
}
meta.activationQueueDepth = activationQueueDepth
}

// TODO: Refactor code because 'tls' is DEPRECATED and will be removed in v2.17
// Check if both "tls" and "unsafeSsl" are specified
tlsVal, tlsOk := config.TriggerMetadata["tls"]
unsafeSslVal, unsafeSslOk := config.TriggerMetadata["unsafeSsl"]

switch {
case tlsOk && unsafeSslOk:
return nil, fmt.Errorf("'tls' and 'unsafeSsl' are both specified. Please use only 'unsafeSsl'")
case tlsOk:
logger.Info("The 'tls' setting is DEPRECATED and will be removed in v2.17 - Use 'unsafeSsl' instead")
unsafeSsl, err := strconv.ParseBool(tlsVal)
if err != nil {
return nil, fmt.Errorf("invalid tls setting: %w", err)
}
meta.unsafeSsl = unsafeSsl
case unsafeSslOk:
unsafeSsl, err := strconv.ParseBool(unsafeSslVal)
if err != nil {
return nil, fmt.Errorf("invalid unsafeSsl setting: %w", err)
}
meta.unsafeSsl = unsafeSsl
default:
meta.unsafeSsl = false
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["usernameFromEnv"]; ok && val != "" {
meta.username = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no username given")
func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.QueueDepth),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if val, ok := config.TriggerMetadata["passwordFromEnv"]; ok && val != "" {
meta.password = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no password given")
func (s *ibmmqScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueDepth, err := s.getQueueDepthViaHTTP(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error inspecting IBM MQ queue depth: %w", err)
}

// TLS config (optional)
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.keyPassword = config.AuthParams["keyPassword"]
metric := GenerateMetricInMili(metricName, float64(queueDepth))

meta.triggerIndex = config.TriggerIndex
return &meta, nil
return []external_metrics.ExternalMetricValue{metric}, queueDepth > s.metadata.ActivationQueueDepth, nil
}

// getQueueDepthViaHTTP returns the depth of the MQ Queue from the Admin endpoint
func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.queueName
url := s.metadata.host
func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.QueueName
url := s.metadata.Host

var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON))
Expand All @@ -213,7 +159,7 @@ func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")

req.SetBasicAuth(s.metadata.username, s.metadata.password)
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)

resp, err := s.httpClient.Do(req)
if err != nil {
Expand All @@ -237,37 +183,8 @@ func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
}
return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason)
return 0, fmt.Errorf("failed to get the current queue depth parameter")
}

return int64(response.CommandResponse[0].Parameters.Curdepth), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *IBMMQScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.queueName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.queueDepth),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *IBMMQScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueDepth, err := s.getQueueDepthViaHTTP(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error inspecting IBM MQ queue depth: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(queueDepth))

return []external_metrics.ExternalMetricValue{metric}, queueDepth > s.metadata.activationQueueDepth, nil
}
23 changes: 12 additions & 11 deletions pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"
"time"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand Down Expand Up @@ -66,6 +65,8 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed authParams Basic Auth and TLS
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123", "ca": "cavalue", "cert": "certvalue", "key": "keyvalue"}},
// No key provided
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123", "ca": "cavalue", "cert": "certvalue"}},
// No username provided
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"password": "Pass123"}},
// No password provided
Expand All @@ -79,7 +80,7 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{
// and verify that the password field is handled correctly.
func TestIBMMQParseMetadata(t *testing.T) {
for _, testData := range testIBMMQMetadata {
metadata, err := parseIBMMQMetadata(&scalersconfig.ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard())
metadata, err := parseIBMMQMetadata(&scalersconfig.ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
fmt.Println(testData)
Expand All @@ -88,8 +89,8 @@ func TestIBMMQParseMetadata(t *testing.T) {
t.Error("Expected error but got success")
fmt.Println(testData)
}
if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] {
t.Error("Expected password from configuration but found something else: ", metadata.password)
if metadata != nil && metadata.Password != "" && metadata.Password != testData.AuthParams["password"] {

Check failure on line 92 in pkg/scalers/ibmmq_scaler_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

invalid operation: metadata != nil (mismatched types ibmmqMetadata and untyped nil)

Check failure on line 92 in pkg/scalers/ibmmq_scaler_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

testData.AuthParams undefined (type parseIBMMQMetadataTestData has no field or method AuthParams, but does have authParams)
t.Error("Expected password from configuration but found something else: ", metadata.Password)
fmt.Println(testData)
}
}
Expand All @@ -103,28 +104,28 @@ var testDefaultQueueDepth = []parseIBMMQMetadataTestData{
// Test that DefaultQueueDepth is set when queueDepth is not provided
func TestParseDefaultQueueDepth(t *testing.T) {
for _, testData := range testDefaultQueueDepth {
metadata, err := parseIBMMQMetadata(&scalersconfig.ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard())
metadata, err := parseIBMMQMetadata(&scalersconfig.ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
switch {
case err != nil && !testData.isError:
t.Error("Expected success but got error", err)
case testData.isError && err == nil:
t.Error("Expected error but got success")
case metadata.queueDepth != defaultTargetQueueDepth:
t.Error("Expected default queueDepth =", defaultTargetQueueDepth, "but got", metadata.queueDepth)
case metadata.QueueDepth != defaultTargetQueueDepth:

Check failure on line 113 in pkg/scalers/ibmmq_scaler_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: defaultTargetQueueDepth
t.Error("Expected default queueDepth =", defaultTargetQueueDepth, "but got", metadata.QueueDepth)

Check failure on line 114 in pkg/scalers/ibmmq_scaler_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

undefined: defaultTargetQueueDepth
}
}
}

// Create a scaler and check if metrics method is available
func TestIBMMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range IBMMQMetricIdentifiers {
metadata, err := parseIBMMQMetadata(&scalersconfig.ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard())
metadata, err := parseIBMMQMetadata(&scalersconfig.ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex})
httpTimeout := 100 * time.Millisecond

if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockIBMMQScaler := IBMMQScaler{
mockIBMMQScaler := ibmmqScaler{
metadata: metadata,
defaultHTTPTimeout: httpTimeout,

Check failure on line 130 in pkg/scalers/ibmmq_scaler_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

unknown field defaultHTTPTimeout in struct literal of type ibmmqScaler
}
Expand Down Expand Up @@ -215,8 +216,8 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) {
}))
defer server.Close()

scaler := IBMMQScaler{
metadata: &IBMMQMetadata{
scaler := ibmmqScaler{
metadata: &ibmmqMetadata{

Check failure on line 220 in pkg/scalers/ibmmq_scaler_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

cannot use &ibmmqMetadata{…} (value of type *ibmmqMetadata) as ibmmqMetadata value in struct literal
host: server.URL,

Check failure on line 221 in pkg/scalers/ibmmq_scaler_test.go

View workflow job for this annotation

GitHub Actions / Static Checks

unknown field host in struct literal of type ibmmqMetadata (typecheck)
},
httpClient: server.Client(),
Expand Down

0 comments on commit 14fa14d

Please sign in to comment.