Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Azure monitor scaler #584

Merged
merged 25 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5533065
Just KEDA portion of scaler, possibly working
Jan 22, 2020
c2135a1
Hard code value until get azure monitor working
Jan 22, 2020
126028c
Add metricThreshold to metadata
Jan 22, 2020
0681004
Create metrics client and get azure metric. Still need to create metr…
Jan 27, 2020
cadf1ca
possibly working?
Jan 27, 2020
c090ef5
Fix copy/paste error
Jan 27, 2020
352af5f
Set aggregationType, ResourceProviderNamespace, ResourceType, and Res…
Jan 28, 2020
2e878bb
Make sure that resourceURI includes resource namespace, resource type…
Jan 28, 2020
4ae421a
Check that aggregation interval is provided in the correct format
Jan 30, 2020
a4dc5ce
Change servicePrinciple to client and ad to activeDirectory. Make cli…
Jan 30, 2020
866c2e9
Add support for custom aggregation interval. Clean up code and remove…
Jan 30, 2020
6167448
Add authentication using resolvedEnv or authParams
Jan 31, 2020
561f060
Move aggregation type validation to separate function. Return -1 on e…
Jan 31, 2020
daf76e7
Replace spaces with tabs
Feb 5, 2020
c420b74
Remove unnecessary types and and interfaces. Refactor code to split g…
Feb 5, 2020
c6fc2de
Fix typo: tentant -> tenant
Feb 5, 2020
71bf0f8
Get rid of resolvedEnv usage since was not using it correctly
Feb 11, 2020
fb97e5f
Add test for parsing metadata
Feb 11, 2020
75bbd72
Check that resourceURI is the correct format
Feb 11, 2020
c695e7e
Return error if targetValue is not provided
Feb 11, 2020
06f2939
Set aggregationInterval if provided and in the proper format
Feb 11, 2020
e754344
Clean up function comments, add ref to Azure K8s metrics adapter
Feb 12, 2020
1199841
Remove subscriptionId and tenantId from authParams
Feb 13, 2020
71ad924
Support resolvedEnv for aadClientID and aadClientPassword
Feb 13, 2020
8d945b7
Add resolvedEnv to test and add test for authParams
Feb 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewPostgresScaler(resolvedEnv, triggerMetadata, authParams)
case "mysql":
return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams)
case "azure-monitor":
return scalers.NewAzureMonitorScaler(resolvedEnv, triggerMetadata, authParams)
default:
return nil, fmt.Errorf("no scaler found for type: %s", triggerType)
}
Expand Down
200 changes: 200 additions & 0 deletions pkg/scalers/azure_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package scalers
melmaliacone marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"fmt"
"math"
"strconv"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/services/preview/monitor/mgmt/2018-03-01/insights"
"github.com/Azure/go-autorest/autorest/azure/auth"
"k8s.io/klog"
)

type azureExternalMetricRequest struct {
MetricName string
SubscriptionID string
ResourceName string
ResourceProviderNamespace string
ResourceType string
Aggregation string
Timespan string
Filter string
ResourceGroup string
}

// GetAzureMetricValue is a func
melmaliacone marked this conversation as resolved.
Show resolved Hide resolved
func GetAzureMetricValue(ctx context.Context, metricMetadata *azureMonitorMetadata) (int32, error) {
client := createMetricsClient(metricMetadata)

requestPtr, err := createMetricsRequest(metricMetadata)
if err != nil {
return -1, err
}

return executeRequest(client, requestPtr)
}

func createMetricsClient(metadata *azureMonitorMetadata) insights.MetricsClient {
client := insights.NewMetricsClient(metadata.subscriptionID)
config := auth.NewClientCredentialsConfig(metadata.clientID, metadata.clientPassword, metadata.tenantID)

authorizer, _ := config.Authorizer()
client.Authorizer = authorizer

return client
}

func createMetricsRequest(metadata *azureMonitorMetadata) (*azureExternalMetricRequest, error) {
metricRequest := azureExternalMetricRequest{
MetricName: metadata.name,
SubscriptionID: metadata.subscriptionID,
Aggregation: metadata.aggregationType,
Filter: metadata.filter,
ResourceGroup: metadata.resourceGroupName,
}

resourceInfo := strings.Split(metadata.resourceURI, "/")
metricRequest.ResourceProviderNamespace = resourceInfo[0]
metricRequest.ResourceType = resourceInfo[1]
metricRequest.ResourceName = resourceInfo[2]

// if no timespan is provided, defaults to 5 minutes
timespan, err := formatTimeSpan(metadata.aggregationInterval)
if err != nil {
return nil, err
}

metricRequest.Timespan = timespan

return &metricRequest, nil
}

func executeRequest(client insights.MetricsClient, request *azureExternalMetricRequest) (int32, error) {
metricResponse, err := getAzureMetric(client, *request)
if err != nil {
azureMonitorLog.Error(err, "error getting azure monitor metric")
return -1, fmt.Errorf("Error getting azure monitor metric %s: %s", request.MetricName, err.Error())
}

// casting drops everything after decimal, so round first
metricValue := int32(math.Round(metricResponse))

return metricValue, nil
}

func getAzureMetric(client insights.MetricsClient, azMetricRequest azureExternalMetricRequest) (float64, error) {
err := azMetricRequest.validate()
if err != nil {
return -1, err
}

metricResourceURI := azMetricRequest.metricResourceURI()
klog.V(2).Infof("resource uri: %s", metricResourceURI)

metricResult, err := client.List(context.Background(), metricResourceURI,
azMetricRequest.Timespan, nil,
azMetricRequest.MetricName, azMetricRequest.Aggregation, nil,
"", azMetricRequest.Filter, "", "")
if err != nil {
return -1, err
}

value, err := extractValue(azMetricRequest, metricResult)

return value, err
}

func extractValue(azMetricRequest azureExternalMetricRequest, metricResult insights.Response) (float64, error) {
metricVals := *metricResult.Value

if len(metricVals) == 0 {
err := fmt.Errorf("Got an empty response for metric %s/%s and aggregate type %s", azMetricRequest.ResourceProviderNamespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation)))
return -1, err
}

timeseries := *metricVals[0].Timeseries
if timeseries == nil {
err := fmt.Errorf("Got metric result for %s/%s and aggregate type %s without timeseries", azMetricRequest.ResourceProviderNamespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation)))
return -1, err
}

data := *timeseries[0].Data
if data == nil {
err := fmt.Errorf("Got metric result for %s/%s and aggregate type %s without any metric values", azMetricRequest.ResourceProviderNamespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation)))
return -1, err
}

valuePtr, err := verifyAggregationTypeIsSupported(azMetricRequest.Aggregation, data)
if err != nil {
return -1, fmt.Errorf("Unable to get value for metric %s/%s with aggregation %s. No value returned by Azure Monitor", azMetricRequest.ResourceProviderNamespace, azMetricRequest.MetricName, azMetricRequest.Aggregation)
}

klog.V(2).Infof("metric type: %s %f", azMetricRequest.Aggregation, *valuePtr)

return *valuePtr, nil
}

func (amr azureExternalMetricRequest) validate() error {
// Shared
if amr.MetricName == "" {
return fmt.Errorf("metricName is required")
}
if amr.ResourceGroup == "" {
return fmt.Errorf("resourceGroup is required")
}
if amr.SubscriptionID == "" {
return fmt.Errorf("subscriptionID is required. set a default or pass via label selectors")
}
return nil
}

func (amr azureExternalMetricRequest) metricResourceURI() string {
return fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/%s/%s/%s",
amr.SubscriptionID,
amr.ResourceGroup,
amr.ResourceProviderNamespace,
amr.ResourceType,
amr.ResourceName)
}

func formatTimeSpan(timeSpan string) (string, error) {
// defaults to last five minutes.
endtime := time.Now().UTC().Format(time.RFC3339)
starttime := time.Now().Add(-(5 * time.Minute)).UTC().Format(time.RFC3339)
if timeSpan != "" {
aggregationInterval := strings.Split(timeSpan, ":")
hours, herr := strconv.Atoi(aggregationInterval[0])
minutes, merr := strconv.Atoi(aggregationInterval[1])
seconds, serr := strconv.Atoi(aggregationInterval[2])

if herr != nil || merr != nil || serr != nil {
return "", fmt.Errorf("Errors parsing metricAggregationInterval: %v, %v, %v", herr, merr, serr)
}

starttime = time.Now().Add(-(time.Duration(hours)*time.Hour + time.Duration(minutes)*time.Minute + time.Duration(seconds)*time.Second)).UTC().Format(time.RFC3339)
}
return fmt.Sprintf("%s/%s", starttime, endtime), nil
}

func verifyAggregationTypeIsSupported(aggregationType string, data []insights.MetricValue) (*float64, error) {
var valuePtr *float64
if strings.EqualFold(string(insights.Average), aggregationType) && data[len(data)-1].Average != nil {
valuePtr = data[len(data)-1].Average
} else if strings.EqualFold(string(insights.Total), aggregationType) && data[len(data)-1].Total != nil {
valuePtr = data[len(data)-1].Total
} else if strings.EqualFold(string(insights.Maximum), aggregationType) && data[len(data)-1].Maximum != nil {
valuePtr = data[len(data)-1].Maximum
} else if strings.EqualFold(string(insights.Minimum), aggregationType) && data[len(data)-1].Minimum != nil {
valuePtr = data[len(data)-1].Minimum
} else if strings.EqualFold(string(insights.Count), aggregationType) && data[len(data)-1].Count != nil {
fValue := float64(*data[len(data)-1].Count)
valuePtr = &fValue
} else {
err := fmt.Errorf("Unsupported aggregation type %s", insights.AggregationType(strings.ToTitle(aggregationType)))
return nil, err
}
return valuePtr, nil
}
189 changes: 189 additions & 0 deletions pkg/scalers/azure_monitor_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package scalers

import (
"context"
"fmt"
"strconv"
"strings"

v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
azureMonitorMetricName = "metricName"
targetValueName = "targetValue"
)

type azureMonitorScaler struct {
metadata *azureMonitorMetadata
}

type azureMonitorMetadata struct {
resourceURI string
tenantID string
subscriptionID string
resourceGroupName string
name string
filter string
aggregationInterval string
aggregationType string
clientID string
clientPassword string
targetValue int
}

var azureMonitorLog = logf.Log.WithName("azure_monitor_scaler")

// NewAzureMonitorScaler stuff
func NewAzureMonitorScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := parseAzureMonitorMetadata(metadata, resolvedEnv, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing azure monitor metadata: %s", err)
}

return &azureMonitorScaler{
metadata: meta,
}, nil
}

func parseAzureMonitorMetadata(metadata, resolvedEnv, authParams map[string]string) (*azureMonitorMetadata, error) {
meta := azureMonitorMetadata{}

if val, ok := metadata[targetValueName]; ok && val != "" {
targetValue, err := strconv.Atoi(val)
if err != nil {
azureMonitorLog.Error(err, "Error parsing azure monitor metadata", "targetValue", targetValueName)
return nil, fmt.Errorf("Error parsing azure monitor metadata %s: %s", targetValueName, err.Error())
}
meta.targetValue = targetValue
} else {
return nil, fmt.Errorf("no targetValue given")
}

if val, ok := metadata["resourceURI"]; ok && val != "" {
resourceURI := strings.Split(val, "/")
if len(resourceURI) != 3 {
return nil, fmt.Errorf("resourceURI not in the correct format. Should be namespace/resource_type/resource_name")
}
meta.resourceURI = val
} else {
return nil, fmt.Errorf("no resourceURI given")
}

if val, ok := metadata["resourceGroupName"]; ok && val != "" {
meta.resourceGroupName = val
} else {
return nil, fmt.Errorf("no resourceGroupName given")
}

if val, ok := metadata[azureMonitorMetricName]; ok && val != "" {
meta.name = val
} else {
return nil, fmt.Errorf("no metricName given")
}

if val, ok := metadata["metricAggregationType"]; ok && val != "" {
melmaliacone marked this conversation as resolved.
Show resolved Hide resolved
meta.aggregationType = val
} else {
return nil, fmt.Errorf("no metricAggregationType given")
}

if val, ok := metadata["metricFilter"]; ok && val != "" {
meta.filter = val
}

if val, ok := metadata["metricAggregationInterval"]; ok && val != "" {
aggregationInterval := strings.Split(val, ":")
if len(aggregationInterval) != 3 {
return nil, fmt.Errorf("metricAggregationInterval not in the correct format. Should be hh:mm:ss")
}
melmaliacone marked this conversation as resolved.
Show resolved Hide resolved
}

// Required authentication parameters below

if val, ok := authParams["subscriptionId"]; ok && val != "" {
meta.subscriptionID = val
} else {
if val, ok := metadata["subscriptionId"]; ok && val != "" {
meta.subscriptionID = val
} else {
return nil, fmt.Errorf("no subscriptionId given")
}
}

if val, ok := authParams["tenantId"]; ok && val != "" {
meta.tenantID = val
} else {
if val, ok := metadata["tenantId"]; ok && val != "" {
meta.tenantID = val
} else {
return nil, fmt.Errorf("no tenantId given")
}
}

if val, ok := authParams["activeDirectoryClientId"]; ok && val != "" {
meta.clientID = val
} else {
if val, ok := metadata["activeDirectoryClientId"]; ok && val != "" {
meta.clientID = val
} else {
return nil, fmt.Errorf("no activeDirectoryClientId given")
}
}

if val, ok := authParams["activeDirectoryClientPassword"]; ok && val != "" {
meta.clientPassword = val
} else {
if val, ok := metadata["activeDirectoryClientPassword"]; ok && val != "" {
melmaliacone marked this conversation as resolved.
Show resolved Hide resolved
meta.clientPassword = val
} else {
return nil, fmt.Errorf("no activeDirectoryClientPassword given")
}
}

return &meta, nil
}

// needs to interact with azure monitor
func (s *azureMonitorScaler) IsActive(ctx context.Context) (bool, error) {
val, err := GetAzureMetricValue(ctx, s.metadata)
if err != nil {
azureMonitorLog.Error(err, "error getting azure monitor metric")
return false, err
}

return val > 0, nil
}

func (s *azureMonitorScaler) Close() error {
return nil
}

func (s *azureMonitorScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetMetricVal := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)
melmaliacone marked this conversation as resolved.
Show resolved Hide resolved
externalMetric := &v2beta1.ExternalMetricSource{MetricName: azureMonitorMetricName, TargetAverageValue: targetMetricVal}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureMonitorScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
val, err := GetAzureMetricValue(ctx, s.metadata)
if err != nil {
azureMonitorLog.Error(err, "error getting azure monitor metric")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(val), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Loading