Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions pkg/operator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/sirupsen/logrus"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metaapi "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -49,10 +50,14 @@ type Controller struct {
isWorkqueue workqueue.RateLimitingInterface
tWorkqueue workqueue.RateLimitingInterface

ocSecWorkqueue workqueue.RateLimitingInterface

crInformer cache.SharedIndexInformer
isInformer cache.SharedIndexInformer
tInformer cache.SharedIndexInformer

ocSecInformer cache.SharedIndexInformer

kubeOCNSInformerFactory kubeinformers.SharedInformerFactory
imageInformerFactory imageinformers.SharedInformerFactory
templateInformerFactory templateinformers.SharedInformerFactory
Expand All @@ -75,12 +80,13 @@ func NewController() (*Controller, error) {

listers := &sampopclient.Listers{}
c := &Controller{
restconfig: kubeconfig,
cvowrapper: operatorstatus.NewClusterOperatorHandler(operatorClient),
crWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "samplesconfig-changes"),
isWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "imagestream-changes"),
tWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "template-changes"),
listers: listers,
restconfig: kubeconfig,
cvowrapper: operatorstatus.NewClusterOperatorHandler(operatorClient),
crWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "samplesconfig-changes"),
isWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "imagestream-changes"),
tWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "template-changes"),
ocSecWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "openshift-config-namespace-secret-changes"),
listers: listers,
}

// Initial event to bootstrap CR if it doesn't exist.
Expand Down Expand Up @@ -114,6 +120,16 @@ func NewController() (*Controller, error) {
c.templateInformerFactory = templateinformers.NewSharedInformerFactoryWithOptions(templateClient, defaultResyncDuration, templateinformers.WithNamespace("openshift"))
c.sampopInformerFactory = sampopinformers.NewSharedInformerFactory(sampopClient, defaultResyncDuration)

// A note on the fact we are listening on secrets in the openshift-config namespace, even though we no longer
// copy that secret to the openshift namespace for imagestream import
// 1) we still inspect that secret to make sure it has credentials for registry.redhat.io, unless the samples
// registry is overriden. If those credentials don't exist, the imagestream imports will fail. We capture these
// results in prometheus metrics/alerts.
// 2) we employ the lister/sharedinformer/workqueue controller apparatus to get cached versions of the data, so
// we do not have to hit the API server everytime somebody queries our prometheus stuff
// 3) however, you have to go all the way with this, including to workqueue, to get the underlying watches so the
// cache is at least initially populated
c.ocSecInformer = c.kubeOCNSInformerFactory.Core().V1().Secrets().Informer()
c.listers.ConfigNamespaceSecrets = c.kubeOCNSInformerFactory.Core().V1().Secrets().Lister().Secrets("openshift-config")

c.isInformer = c.imageInformerFactory.Image().V1().ImageStreams().Informer()
Expand Down Expand Up @@ -141,13 +157,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
defer c.crWorkqueue.ShutDown()
defer c.isWorkqueue.ShutDown()
defer c.tWorkqueue.ShutDown()
defer c.ocSecWorkqueue.ShutDown()

c.imageInformerFactory.Start(stopCh)
c.templateInformerFactory.Start(stopCh)
c.sampopInformerFactory.Start(stopCh)
c.kubeOCNSInformerFactory.Start(stopCh)

logrus.Println("waiting for informer caches to sync")
if !cache.WaitForCacheSync(stopCh, c.isInformer.HasSynced, c.tInformer.HasSynced, c.crInformer.HasSynced) {
if !cache.WaitForCacheSync(stopCh, c.isInformer.HasSynced, c.tInformer.HasSynced, c.crInformer.HasSynced, c.ocSecInformer.HasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}

Expand All @@ -173,6 +191,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
for i := 0; i < 5; i++ {
go wait.Until(tQueueWorker.workqueueProcessor, time.Second, stopCh)
}
ocSecQueueWorker := queueWorker{
c: c,
workQueue: c.ocSecWorkqueue,
getter: &ocSecretGetter{},
}
go wait.Until(ocSecQueueWorker.workqueueProcessor, time.Second, stopCh)

logrus.Println("started events processor")
<-stopCh
Expand All @@ -193,6 +217,12 @@ func (g *crGetter) Get(c *Controller, key string) (runtime.Object, error) {
return c.listers.Config.Get(sampopapi.ConfigName)
}

type ocSecretGetter struct{}

func (g *ocSecretGetter) Get(c *Controller, key string) (runtime.Object, error) {
return c.listers.ConfigNamespaceSecrets.Get(key)
}

type isGetter struct{}

func (g *isGetter) Get(c *Controller, key string) (runtime.Object, error) {
Expand Down Expand Up @@ -255,6 +285,13 @@ func (c *crQueueKeyGen) Key(o interface{}) string {
return cr.Name
}

type secretQueueKeyGen struct{}

func (c *secretQueueKeyGen) Key(o interface{}) string {
secret := o.(*corev1.Secret)
return secret.Name
}

type imagestreamQueueKeyGen struct{}

func (c *imagestreamQueueKeyGen) Key(o interface{}) string {
Expand Down Expand Up @@ -365,6 +402,10 @@ func (c *Controller) crInformerEventHandler() cache.ResourceEventHandlerFuncs {
return c.commonInformerEventHandler(&crQueueKeyGen{}, c.crWorkqueue)
}

func (c *Controller) ocSecretInformerEventHandler() cache.ResourceEventHandlerFuncs {
return c.commonInformerEventHandler(&secretQueueKeyGen{}, c.ocSecWorkqueue)
}

func (c *Controller) imagestreamInformerEventHandler() cache.ResourceEventHandlerFuncs {
return c.commonInformerEventHandler(&imagestreamQueueKeyGen{}, c.isWorkqueue)
}
Expand Down