Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
31 changes: 20 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/openshift/cluster-image-registry-operator
go 1.19

require (
cloud.google.com/go/storage v1.10.0
cloud.google.com/go/resourcemanager v1.9.1
cloud.google.com/go/storage v1.29.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-sdk-for-go v55.6.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.0
Expand All @@ -23,6 +24,7 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/googleapis/gax-go/v2 v2.11.0
github.com/gophercloud/gophercloud v1.1.0
github.com/gophercloud/utils v0.0.0-20221124081324-7bac6f5cdf99
github.com/goware/urlx v0.3.2
Expand All @@ -38,7 +40,8 @@ require (
github.com/stretchr/testify v1.8.1
golang.org/x/net v0.11.0
golang.org/x/oauth2 v0.8.0
google.golang.org/api v0.57.0
golang.org/x/time v0.3.0
google.golang.org/api v0.126.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.27.4
k8s.io/apimachinery v0.27.4
Expand All @@ -48,7 +51,11 @@ require (
)

require (
cloud.google.com/go v0.97.0 // indirect
cloud.google.com/go v0.110.2 // indirect
cloud.google.com/go/compute v1.19.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.0 // indirect
cloud.google.com/go/longrunning v0.5.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.20 // indirect
Expand Down Expand Up @@ -90,9 +97,10 @@ require (
github.com/google/cel-go v0.12.6 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-retryablehttp v0.6.6 // indirect
github.com/imdario/mergo v0.3.8 // indirect
Expand Down Expand Up @@ -125,7 +133,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect
go.etcd.io/etcd/client/v3 v3.5.7 // indirect
go.mongodb.org/mongo-driver v1.5.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.35.1 // indirect
go.opentelemetry.io/otel v1.10.0 // indirect
Expand All @@ -140,15 +148,16 @@ require (
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand All @@ -168,4 +177,4 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace google.golang.org/grpc => google.golang.org/grpc v1.40.0
replace google.golang.org/grpc => google.golang.org/grpc v1.55.0
855 changes: 835 additions & 20 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions manifests/01-registry-credentials-request-gcs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ spec:
kind: GCPProviderSpec
predefinedRoles:
- roles/storage.admin
- roles/resourcemanager.tagUser
skipServiceCheck: true
serviceAccountNames:
- cluster-image-registry-operator
Expand Down
260 changes: 256 additions & 4 deletions pkg/storage/gcs/gcp_labels_tags.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,67 @@
package gcs

import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"

"k8s.io/klog/v2"

configv1 "github.com/openshift/api/config/v1"
imageregistryv1 "github.com/openshift/api/imageregistry/v1"
operatorapi "github.com/openshift/api/operator/v1"
configlisters "github.com/openshift/client-go/config/listers/config/v1"
regopclient "github.com/openshift/cluster-image-registry-operator/pkg/client"
"github.com/openshift/cluster-image-registry-operator/pkg/defaults"
"github.com/openshift/cluster-image-registry-operator/pkg/storage/util"

rscmgr "cloud.google.com/go/resourcemanager/apiv3"
rscmgrpb "cloud.google.com/go/resourcemanager/apiv3/resourcemanagerpb"
"github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
"golang.org/x/time/rate"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

"k8s.io/klog/v2"
)

const (
// ocpDefaultLabelFmt is the format string for the default label
// added to the OpenShift created GCP resources.
ocpDefaultLabelFmt = "kubernetes-io-cluster-%s"

// gcpTagsSuccessStatusReason is the operator condition status reason
// for successful tag operations.
gcpTagsSuccessStatusReason = "SuccessTaggingBucket"

// gcpTagsFailedStatusReason is the operator condition status reason
// for failed tag operations.
gcpTagsFailedStatusReason = "ErrorTaggingBucket"

// gcpMaxTagsPerResource is the maximum number of tags that can
// be attached to a resource.
gcpMaxTagsPerResource = 50

// gcpTagsRequestRateLimit is the tag request rate limit per second.
gcpTagsRequestRateLimit = 8

// gcpTagsRequestTokenBucketSize is the burst/token bucket size used
// for limiting API requests.
gcpTagsRequestTokenBucketSize = 8

// resourceManagerHostSubPath is the endpoint for tag requests.
resourceManagerHostSubPath = "cloudresourcemanager.googleapis.com"

// bucketParentPathFmt is the string format for the parent path of a bucket resource
bucketParentPathFmt = "//storage.googleapis.com/projects/_/buckets/%s"
)

func getUserLabels(infraLister configlisters.InfrastructureLister) (map[string]string, error) {
infra, err := util.GetInfrastructure(infraLister)
if err != nil {
klog.Errorf("getUserLabels: failed to read infrastructure/cluster resource: %w", err)
return nil, err
return nil, fmt.Errorf("getUserLabels: failed to read infrastructure/cluster resource: %w", err)
}
// add OCP default label along with user-defined labels
labels := map[string]string{
Expand All @@ -35,3 +77,213 @@ func getUserLabels(infraLister configlisters.InfrastructureLister) (map[string]s
}
return labels, nil
}

// newLimiter returns token bucket based request rate limiter after initializing
// the passed values for limit, burst(or token bucket) size. If opted for emptyBucket
// all initial tokens are reserved for the first burst.
func newLimiter(limit, burst int, emptyBucket bool) *rate.Limiter {
limiter := rate.NewLimiter(rate.Every(time.Second/time.Duration(limit)), burst)

if emptyBucket {
limiter.AllowN(time.Now(), burst)
}

return limiter
}

// toTagValueList converts the tags to an array containing tagValues
// NamespacedNames.
func toTagValueList(tags []configv1.GCPResourceTag) []string {
if len(tags) <= 0 {
return nil
}

list := make([]string, 0, len(tags))
for _, tag := range tags {
t := fmt.Sprintf("%s/%s/%s", tag.ParentID, tag.Key, tag.Value)
list = append(list, t)
}
return list
}

// getInfraResourceTagsList returns the user-defined tags present in the
// status sub-resource of Infrastructure.
func getInfraResourceTagsList(platformStatus *configv1.PlatformStatus) []configv1.GCPResourceTag {
if platformStatus != nil && platformStatus.GCP != nil && platformStatus.GCP.ResourceTags != nil {
return platformStatus.GCP.ResourceTags
}
klog.V(1).Infof("getInfraResourceTagsList: user-defined tag list is not provided")
return nil
}

// getTagsList returns the list of tags to apply on the resources.
func getTagsList(platformStatus *configv1.PlatformStatus) []string {
return toTagValueList(getInfraResourceTagsList(platformStatus))
}

// getFilteredTagList returns the list of tags to apply on the resources after
// filtering the tags already existing on a given resource.
func getFilteredTagList(ctx context.Context, platformStatus *configv1.PlatformStatus, client *rscmgr.TagBindingsClient, parent string) []string {
return filterTagList(ctx, client, parent, getTagsList(platformStatus))
}

// filterTagList returns the filtered list of tags to apply on the resources.
func filterTagList(ctx context.Context, client *rscmgr.TagBindingsClient, parent string, tagList []string) []string {
dupTags := make(map[string]bool, len(tagList))
for _, k := range tagList {
dupTags[k] = false
}

listBindingsReq := &rscmgrpb.ListEffectiveTagsRequest{
Parent: parent,
}
bindings := client.ListEffectiveTags(ctx, listBindingsReq)
// a resource can have a maximum of {gcpMaxTagsPerResource} tags attached to it.
// Will iterate for {gcpMaxTagsPerResource} times in the worst case scenario, if
// none of the break conditions are met. Should the {gcpMaxTagsPerResource} be
// increased in future, it should not create an issue, since this is an optimization
// attempt to reduce the number the tag write calls by skipping already existing tags,
// since it has a quota restriction.
for i := 0; i < gcpMaxTagsPerResource; i++ {
binding, err := bindings.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil || binding == nil {
klog.V(4).Infof("failed to list effective tags on the %s bucket: %v: %v", parent, binding, err)
break
}
tag := binding.GetNamespacedTagValue()
if _, exist := dupTags[tag]; exist {
dupTags[tag] = true
klog.V(4).Infof("filterTagList: skipping tag %s already exists on the %s bucket", tag, parent)
}
}

filteredTags := make([]string, 0, len(tagList))
for tagValue, dup := range dupTags {
if !dup {
filteredTags = append(filteredTags, tagValue)
}
}

return filteredTags
}

// getCreateCallOptions returns a list of additional call options to use for
// the create operations.
func getCreateCallOptions() []gax.CallOption {
return []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnHTTPCodes(gax.Backoff{
Initial: 90 * time.Second,
Max: 5 * time.Minute,
Multiplier: 2,
},
http.StatusTooManyRequests)
}),
}
}

// getTagBindingsClient returns the client to be used for creating tag bindings to
// the resources.
func getTagBindingsClient(ctx context.Context, listers *regopclient.StorageListers, location string) (*rscmgr.TagBindingsClient, error) {
cfg, err := GetConfig(listers)
if err != nil {
return nil, fmt.Errorf("getTagBindingsClient: failed to read gcp config: %w", err)
}

endpoint := fmt.Sprintf("https://%s-%s", location, resourceManagerHostSubPath)
opts := []option.ClientOption{
option.WithCredentialsJSON([]byte(cfg.KeyfileData)),
option.WithEndpoint(endpoint),
}
return rscmgr.NewTagBindingsRESTClient(ctx, opts...)
}

// addTagsToStorageBucket adds the user-defined tags in the Infrastructure resource
// to the passed GCP bucket resource. It's wrapper around addUserTagsToStorageBucket()
// additionally updates status condition.
func addTagsToStorageBucket(ctx context.Context, cr *imageregistryv1.Config, listers *regopclient.StorageListers, bucketName, region string) error {
if err := addUserTagsToStorageBucket(ctx, listers, bucketName, region); err != nil {
util.UpdateCondition(cr, defaults.StorageTagged, operatorapi.ConditionFalse,
gcpTagsFailedStatusReason, err.Error())
return err
}
util.UpdateCondition(cr, defaults.StorageTagged, operatorapi.ConditionTrue,
gcpTagsSuccessStatusReason,
fmt.Sprintf("Successfully added user-defined tags to %s storage bucket", bucketName))
return nil
}

// addUserTagsToStorageBucket adds the user-defined tags in the Infrastructure resource
// to the passed GCP bucket resource.
func addUserTagsToStorageBucket(ctx context.Context, listers *regopclient.StorageListers, bucketName, region string) error {
// Tags are not supported for buckets located in the us-east2 and us-east3 regions.
// https://cloud.google.com/storage/docs/tags-and-labels#tags
if strings.ToLower(region) == "us-east2" ||
strings.ToLower(region) == "us-east3" {
klog.Infof("addUserTagsToStorageBucket: skip tagging bucket %s created in tags unsupported region %s", bucketName, region)
return nil
}

infra, err := util.GetInfrastructure(listers.Infrastructures)
if err != nil {
return fmt.Errorf("addUserTagsToStorageBucket: failed to read infrastructure/cluster resource: %w", err)
}

client, err := getTagBindingsClient(ctx, listers, region)
if err != nil || client == nil {
return fmt.Errorf("failed to create tag binding client for adding tags to %s bucket: %v",
bucketName, err)
}
defer client.Close()

parent := fmt.Sprintf(bucketParentPathFmt, bucketName)
tagValues := getFilteredTagList(ctx, infra.Status.PlatformStatus, client, parent)
if len(tagValues) <= 0 {
return nil
}

// GCP has a rate limit of 600 requests per minute, restricting
// here to 8 requests per second.
limiter := newLimiter(gcpTagsRequestRateLimit, gcpTagsRequestTokenBucketSize, true)

tagBindingReq := &rscmgrpb.CreateTagBindingRequest{
TagBinding: &rscmgrpb.TagBinding{
Parent: parent,
},
}
errFlag := false
for _, value := range tagValues {
if err := limiter.Wait(ctx); err != nil {
errFlag = true
klog.Errorf("rate limiting request to add %s tag to %s bucket failed: %v",
value, bucketName, err)
continue
}

tagBindingReq.TagBinding.TagValueNamespacedName = value
result, err := client.CreateTagBinding(ctx, tagBindingReq, getCreateCallOptions()...)
if err != nil {
e, ok := err.(*apierror.APIError)
if ok && e.HTTPCode() == http.StatusConflict {
klog.Infof("tag binding %s/%s already exists", bucketName, value)
continue
}
errFlag = true
klog.Errorf("request to add %s tag to %s bucket failed: %v", value, bucketName, err)
continue
}

if _, err = result.Wait(ctx); err != nil {
errFlag = true
klog.Errorf("failed to add %s tag to %s bucket: %v", value, bucketName, err)
}
klog.V(1).Infof("binding tag %s to %s bucket successful", value, bucketName)
}
if errFlag {
return fmt.Errorf("failed to add tag(s) to %s bucket", bucketName)
}
return nil
}
Loading