diff --git a/pkg/operator/controller.go b/pkg/operator/controller.go index f2e3e2acb..6def96b61 100644 --- a/pkg/operator/controller.go +++ b/pkg/operator/controller.go @@ -7,6 +7,7 @@ import ( "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metaapi "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -49,10 +50,14 @@ type Controller struct { isWorkqueue workqueue.RateLimitingInterface tWorkqueue workqueue.RateLimitingInterface + ocSecWorkqueue workqueue.RateLimitingInterface + crInformer cache.SharedIndexInformer isInformer cache.SharedIndexInformer tInformer cache.SharedIndexInformer + ocSecInformer cache.SharedIndexInformer + kubeOCNSInformerFactory kubeinformers.SharedInformerFactory imageInformerFactory imageinformers.SharedInformerFactory templateInformerFactory templateinformers.SharedInformerFactory @@ -75,12 +80,13 @@ func NewController() (*Controller, error) { listers := &sampopclient.Listers{} c := &Controller{ - restconfig: kubeconfig, - cvowrapper: operatorstatus.NewClusterOperatorHandler(operatorClient), - crWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "samplesconfig-changes"), - isWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "imagestream-changes"), - tWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "template-changes"), - listers: listers, + restconfig: kubeconfig, + cvowrapper: operatorstatus.NewClusterOperatorHandler(operatorClient), + crWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "samplesconfig-changes"), + isWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "imagestream-changes"), + tWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "template-changes"), + ocSecWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "openshift-config-namespace-secret-changes"), + listers: listers, } // Initial event to bootstrap CR if it doesn't exist. @@ -114,6 +120,16 @@ func NewController() (*Controller, error) { c.templateInformerFactory = templateinformers.NewSharedInformerFactoryWithOptions(templateClient, defaultResyncDuration, templateinformers.WithNamespace("openshift")) c.sampopInformerFactory = sampopinformers.NewSharedInformerFactory(sampopClient, defaultResyncDuration) + // A note on the fact we are listening on secrets in the openshift-config namespace, even though we no longer + // copy that secret to the openshift namespace for imagestream import + // 1) we still inspect that secret to make sure it has credentials for registry.redhat.io, unless the samples + // registry is overriden. If those credentials don't exist, the imagestream imports will fail. We capture these + // results in prometheus metrics/alerts. + // 2) we employ the lister/sharedinformer/workqueue controller apparatus to get cached versions of the data, so + // we do not have to hit the API server everytime somebody queries our prometheus stuff + // 3) however, you have to go all the way with this, including to workqueue, to get the underlying watches so the + // cache is at least initially populated + c.ocSecInformer = c.kubeOCNSInformerFactory.Core().V1().Secrets().Informer() c.listers.ConfigNamespaceSecrets = c.kubeOCNSInformerFactory.Core().V1().Secrets().Lister().Secrets("openshift-config") c.isInformer = c.imageInformerFactory.Image().V1().ImageStreams().Informer() @@ -141,13 +157,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { defer c.crWorkqueue.ShutDown() defer c.isWorkqueue.ShutDown() defer c.tWorkqueue.ShutDown() + defer c.ocSecWorkqueue.ShutDown() c.imageInformerFactory.Start(stopCh) c.templateInformerFactory.Start(stopCh) c.sampopInformerFactory.Start(stopCh) + c.kubeOCNSInformerFactory.Start(stopCh) logrus.Println("waiting for informer caches to sync") - if !cache.WaitForCacheSync(stopCh, c.isInformer.HasSynced, c.tInformer.HasSynced, c.crInformer.HasSynced) { + if !cache.WaitForCacheSync(stopCh, c.isInformer.HasSynced, c.tInformer.HasSynced, c.crInformer.HasSynced, c.ocSecInformer.HasSynced) { return fmt.Errorf("failed to wait for caches to sync") } @@ -173,6 +191,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { for i := 0; i < 5; i++ { go wait.Until(tQueueWorker.workqueueProcessor, time.Second, stopCh) } + ocSecQueueWorker := queueWorker{ + c: c, + workQueue: c.ocSecWorkqueue, + getter: &ocSecretGetter{}, + } + go wait.Until(ocSecQueueWorker.workqueueProcessor, time.Second, stopCh) logrus.Println("started events processor") <-stopCh @@ -193,6 +217,12 @@ func (g *crGetter) Get(c *Controller, key string) (runtime.Object, error) { return c.listers.Config.Get(sampopapi.ConfigName) } +type ocSecretGetter struct{} + +func (g *ocSecretGetter) Get(c *Controller, key string) (runtime.Object, error) { + return c.listers.ConfigNamespaceSecrets.Get(key) +} + type isGetter struct{} func (g *isGetter) Get(c *Controller, key string) (runtime.Object, error) { @@ -255,6 +285,13 @@ func (c *crQueueKeyGen) Key(o interface{}) string { return cr.Name } +type secretQueueKeyGen struct{} + +func (c *secretQueueKeyGen) Key(o interface{}) string { + secret := o.(*corev1.Secret) + return secret.Name +} + type imagestreamQueueKeyGen struct{} func (c *imagestreamQueueKeyGen) Key(o interface{}) string { @@ -365,6 +402,10 @@ func (c *Controller) crInformerEventHandler() cache.ResourceEventHandlerFuncs { return c.commonInformerEventHandler(&crQueueKeyGen{}, c.crWorkqueue) } +func (c *Controller) ocSecretInformerEventHandler() cache.ResourceEventHandlerFuncs { + return c.commonInformerEventHandler(&secretQueueKeyGen{}, c.ocSecWorkqueue) +} + func (c *Controller) imagestreamInformerEventHandler() cache.ResourceEventHandlerFuncs { return c.commonInformerEventHandler(&imagestreamQueueKeyGen{}, c.isWorkqueue) }