diff --git a/pkg/route/secret/manager.go b/pkg/route/secret/manager.go new file mode 100644 index 0000000000..1a54af6b7b --- /dev/null +++ b/pkg/route/secret/manager.go @@ -0,0 +1,118 @@ +package secret + +import ( + "fmt" + "sync" + "time" + + routev1 "github.com/openshift/api/route/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +type Manager struct { + monitor SecretMonitor + registeredHandlers map[string]SecretEventHandlerRegistration + + lock sync.RWMutex + + // monitors are the producer of the resourceChanges queue + resourceChanges workqueue.RateLimitingInterface + + secretHandler cache.ResourceEventHandlerFuncs +} + +func NewManager(kubeClient *kubernetes.Clientset, queue workqueue.RateLimitingInterface) *Manager { + return &Manager{ + monitor: NewSecretMonitor(kubeClient), + lock: sync.RWMutex{}, + resourceChanges: queue, + registeredHandlers: make(map[string]SecretEventHandlerRegistration), + + // default secret handler + secretHandler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(oldObj, newObj interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }, + } +} + +func (m *Manager) WithSecretHandler(handler cache.ResourceEventHandlerFuncs) *Manager { + m.secretHandler = handler + return m +} + +func (m *Manager) GetSecret(parent *routev1.Route, namespace, name string) (*v1.Secret, error) { + m.lock.Lock() + defer m.lock.Unlock() + + key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name) + handle, exists := m.registeredHandlers[key] + + if !exists { + return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "routes"}, key) + } + + if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) { return handle.HasSynced(), nil }); err != nil { + return nil, apierrors.NewInternalError(err) + } + + obj, err := m.monitor.GetSecret(handle) + if err != nil { + return nil, err + } + + return obj, nil +} + +func (m *Manager) RegisterRoute(parent *routev1.Route, getReferencedObjects func(*routev1.Route) sets.String) error { + // TODO refactor later + // names := getReferencedObjects(parent) + + m.lock.Lock() + defer m.lock.Unlock() + + // TODO hard coded to test since externalCertificate is TP + handle, err := m.monitor.AddEventHandler(parent.Namespace, fmt.Sprintf("%s/%s", parent.Name, "dummy-secret"), m.secretHandler) + if err != nil { + return apierrors.NewInternalError(err) + } + + key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name) + m.registeredHandlers[key] = handle + + klog.Info("secret manager registered route", " route", key) + + return nil + +} + +func (m *Manager) UnregisterRoute(parent *routev1.Route, getReferencedObjects func(*routev1.Route) sets.String) error { + m.lock.Lock() + defer m.lock.Unlock() + + key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name) + handle, ok := m.registeredHandlers[key] + if !ok { + return apierrors.NewNotFound(schema.GroupResource{Resource: "routes"}, key) + } + + err := m.monitor.RemoveEventHandler(handle) + if err != nil { + return apierrors.NewNotFound(schema.GroupResource{Resource: "routes"}, key) + } + + delete(m.registeredHandlers, key) + + klog.Info("secret manager unregistered route", " route", key) + + return nil +} diff --git a/pkg/route/secret/monitor.go b/pkg/route/secret/monitor.go new file mode 100644 index 0000000000..ff1f9fcbf0 --- /dev/null +++ b/pkg/route/secret/monitor.go @@ -0,0 +1,89 @@ +package secret + +import ( + "fmt" + "strings" + "sync" + "sync/atomic" + + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +type ObjectKey struct { + Namespace string + Name string +} + +type singleItemMonitor struct { + key ObjectKey + informer cache.SharedInformer + numHandlers atomic.Int32 + + lock sync.Mutex + stopped bool + stopCh chan struct{} +} + +func newSingleItemMonitor(key ObjectKey, informer cache.SharedInformer) *singleItemMonitor { + return &singleItemMonitor{ + key: key, + informer: informer, + stopCh: make(chan struct{}), + } +} + +func (i *singleItemMonitor) Stop() bool { + i.lock.Lock() + defer i.lock.Unlock() + + if i.stopped { + return false + } + i.stopped = true + close(i.stopCh) + return true +} + +func (i *singleItemMonitor) HasSynced() bool { + return i.informer.HasSynced() +} + +func (i *singleItemMonitor) StartInformer() { + klog.Info("starting informer") + i.informer.Run(i.stopCh) +} + +func (i *singleItemMonitor) AddEventHandler(handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error) { + registration, err := i.informer.AddEventHandler(handler) + if err != nil { + return nil, err + } + i.numHandlers.Add(1) + + return &secretEventHandlerRegistration{ + ResourceEventHandlerRegistration: registration, + objectKey: i.key, + }, nil +} + +func (i *singleItemMonitor) RemoveEventHandler(handle SecretEventHandlerRegistration) error { + if err := i.informer.RemoveEventHandler(handle); err != nil { + return err + } + i.numHandlers.Add(-1) + return nil +} + +func (i *singleItemMonitor) GetItemKey() string { + if keys := strings.Split(i.key.Name, "_"); len(keys) == 1 { + return keys[1] + } + + return "" +} + +func (i *singleItemMonitor) GetItem() (item interface{}, exists bool, err error) { + itemKey := i.GetItemKey() + return i.informer.GetStore().Get(fmt.Sprintf("%s/%s", i.key.Namespace, itemKey)) +} diff --git a/pkg/route/secret/secret_monitor.go b/pkg/route/secret/secret_monitor.go new file mode 100644 index 0000000000..d06f8b4eca --- /dev/null +++ b/pkg/route/secret/secret_monitor.go @@ -0,0 +1,149 @@ +package secret + +import ( + "fmt" + "strings" + "sync" + + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error) +type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error) + +type SecretEventHandlerRegistration interface { + cache.ResourceEventHandlerRegistration + + GetKey() ObjectKey +} + +type SecretMonitor interface { + AddEventHandler(namespace, name string, handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error) + + RemoveEventHandler(SecretEventHandlerRegistration) error + + GetSecret(SecretEventHandlerRegistration) (*v1.Secret, error) +} + +type secretEventHandlerRegistration struct { + cache.ResourceEventHandlerRegistration + objectKey ObjectKey +} + +func (r *secretEventHandlerRegistration) GetKey() ObjectKey { + return r.objectKey +} + +type sm struct { + kubeClient kubernetes.Interface + + lock sync.RWMutex + monitors map[ObjectKey]*singleItemMonitor +} + +func NewSecretMonitor(kubeClient *kubernetes.Clientset) SecretMonitor { + return &sm{ + kubeClient: kubeClient, + monitors: map[ObjectKey]*singleItemMonitor{}, + } +} + +func (s *sm) AddEventHandler(namespace, name string, handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error) { + s.lock.Lock() + defer s.lock.Unlock() + + // name is a combination or routename_secretname + key := ObjectKey{Namespace: namespace, Name: name} + m, exists := s.monitors[key] + + // TODO refactor this later + secretName := strings.Split(name, "_")[1] + if !exists { + sharedInformer := cache.NewSharedInformer( + cache.NewListWatchFromClient( + s.kubeClient.CoreV1().RESTClient(), + "secrets", + namespace, + fields.OneTermEqualSelector("metadata.name", secretName), + ), + &corev1.Secret{}, + 0, + ) + + m = newSingleItemMonitor(key, sharedInformer) + go m.StartInformer() + + s.monitors[key] = m + + klog.Info("secret informer started", " item key ", key) + } + + klog.Info("secret handler added", " item key ", key) + + return m.AddEventHandler(handler) +} + +func (s *sm) RemoveEventHandler(handle SecretEventHandlerRegistration) error { + s.lock.Lock() + defer s.lock.Unlock() + + key := handle.GetKey() + m, ok := s.monitors[key] + if !ok { + // already removed + return nil + } + + if err := m.RemoveEventHandler(handle); err != nil { + return err + } + klog.Info("secret handler removed", " item key", key) + + if m.numHandlers.Load() <= 0 { + m.Stop() + delete(s.monitors, key) + klog.Info("secret informer stopped ", " item key ", key) + } + + return nil +} + +func (s *sm) GetSecret(handle SecretEventHandlerRegistration) (*v1.Secret, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + key := handle.GetKey() + + m, exists := s.monitors[key] + + if !exists { + return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "secrets"}, m.GetItemKey()) + } + + uncast, exists, err := m.GetItem() + if !exists { + return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "secrets"}, m.GetItemKey()) + } + + if err != nil { + return nil, err + } + + ret, ok := uncast.(*v1.Secret) + if !ok { + return nil, fmt.Errorf("unexpected type: %T", uncast) + } + + return ret, nil + +}