diff --git a/pkg/route/secret/manager.go b/pkg/route/secret/manager.go new file mode 100644 index 0000000000..ecd726f099 --- /dev/null +++ b/pkg/route/secret/manager.go @@ -0,0 +1,135 @@ +package secret + +import ( + "fmt" + "sync" + "time" + + routev1 "github.com/openshift/api/route/v1" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +// this looks somewhat confused, I'm not sure this layer is necessary. Given a SecretMonitor and a controller +// that observes Routes, it seems as though you'd simply wire handles to trigger evaluation for the route in question +// and eventhandler for that controller would do the diff/deletion on its own. +// If you have a sample controller I can help there +type Monitor interface { + // Get secret by secret namespace and name. + GetSecret(namespace, name string) (*v1.Secret, error) + + // WARNING: Register/UnregisterRoute functions should be efficient, + // i.e. should not block on network operations. + + // RegisterRoute registers all secrets from a given Route. + RegisterRoute(*routev1.Route, func(*routev1.Route) sets.String) + + // UnregisterRoute unregisters secrets from a given Route that are not + // used by any other registered Route. + UnregisterRoute(*routev1.Route, func(*routev1.Route) sets.String) +} + +// SecretMonitor keeps a store with secrets necessary +// for registered routes. +type Manager struct { + monitor SecretMonitor + registeredHandlers map[string]SecretEventHandlerRegistration + + lock sync.RWMutex + + stopCh <-chan struct{} + + // monitors are the producer of the resourceChanges queue + resourceChanges workqueue.RateLimitingInterface + + secretHandler cache.ResourceEventHandlerFuncs +} + +func NewRouteManager(kubeClient *kubernetes.Clientset, queue workqueue.RateLimitingInterface) *Manager { + return &Manager{ + monitor: NewSecretMonitor(kubeClient), + lock: sync.RWMutex{}, + stopCh: make(<-chan struct{}), + resourceChanges: queue, + registeredHandlers: make(map[string]SecretEventHandlerRegistration), + + // default secret handler + secretHandler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(oldObj, newObj interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }, + } +} + +func (m *Manager) WithSecretHandler(handler cache.ResourceEventHandlerFuncs) *Manager { + m.secretHandler = handler + return m +} + +func (m *Manager) GetSecret(parent *routev1.Route, namespace, name string) (*v1.Secret, error) { + key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name) + gr := appsv1.Resource("secret") + + m.lock.RLock() + handle, exists := m.registeredHandlers[key] + m.lock.RUnlock() + + if !exists { + return nil, fmt.Errorf("object %q/%q not registered", namespace, name) + } + + if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) { return handle.HasSynced(), nil }); err != nil { + return nil, fmt.Errorf("failed to sync %s cache: %v", gr.String(), err) + } + + obj, err := m.monitor.GetSecret(handle) + if err != nil { + return nil, err + } + + return obj, nil +} + +func (m *Manager) RegisterRoute(parent *routev1.Route, getReferencedObjects func(*routev1.Route) sets.String) { + // TODO refactor later + // names := getReferencedObjects(parent) + + m.lock.Lock() + defer m.lock.Unlock() + + // TODO iterate refererenced objects if we have 1-many mappings between route and secrets + // TODO hard coded to test since externalCertificate is TP + handle, err := m.monitor.AddEventHandler(parent.Namespace, fmt.Sprintf("%s_%s", parent.Name, "dummy-secret"), m.secretHandler) + if err != nil { + // TODO handle errors, sig change + } + + key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name) + m.registeredHandlers[key] = handle + +} + +func (m *Manager) UnregisterRoute(parent *routev1.Route, getReferencedObjects func(*routev1.Route) sets.String) { + key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name) + + m.lock.Lock() + defer m.lock.Unlock() + + handle, ok := m.registeredHandlers[key] + if !ok { + // TODO handle errors, sig change + } + + err := m.monitor.RemoveEventHandler(handle) + if err != nil { + // TODO handle errors, sig change + } + + delete(m.registeredHandlers, key) +} diff --git a/pkg/route/secret/monitor.go b/pkg/route/secret/monitor.go new file mode 100644 index 0000000000..e9e523da81 --- /dev/null +++ b/pkg/route/secret/monitor.go @@ -0,0 +1,76 @@ +package secret + +import ( + "fmt" + "sync" + "sync/atomic" + + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +type ObjectKey struct { + Namespace string + Name string +} + +type singleItemMonitor struct { + key ObjectKey + informer cache.SharedInformer + + lock sync.Mutex + numHandlers atomic.Int32 + stopped bool + stopCh chan struct{} +} + +func newSingleItemMonitor(key ObjectKey, informer cache.SharedInformer) *singleItemMonitor { + return &singleItemMonitor{ + key: key, + informer: informer, + stopCh: make(chan struct{}), + } +} + +func (i *singleItemMonitor) Stop() bool { + i.lock.Lock() + defer i.lock.Unlock() + if i.stopped { + return false + } + i.stopped = true + close(i.stopCh) + return true +} + +func (i *singleItemMonitor) StartInformer() { + i.lock.Lock() + defer i.lock.Unlock() + klog.Info("starting informer") + i.informer.Run(i.stopCh) +} + +func (i *singleItemMonitor) AddEventHandler(handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error) { + registration, err := i.informer.AddEventHandler(handler) + if err != nil { + return nil, err + } + i.numHandlers.Add(1) + + return &secretEventHandlerRegistration{ + ResourceEventHandlerRegistration: registration, + objectKey: i.key, + }, nil +} + +func (i *singleItemMonitor) RemoveEventHandler(handle SecretEventHandlerRegistration) error { + if err := i.informer.RemoveEventHandler(handle); err != nil { + return err + } + i.numHandlers.Add(-1) + return nil +} + +func (i *singleItemMonitor) GetItem() (item interface{}, exists bool, err error) { + return i.informer.GetStore().Get(fmt.Sprintf("%s/%s", i.key.Namespace, i.key.Name)) +} diff --git a/pkg/route/secret/secret_monitor.go b/pkg/route/secret/secret_monitor.go new file mode 100644 index 0000000000..a8deb355e1 --- /dev/null +++ b/pkg/route/secret/secret_monitor.go @@ -0,0 +1,138 @@ +package secret + +import ( + "context" + "fmt" + "sync" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +type SecretEventHandlerRegistration interface { + cache.ResourceEventHandlerRegistration + + GetKey() ObjectKey +} + +type SecretMonitor interface { + AddEventHandler(namespace, name string, handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error) + + RemoveEventHandler(SecretEventHandlerRegistration) error + + GetSecret(namespace, name string) (*corev1.Secret, error) +} + +type secretEventHandlerRegistration struct { + cache.ResourceEventHandlerRegistration + + objectKey ObjectKey +} + +func (r *secretEventHandlerRegistration) GetKey() ObjectKey { + return r.objectKey +} + +type sm struct { + kubeClient kubernetes.Interface + + lock sync.RWMutex + monitors map[ObjectKey]*singleItemMonitor +} + +func NewSecretMonitor(kubeClient *kubernetes.Clientset) SecretMonitor { + return &sm{ + kubeClient: kubeClient, + monitors: map[ObjectKey]*singleItemMonitor{} + } +} + +func (s *sm) AddEventHandler(namespace, name string, handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + s.lock.Lock() + defer s.lock.Unlock() + + key := ObjectKey{Namespace: namespace, Name: name} + m, exists := s.monitors[key] + + if !exists { + sharedInformer := cache.NewSharedInformer( + cache.NewListWatchFromClient( + s.kubeClient.CoreV1().RESTClient(), + "secrets", + namespace, + fields.OneTermEqualSelector("metadata.name", name), + ), + &corev1.Secret{}, + 0) + + m := newSingleItemMonitor(key, sharedInformer) + go m.StartInformer() + + s.monitors[key] = m + + klog.Info("secret informer started", " item key ", key) + } + + klog.Info("secret handler added", " item key ", key) + return m.AddEventHandler(handler) +} + +func (s *sm) RemoveEventHandler(handle SecretEventHandlerRegistration) error { + s.lock.Lock() + defer s.lock.Unlock() + + key := handle.GetKey() + m, ok := s.monitors[key] + if !ok { + // already gone + return nil + } + + klog.Info("secret handler removed", " item key ", key) + if err := m.RemoveEventHandler(handle); err != nil { + return err + } + + if m.numHandlers.Load() <= 0 { + klog.Info("secret informer stopped", " item key ", key) + m.Stop() + delete(s.monitors, key) + } + + return nil +} + +func (s *sm) GetSecret(namespace, name string) (*corev1.Secret, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + key := ObjectKey{Namespace: namespace, Name: name} + m, exists := s.monitors[key] + + if !exists { + return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "secrets"}, name) + } + + uncast, exists, err := m.GetItem() + if !exists { + return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "secrets"}, name) + } + if err != nil { + return nil, err + } + + ret, ok := uncast.(*corev1.Secret) + if !ok { + return nil, fmt.Errorf("unexpected type: %T", uncast) + } + + return ret, nil +}