Skip to content

Commit

Permalink
Add GCP identity authentication when using Pubsub Scaler (#2225)
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Maria Alvarez <[email protected]>
  • Loading branch information
jmalvarezf-lmes authored Nov 3, 2021
1 parent 2669219 commit 744fa09
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))
- Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211))
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225))

### Improvements

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kedacore/keda/v2
go 1.16

require (
cloud.google.com/go v0.97.0 // indirect
cloud.google.com/go/monitoring v1.0.0
github.com/Azure/azure-amqp-common-go/v3 v3.2.1
github.com/Azure/azure-event-hubs-go/v3 v3.3.16
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+Y
cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4=
cloud.google.com/go v0.96.0 h1:r9XIwQ9FrJspMjHulRm1kl1uanw5gSolzSK+dukeH0E=
cloud.google.com/go v0.96.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
cloud.google.com/go v0.97.0 h1:3DXvAyifywvq64LfkKaMOmkWPS1CikIQdMe2lY9vxU8=
cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
Expand Down
24 changes: 19 additions & 5 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -23,6 +24,7 @@ const (
type gcpAuthorizationMetadata struct {
GoogleApplicationCredentials string
podIdentityOwner bool
podIdentityProviderEnabled bool
}

type pubsubScaler struct {
Expand Down Expand Up @@ -74,7 +76,7 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {
return nil, fmt.Errorf("no subscription name given")
}

auth, err := getGcpAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
auth, err := getGcpAuthorization(config, config.ResolvedEnv)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,7 +155,13 @@ func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metric
// Stackdriver api
func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {
if s.client == nil {
client, err := NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}
if err != nil {
return -1, err
}
Expand All @@ -165,15 +173,21 @@ func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {
return s.client.GetMetrics(ctx, filter)
}

func getGcpAuthorization(authParams, metadata, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
metadata := config.TriggerMetadata
authParams := config.AuthParams
meta := gcpAuthorizationMetadata{}
if metadata["identityOwner"] == "operator" {
meta.podIdentityOwner = false
} else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" {
meta.podIdentityOwner = true
if authParams["GoogleApplicationCredentials"] != "" {
switch {
case config.PodIdentity == kedav1alpha1.PodIdentityProviderGCP:
// do nothing, rely on underneath metadata google
meta.podIdentityProviderEnabled = true
case authParams["GoogleApplicationCredentials"] != "":
meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"]
} else {
default:
if metadata["credentialsFromEnv"] != "" {
meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]]
} else {
Expand Down
55 changes: 45 additions & 10 deletions pkg/scalers/stackdriver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"cloud.google.com/go/compute/metadata"
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/api/iterator"
Expand All @@ -18,6 +20,7 @@ import (
type StackDriverClient struct {
metricsClient *monitoring.MetricClient
credentials GoogleApplicationCredentials
projectID string
}

// NewStackDriverClient creates a new stackdriver client with the credentials that are passed
Expand All @@ -41,6 +44,23 @@ func NewStackDriverClient(ctx context.Context, credentials string) (*StackDriver
}, nil
}

// NewStackDriverClient creates a new stackdriver client with the credentials underlying
func NewStackDriverClientPodIdentity(ctx context.Context) (*StackDriverClient, error) {
client, err := monitoring.NewMetricClient(ctx)
if err != nil {
return nil, err
}
c := metadata.NewClient(&http.Client{})
project, err := c.ProjectID()
if err != nil {
return nil, err
}
return &StackDriverClient{
metricsClient: client,
projectID: project,
}, nil
}

// GetMetrics fetches metrics from stackdriver for a specific filter for the last minute
func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64, error) {
// Set the start time to 1 minute ago
Expand All @@ -50,19 +70,34 @@ func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64
endTime := time.Now().UTC()

// Create a request with the filter and the GCP project ID
req := &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.credentials.ProjectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
var req *monitoringpb.ListTimeSeriesRequest
if len(s.projectID) > 0 {
req = &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.projectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
},
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
}
} else {
req = &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.credentials.ProjectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
},
},
},
}
}

// Get an iterator with the list of time series
it := s.metricsClient.ListTimeSeries(ctx, req)

Expand Down

0 comments on commit 744fa09

Please sign in to comment.