diff --git a/pkg/cmd/controller/config.go b/pkg/cmd/controller/config.go index f4c80a554..280b7afac 100644 --- a/pkg/cmd/controller/config.go +++ b/pkg/cmd/controller/config.go @@ -9,7 +9,7 @@ var ControllerInitializers = map[openshiftcontrolplanev1.OpenShiftControllerName openshiftcontrolplanev1.OpenShiftDefaultRoleBindingsController: RunDefaultRoleBindingController, - openshiftcontrolplanev1.OpenShiftServiceAccountPullSecretsController: RunServiceAccountPullSecretsController, + openshiftcontrolplanev1.OpenShiftServiceAccountPullSecretsController: RunInternalImageRegistryPullSecretsController, openshiftcontrolplanev1.OpenShiftOriginNamespaceController: RunOriginNamespaceController, openshiftcontrolplanev1.OpenShiftBuilderServiceAccountController: RunBuilderServiceAccountController, diff --git a/pkg/cmd/controller/imageregistry.go b/pkg/cmd/controller/imageregistry.go new file mode 100644 index 000000000..088ee16c7 --- /dev/null +++ b/pkg/cmd/controller/imageregistry.go @@ -0,0 +1,28 @@ +package controller + +import "github.com/openshift/openshift-controller-manager/pkg/internalregistry/controllers" + +// RunInternalImageRegistryPullSecretsController starts the control loops that manage +// the image pull secrets for the internal image registry. +func RunInternalImageRegistryPullSecretsController(ctx *ControllerContext) (bool, error) { + kc := ctx.HighRateLimitClientBuilder.ClientOrDie(iInfraServiceAccountPullSecretsControllerServiceAccountName) + secrets := ctx.KubernetesInformers.Core().V1().Secrets() + serviceAccounts := ctx.KubernetesInformers.Core().V1().ServiceAccounts() + services := ctx.KubernetesInformers.Core().V1().Services() + additionalRegistryURLs := ctx.OpenshiftControllerConfig.DockerPullSecret.RegistryURLs + + serviceAccountController := controllers.NewServiceAccountController(kc, serviceAccounts, secrets) + imagePullSecretController, kids, urls := controllers.NewImagePullSecretController(kc, secrets) + keyIDObservationController := controllers.NewKeyIDObservationController(kc, secrets, kids) + registryURLObservationController := controllers.NewRegistryURLObservationController(services, additionalRegistryURLs, urls) + legacyTokenSecretController := controllers.NewLegacyTokenSecretController(kc, secrets) + legacyImagePullSecretController := controllers.NewLegacyImagePullSecretController(kc, secrets) + + go serviceAccountController.Run(ctx.Context, 5) + go keyIDObservationController.Run(ctx.Context, 1) + go registryURLObservationController.Run(ctx.Context, 1) + go imagePullSecretController.Run(ctx.Context, 5) + go legacyTokenSecretController.Run(ctx.Context, 5) + go legacyImagePullSecretController.Run(ctx.Context, 5) + return true, nil +} diff --git a/pkg/internalregistry/controllers/annotations.go b/pkg/internalregistry/controllers/annotations.go new file mode 100644 index 000000000..4ec379b56 --- /dev/null +++ b/pkg/internalregistry/controllers/annotations.go @@ -0,0 +1,18 @@ +package controllers + +const ( + // Annotation added to managed image pull secrets to indicate the service account used in the token. + InternalRegistryAuthTokenServiceAccountAnnotation = "openshift.io/internal-registry-auth-token.service-account" + + // Annotation added to managed image pull secrets to indicate the service account token's binding type. + InternalRegistryAuthTokenTypeAnnotation = "openshift.io/internal-registry-auth-token.binding" + + // Annotation added to service accounts to document the corresponding managed image pull secret. + InternalRegistryImagePullSecretRefKey = "openshift.io/internal-registry-pull-secret-ref" + + // Indicates a bound service account token is used for authentication. + AuthTokenTypeBound = "bound" + + // Indicates a legacy, long-lived, service account token is used for authentication. + AuthTokenTypeLegacy = "legacy" +) diff --git a/pkg/internalregistry/controllers/image_pull_secret_controller.go b/pkg/internalregistry/controllers/image_pull_secret_controller.go new file mode 100644 index 000000000..5aa470cd4 --- /dev/null +++ b/pkg/internalregistry/controllers/image_pull_secret_controller.go @@ -0,0 +1,401 @@ +package controllers + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "reflect" + "sync" + "time" + + "go.uber.org/atomic" + "golang.org/x/exp/slices" + "gopkg.in/square/go-jose.v2/jwt" + authenticationv1 "k8s.io/api/authentication/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" + informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/credentialprovider" +) + +type imagePullSecretController struct { + client kubernetes.Interface + secrets listers.SecretLister + cacheSyncs []cache.InformerSynced + queue workqueue.RateLimitingInterface + urls *atomic.Pointer[[]string] + urlsC chan []string + kids *atomic.Pointer[[]string] + kidsC chan []string +} + +// some handy types so we don't mixup these channels +type urlsChan chan<- []string +type kidsChan chan<- []string + +func NewImagePullSecretController(kubeclient kubernetes.Interface, secrets informers.SecretInformer) (*imagePullSecretController, kidsChan, urlsChan) { + c := &imagePullSecretController{ + client: kubeclient, + secrets: secrets.Lister(), + cacheSyncs: []cache.InformerSynced{secrets.Informer().HasSynced}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bound-token-managed-image-pull-secrets"), + kids: atomic.NewPointer[[]string](nil), + urls: atomic.NewPointer[[]string](nil), + kidsC: make(chan []string), + urlsC: make(chan []string), + } + secrets.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: isManagedImagePullSecret, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + UpdateFunc: func(_ any, new any) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + c.queue.Add(key) + } + }, + DeleteFunc: func(obj any) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + }, + }) + return c, c.kidsC, c.urlsC +} + +func isManagedImagePullSecret(obj any) bool { + secret, ok := obj.(*corev1.Secret) + if !ok { + return false + } + return secret.Type == corev1.SecretTypeDockercfg && len(secret.Annotations[InternalRegistryAuthTokenServiceAccountAnnotation]) > 0 +} + +func (c *imagePullSecretController) resync() { + secrets, err := c.secrets.List(labels.Everything()) + if err != nil { + klog.V(1).ErrorS(err, "error listing secrets") + return + } + for _, s := range secrets { + if isManagedImagePullSecret(s) { + key, err := cache.MetaNamespaceKeyFunc(s) + if err == nil { + c.queue.Add(key) + } + } + } +} + +const imagePullSecretControllerFieldManager = "openshift.io/image-registry-pull-secrets_image-pull-secret-controller" + +func (c *imagePullSecretController) sync(ctx context.Context, key string) (error, time.Duration) { + klog.V(4).InfoS("sync", "key", key) + + kids := c.kids.Load() + urls := c.urls.Load() + + // if we don't have a kid yet, requeue + if kids == nil { + // return error to requeue + return fmt.Errorf("service account token keys have not been observed yet"), 0 + } + + // if we don't have registry urls yet, requeue + if urls == nil { + // return error to requeue + return fmt.Errorf("image registry urls have not been observed yet"), 0 + } + + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err, 0 + } + secret, err := c.secrets.Secrets(ns).Get(name) + if errors.IsNotFound(err) { + return nil, 0 + } + if err != nil { + return err, 0 + } + + refreshNow, refreshAt := c.isSecretRefreshNeeded(secret, *urls, *kids) + if !refreshNow { + + // the annotation is missing or incorrect, fix it + if secret.Annotations[InternalRegistryAuthTokenTypeAnnotation] != AuthTokenTypeBound { + patch, err := applycorev1.ExtractSecret(secret, imagePullSecretControllerFieldManager) + if err != nil { + return err, 0 + } + patch.WithAnnotations(map[string]string{InternalRegistryAuthTokenTypeAnnotation: AuthTokenTypeBound}) + _, err = c.client.CoreV1().Secrets(secret.Namespace).Apply(ctx, patch, metav1.ApplyOptions{Force: true, FieldManager: imagePullSecretControllerFieldManager}) + if err != nil { + return err, 0 + } + } + + // token is not expired and not expiring soon, requeue when expected to need a refresh + refreshAfter := refreshAt.Sub(time.Now()) + klog.V(4).InfoS(key, "refreshAfter", refreshAfter) + return nil, refreshAfter + } + + var serviceAccountName = serviceAccountNameForManagedSecret(secret) + klog.V(2).InfoS("Refreshing image pull secret", "ns", secret.Namespace, "name", secret.Name, "serviceaccount", serviceAccountName) + + // request new token, bound by default time and bound to this secret + tokenRequest, err := c.client.CoreV1().ServiceAccounts(secret.Namespace).CreateToken(ctx, serviceAccountName, + &authenticationv1.TokenRequest{Spec: authenticationv1.TokenRequestSpec{BoundObjectRef: &authenticationv1.BoundObjectReference{ + APIVersion: "v1", Kind: "Secret", Name: secret.Name, UID: secret.UID, + }}}, + metav1.CreateOptions{}, + ) + if err != nil { + return err, 0 + } + + // compute registry authentication data + data, err := json.Marshal(dockerConfig(tokenRequest.Status.Token, *urls)) + if err != nil { + return fmt.Errorf("unable to serialize registry auth data: %w", err), 0 + } + + patch := applycorev1.Secret(name, ns). + WithAnnotations(map[string]string{ + InternalRegistryAuthTokenTypeAnnotation: AuthTokenTypeBound, + }). + WithType(corev1.SecretTypeDockercfg). + WithData(map[string][]byte{corev1.DockerConfigKey: data}) + _, err = c.client.CoreV1().Secrets(secret.Namespace).Apply(ctx, patch, metav1.ApplyOptions{Force: true, FieldManager: imagePullSecretControllerFieldManager}) + + if err != nil { + return err, 0 + } + + refreshAfter := refreshAt.Sub(time.Now()) + return nil, refreshAfter +} + +func dockerConfig(token string, urls []string) any { + // not using credentialprovider.DockerConfig to keep redundant username/password/email out of secret + auth := map[string]map[string]string{} + entry := map[string]string{ + "auth": base64.StdEncoding.EncodeToString([]byte(":" + token)), + } + for _, url := range urls { + auth[url] = entry + } + return auth +} + +func (c *imagePullSecretController) isSecretRefreshNeeded(secret *corev1.Secret, urls, kids []string) (bool, time.Time) { + valid, refreshAt := c.registryAuthenticationFileValid(secret, urls, kids) + return !valid, refreshAt +} + +func (c *imagePullSecretController) registryAuthenticationFileValid(imagePullSecret *corev1.Secret, urls, kids []string) (bool, time.Time) { + if imagePullSecret.Type != corev1.SecretTypeDockercfg { + klog.V(2).InfoS("Internal registry pull secret type is incorrect.", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "type", imagePullSecret.Type) + return false, time.Now() + } + // registry authentication file must exist + if _, ok := imagePullSecret.Data[corev1.DockerConfigKey]; !ok { + klog.V(2).InfoS("Internal registry pull secret does not contain the expected key", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "keys", reflect.ValueOf(imagePullSecret.Data).MapKeys()) + return false, time.Now() + } + // parse the registry authentication file + auth := credentialprovider.DockerConfig{} + if err := json.Unmarshal(imagePullSecret.Data[corev1.DockerConfigKey], &auth); err != nil { + klog.V(2).InfoS("Internal registry pull secret auth data cannot be parsed", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name) + return false, time.Now() + } + // there should be an entries for each internal registry url + if len(auth) != len(urls) { + klog.V(2).InfoS("Internal registry pull secret auth data does not contain the correct number of entries", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "expected", len(urls), "actual", len(auth)) + return false, time.Now() + } + matches := 0 +CheckUrl: + for _, url := range urls { + for key := range auth { + if key == url { + matches++ + continue CheckUrl + } + } + } + if matches != len(urls) { + klog.V(2).InfoS("Internal registry pull secret needs to be refreshed", "reason", "auth data does not contain the correct entries", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "expected", urls, "actual", reflect.ValueOf(auth).MapKeys()) + return false, time.Now() + } + + // track the earliest refresh time of the token (they should all be the same, but check anyway) + var requeueAt time.Time + + // check the token embedded in the registry authentication file + for url, entry := range auth { + token, err := jwt.ParseSigned(entry.Password) + if err != nil { + klog.V(2).InfoS("Internal registry pull secret needs to be refreshed", "reason", "auth token cannot be parsed", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "url", url, "error", err) + return false, time.Now() + } + + // was token created with previoud token signing cert? + var validKeyID bool + for _, kid := range kids { + if token.Headers[0].KeyID == kid { + validKeyID = true + break + } + } + if !validKeyID { + klog.V(2).InfoS("Internal registry pull secret needs to be refreshed", "reason", "auth token was not signed by a current signer", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "url", url, "error", err) + return false, time.Now() + } + + var claims jwt.Claims + // "unsafe" in the following API just means we are not validating the signature + err = token.UnsafeClaimsWithoutVerification(&claims) + if err != nil { + klog.V(2).InfoS("Internal registry pull secret needs to be refreshed", "reason", "auth token claim cannot be parsed", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "url", url, "error", err) + return false, time.Now() + } + // if token is expired or will only be valid less than 40% of its remaining duration we want to trigger a new token request + refreshTime := claims.Expiry.Time().Add(time.Duration(-int64(float64(time.Now().Sub(claims.Expiry.Time())) * 0.4))) + klog.V(4).InfoS("Token expiration check.", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "url", url, "expirtyTime", claims.Expiry.Time(), "refreshTime", refreshTime) + if time.Now().After(refreshTime) { + klog.V(2).InfoS("Internal registry pull secret needs to be refreshed", "reason", "auth token needs to be refreshed", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name, "url", url, "expirtyTime", claims.Expiry.Time(), "refreshTime", refreshTime) + return false, time.Now() + } + if requeueAt.IsZero() || requeueAt.After(refreshTime) { + requeueAt = refreshTime + } + } + klog.V(4).InfoS("Internal registry pull secret does not need to be refreshed.", "ns", imagePullSecret.Namespace, "name", imagePullSecret.Name) + return true, requeueAt +} + +func (c *imagePullSecretController) Run(ctx context.Context, workers int) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + const name = "openshift.io/internal-image-registry-pull-secrets_image-pull-secret" + klog.InfoS("Starting controller", "name", name) + + if !cache.WaitForNamedCacheSync(name, ctx.Done(), c.cacheSyncs...) { + return + } + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + var v []string + for len(v) == 0 { + klog.V(2).Info("Waiting for image registry urls to be observed") + select { + case v = <-c.urlsC: + c.urls.Store(&v) + klog.V(2).InfoS("Observed image registry urls", "urls", v) + case <-ctx.Done(): + return + } + } + }() + go func() { + defer wg.Done() + var v []string + for len(v) == 0 { + klog.V(2).Info("Waiting for service account token signing cert to be observed") + select { + case v = <-c.kidsC: + klog.V(2).InfoS("Observed service account token signing certs", "kids", v) + c.kids.Store(&v) + case <-ctx.Done(): + return + } + } + }() + wg.Wait() + + // start workers + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + + // start observers + go func() { + for { + select { + case v := <-c.urlsC: + if len(v) == 0 { + klog.V(1).ErrorS(nil, "unable to observe at least one image registry url") + continue // controller can not do anything useful without a value, so do nothing + } + klog.V(2).InfoS("Observed image registry urls", "urls", v) + old := c.urls.Swap(&v) + if !slices.Equal(*old, v) { + c.resync() + } + case v := <-c.kidsC: + if len(v) == 0 { + klog.V(1).ErrorS(nil, "unable to observe at least one service account token signing cert") + continue // controller can not do anything useful without a value, so do nothing + } + klog.V(2).InfoS("Observed service account token signing certs", "kids", v) + old := c.kids.Swap(&v) + if !slices.Equal(*old, v) { + c.resync() + } + case <-ctx.Done(): + return + } + } + }() + + klog.InfoS("Started controller", "name", name) + <-ctx.Done() + klog.InfoS("Shutting down controller", "name", name) +} + +func (c *imagePullSecretController) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *imagePullSecretController) processNextWorkItem(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + err, requeueAfter := c.sync(ctx, key.(string)) + if err == nil { + c.queue.Forget(key) + if requeueAfter > 0 { + c.queue.AddAfter(key, requeueAfter) + } + return true + } + runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + c.queue.AddRateLimited(key) + return true +} diff --git a/pkg/internalregistry/controllers/keyid_observation_controller.go b/pkg/internalregistry/controllers/keyid_observation_controller.go new file mode 100644 index 000000000..428d7ccfa --- /dev/null +++ b/pkg/internalregistry/controllers/keyid_observation_controller.go @@ -0,0 +1,196 @@ +package controllers + +import ( + "context" + "crypto" + "crypto/x509" + "encoding/base64" + "fmt" + "time" + + "golang.org/x/exp/slices" + "gopkg.in/square/go-jose.v2/jwt" + authenticationv1 "k8s.io/api/authentication/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/keyutil" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +type keyIDObservation struct { + client kubernetes.Interface + secrets listers.SecretLister + cacheSyncs []cache.InformerSynced + queue workqueue.RateLimitingInterface + ch kidsChan +} + +func NewKeyIDObservationController(kubeclient kubernetes.Interface, secrets informers.SecretInformer, ch kidsChan) *keyIDObservation { + c := &keyIDObservation{ + client: kubeclient, + secrets: secrets.Lister(), + ch: ch, + cacheSyncs: []cache.InformerSynced{secrets.Informer().HasSynced}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "sa-signing-key-secrets"), + } + secrets.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj any) bool { + if secret, ok := obj.(*corev1.Secret); ok { + return (secret.Namespace == "openshift-kube-apiserver") && (secret.Name == "bound-service-account-signing-key") + } + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + UpdateFunc: func(_, obj any) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + }, + }) + return c +} + +func (c *keyIDObservation) sync(ctx context.Context, key string) error { + klog.V(4).InfoS("sync", "key", key) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + secret, err := c.secrets.Secrets(ns).Get(name) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + pem, ok := secret.Data["service-account.pub"] + if !ok { + return fmt.Errorf("expected data service-account.pub not found") + } + keys, err := keyutil.ParsePublicKeysPEM(pem) + if err != nil { + return err + } + // compute key ID (e.g. hash) for token signing private keys from the public keys + var kids []string + for _, key := range keys { + der, err := x509.MarshalPKIXPublicKey(key) + if err != nil { + return err + } + hashFunc := crypto.SHA256.New() + hashFunc.Reset() + _, err = hashFunc.Write(der) + if err != nil { + return err + } + kids = append(kids, base64.RawURLEncoding.EncodeToString(hashFunc.Sum(nil))) + } + slices.Sort(kids) + select { + case c.ch <- kids: + case <-ctx.Done(): + } + return nil +} + +func (c *keyIDObservation) fallbackWorker(ctx context.Context) { + // if the signing key secret exists, return until next call of this method. + // if an error occurs trying to get the hash, retry for 1 minute before giving up until the next call of this method. + err := wait.PollUntilContextTimeout(ctx, 10*time.Second, 1*time.Minute, true, func(ctx context.Context) (bool, error) { + _, err := c.secrets.Secrets("openshift-kube-apiserver").Get("bound-service-account-signing-key") + if err == nil { + // signing key secret exists, skip and let sync handle + return true, nil + } + if !errors.IsNotFound(err) { + // something went wrong, retry + runtime.HandleError(err) + return false, nil + } + // signing key secret not found, continue + + // create a throwaway API token + expirationSeconds := int64(10 * time.Minute / time.Second) + tokenRequest := &authenticationv1.TokenRequest{Spec: authenticationv1.TokenRequestSpec{Audiences: []string{"not-api"}, ExpirationSeconds: &expirationSeconds}} + tokenRequest, err = c.client.CoreV1().ServiceAccounts("default").CreateToken(ctx, "default", tokenRequest, metav1.CreateOptions{}) + if err != nil { + runtime.HandleError(fmt.Errorf("unable to create throw-away token: %w", err)) + return false, nil + } + + // parse token and extract the kids + token, err := jwt.ParseSigned(tokenRequest.Status.Token) + if err != nil { + runtime.HandleError(fmt.Errorf("unable to parse throw-away token: %w", err)) + return false, nil + } + var kids []string + for _, header := range token.Headers { + kids = append(kids, header.KeyID) + } + slices.Sort(kids) + select { + case c.ch <- kids: + case <-ctx.Done(): + } + return true, nil + }) + if err != nil { + runtime.HandleError(err) + } +} + +func (c *keyIDObservation) Run(ctx context.Context, workers int) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + const name = "openshift.io/internal-image-registry-pull-secrets_kids" + klog.InfoS("Starting controller", "name", name) + if !cache.WaitForNamedCacheSync(name, ctx.Done(), c.cacheSyncs...) { + return + } + go wait.UntilWithContext(ctx, c.fallbackWorker, 1*time.Minute) + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + klog.InfoS("Started controller", "name", name) + <-ctx.Done() + klog.InfoS("Shutting down controller", "name", name) +} + +func (c *keyIDObservation) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *keyIDObservation) processNextWorkItem(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + err := c.sync(ctx, key.(string)) + if err == nil { + c.queue.Forget(key) + return true + } + runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + c.queue.AddRateLimited(key) + return true +} diff --git a/pkg/internalregistry/controllers/legacy_image_pull_secret_controller.go b/pkg/internalregistry/controllers/legacy_image_pull_secret_controller.go new file mode 100644 index 000000000..f8adb0779 --- /dev/null +++ b/pkg/internalregistry/controllers/legacy_image_pull_secret_controller.go @@ -0,0 +1,159 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + "golang.org/x/exp/slices" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" + informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const legacyImagePullSecretControllerFieldManager = "openshift.io/image-registry-pull-secrets_legacy-token-secrets-controller" + +type legacyImagePullSecretController struct { + client kubernetes.Interface + secrets listers.SecretLister + cacheSyncs []cache.InformerSynced + queue workqueue.RateLimitingInterface +} + +func NewLegacyImagePullSecretController(client kubernetes.Interface, secrets informers.SecretInformer) *legacyImagePullSecretController { + c := &legacyImagePullSecretController{ + client: client, + secrets: secrets.Lister(), + cacheSyncs: []cache.InformerSynced{secrets.Informer().HasSynced}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "legacy-image-pull-secrets"), + } + secrets.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj any) bool { + secret := obj.(*corev1.Secret) + if secret.Type != corev1.SecretTypeDockercfg { + // not an image pull secret + return false + } + if _, ok := secret.Annotations["openshift.io/token-secret.name"]; !ok { + // does not appear to be a legacy managed image pull secret + return false + } + return true + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + UpdateFunc: func(_ any, new any) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + c.queue.Add(key) + } + }, + DeleteFunc: func(obj any) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + }, + }) + return c +} + +func (c *legacyImagePullSecretController) sync(ctx context.Context, key string) error { + klog.V(4).InfoS("sync", "key", key) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + secret, err := c.secrets.Secrets(ns).Get(name) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + + if !secret.DeletionTimestamp.IsZero() { + // legacy image pull secret is being deleted, delete the corresponding legacy token secret + if slices.Contains(secret.Finalizers, "openshift.io/legacy-token") { + t := secret.Annotations["openshift.io/token-secret.name"] + if len(t) > 0 { + err := c.client.CoreV1().Secrets(ns).Delete(ctx, t, metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + return err + } + } + // either no token secret was specified, or it was successfully deleted. clear finalizer + var finalizers []string + for _, f := range secret.Finalizers { + if f != "openshift.io/legacy-token" { + finalizers = append(finalizers, f) + } + } + patch := applycorev1.Secret(name, ns). + WithAnnotations(map[string]string{InternalRegistryAuthTokenTypeAnnotation: AuthTokenTypeLegacy}). + WithFinalizers(finalizers...) + _, err = c.client.CoreV1().Secrets(ns).Apply(ctx, patch, metav1.ApplyOptions{FieldManager: legacyTokenSecretControllerFieldManager}) + return err + } + // finalizer has already been removed, nothing to do, delete in progress + return nil + } + patch := applycorev1.Secret(name, ns). + WithAnnotations(map[string]string{InternalRegistryAuthTokenTypeAnnotation: AuthTokenTypeLegacy}). + WithFinalizers("openshift.io/legacy-token") + + _, err = c.client.CoreV1().Secrets(ns).Apply(ctx, patch, metav1.ApplyOptions{Force: true, FieldManager: legacyImagePullSecretControllerFieldManager}) + return err +} + +func (c *legacyImagePullSecretController) Run(ctx context.Context, workers int) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + const name = "openshift.io/internal-image-registry-pull-secrets_legacy-image-pull-secret" + klog.InfoS("Starting controller", "name", name) + if !cache.WaitForNamedCacheSync(name, ctx.Done(), c.cacheSyncs...) { + return + } + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + klog.InfoS("Started controller", "name", name) + <-ctx.Done() + klog.InfoS("Shutting down controller", "name", name) +} + +func (c *legacyImagePullSecretController) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *legacyImagePullSecretController) processNextWorkItem(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + err := c.sync(ctx, key.(string)) + if err == nil { + c.queue.Forget(key) + return true + } + runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + c.queue.AddRateLimited(key) + return true +} diff --git a/pkg/internalregistry/controllers/legacy_token_secret_controller.go b/pkg/internalregistry/controllers/legacy_token_secret_controller.go new file mode 100644 index 000000000..2c0f28aae --- /dev/null +++ b/pkg/internalregistry/controllers/legacy_token_secret_controller.go @@ -0,0 +1,137 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + v1 "k8s.io/client-go/applyconfigurations/core/v1" + informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const legacyTokenSecretControllerFieldManager = "openshift.io/image-registry-pull-secrets_legacy-token-secrets-controller" + +type legacyTokenSecretController struct { + client kubernetes.Interface + secrets listers.SecretLister + cacheSyncs []cache.InformerSynced + queue workqueue.RateLimitingInterface +} + +func NewLegacyTokenSecretController(client kubernetes.Interface, secrets informers.SecretInformer) *legacyTokenSecretController { + c := &legacyTokenSecretController{ + client: client, + secrets: secrets.Lister(), + cacheSyncs: []cache.InformerSynced{secrets.Informer().HasSynced}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "legacy-service-account-token-secrets"), + } + secrets.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj any) bool { + secret := obj.(*corev1.Secret) + if secret.Type != corev1.SecretTypeServiceAccountToken { + // not a service account token + return false + } + if _, ok := secret.Labels["openshift.io/legacy-token"]; ok { + // already has the needed label + return false + } + if secret.Annotations["kubernetes.io/created-by"] != "openshift.io/create-dockercfg-secrets" { + // not a secret previously managed by openshift-controller-manager + return false + } + if _, ok := secret.Annotations[corev1.ServiceAccountNameKey]; !ok { + // not expected, can't handle + return false + } + return true + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + UpdateFunc: func(_ any, new any) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + c.queue.Add(key) + } + }, + DeleteFunc: func(obj any) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + }, + }) + return c +} + +func (c *legacyTokenSecretController) sync(ctx context.Context, key string) error { + klog.V(4).InfoS("secret", "sync", key) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + _, err = c.secrets.Secrets(ns).Get(name) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + patch := v1.Secret(name, ns) + patch.WithLabels(map[string]string{"openshift.io/legacy-token": "true"}) + _, err = c.client.CoreV1().Secrets(ns).Apply(ctx, patch, metav1.ApplyOptions{Force: true, FieldManager: legacyTokenSecretControllerFieldManager}) + return err +} + +func (c *legacyTokenSecretController) Run(ctx context.Context, workers int) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + const name = "openshift.io/internal-image-registry-pull-secrets_legacy-token-secret" + klog.InfoS("Starting controller", "name", name) + if !cache.WaitForNamedCacheSync(name, ctx.Done(), c.cacheSyncs...) { + return + } + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + klog.InfoS("Started controller", "name", name) + <-ctx.Done() + klog.InfoS("Shutting down controller", "name", name) +} + +func (c *legacyTokenSecretController) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *legacyTokenSecretController) processNextWorkItem(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + err := c.sync(ctx, key.(string)) + if err == nil { + c.queue.Forget(key) + return true + } + runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + c.queue.AddRateLimited(key) + return true +} diff --git a/pkg/internalregistry/controllers/registry_urls_observation_controller.go b/pkg/internalregistry/controllers/registry_urls_observation_controller.go new file mode 100644 index 000000000..28ee4cb97 --- /dev/null +++ b/pkg/internalregistry/controllers/registry_urls_observation_controller.go @@ -0,0 +1,167 @@ +package controllers + +import ( + "context" + "fmt" + "net" + "time" + + "golang.org/x/exp/slices" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + informers "k8s.io/client-go/informers/core/v1" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const clusterDNSSuffix = "cluster.local" + +type serviceLocation struct { + namespace string + name string +} + +var serviceLocations = []serviceLocation{ + {namespace: "default", name: "docker-registry"}, + {namespace: "openshift-image-registry", name: "image-registry"}, +} + +type registryURLObservation struct { + services listers.ServiceLister + additionalRegistryURLs []string + cacheSyncs []cache.InformerSynced + queue workqueue.RateLimitingInterface + ch urlsChan +} + +func NewRegistryURLObservationController(services informers.ServiceInformer, additionalRegistryURLs []string, ch urlsChan) *registryURLObservation { + c := ®istryURLObservation{ + services: services.Lister(), + additionalRegistryURLs: additionalRegistryURLs, + ch: ch, + cacheSyncs: []cache.InformerSynced{services.Informer().HasSynced}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "internal-registry-services"), + } + services.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj any) bool { + service := obj.(*corev1.Service) + for _, l := range serviceLocations { + if l.name == service.Name && l.namespace == service.Namespace { + return true + } + } + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + UpdateFunc: func(_, obj any) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + }, + }) + return c +} + +func (c *registryURLObservation) sync(ctx context.Context, key string) error { + klog.V(4).InfoS("sync", "key", key) + // urlsForInternalRegistry returns the dns form and the ip form of the service + urls := append([]string{}, c.additionalRegistryURLs...) + for _, location := range serviceLocations { + urls = append(urls, urlsForInternalRegistryService(c.services, location)...) + } + slices.Sort(urls) + select { + case c.ch <- urls: + case <-ctx.Done(): + } + return nil +} + +func urlsForInternalRegistryService(services listers.ServiceLister, location serviceLocation) []string { + service, err := services.Services(location.namespace).Get(location.name) + if err != nil { + return []string{} + } + + ip := net.ParseIP(service.Spec.ClusterIP) + if ip == nil { + return []string{} + } + + if len(service.Spec.Ports) == 0 { + return []string{} + } + + svcPort := service.Spec.Ports[0].Port + ret := []string{ + net.JoinHostPort(fmt.Sprintf("%s.%s.svc", service.Name, service.Namespace), fmt.Sprintf("%d", svcPort)), + } + + // Bug 1780376: add ClusterIP as a location if service supports IPv4 + // IPv6 addresses are not valid locations in an image pull spec + ipv4 := ip.To4() + if ipv4 != nil { + ret = append(ret, net.JoinHostPort(ipv4.String(), fmt.Sprintf("%d", svcPort))) + } + // Bug 1701422: if using HTTP/S default ports, add locations without the port number + if svcPort == 80 || svcPort == 443 { + ret = append(ret, fmt.Sprintf("%s.%s.svc", service.Name, service.Namespace)) + if ipv4 != nil { + ret = append(ret, ipv4.String()) + } + } + ret = append(ret, net.JoinHostPort(fmt.Sprintf("%s.%s.svc."+clusterDNSSuffix, service.Name, service.Namespace), fmt.Sprintf("%d", svcPort))) + // Bug 1701422: if using HTTP/S default ports, add locations without the port number + if svcPort == 80 || svcPort == 443 { + ret = append(ret, fmt.Sprintf("%s.%s.svc."+clusterDNSSuffix, service.Name, service.Namespace)) + } + return ret +} + +func (c *registryURLObservation) Run(ctx context.Context, workers int) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + const name = "openshift.io/internal-image-registry-pull-secrets_urls" + klog.InfoS("Starting controller", "name", name) + if !cache.WaitForNamedCacheSync(name, ctx.Done(), c.cacheSyncs...) { + return + } + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + klog.InfoS("Started controller", "name", name) + <-ctx.Done() + klog.InfoS("Shutting down controller", "name", name) +} + +func (c *registryURLObservation) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *registryURLObservation) processNextWorkItem(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + err := c.sync(ctx, key.(string)) + if err == nil { + c.queue.Forget(key) + return true + } + runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + c.queue.AddRateLimited(key) + return true +} diff --git a/pkg/internalregistry/controllers/service_account_controller.go b/pkg/internalregistry/controllers/service_account_controller.go new file mode 100644 index 000000000..1e0ecae7f --- /dev/null +++ b/pkg/internalregistry/controllers/service_account_controller.go @@ -0,0 +1,354 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage/names" + applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" + applymetav1 "k8s.io/client-go/applyconfigurations/meta/v1" + informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/openshift/library-go/pkg/build/naming" + "github.com/openshift/openshift-controller-manager/pkg/serviceaccounts/controllers" +) + +type serviceAccountController struct { + client kubernetes.Interface + serviceAccounts listers.ServiceAccountLister + secrets listers.SecretLister + cacheSyncs []cache.InformerSynced + queue workqueue.RateLimitingInterface +} + +func serviceAccountNameForManagedSecret(secret *corev1.Secret) string { + n := secret.Annotations[InternalRegistryAuthTokenServiceAccountAnnotation] + if len(n) > 0 { + return n + } + // legacy fallback + return secret.Annotations[corev1.ServiceAccountNameKey] +} + +// NewServiceAccountController creates a controller that for each service +// account in the cluster, creates an image pull secret that can be used +// to pull images from the integrated image registry. +func NewServiceAccountController(kubeclient kubernetes.Interface, serviceAccounts informers.ServiceAccountInformer, secrets informers.SecretInformer) *serviceAccountController { + c := &serviceAccountController{ + client: kubeclient, + serviceAccounts: serviceAccounts.Lister(), + secrets: secrets.Lister(), + cacheSyncs: []cache.InformerSynced{serviceAccounts.Informer().HasSynced, secrets.Informer().HasSynced}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "service-accounts"), + } + + serviceAccounts.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + UpdateFunc: func(old any, new any) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + c.queue.Add(key) + } + }, + DeleteFunc: func(obj any) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } + }, + }) + + secrets.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj any) bool { + secret, ok := obj.(*corev1.Secret) + return ok && secret.Type == corev1.SecretTypeDockercfg + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + secret := obj.(*corev1.Secret) + serviceAccountName := serviceAccountNameForManagedSecret(secret) + if len(serviceAccountName) > 0 { + key := cache.NewObjectName(secret.Namespace, serviceAccountName).String() + c.queue.Add(key) + } + }, + UpdateFunc: func(old any, new any) { + secret := old.(*corev1.Secret) + serviceAccountName := serviceAccountNameForManagedSecret(secret) + if len(serviceAccountName) > 0 { + key := cache.NewObjectName(secret.Namespace, serviceAccountName).String() + c.queue.Add(key) + } + }, + DeleteFunc: func(obj any) { + var secret *corev1.Secret + switch o := obj.(type) { + case cache.DeletedFinalStateUnknown: + var ok bool + if secret, ok = o.Obj.(*corev1.Secret); !ok { + return + } + case *corev1.Secret: + secret = o + } + serviceAccountName := serviceAccountNameForManagedSecret(secret) + if len(serviceAccountName) > 0 { + key := cache.NewObjectName(secret.Namespace, serviceAccountName).String() + c.queue.Add(key) + } + }, + }, + }) + return c +} + +const serviceAccountControllerFieldManager = "openshift.io/image-registry-pull-secrets_service-account-controller" + +func (c *serviceAccountController) sync(ctx context.Context, key string) error { + klog.V(4).InfoS("sync", "key", key) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + serviceAccount, err := c.serviceAccounts.ServiceAccounts(ns).Get(name) + if kerrors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + + // name of managed secret + secretName, err := c.managedImagePullSecretName(ctx, serviceAccount) + + // ensure secret ref annotation is set + if serviceAccount.Annotations[InternalRegistryImagePullSecretRefKey] != secretName { + patch, err := applycorev1.ExtractServiceAccount(serviceAccount, serviceAccountControllerFieldManager) + if err != nil { + return err + } + patch.WithAnnotations(map[string]string{InternalRegistryImagePullSecretRefKey: secretName}) + // we apply this now to ensure the secret name stays stable in case a error occurs while reconciling + serviceAccount, err = c.client.CoreV1().ServiceAccounts(ns).Apply(ctx, patch, metav1.ApplyOptions{Force: true, FieldManager: serviceAccountControllerFieldManager}) + if err != nil { + return err + } + } + + // get the managed image pull secret + secret, err := c.secrets.Secrets(serviceAccount.Namespace).Get(secretName) + if err != nil && !kerrors.IsNotFound(err) { + return err + } + + // nothing more to do if the manged secret is a lecacy image pull secret + if secret != nil && isLegacyImagePullSecret(secret) { + return nil + } + + // if secret does not exist, or owner reference is missing, apply secret + var secretOwnerRefNeedsUpdate, secretSARefNeedsUpdate bool + if secret != nil { + secretSARefNeedsUpdate = secret.Annotations[InternalRegistryAuthTokenServiceAccountAnnotation] != serviceAccount.Name + for _, ref := range secret.OwnerReferences { + if ref.Name == serviceAccount.Name && ref.UID == serviceAccount.UID && ref.Kind == "ServiceAccount" && ref.APIVersion == "v1" { + secretOwnerRefNeedsUpdate = true + break + } + } + } + if secret == nil || secretSARefNeedsUpdate || secretOwnerRefNeedsUpdate { + patch := applycorev1.Secret(secretName, ns). + WithAnnotations(map[string]string{ + InternalRegistryAuthTokenServiceAccountAnnotation: serviceAccount.Name, + }). + WithOwnerReferences( + applymetav1.OwnerReference(). + WithAPIVersion("v1"). + WithKind("ServiceAccount"). + WithName(serviceAccount.Name). + WithUID(serviceAccount.UID), + ). + WithType(corev1.SecretTypeDockercfg). + WithData(map[string][]byte{corev1.DockerConfigKey: []byte("{}")}) + if secret != nil { + patch.WithData(map[string][]byte{corev1.DockerConfigKey: secret.Data[corev1.DockerConfigKey]}) + } + secret, err = c.client.CoreV1().Secrets(serviceAccount.Namespace).Apply(ctx, patch, metav1.ApplyOptions{FieldManager: serviceAccountControllerFieldManager, Force: true}) + if err != nil { + return fmt.Errorf("unable to update managed image pull secret: %v", err) + } + } + + // nothing else to do if we are not dealing with a bound image pull secret + if secret.Annotations[InternalRegistryAuthTokenTypeAnnotation] != AuthTokenTypeBound { + return nil + } + + // new patch + patch := applycorev1.ServiceAccount(name, ns) + + // don't leave out the anotation + patch.WithAnnotations(map[string]string{InternalRegistryImagePullSecretRefKey: secretName}) + + // ensure managed image pull secret is referenced, only if there is data + if len(secret.Data[corev1.DockerConfigKey]) > len([]byte("{}")) { + patch.WithImagePullSecrets(applycorev1.LocalObjectReference().WithName(secretName)) + // TODO remove the following line as part of API-1798 + patch.WithSecrets(applycorev1.ObjectReference().WithName(secretName)) + } + serviceAccount, err = c.client.CoreV1().ServiceAccounts(ns).Apply(ctx, patch, metav1.ApplyOptions{Force: true, FieldManager: serviceAccountControllerFieldManager}) + if err != nil { + return err + } + + // TODO add the commented out code as part of API-1798 + /* + // TODO haven't figured out how to remove the secret reference using Apply + if slices.ContainsFunc(serviceAccount.Secrets, func(ref corev1.ObjectReference) bool { return ref.Name == secretName }) { + sa := serviceAccount.DeepCopy() + var a []corev1.ObjectReference + for _, ref := range serviceAccount.Secrets { + if ref.Name != secretName { + a = append(a, ref) + } + } + sa.Secrets = a + _, err = c.client.CoreV1().ServiceAccounts(ns).Update(ctx, sa, metav1.UpdateOptions{FieldManager: serviceAccountControllerFieldManager}) + } + */ + return err +} + +func ownerReferenceDeSynced(serviceAccount *corev1.ServiceAccount, secret *corev1.Secret) bool { + for _, ref := range secret.OwnerReferences { + if ref.Name == serviceAccount.Name && ref.UID == serviceAccount.UID && ref.Kind == "ServiceAccount" && ref.APIVersion == "v1" { + return true + } + } + return false +} + +func (c *serviceAccountController) managedImagePullSecretName(ctx context.Context, serviceAccount *corev1.ServiceAccount) (string, error) { + // happy path + name := serviceAccount.Annotations[InternalRegistryImagePullSecretRefKey] + if len(name) != 0 { + return name, nil + } + // try to reuse the legacy image pull secret name. + name, err := c.legacyImagePullSecretName(ctx, serviceAccount) + if err != nil { + return "", err + } + if len(name) > 0 { + return name, nil + } + // no existing name found, generate one + name = names.SimpleNameGenerator.GenerateName(naming.GetName(serviceAccount.Name, "dockercfg-", 58)) + return name, nil +} + +func (c *serviceAccountController) legacyImagePullSecretName(ctx context.Context, serviceAccount *corev1.ServiceAccount) (string, error) { + // find a legacy image pull secret in the same namespace + for _, ref := range serviceAccount.ImagePullSecrets { + secret, err := c.secrets.Secrets(serviceAccount.Namespace).Get(ref.Name) + if kerrors.IsNotFound(err) { + // reference image pull secret does not exist, ignore + continue + } + if err != nil { + return "", err + } + if isLegacyImagePullSecretForServiceAccount(secret, serviceAccount) { + // return the first one found + klog.V(1).InfoS("found legacy managed image pull secret", "ns", serviceAccount.Namespace, "serviceAccount", serviceAccount.Name, "secret", secret.Name) + return secret.Name, nil + } + } + return "", nil +} + +var expectedLegacyAnnotations = map[string]func(*corev1.ServiceAccount, string) bool{ + corev1.ServiceAccountNameKey: func(sa *corev1.ServiceAccount, v string) bool { return sa.Name == v }, + corev1.ServiceAccountUIDKey: func(sa *corev1.ServiceAccount, v string) bool { return sa.UID == types.UID(v) }, + controllers.ServiceAccountTokenSecretNameKey: func(sa *corev1.ServiceAccount, v string) bool { return true }, + controllers.ServiceAccountTokenValueAnnotation: func(sa *corev1.ServiceAccount, v string) bool { return true }, +} + +func isLegacyImagePullSecret(secret *corev1.Secret) bool { + for key := range expectedLegacyAnnotations { + if _, ok := secret.Annotations[key]; !ok { + return false + } + } + return true +} + +func isLegacyImagePullSecretForServiceAccount(secret *corev1.Secret, serviceAccount *corev1.ServiceAccount) bool { + for key, valueOK := range expectedLegacyAnnotations { + value, ok := secret.Annotations[key] + if !ok { + return false + } + if !valueOK(serviceAccount, value) { + return false + } + } + return true +} + +func (c *serviceAccountController) Run(ctx context.Context, workers int) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + const name = "openshift.io/internal-image-registry-pull-secrets_service-account" + klog.InfoS("Starting controller", "name", name) + if !cache.WaitForNamedCacheSync(name, ctx.Done(), c.cacheSyncs...) { + return + } + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + klog.InfoS("Started controller", "name", name) + <-ctx.Done() + klog.InfoS("Shutting down controller", "name", name) +} + +func (c *serviceAccountController) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false +// when it's time to quit. +func (c *serviceAccountController) processNextWorkItem(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + err := c.sync(ctx, key.(string)) + if err == nil { + c.queue.Forget(key) + return true + } + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + c.queue.AddRateLimited(key) + return true +}