Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions pkg/route/secret/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package secret

import (
"fmt"
"sync"
"time"

routev1 "github.com/openshift/api/route/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

type Manager struct {
monitor SecretMonitor
registeredHandlers map[string]SecretEventHandlerRegistration

lock sync.RWMutex

// monitors are the producer of the resourceChanges queue
resourceChanges workqueue.RateLimitingInterface

secretHandler cache.ResourceEventHandlerFuncs
}

func NewManager(kubeClient *kubernetes.Clientset, queue workqueue.RateLimitingInterface) *Manager {
return &Manager{
monitor: NewSecretMonitor(kubeClient),
lock: sync.RWMutex{},
resourceChanges: queue,
registeredHandlers: make(map[string]SecretEventHandlerRegistration),

// default secret handler
secretHandler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {},
},
}
}

func (m *Manager) WithSecretHandler(handler cache.ResourceEventHandlerFuncs) *Manager {
m.secretHandler = handler
return m
}

func (m *Manager) GetSecret(parent *routev1.Route, namespace, name string) (*v1.Secret, error) {
m.lock.Lock()
defer m.lock.Unlock()

key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name)
handle, exists := m.registeredHandlers[key]

if !exists {
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "routes"}, key)
}

if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) { return handle.HasSynced(), nil }); err != nil {
return nil, apierrors.NewInternalError(err)
}

obj, err := m.monitor.GetSecret(handle)
if err != nil {
return nil, err
}

return obj, nil
}

func (m *Manager) RegisterRoute(parent *routev1.Route, getReferencedObjects func(*routev1.Route) sets.String) error {
// TODO refactor later
// names := getReferencedObjects(parent)

m.lock.Lock()
defer m.lock.Unlock()

// TODO hard coded to test since externalCertificate is TP
handle, err := m.monitor.AddEventHandler(parent.Namespace, fmt.Sprintf("%s/%s", parent.Name, "dummy-secret"), m.secretHandler)
if err != nil {
return apierrors.NewInternalError(err)
}

key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name)
m.registeredHandlers[key] = handle

klog.Info("secret manager registered route", " route", key)

return nil

}

func (m *Manager) UnregisterRoute(parent *routev1.Route, getReferencedObjects func(*routev1.Route) sets.String) error {
m.lock.Lock()
defer m.lock.Unlock()

key := fmt.Sprintf("%s/%s", parent.Namespace, parent.Name)
handle, ok := m.registeredHandlers[key]
if !ok {
return apierrors.NewNotFound(schema.GroupResource{Resource: "routes"}, key)
}

err := m.monitor.RemoveEventHandler(handle)
if err != nil {
return apierrors.NewNotFound(schema.GroupResource{Resource: "routes"}, key)
}

delete(m.registeredHandlers, key)

klog.Info("secret manager unregistered route", " route", key)

return nil
}
89 changes: 89 additions & 0 deletions pkg/route/secret/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package secret

import (
"fmt"
"strings"
"sync"
"sync/atomic"

"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

type ObjectKey struct {
Namespace string
Name string
}

type singleItemMonitor struct {
key ObjectKey
informer cache.SharedInformer
numHandlers atomic.Int32

lock sync.Mutex
stopped bool
stopCh chan struct{}
}

func newSingleItemMonitor(key ObjectKey, informer cache.SharedInformer) *singleItemMonitor {
return &singleItemMonitor{
key: key,
informer: informer,
stopCh: make(chan struct{}),
}
}

func (i *singleItemMonitor) Stop() bool {
i.lock.Lock()
defer i.lock.Unlock()

if i.stopped {
return false
}
i.stopped = true
close(i.stopCh)
return true
}

func (i *singleItemMonitor) HasSynced() bool {
return i.informer.HasSynced()
}

func (i *singleItemMonitor) StartInformer() {
klog.Info("starting informer")
i.informer.Run(i.stopCh)
}

func (i *singleItemMonitor) AddEventHandler(handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error) {
registration, err := i.informer.AddEventHandler(handler)
if err != nil {
return nil, err
}
i.numHandlers.Add(1)

return &secretEventHandlerRegistration{
ResourceEventHandlerRegistration: registration,
objectKey: i.key,
}, nil
}

func (i *singleItemMonitor) RemoveEventHandler(handle SecretEventHandlerRegistration) error {
if err := i.informer.RemoveEventHandler(handle); err != nil {
return err
}
i.numHandlers.Add(-1)
return nil
}

func (i *singleItemMonitor) GetItemKey() string {
if keys := strings.Split(i.key.Name, "_"); len(keys) == 1 {
return keys[1]
}

return ""
}

func (i *singleItemMonitor) GetItem() (item interface{}, exists bool, err error) {
itemKey := i.GetItemKey()
return i.informer.GetStore().Get(fmt.Sprintf("%s/%s", i.key.Namespace, itemKey))
}
149 changes: 149 additions & 0 deletions pkg/route/secret/secret_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package secret

import (
"fmt"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)

type SecretEventHandlerRegistration interface {
cache.ResourceEventHandlerRegistration

GetKey() ObjectKey
}

type SecretMonitor interface {
AddEventHandler(namespace, name string, handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error)

RemoveEventHandler(SecretEventHandlerRegistration) error

GetSecret(SecretEventHandlerRegistration) (*v1.Secret, error)
}

type secretEventHandlerRegistration struct {
cache.ResourceEventHandlerRegistration
objectKey ObjectKey
}

func (r *secretEventHandlerRegistration) GetKey() ObjectKey {
return r.objectKey
}

type sm struct {
kubeClient kubernetes.Interface

lock sync.RWMutex
monitors map[ObjectKey]*singleItemMonitor
}

func NewSecretMonitor(kubeClient *kubernetes.Clientset) SecretMonitor {
return &sm{
kubeClient: kubeClient,
monitors: map[ObjectKey]*singleItemMonitor{},
}
}

func (s *sm) AddEventHandler(namespace, name string, handler cache.ResourceEventHandler) (SecretEventHandlerRegistration, error) {
s.lock.Lock()
defer s.lock.Unlock()

// name is a combination or routename_secretname
key := ObjectKey{Namespace: namespace, Name: name}
m, exists := s.monitors[key]

// TODO refactor this later
secretName := strings.Split(name, "_")[1]
if !exists {
sharedInformer := cache.NewSharedInformer(
cache.NewListWatchFromClient(
s.kubeClient.CoreV1().RESTClient(),
"secrets",
namespace,
fields.OneTermEqualSelector("metadata.name", secretName),
),
&corev1.Secret{},
0,
)

m = newSingleItemMonitor(key, sharedInformer)
go m.StartInformer()

s.monitors[key] = m

klog.Info("secret informer started", " item key ", key)
}

klog.Info("secret handler added", " item key ", key)

return m.AddEventHandler(handler)
}

func (s *sm) RemoveEventHandler(handle SecretEventHandlerRegistration) error {
s.lock.Lock()
defer s.lock.Unlock()

key := handle.GetKey()
m, ok := s.monitors[key]
if !ok {
// already removed
return nil
}

if err := m.RemoveEventHandler(handle); err != nil {
return err
}
klog.Info("secret handler removed", " item key", key)

if m.numHandlers.Load() <= 0 {
m.Stop()
delete(s.monitors, key)
klog.Info("secret informer stopped ", " item key ", key)
}

return nil
}

func (s *sm) GetSecret(handle SecretEventHandlerRegistration) (*v1.Secret, error) {
s.lock.RLock()
defer s.lock.RUnlock()

key := handle.GetKey()

m, exists := s.monitors[key]

if !exists {
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "secrets"}, m.GetItemKey())
}

uncast, exists, err := m.GetItem()
if !exists {
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "secrets"}, m.GetItemKey())
}

if err != nil {
return nil, err
}

ret, ok := uncast.(*v1.Secret)
if !ok {
return nil, fmt.Errorf("unexpected type: %T", uncast)
}

return ret, nil

}