Skip to content

Commit

Permalink
add mongo scaler
Browse files Browse the repository at this point in the history
Signed-off-by: carson <[email protected]>
  • Loading branch information
高强 committed Jan 1, 2021
1 parent fbfa738 commit 66c6b16
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
### New
- Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994))
- Introducing InfluxDB scaler ([#1239](https://github.com/kedacore/keda/issues/1239))
- Introducing Mongo scaler ([#1465](https://github.com/kedacore/keda/issues/1465))

### Improvements
- Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311))
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/tidwall/gjson v1.6.4
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.mongodb.org/mongo-driver v1.1.2
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
Expand Down Expand Up @@ -891,6 +892,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
Expand Down Expand Up @@ -1395,6 +1397,7 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.2 h1:jxcFYjlkl8xaERsgLo+RNquI0epW6zuy/ZRQs6jnrFA=
go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.opencensus.io v0.17.0/go.mod h1:mp1VrMQxhlqqDpKvH4UcQUa4YwlzNmymAjPrDdfxNpI=
Expand Down
291 changes: 291 additions & 0 deletions pkg/scalers/mongo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
package scalers

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// mongoScaler is support for mongo in keda.
type mongoScaler struct {
metadata *mongoMetadata
client *mongo.Client
}

// mongoMetadata specify mongo scaler params.
type mongoMetadata struct {
// The string is used by connected with mongo.
// +optional
connectionString string
// Specify the host to connect to the mongo server,if the connectionString be provided, don't need specify this param.
// +optional
host string
// Specify the port to connect to the mongo server,if the connectionString be provided, don't need specify this param.
// +optional
port string
// Specify the username to connect to the mongo server,if the connectionString be provided, don't need specify this param.
// +optional
username string
// Specify the password to connect to the mongo server,if the connectionString be provided, don't need specify this param.
// +optional
password string

// The name of the database to be queried.
// +required
dbName string
// The name of the collection to be queried.
// +required
collection string
// A mongo filter doc,used by specify DB.
// +required
query string
// A threshold that is used as targetAverageValue in HPA
// +required
queryValue int
}

// Default variables and settings
const (
mongoDefaultTimeOut = 10 * time.Second
defaultCollection = "default"
defaultDB = "local"
defaultQueryValue = 1
)

type mongoFields struct {
ID primitive.ObjectID `bson:"_id, omitempty"`
}

var mongoLog = logf.Log.WithName("mongo_scaler")

// NewMongoScaler creates a new Mongo scaler
func NewMongoScaler(config *ScalerConfig) (Scaler, error) {
ctx, cancel := context.WithTimeout(context.Background(), mongoDefaultTimeOut)
defer cancel()

meta, connStr, err := parseMongoMetadata(config)
if err != nil {
return nil, fmt.Errorf("failed to parsing mongo metadata,because of %v", err)
}

opt := options.Client().ApplyURI(connStr)
client, err := mongo.Connect(ctx, opt)
if err != nil {
return nil, fmt.Errorf("failed to establish connection with mongo,because of %v", err)
}

if err = client.Ping(ctx, readpref.Primary()); err != nil {
return nil, fmt.Errorf("failed to ping mongo,because of %v", err)
}

return &mongoScaler{
metadata: meta,
client: client,
}, nil
}

func parseMongoMetadata(config *ScalerConfig) (*mongoMetadata, string, error) {
var connStr string
// setting default metadata
meta := mongoMetadata{
collection: defaultCollection,
query: "",
queryValue: defaultQueryValue,
dbName: defaultDB,
}

// parse metaData from ScaledJob config
if val, ok := config.TriggerMetadata["collection"]; ok {
meta.collection = val
} else {
return nil, "", fmt.Errorf("no collection given")
}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, "", fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.Atoi(val)
if err != nil {
return nil, "", fmt.Errorf("failed to convert %v to int,because of %v", queryValue, err.Error())
}
meta.queryValue = queryValue
} else {
return nil, "", fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["dbName"]; ok {
meta.dbName = val
} else {
return nil, "", fmt.Errorf("no dbName given")
}

// Resolve connectionString
if c, ok := config.AuthParams["connectionString"]; ok {
meta.connectionString = c
} else if v, ok := config.TriggerMetadata["connectionStringFromEnv"]; ok {
meta.connectionString = config.ResolvedEnv[v]
} else {
meta.connectionString = ""
if val, ok := config.TriggerMetadata["host"]; ok {
meta.host = val
} else {
return nil, "", fmt.Errorf("no host given")
}
if val, ok := config.TriggerMetadata["port"]; ok {
meta.port = val
} else {
return nil, "", fmt.Errorf("no port given")
}

if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
} else {
return nil, "", fmt.Errorf("no username given")
}
// get password from env or authParams
if v, ok := config.AuthParams["password"]; ok {
meta.password = v
} else if v, ok := config.TriggerMetadata["passwordFromEnv"]; ok {
meta.password = config.ResolvedEnv[v]
}

if len(meta.password) == 0 {
return nil, "", fmt.Errorf("no password given")
}
}

if meta.connectionString != "" {
connStr = meta.connectionString
} else {
// Build connection str
addr := fmt.Sprintf("%s:%s", meta.host, meta.port)
auth := fmt.Sprintf("%s:%s", meta.username, meta.password)
connStr = "mongodb://" + auth + "@" + addr
}

return &meta, connStr, nil
}

func (s *mongoScaler) IsActive(ctx context.Context) (bool, error) {
result, err := s.getQueryResult()
if err != nil {
mongoLog.Error(err, fmt.Sprintf("failed to get query result by mongo,because of %v", err))
return false, err
}
return result > 0, nil
}

// Close disposes of mongo connections
func (s *mongoScaler) Close() error {
if s.client != nil {
err := s.client.Disconnect(context.TODO())
if err != nil {
mongoLog.Error(err, fmt.Sprintf("failed to close mongo connection,because of %v", err))
return err
}
}

return nil
}

// getQueryResult query mongo by meta.query
func (s *mongoScaler) getQueryResult() (int, error) {
var (
results []mongoFields
)

ctx, cancel := context.WithTimeout(context.Background(), mongoDefaultTimeOut)
defer cancel()

filter, err := json2BsonDoc(s.metadata.query)
if err != nil {
mongoLog.Error(err, fmt.Sprintf("failed to convert query param to bson.Doc,because of %v", err))
return 0, err
}

cursor, err := s.client.Database(s.metadata.dbName).Collection(s.metadata.collection).Find(ctx, filter)
if err != nil {
mongoLog.Error(err, fmt.Sprintf("failed to query %v in %v,because of %v", s.metadata.dbName, s.metadata.collection, err))
return 0, err
}

err = cursor.All(ctx, &results)
if err != nil {
mongoLog.Error(err, fmt.Sprintf("failed to traversal the query results,because of %v", err))
return 0, err
}

return len(results), nil
}

// GetMetrics query from mongo,and return to external metrics
func (s *mongoScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("failed to inspect momgo,because of %v", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetMetricSpecForScaling get the query value for scaling
func (s *mongoScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI)
metricName := kedautil.NormalizeString(fmt.Sprintf("%s-%s", "mongo", s.metadata.connectionString))

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// json2BsonDoc convert Json to Bson.Doc
func json2BsonDoc(js string) (doc bsonx.Doc, err error) {
doc = bsonx.Doc{}
err = bson.UnmarshalExtJSON([]byte(js), true, &doc)
if err != nil {
return nil, err
}

if len(doc) == 0 {
return nil, errors.New("empty bson document")
}

return doc, nil
}
Loading

0 comments on commit 66c6b16

Please sign in to comment.