Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GCP identity authentication when using Pubsub Scaler #2225

Merged
merged 4 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))
- Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211))
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225))

### Improvements

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kedacore/keda/v2
go 1.16

require (
cloud.google.com/go v0.97.0 // indirect
cloud.google.com/go/monitoring v1.0.0
github.com/Azure/azure-amqp-common-go/v3 v3.2.1
github.com/Azure/azure-event-hubs-go/v3 v3.3.16
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+Y
cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4=
cloud.google.com/go v0.96.0 h1:r9XIwQ9FrJspMjHulRm1kl1uanw5gSolzSK+dukeH0E=
cloud.google.com/go v0.96.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
cloud.google.com/go v0.97.0 h1:3DXvAyifywvq64LfkKaMOmkWPS1CikIQdMe2lY9vxU8=
cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
Expand Down
24 changes: 19 additions & 5 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -23,6 +24,7 @@ const (
type gcpAuthorizationMetadata struct {
GoogleApplicationCredentials string
podIdentityOwner bool
podIdentityProviderEnabled bool
}

type pubsubScaler struct {
Expand Down Expand Up @@ -74,7 +76,7 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {
return nil, fmt.Errorf("no subscription name given")
}

auth, err := getGcpAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
auth, err := getGcpAuthorization(config, config.ResolvedEnv)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,7 +155,13 @@ func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metric
// Stackdriver api
func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {
if s.client == nil {
client, err := NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}
if err != nil {
return -1, err
}
Expand All @@ -165,15 +173,21 @@ func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {
return s.client.GetMetrics(ctx, filter)
}

func getGcpAuthorization(authParams, metadata, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
metadata := config.TriggerMetadata
authParams := config.AuthParams
meta := gcpAuthorizationMetadata{}
if metadata["identityOwner"] == "operator" {
meta.podIdentityOwner = false
} else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" {
meta.podIdentityOwner = true
if authParams["GoogleApplicationCredentials"] != "" {
switch {
case config.PodIdentity == kedav1alpha1.PodIdentityProviderGCP:
// do nothing, rely on underneath metadata google
meta.podIdentityProviderEnabled = true
case authParams["GoogleApplicationCredentials"] != "":
meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"]
} else {
default:
if metadata["credentialsFromEnv"] != "" {
meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]]
} else {
Expand Down
55 changes: 45 additions & 10 deletions pkg/scalers/stackdriver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"cloud.google.com/go/compute/metadata"
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/api/iterator"
Expand All @@ -18,6 +20,7 @@ import (
type StackDriverClient struct {
metricsClient *monitoring.MetricClient
credentials GoogleApplicationCredentials
projectID string
}

// NewStackDriverClient creates a new stackdriver client with the credentials that are passed
Expand All @@ -41,6 +44,23 @@ func NewStackDriverClient(ctx context.Context, credentials string) (*StackDriver
}, nil
}

// NewStackDriverClient creates a new stackdriver client with the credentials underlying
func NewStackDriverClientPodIdentity(ctx context.Context) (*StackDriverClient, error) {
client, err := monitoring.NewMetricClient(ctx)
if err != nil {
return nil, err
}
c := metadata.NewClient(&http.Client{})
project, err := c.ProjectID()
if err != nil {
return nil, err
}
return &StackDriverClient{
metricsClient: client,
projectID: project,
}, nil
}

// GetMetrics fetches metrics from stackdriver for a specific filter for the last minute
func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64, error) {
// Set the start time to 1 minute ago
Expand All @@ -50,19 +70,34 @@ func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64
endTime := time.Now().UTC()

// Create a request with the filter and the GCP project ID
req := &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.credentials.ProjectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
var req *monitoringpb.ListTimeSeriesRequest
if len(s.projectID) > 0 {
req = &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.projectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
},
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
}
} else {
req = &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.credentials.ProjectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
},
},
},
}
}

// Get an iterator with the list of time series
it := s.metricsClient.ListTimeSeries(ctx, req)

Expand Down