Skip to content

Commit

Permalink
Add a scaler based on number of GCP Storage bucket objects
Browse files Browse the repository at this point in the history
Fixes kedacore#2628

Signed-off-by: Ram Cohen <[email protected]>
  • Loading branch information
RamCohen committed Mar 3, 2022
1 parent 98c72a9 commit 3ace51f
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 35 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/xhit/go-str2duration/v2 v2.0.0
go.mongodb.org/mongo-driver v1.8.2
google.golang.org/api v0.66.0
google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350
google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44
google.golang.org/grpc v1.44.0
google.golang.org/protobuf v1.27.1
k8s.io/api v0.23.3
Expand Down Expand Up @@ -83,6 +83,9 @@ replace (
)

require (
cloud.google.com/go v0.100.2 // indirect
cloud.google.com/go/iam v0.1.1 // indirect
cloud.google.com/go/storage v1.20.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/go-amqp v0.16.4 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW
cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
cloud.google.com/go v0.98.0/go.mod h1:ua6Ush4NALrHk5QXDWnjvZHN93OuF0HfuEPq9I1X0cM=
cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA=
cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U=
cloud.google.com/go v0.100.2 h1:t9Iw5QH5v4XtlEQaCtUY7x6sCABps8sW0acw7e2WQ6Y=
cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
Expand All @@ -41,6 +42,8 @@ cloud.google.com/go/compute v1.1.0/go.mod h1:2NIffxgWfORSI7EOYMFatGTfjMLnqrOKBEy
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/iam v0.1.1 h1:4CapQyNFjiksks1/x7jsvsygFPhihslYk5GptIrlX68=
cloud.google.com/go/iam v0.1.1/go.mod h1:CKqrcnI/suGpybEHxZ7BMehL0oA4LpdyJdUlTl9jVMw=
cloud.google.com/go/monitoring v1.2.0 h1:fEvQITrhVcPM6vuDQcgPMbU5kZFeQFwZmE7v6+S8BPo=
cloud.google.com/go/monitoring v1.2.0/go.mod h1:tE8I08OzjWmXLhCopnPaUDpfGOEJOonfWXGR9E9SsFo=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
Expand All @@ -53,6 +56,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.18.2/go.mod h1:AiIj7BWXyhO5gGVmYJ+S8tbkCx3yb0IMjua8Aw4naVM=
cloud.google.com/go/storage v1.20.0 h1:kv3rQ3clEQdxqokkCCgQo+bxPqcuXiROjxvnKb8Oqdk=
cloud.google.com/go/storage v1.20.0/go.mod h1:TiC1o6FxNCG8y5gB7rqCsFZCIYPMPZCO81ppOoEPLGI=
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d/go.mod h1:IshRmMJBhDfFj5Y67nVhMYTTIze91RUeT73ipWKs/GY=
contrib.go.opencensus.io/exporter/prometheus v0.4.0/go.mod h1:o7cosnyfuPVK0tB8q0QmaQNhGnptITnPQB+z1+qeFB0=
contrib.go.opencensus.io/exporter/zipkin v0.1.2/go.mod h1:mP5xM3rrgOjpn79MM8fZbj3gsxcuytSqtH0dxSWW1RE=
Expand Down Expand Up @@ -1384,6 +1389,7 @@ google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdr
google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E=
google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I=
google.golang.org/api v0.63.0/go.mod h1:gs4ij2ffTRXwuzzgJl/56BdwJaA194ijkfn++9tDuPo=
google.golang.org/api v0.64.0/go.mod h1:931CdxA8Rm4t6zqTFGSsgwbAEZ2+GMYurbndwSimebM=
google.golang.org/api v0.65.0/go.mod h1:ArYhxgGadlWmqO1IqVujw6Cs8IdD33bTmzKo2Sh+cbg=
google.golang.org/api v0.66.0 h1:CbGy4LEiXCVCiNEDFgGpWOVwsDT7E2Qej1ZvN1P7KPg=
google.golang.org/api v0.66.0/go.mod h1:I1dmXYpX7HGwz/ejRxwQp2qj5bFAz93HiCU1C1oYd9M=
Expand Down Expand Up @@ -1463,11 +1469,14 @@ google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211221195035-429b39de9b1c/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211223182754-3ac035c7e7cb/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220111164026-67b88f271998/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220114231437-d2e6a121cae0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 h1:YxHp5zqIcAShDEvRr5/0rVESVS+njYF68PSdazrNLJo=
google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44 h1:0UVUC7VWA/mIU+5a4hVWH6xa234gLcRX8ZcrFKmWWKA=
google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
Expand Down
41 changes: 41 additions & 0 deletions pkg/scalers/gcp_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package scalers

import (
"fmt"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

type gcpAuthorizationMetadata struct {
GoogleApplicationCredentials string
GoogleApplicationCredentialsFile string
podIdentityOwner bool
podIdentityProviderEnabled bool
}

func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
metadata := config.TriggerMetadata
authParams := config.AuthParams
meta := gcpAuthorizationMetadata{}
if metadata["identityOwner"] == "operator" {
meta.podIdentityOwner = false
} else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" {
meta.podIdentityOwner = true
switch {
case config.PodIdentity == kedav1alpha1.PodIdentityProviderGCP:
// do nothing, rely on underneath metadata google
meta.podIdentityProviderEnabled = true
case authParams["GoogleApplicationCredentials"] != "":
meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"]
default:
if metadata["credentialsFromEnv"] != "" {
meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]]
} else if metadata["credentialsFromEnvFile"] != "" {
meta.GoogleApplicationCredentialsFile = resolvedEnv[metadata["credentialsFromEnvFile"]]
} else {
return nil, fmt.Errorf("GoogleApplicationCredentials not found")
}
}
}
return &meta, nil
}
36 changes: 2 additions & 34 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -32,12 +31,6 @@ const (

var regexpCompositeSubscriptionIDPrefix = regexp.MustCompile(compositeSubscriptionIDPrefix)

type gcpAuthorizationMetadata struct {
GoogleApplicationCredentials string
podIdentityOwner bool
podIdentityProviderEnabled bool
}

type pubsubScaler struct {
client *StackDriverClient
metadata *pubsubMetadata
Expand All @@ -48,7 +41,7 @@ type pubsubMetadata struct {
value int

subscriptionName string
gcpAuthorization gcpAuthorizationMetadata
gcpAuthorization *gcpAuthorizationMetadata
scalerIndex int
}

Expand Down Expand Up @@ -121,7 +114,7 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {
if err != nil {
return nil, err
}
meta.gcpAuthorization = *auth
meta.gcpAuthorization = auth
meta.scalerIndex = config.ScalerIndex
return &meta, nil
}
Expand Down Expand Up @@ -255,28 +248,3 @@ func getSubscriptionData(s *pubsubScaler) (string, string) {
}
return subscriptionID, projectID
}

func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
metadata := config.TriggerMetadata
authParams := config.AuthParams
meta := gcpAuthorizationMetadata{}
if metadata["identityOwner"] == "operator" {
meta.podIdentityOwner = false
} else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" {
meta.podIdentityOwner = true
switch {
case config.PodIdentity == kedav1alpha1.PodIdentityProviderGCP:
// do nothing, rely on underneath metadata google
meta.podIdentityProviderEnabled = true
case authParams["GoogleApplicationCredentials"] != "":
meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"]
default:
if metadata["credentialsFromEnv"] != "" {
meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]]
} else {
return nil, fmt.Errorf("GoogleApplicationCredentials not found")
}
}
}
return &meta, nil
}
190 changes: 190 additions & 0 deletions pkg/scalers/gcp_storage_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package scalers

import (
"context"
"fmt"
"strconv"

"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
option "google.golang.org/api/option"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
defaultTargetLength = 1000
)

type gcsScaler struct {
client *storage.Client
bucket *storage.BucketHandle
metadata *gcsMetadata
}

type gcsMetadata struct {
bucketName string
gcpAuthorization *gcpAuthorizationMetadata
targetLength int
scalerIndex int
}

var gcsLog = logf.Log.WithName("gcp_storage_scaler")

// NewGcsScaler creates a new gcsScaler
func NewGcsScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseGcsMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing GCP storage metadata: %s", err)
}

ctx := context.Background()

var client *storage.Client

if meta.gcpAuthorization.podIdentityProviderEnabled {
client, err = storage.NewClient(ctx, option.WithScopes("ScopeReadOnly"))
} else if meta.gcpAuthorization.GoogleApplicationCredentialsFile != "" {
client, err = storage.NewClient(
ctx, option.WithCredentialsFile(meta.gcpAuthorization.GoogleApplicationCredentialsFile))
} else {
client, err = storage.NewClient(
ctx, option.WithCredentialsJSON([]byte(meta.gcpAuthorization.GoogleApplicationCredentials)))
}

if err != nil {
return nil, fmt.Errorf("storage.NewClient: %v", err)
}

bucket := client.Bucket(meta.bucketName)
if bucket == nil {
return nil, fmt.Errorf("Failed to create a handle to bucket %s", meta.bucketName)
}

return &gcsScaler{
client: client,
bucket: bucket,
metadata: meta,
}, nil
}

func parseGcsMetadata(config *ScalerConfig) (*gcsMetadata, error) {
meta := gcsMetadata{}
meta.targetLength = defaultTargetLength

if val, ok := config.TriggerMetadata["bucketName"]; ok {
if val == "" {
gcsLog.Error(nil, "no bucket name given")
return nil, fmt.Errorf("no bucket name given")
}

meta.bucketName = val
} else {
gcsLog.Error(nil, "no bucket name given")
return nil, fmt.Errorf("no bucket name given")
}

if val, ok := config.TriggerMetadata["targetLength"]; ok {
targetLength, err := strconv.Atoi(val)
if err != nil {
gcsLog.Error(err, "Error parsing targetLength")
return nil, fmt.Errorf("error parsing targetLength: %s", err.Error())
}

meta.targetLength = targetLength
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
if err != nil {
return nil, err
}
meta.gcpAuthorization = auth
meta.scalerIndex = config.ScalerIndex
return &meta, nil
}

// IsActive checks if there are any messages in the subscription
func (s *gcsScaler) IsActive(ctx context.Context) (bool, error) {
items, err := s.getItemCount(ctx, 1)
if err != nil {
return false, err
}

return items > 0, nil
}

func (s *gcsScaler) Close(context.Context) error {
if s.client != nil {
s.client.Close()
}
return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *gcsScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
// Construct the target value as a quantity
targetValueQty := resource.NewQuantity(int64(s.metadata.targetLength), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-storage-%s", s.metadata.bucketName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValueQty,
},
}

// Create the metric spec for the HPA
metricSpec := v2beta2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics connects to Stack Driver and finds the size of the pub sub subscription
func (s *gcsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
items, err := s.getItemCount(ctx, s.metadata.targetLength)
if err != nil {
return nil, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(items), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// getItemCount gets the number of items in the bucket, up to maxCount
func (s *gcsScaler) getItemCount(ctx context.Context, maxCount int) (int, error) {
query := &storage.Query{Prefix: ""}
query.SetAttrSelection([]string{"Name"})
it := s.bucket.Objects(ctx, query)
count := 0

for count < maxCount {
_, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
gcsLog.Error(err, "Failed to enumerate items in bucket")
return count, err
}
count++
}

return count, nil
}
Loading

0 comments on commit 3ace51f

Please sign in to comment.