Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MongoDB scaler #1467

Merged
merged 2 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/tidwall/gjson v1.6.7
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.mongodb.org/mongo-driver v1.1.2
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
Expand Down Expand Up @@ -1395,6 +1396,7 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.2 h1:jxcFYjlkl8xaERsgLo+RNquI0epW6zuy/ZRQs6jnrFA=
go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.opencensus.io v0.17.0/go.mod h1:mp1VrMQxhlqqDpKvH4UcQUa4YwlzNmymAjPrDdfxNpI=
Expand Down
286 changes: 286 additions & 0 deletions pkg/scalers/mongo_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package scalers

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// mongoDBScaler is support for mongoDB in keda.
type mongoDBScaler struct {
metadata *mongoDBMetadata
client *mongo.Client
}

// mongoDBMetadata specify mongoDB scaler params.
type mongoDBMetadata struct {
// The string is used by connected with mongoDB.
// +optional
connectionString string
// Specify the host to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
host string
// Specify the port to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
port string
// Specify the username to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
username string
// Specify the password to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
password string

// The name of the database to be queried.
// +required
dbName string
// The name of the collection to be queried.
// +required
collection string
// A mongoDB filter doc,used by specify DB.
// +required
query string
// A threshold that is used as targetAverageValue in HPA
// +required
queryValue int

metricName string
}

// Default variables and settings
const (
mongoDBDefaultTimeOut = 10 * time.Second
defaultCollection = "default"
defaultDB = "test"
defaultQueryValue = 1
)

var mongoDBLog = logf.Log.WithName("mongodb_scaler")

// NewMongoDBScaler creates a new mongoDB scaler
func NewMongoDBScaler(config *ScalerConfig) (Scaler, error) {
ctx, cancel := context.WithTimeout(context.Background(), mongoDBDefaultTimeOut)
defer cancel()

meta, connStr, err := parseMongoDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("failed to parsing mongoDB metadata, because of %v", err)
}

opt := options.Client().ApplyURI(connStr)
client, err := mongo.Connect(ctx, opt)
if err != nil {
return nil, fmt.Errorf("failed to establish connection with mongoDB, because of %v", err)
}

if err = client.Ping(ctx, readpref.Primary()); err != nil {
return nil, fmt.Errorf("failed to ping mongoDB, because of %v", err)
}

return &mongoDBScaler{
metadata: meta,
client: client,
}, nil
}

func parseMongoDBMetadata(config *ScalerConfig) (*mongoDBMetadata, string, error) {
var connStr string
// setting default metadata
meta := mongoDBMetadata{
collection: defaultCollection,
query: "",
queryValue: defaultQueryValue,
dbName: defaultDB,
}

// parse metaData from ScaledJob config
if val, ok := config.TriggerMetadata["collection"]; ok {
meta.collection = val
} else {
return nil, "", fmt.Errorf("no collection given")
}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, "", fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.Atoi(val)
if err != nil {
return nil, "", fmt.Errorf("failed to convert %v to int, because of %v", queryValue, err.Error())
}
meta.queryValue = queryValue
} else {
return nil, "", fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["dbName"]; ok {
meta.dbName = val
} else {
return nil, "", fmt.Errorf("no dbName given")
}

// Resolve connectionString
if c, ok := config.AuthParams["connectionString"]; ok {
meta.connectionString = c
} else if v, ok := config.TriggerMetadata["connectionStringFromEnv"]; ok {
meta.connectionString = config.ResolvedEnv[v]
} else {
meta.connectionString = ""
if val, ok := config.TriggerMetadata["host"]; ok {
meta.host = val
} else {
return nil, "", fmt.Errorf("no host given")
}
if val, ok := config.TriggerMetadata["port"]; ok {
meta.port = val
} else {
return nil, "", fmt.Errorf("no port given")
}

if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
} else {
return nil, "", fmt.Errorf("no username given")
}
// get password from env or authParams
if v, ok := config.AuthParams["password"]; ok {
meta.password = v
} else if v, ok := config.TriggerMetadata["passwordFromEnv"]; ok {
meta.password = config.ResolvedEnv[v]
}

if len(meta.password) == 0 {
return nil, "", fmt.Errorf("no password given")
}
}

if meta.connectionString != "" {
connStr = meta.connectionString
} else {
// Build connection str
addr := fmt.Sprintf("%s:%s", meta.host, meta.port)
auth := fmt.Sprintf("%s:%s", meta.username, meta.password)
connStr = "mongodb://" + auth + "@" + addr
}

if val, ok := config.TriggerMetadata["metricName"]; ok {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mongodb-%s", val))
} else {
maskedURL, err := kedautil.MaskPartOfURL(connStr, kedautil.Hostname)
if err != nil {
return nil, "", fmt.Errorf("failure masking part of url")
}
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mongodb-%s-%s", maskedURL, meta.collection))
}
return &meta, connStr, nil
}

func (s *mongoDBScaler) IsActive(ctx context.Context) (bool, error) {
result, err := s.getQueryResult()
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to get query result by mongoDB, because of %v", err))
return false, err
}
return result > 0, nil
}

// Close disposes of mongoDB connections
func (s *mongoDBScaler) Close() error {
if s.client != nil {
err := s.client.Disconnect(context.TODO())
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to close mongoDB connection, because of %v", err))
return err
}
}

return nil
}

// getQueryResult query mongoDB by meta.query
func (s *mongoDBScaler) getQueryResult() (int, error) {
ctx, cancel := context.WithTimeout(context.Background(), mongoDBDefaultTimeOut)
defer cancel()

filter, err := json2BsonDoc(s.metadata.query)
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to convert query param to bson.Doc, because of %v", err))
return 0, err
}

docsNum, err := s.client.Database(s.metadata.dbName).Collection(s.metadata.collection).CountDocuments(ctx, filter)
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to query %v in %v, because of %v", s.metadata.dbName, s.metadata.collection, err))
return 0, err
}

return int(docsNum), nil
}

// GetMetrics query from mongoDB,and return to external metrics
func (s *mongoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("failed to inspect momgoDB, because of %v", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetMetricSpecForScaling get the query value for scaling
func (s *mongoDBScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// json2BsonDoc convert Json to Bson.Doc
func json2BsonDoc(js string) (doc bsonx.Doc, err error) {
doc = bsonx.Doc{}
err = bson.UnmarshalExtJSON([]byte(js), true, &doc)
if err != nil {
return nil, err
}

if len(doc) == 0 {
return nil, errors.New("empty bson document")
}

return doc, nil
}
87 changes: 87 additions & 0 deletions pkg/scalers/mongo_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package scalers

import (
"testing"

"go.mongodb.org/mongo-driver/mongo"
)

var testMongoDBResolvedEnv = map[string]string{
"MongoDB_CONN_STR": "test_conn_str",
"MongoDB_PASSWORD": "test",
}

type parseMongoDBMetadataTestData struct {
metadata map[string]string
resolvedEnv map[string]string
raisesError bool
}

type mongoDBMetricIdentifier struct {
metadataTestData *parseMongoDBMetadataTestData
name string
}

var testMONGODBMetadata = []parseMongoDBMetadataTestData{
// No metadata
{
metadata: map[string]string{},
resolvedEnv: testMongoDBResolvedEnv,
raisesError: true,
},
// connectionStringFromEnv
{
metadata: map[string]string{"query": `{"name":"John"}`, "collection": "demo", "queryValue": "12", "connectionStringFromEnv": "Mongo_CONN_STR", "dbName": "test"},
resolvedEnv: testMongoDBResolvedEnv,
raisesError: false,
},
// with metric name
{
metadata: map[string]string{"query": `{"name":"John"}`, "metricName": "hpa", "collection": "demo", "queryValue": "12", "connectionStringFromEnv": "Mongo_CONN_STR", "dbName": "test"},
resolvedEnv: testMongoDBResolvedEnv,
raisesError: false,
},
}

var mongoDBMetricIdentifiers = []mongoDBMetricIdentifier{
{metadataTestData: &testMONGODBMetadata[2], name: "mongodb-hpa"},
}

func TestParseMongoDBMetadata(t *testing.T) {
for _, testData := range testMONGODBMetadata {
_, _, err := parseMongoDBMetadata(&ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && !testData.raisesError {
t.Error("Expected success but got error:", err)
}
if err == nil && testData.raisesError {
t.Error("Expected error but got success")
}
}
}

func TestMongoDBGetMetricSpecForScaling(t *testing.T) {
for _, testData := range mongoDBMetricIdentifiers {
meta, _, err := parseMongoDBMetadata(&ScalerConfig{ResolvedEnv: testData.metadataTestData.resolvedEnv, TriggerMetadata: testData.metadataTestData.metadata})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockMongoDBScaler := mongoDBScaler{meta, &mongo.Client{}}

metricSpec := mockMongoDBScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}

func TestJson2BsonDoc(t *testing.T) {
var testJSON = `{"name":"carson"}`
doc, err := json2BsonDoc(testJSON)
if err != nil {
t.Error("convert testJson to Bson.Doc err:", err)
}
if doc == nil {
t.Error("the doc is nil")
}
}
Loading