Skip to content

Commit

Permalink
add-graphite-scaler
Browse files Browse the repository at this point in the history
Signed-off-by: 刘烁 <[email protected]>
  • Loading branch information
刘烁 committed Apr 21, 2021
1 parent 33c3cdb commit 1f9d2ef
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 0 deletions.
166 changes: 166 additions & 0 deletions pkg/scalers/graphite_api_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package scalers

import (
"context"
"fmt"
"io/ioutil"
"net/http"
url_pkg "net/url"
"strconv"

"github.com/tidwall/gjson"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
grapServerAddress = "serverAddress"
grapMetricName = "metricName"
grapQuery = "query"
grapThreshold = "threshold"
grapqueryTime = "queryTime"
)

type graphiteScaler struct {
metadata *graphiteMetadata
}

type graphiteMetadata struct {
serverAddress string
metricName string
query string
threshold int
from string
}

var graphiteLog = logf.Log.WithName("graphite_scaler")

// NewGraphiteScaler creates a new graphiteScaler
func NewGraphiteScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseGraphiteMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing graphite metadata: %s", err)
}

return &graphiteScaler{
metadata: meta,
}, nil
}

func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) {
meta := graphiteMetadata{}

if val, ok := config.TriggerMetadata[grapServerAddress]; ok && val != "" {
meta.serverAddress = val
} else {
return nil, fmt.Errorf("no %s given", grapServerAddress)
}

if val, ok := config.TriggerMetadata[grapQuery]; ok && val != "" {
meta.query = val
} else {
return nil, fmt.Errorf("no %s given", grapQuery)
}

if val, ok := config.TriggerMetadata[grapMetricName]; ok && val != "" {
meta.metricName = val
} else {
return nil, fmt.Errorf("no %s given", grapMetricName)
}

if val, ok := config.TriggerMetadata[grapqueryTime]; ok && val != "" {
meta.from = val
} else {
return nil, fmt.Errorf("no %s given", grapqueryTime)
}

if val, ok := config.TriggerMetadata[grapThreshold]; ok && val != "" {
t, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %s", grapThreshold, err)
}

meta.threshold = t
}

return &meta, nil
}

func (s *graphiteScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return false, err
}

return val > 0, nil
}

func (s *graphiteScaler) Close() error {
return nil
}

func (s *graphiteScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.threshold), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "graphite", s.metadata.serverAddress, s.metadata.metricName)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) {
queryEscaped := url_pkg.QueryEscape(s.metadata.query)
url := fmt.Sprintf("%s/render?target=%s&format=json", s.metadata.serverAddress, queryEscaped)
r, err := http.Get(url)
if err != nil {
return -1, err
}

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return -1, err
}
r.Body.Close()

result := gjson.GetBytes(b, "0.datapoints.#.0")
var v float64 = -1
for _, valur := range result.Array() {
if valur.String() != "" {
if float64(valur.Int()) > v {
v = float64(valur.Int())
}
}
}
return v, nil
}

func (s *graphiteScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(val), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
65 changes: 65 additions & 0 deletions pkg/scalers/graphite_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package scalers

import (
"testing"
)

type parseGraphiteMetadataTestData struct {
metadata map[string]string
isError bool
}

type graphiteMetricIdentifier struct {
metadataTestData *parseGraphiteMetadataTestData
name string
}

var testGrapMetadata = []parseGraphiteMetadataTestData{
{map[string]string{}, true},
// all properly formed
{map[string]string{"grapServerAddress": "http://localhost:81", "grapMetricName": "stats.counters.http.hello-world.request.count.count", "threshold": "100", "grapQuery": "up", "disableScaleToZero": "true"}, false},
// missing serverAddress
{map[string]string{"grapServerAddress": "", "grapMetricName": "stats.counters.http.hello-world.request.count.count", "threshold": "100", "grapQuery": "up", "disableScaleToZero": "true"}, true},
// missing metricName
{map[string]string{"grapServerAddress": "http://localhost:81", "grapMetricName": "", "threshold": "100", "grapQuery": "up", "disableScaleToZero": "true"}, true},
// malformed threshold
{map[string]string{"grapServerAddress": "http://localhost:81", "grapMetricName": "stats.counters.http.hello-world.request.count.count", "threshold": "one", "grapQuery": "up", "disableScaleToZero": "true"}, true},
// missing query
{map[string]string{"grapServerAddress": "http://localhost:81", "grapMetricName": "stats.counters.http.hello-world.request.count.count", "threshold": "100", "grapQuery": "", "disableScaleToZero": "true"}, true},
// all properly formed, default disableScaleToZero
{map[string]string{"grapServerAddress": "http://localhost:81", "grapMetricName": "stats.counters.http.hello-world.request.count.count", "threshold": "100", "grapQuery": "up"}, false},
}

var graphiteMetricIdentifiers = []graphiteMetricIdentifier{
{&testGrapMetadata[1], "graphite-http---localhost-9090-http_requests_total"},
}

func TestGraphiteParseMetadata(t *testing.T) {
for _, testData := range testGrapMetadata {
_, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGraphiteGetMetricSpecForScaling(t *testing.T) {
for _, testData := range graphiteMetricIdentifiers {
meta, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGraphiteScaler := graphiteScaler{
metadata: meta,
}

metricSpec := mockGraphiteScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
return scalers.NewPubSubScaler(config)
case "graphite":
return scalers.NewGraphiteScaler(config)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(config)
case "ibmmq":
Expand Down

0 comments on commit 1f9d2ef

Please sign in to comment.