Skip to content

Commit

Permalink
add mongoDB scaler
Browse files Browse the repository at this point in the history
Signed-off-by: 高强 <[email protected]>
  • Loading branch information
NUCsimple authored and 高强 committed Jan 15, 2021
1 parent c2f46c6 commit 3fb066e
Show file tree
Hide file tree
Showing 4 changed files with 600 additions and 0 deletions.
286 changes: 286 additions & 0 deletions pkg/scalers/mongo_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package scalers

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// mongoDBScaler is support for mongoDB in keda.
type mongoDBScaler struct {
metadata *mongoDBMetadata
client *mongo.Client
}

// mongoDBMetadata specify mongoDB scaler params.
type mongoDBMetadata struct {
// The string is used by connected with mongoDB.
// +optional
connectionString string
// Specify the host to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
host string
// Specify the port to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
port string
// Specify the username to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
username string
// Specify the password to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
password string

// The name of the database to be queried.
// +required
dbName string
// The name of the collection to be queried.
// +required
collection string
// A mongoDB filter doc,used by specify DB.
// +required
query string
// A threshold that is used as targetAverageValue in HPA
// +required
queryValue int

metricName string
}

// Default variables and settings
const (
mongoDBDefaultTimeOut = 10 * time.Second
defaultCollection = "default"
defaultDB = "test"
defaultQueryValue = 1
)

var mongoDBLog = logf.Log.WithName("mongodb_scaler")

// NewMongoDBScaler creates a new mongoDB scaler
func NewMongoDBScaler(config *ScalerConfig) (Scaler, error) {
ctx, cancel := context.WithTimeout(context.Background(), mongoDBDefaultTimeOut)
defer cancel()

meta, connStr, err := parseMongoDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("failed to parsing mongoDB metadata, because of %v", err)
}

opt := options.Client().ApplyURI(connStr)
client, err := mongo.Connect(ctx, opt)
if err != nil {
return nil, fmt.Errorf("failed to establish connection with mongoDB, because of %v", err)
}

if err = client.Ping(ctx, readpref.Primary()); err != nil {
return nil, fmt.Errorf("failed to ping mongoDB, because of %v", err)
}

return &mongoDBScaler{
metadata: meta,
client: client,
}, nil
}

func parseMongoDBMetadata(config *ScalerConfig) (*mongoDBMetadata, string, error) {
var connStr string
// setting default metadata
meta := mongoDBMetadata{
collection: defaultCollection,
query: "",
queryValue: defaultQueryValue,
dbName: defaultDB,
}

// parse metaData from ScaledJob config
if val, ok := config.TriggerMetadata["collection"]; ok {
meta.collection = val
} else {
return nil, "", fmt.Errorf("no collection given")
}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, "", fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.Atoi(val)
if err != nil {
return nil, "", fmt.Errorf("failed to convert %v to int, because of %v", queryValue, err.Error())
}
meta.queryValue = queryValue
} else {
return nil, "", fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["dbName"]; ok {
meta.dbName = val
} else {
return nil, "", fmt.Errorf("no dbName given")
}

// Resolve connectionString
if c, ok := config.AuthParams["connectionString"]; ok {
meta.connectionString = c
} else if v, ok := config.TriggerMetadata["connectionStringFromEnv"]; ok {
meta.connectionString = config.ResolvedEnv[v]
} else {
meta.connectionString = ""
if val, ok := config.TriggerMetadata["host"]; ok {
meta.host = val
} else {
return nil, "", fmt.Errorf("no host given")
}
if val, ok := config.TriggerMetadata["port"]; ok {
meta.port = val
} else {
return nil, "", fmt.Errorf("no port given")
}

if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
} else {
return nil, "", fmt.Errorf("no username given")
}
// get password from env or authParams
if v, ok := config.AuthParams["password"]; ok {
meta.password = v
} else if v, ok := config.TriggerMetadata["passwordFromEnv"]; ok {
meta.password = config.ResolvedEnv[v]
}

if len(meta.password) == 0 {
return nil, "", fmt.Errorf("no password given")
}
}

if meta.connectionString != "" {
connStr = meta.connectionString
} else {
// Build connection str
addr := fmt.Sprintf("%s:%s", meta.host, meta.port)
auth := fmt.Sprintf("%s:%s", meta.username, meta.password)
connStr = "mongodb://" + auth + "@" + addr
}

if val, ok := config.TriggerMetadata["metricName"]; ok {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mongodb-%s", val))
} else {
maskedURL, err := kedautil.MaskPartOfURL(connStr, kedautil.Hostname)
if err != nil {
return nil, "", fmt.Errorf("failure masking part of url")
}
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mongodb-%s-%s", maskedURL, meta.collection))
}
return &meta, connStr, nil
}

func (s *mongoDBScaler) IsActive(ctx context.Context) (bool, error) {
result, err := s.getQueryResult()
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to get query result by mongoDB, because of %v", err))
return false, err
}
return result > 0, nil
}

// Close disposes of mongoDB connections
func (s *mongoDBScaler) Close() error {
if s.client != nil {
err := s.client.Disconnect(context.TODO())
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to close mongoDB connection, because of %v", err))
return err
}
}

return nil
}

// getQueryResult query mongoDB by meta.query
func (s *mongoDBScaler) getQueryResult() (int, error) {
ctx, cancel := context.WithTimeout(context.Background(), mongoDBDefaultTimeOut)
defer cancel()

filter, err := json2BsonDoc(s.metadata.query)
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to convert query param to bson.Doc, because of %v", err))
return 0, err
}

docsNum, err := s.client.Database(s.metadata.dbName).Collection(s.metadata.collection).CountDocuments(ctx, filter)
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to query %v in %v, because of %v", s.metadata.dbName, s.metadata.collection, err))
return 0, err
}

return int(docsNum), nil
}

// GetMetrics query from mongoDB,and return to external metrics
func (s *mongoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("failed to inspect momgoDB, because of %v", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetMetricSpecForScaling get the query value for scaling
func (s *mongoDBScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// json2BsonDoc convert Json to Bson.Doc
func json2BsonDoc(js string) (doc bsonx.Doc, err error) {
doc = bsonx.Doc{}
err = bson.UnmarshalExtJSON([]byte(js), true, &doc)
if err != nil {
return nil, err
}

if len(doc) == 0 {
return nil, errors.New("empty bson document")
}

return doc, nil
}
87 changes: 87 additions & 0 deletions pkg/scalers/mongo_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package scalers

import (
"testing"

"go.mongodb.org/mongo-driver/mongo"
)

var testMongoDBResolvedEnv = map[string]string{
"MongoDB_CONN_STR": "test_conn_str",
"MongoDB_PASSWORD": "test",
}

type parseMongoDBMetadataTestData struct {
metadata map[string]string
resolvedEnv map[string]string
raisesError bool
}

type mongoDBMetricIdentifier struct {
metadataTestData *parseMongoDBMetadataTestData
name string
}

var testMONGODBMetadata = []parseMongoDBMetadataTestData{
// No metadata
{
metadata: map[string]string{},
resolvedEnv: testMongoDBResolvedEnv,
raisesError: true,
},
// connectionStringFromEnv
{
metadata: map[string]string{"query": `{"name":"John"}`, "collection": "demo", "queryValue": "12", "connectionStringFromEnv": "Mongo_CONN_STR", "dbName": "test"},
resolvedEnv: testMongoDBResolvedEnv,
raisesError: false,
},
// with metric name
{
metadata: map[string]string{"query": `{"name":"John"}`, "metricName": "hpa", "collection": "demo", "queryValue": "12", "connectionStringFromEnv": "Mongo_CONN_STR", "dbName": "test"},
resolvedEnv: testMongoDBResolvedEnv,
raisesError: false,
},
}

var mongoDBMetricIdentifiers = []mongoDBMetricIdentifier{
{metadataTestData: &testMONGODBMetadata[2], name: "mongodb-hpa"},
}

func TestParseMongoDBMetadata(t *testing.T) {
for _, testData := range testMONGODBMetadata {
_, _, err := parseMongoDBMetadata(&ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && !testData.raisesError {
t.Error("Expected success but got error:", err)
}
if err == nil && testData.raisesError {
t.Error("Expected error but got success")
}
}
}

func TestMongoDBGetMetricSpecForScaling(t *testing.T) {
for _, testData := range mongoDBMetricIdentifiers {
meta, _, err := parseMongoDBMetadata(&ScalerConfig{ResolvedEnv: testData.metadataTestData.resolvedEnv, TriggerMetadata: testData.metadataTestData.metadata})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockMongoDBScaler := mongoDBScaler{meta, &mongo.Client{}}

metricSpec := mockMongoDBScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}

func TestJson2BsonDoc(t *testing.T) {
var testJSON = `{"name":"carson"}`
doc, err := json2BsonDoc(testJSON)
if err != nil {
t.Error("convert testJson to Bson.Doc err:", err)
}
if doc == nil {
t.Error("the doc is nil")
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal
return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config)
case "metrics-api":
return scalers.NewMetricsAPIScaler(config)
case "mongodb":
return scalers.NewMongoDBScaler(config)
case "mysql":
return scalers.NewMySQLScaler(config)
case "openstack-swift":
Expand Down
Loading

0 comments on commit 3fb066e

Please sign in to comment.