Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions pkg/route/secret/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package secret

import (
"fmt"
"sync"
"time"

routev1 "github.com/openshift/api/route/v1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

// this looks somewhat confused, I'm not sure this layer is necessary. Given a SecretMonitor and a controller
// that observes Routes, it seems as though you'd simply wire handles to trigger evaluation for the route in question
// and eventhandler for that controller would do the diff/deletion on its own.
// If you have a sample controller I can help there
type Monitor interface {
// Get secret by secret namespace and name.
GetSecret(namespace, name string) (*v1.Secret, error)

// WARNING: Register/UnregisterRoute functions should be efficient,
// i.e. should not block on network operations.

// RegisterRoute registers all secrets from a given Route.
RegisterRoute(*routev1.Route, func(*routev1.Route) sets.String)

// UnregisterRoute unregisters secrets from a given Route that are not
// used by any other registered Route.
UnregisterRoute(*routev1.Route, func(*routev1.Route) sets.String)
}

// SecretMonitor keeps a store with secrets necessary
// for registered routes.
type Manager struct {
monitor SecretMonitor
registeredHandlers map[string]SecretEventHandlerRegistration

lock sync.RWMutex

stopCh <-chan struct{}

// monitors are the producer of the resourceChanges queue
resourceChanges workqueue.RateLimitingInterface

secretHandler cache.ResourceEventHandlerFuncs
}

func NewRouteManager(kubeClient *kubernetes.Clientset, queue workqueue.RateLimitingInterface) *Manager {
return &Manager{
monitor: NewSecretMonitor(kubeClient),
lock: sync.RWMutex{},
stopCh: make(<-chan struct{}),
resourceChanges: queue,
registeredHandlers: make(map[string]SecretEventHandlerRegistration),

// default secret handler
secretHandler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {},
},
}
}

func (m *Manager) WithSecretHandler(handler cache.ResourceEventHandlerFuncs) *Manager {
m.secretHandler = handler
return m
}

func (m *Manager) GetSecret(parent *routev1.Route, namespace, name string) (*v1.Secret, error) {
key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name)
gr := appsv1.Resource("secret")

m.lock.RLock()
handle, exists := m.registeredHandlers[key]
m.lock.RUnlock()

if !exists {
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
}

if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) { return handle.HasSynced(), nil }); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", gr.String(), err)
}

obj, err := m.monitor.GetSecret(handle)
if err != nil {
return nil, err
}

return obj, nil
}

func (m *Manager) RegisterRoute(parent *routev1.Route, getReferencedObjects func(*routev1.Route) sets.String) {
// TODO refactor later
// names := getReferencedObjects(parent)

m.lock.Lock()
defer m.lock.Unlock()

// TODO iterate refererenced objects if we have 1-many mappings between route and secrets
// TODO hard coded to test since externalCertificate is TP
handle, err := m.monitor.AddEventHandler(parent.Namespace, fmt.Sprintf("%s_%s", parent.Name, "dummy-secret"), m.secretHandler)
if err != nil {
// TODO handle errors, sig change
}

key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name)
m.registeredHandlers[key] = handle

}

func (m *Manager) UnregisterRoute(parent *routev1.Route, getReferencedObjects func(*routev1.Route) sets.String) {
key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name)

m.lock.Lock()
defer m.lock.Unlock()

handle, ok := m.registeredHandlers[key]
if !ok {
// TODO handle errors, sig change
}

err := m.monitor.RemoveEventHandler(handle)
if err != nil {
// TODO handle errors, sig change
}

delete(m.registeredHandlers, key)
}
76 changes: 76 additions & 0 deletions pkg/route/secret/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package secret

import (
"fmt"
"sync"
"sync/atomic"

"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

type ObjectKey struct {
Namespace string
Name string
}

type singleItemMonitor struct {
key ObjectKey
informer cache.SharedInformer

lock sync.Mutex
numHandlers atomic.Int32
stopped bool
stopCh chan struct{}
}

func newSingleItemMonitor(key ObjectKey, informer cache.SharedInformer) *singleItemMonitor {
return &singleItemMonitor{
key: key,
informer: informer,
stopCh: make(chan struct{}),
}
}

func (i *singleItemMonitor) Stop() bool {
i.lock.Lock()
defer i.lock.Unlock()
if i.stopped {
return false
}
i.stopped = true
close(i.stopCh)
return true
}

func (i *singleItemMonitor) StartInformer() {
i.lock.Lock()
defer i.lock.Unlock()
klog.Info("starting informer")
i.informer.Run(i.stopCh)
}

func (i *singleItemMonitor) AddEventHandler(handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error) {
registration, err := i.informer.AddEventHandler(handler)
if err != nil {
return nil, err
}
i.numHandlers.Add(1)

return &secretEventHandlerRegistration{
ResourceEventHandlerRegistration: registration,
objectKey: i.key,
}, nil
}

func (i *singleItemMonitor) RemoveEventHandler(handle SecretEventHandlerRegistration) error {
if err := i.informer.RemoveEventHandler(handle); err != nil {
return err
}
i.numHandlers.Add(-1)
return nil
}

func (i *singleItemMonitor) GetItem() (item interface{}, exists bool, err error) {
return i.informer.GetStore().Get(fmt.Sprintf("%s/%s", i.key.Namespace, i.key.Name))
}
138 changes: 138 additions & 0 deletions pkg/route/secret/secret_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package secret

import (
"context"
"fmt"
"sync"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

type SecretEventHandlerRegistration interface {
cache.ResourceEventHandlerRegistration

GetKey() ObjectKey
}

type SecretMonitor interface {
AddEventHandler(namespace, name string, handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error)

RemoveEventHandler(SecretEventHandlerRegistration) error

GetSecret(namespace, name string) (*corev1.Secret, error)
}

type secretEventHandlerRegistration struct {
cache.ResourceEventHandlerRegistration

objectKey ObjectKey
}

func (r *secretEventHandlerRegistration) GetKey() ObjectKey {
return r.objectKey
}

type sm struct {
kubeClient kubernetes.Interface

lock sync.RWMutex
monitors map[ObjectKey]*singleItemMonitor
}

func NewSecretMonitor(kubeClient *kubernetes.Clientset) SecretMonitor {
return &sm{
kubeClient: kubeClient,
monitors: map[ObjectKey]*singleItemMonitor{}
}
}

func (s *sm) AddEventHandler(namespace, name string, handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
s.lock.Lock()
defer s.lock.Unlock()

key := ObjectKey{Namespace: namespace, Name: name}
m, exists := s.monitors[key]

if !exists {
sharedInformer := cache.NewSharedInformer(
cache.NewListWatchFromClient(
s.kubeClient.CoreV1().RESTClient(),
"secrets",
namespace,
fields.OneTermEqualSelector("metadata.name", name),
),
&corev1.Secret{},
0)

m := newSingleItemMonitor(key, sharedInformer)
go m.StartInformer()

s.monitors[key] = m

klog.Info("secret informer started", " item key ", key)
}

klog.Info("secret handler added", " item key ", key)
return m.AddEventHandler(handler)
}

func (s *sm) RemoveEventHandler(handle SecretEventHandlerRegistration) error {
s.lock.Lock()
defer s.lock.Unlock()

key := handle.GetKey()
m, ok := s.monitors[key]
if !ok {
// already gone
return nil
}

klog.Info("secret handler removed", " item key ", key)
if err := m.RemoveEventHandler(handle); err != nil {
return err
}

if m.numHandlers.Load() <= 0 {
klog.Info("secret informer stopped", " item key ", key)
m.Stop()
delete(s.monitors, key)
}

return nil
}

func (s *sm) GetSecret(namespace, name string) (*corev1.Secret, error) {
s.lock.RLock()
defer s.lock.RUnlock()

key := ObjectKey{Namespace: namespace, Name: name}
m, exists := s.monitors[key]

if !exists {
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "secrets"}, name)
}

uncast, exists, err := m.GetItem()
if !exists {
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "secrets"}, name)
}
if err != nil {
return nil, err
}

ret, ok := uncast.(*corev1.Secret)
if !ok {
return nil, fmt.Errorf("unexpected type: %T", uncast)
}

return ret, nil
}