diff --git a/CHANGELOG.md b/CHANGELOG.md index c1b816eb2b6..c21fdd83ca8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092)) - Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211)) - Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) +- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225)) ### Improvements diff --git a/go.mod b/go.mod index 186c8198a74..78e1d5573ac 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/kedacore/keda/v2 go 1.16 require ( + cloud.google.com/go v0.97.0 // indirect cloud.google.com/go/monitoring v1.0.0 github.com/Azure/azure-amqp-common-go/v3 v3.2.1 github.com/Azure/azure-event-hubs-go/v3 v3.3.16 diff --git a/go.sum b/go.sum index 8bd4f0c8857..2bf81b7a7a7 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+Y cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4= cloud.google.com/go v0.96.0 h1:r9XIwQ9FrJspMjHulRm1kl1uanw5gSolzSK+dukeH0E= cloud.google.com/go v0.96.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc= +cloud.google.com/go v0.97.0 h1:3DXvAyifywvq64LfkKaMOmkWPS1CikIQdMe2lY9vxU8= +cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index 00c88290ad9..5cf94f73398 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -12,6 +12,7 @@ import ( "k8s.io/metrics/pkg/apis/external_metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -23,6 +24,7 @@ const ( type gcpAuthorizationMetadata struct { GoogleApplicationCredentials string podIdentityOwner bool + podIdentityProviderEnabled bool } type pubsubScaler struct { @@ -74,7 +76,7 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) { return nil, fmt.Errorf("no subscription name given") } - auth, err := getGcpAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) + auth, err := getGcpAuthorization(config, config.ResolvedEnv) if err != nil { return nil, err } @@ -153,7 +155,13 @@ func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metric // Stackdriver api func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) { if s.client == nil { - client, err := NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials) + var client *StackDriverClient + var err error + if s.metadata.gcpAuthorization.podIdentityProviderEnabled { + client, err = NewStackDriverClientPodIdentity(ctx) + } else { + client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials) + } if err != nil { return -1, err } @@ -165,15 +173,21 @@ func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) { return s.client.GetMetrics(ctx, filter) } -func getGcpAuthorization(authParams, metadata, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) { +func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) { + metadata := config.TriggerMetadata + authParams := config.AuthParams meta := gcpAuthorizationMetadata{} if metadata["identityOwner"] == "operator" { meta.podIdentityOwner = false } else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" { meta.podIdentityOwner = true - if authParams["GoogleApplicationCredentials"] != "" { + switch { + case config.PodIdentity == kedav1alpha1.PodIdentityProviderGCP: + // do nothing, rely on underneath metadata google + meta.podIdentityProviderEnabled = true + case authParams["GoogleApplicationCredentials"] != "": meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"] - } else { + default: if metadata["credentialsFromEnv"] != "" { meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]] } else { diff --git a/pkg/scalers/stackdriver_client.go b/pkg/scalers/stackdriver_client.go index 0f62f39b8c6..cb72037629f 100644 --- a/pkg/scalers/stackdriver_client.go +++ b/pkg/scalers/stackdriver_client.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "fmt" + "net/http" "time" + "cloud.google.com/go/compute/metadata" monitoring "cloud.google.com/go/monitoring/apiv3/v2" timestamp "github.com/golang/protobuf/ptypes/timestamp" "google.golang.org/api/iterator" @@ -18,6 +20,7 @@ import ( type StackDriverClient struct { metricsClient *monitoring.MetricClient credentials GoogleApplicationCredentials + projectID string } // NewStackDriverClient creates a new stackdriver client with the credentials that are passed @@ -41,6 +44,23 @@ func NewStackDriverClient(ctx context.Context, credentials string) (*StackDriver }, nil } +// NewStackDriverClient creates a new stackdriver client with the credentials underlying +func NewStackDriverClientPodIdentity(ctx context.Context) (*StackDriverClient, error) { + client, err := monitoring.NewMetricClient(ctx) + if err != nil { + return nil, err + } + c := metadata.NewClient(&http.Client{}) + project, err := c.ProjectID() + if err != nil { + return nil, err + } + return &StackDriverClient{ + metricsClient: client, + projectID: project, + }, nil +} + // GetMetrics fetches metrics from stackdriver for a specific filter for the last minute func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64, error) { // Set the start time to 1 minute ago @@ -50,19 +70,34 @@ func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64 endTime := time.Now().UTC() // Create a request with the filter and the GCP project ID - req := &monitoringpb.ListTimeSeriesRequest{ - Name: "projects/" + s.credentials.ProjectID, - Filter: filter, - Interval: &monitoringpb.TimeInterval{ - StartTime: ×tamp.Timestamp{ - Seconds: startTime.Unix(), + var req *monitoringpb.ListTimeSeriesRequest + if len(s.projectID) > 0 { + req = &monitoringpb.ListTimeSeriesRequest{ + Name: "projects/" + s.projectID, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamp.Timestamp{ + Seconds: startTime.Unix(), + }, + EndTime: ×tamp.Timestamp{ + Seconds: endTime.Unix(), + }, }, - EndTime: ×tamp.Timestamp{ - Seconds: endTime.Unix(), + } + } else { + req = &monitoringpb.ListTimeSeriesRequest{ + Name: "projects/" + s.credentials.ProjectID, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamp.Timestamp{ + Seconds: startTime.Unix(), + }, + EndTime: ×tamp.Timestamp{ + Seconds: endTime.Unix(), + }, }, - }, + } } - // Get an iterator with the list of time series it := s.metricsClient.ListTimeSeries(ctx, req)