Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ package trigger
import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/auth"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
Expand All @@ -44,7 +48,8 @@ import (
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"

serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
Expand All @@ -65,7 +70,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
brokerInformer := brokerinformer.Get(ctx)
triggerInformer := triggerinformer.Get(ctx)
triggerLister := triggerInformer.Lister()
serviceaccountInformer := serviceaccountinformer.Get(ctx)
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

Expand Down Expand Up @@ -95,7 +100,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
GetKafkaClient: clientPool.GetClient,
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
ServiceAccountLister: serviceaccountInformer.Lister(),
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
Expand Down Expand Up @@ -153,8 +158,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))

// Reconciler Trigger when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventing.Trigger{}),
oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Expand Down Expand Up @@ -182,6 +187,34 @@ func filterTriggers(lister eventinglisters.BrokerLister, brokerClass string, fin
}
}

// filterOIDCServiceAccounts returns a function that returns true if the resource passed
// is a service account, which is owned by a trigger pointing to a the given broker class.
func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister, brokerClass string, finalizer string) func(interface{}) bool {
return func(obj interface{}) bool {
controlledByTrigger := controller.FilterController(&eventing.Trigger{})(obj)
if !controlledByTrigger {
return false
}

sa, ok := obj.(*corev1.ServiceAccount)
if !ok {
return false
}

owner := metav1.GetControllerOf(sa)
if owner == nil {
return false
}

trigger, err := triggerLister.Triggers(sa.Namespace).Get(owner.Name)
if err != nil {
return false
}

return filterTriggers(brokerLister, brokerClass, finalizer)(trigger)
}
}

func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool {
for _, f := range finalizers {
if f == finalizerName {
Expand Down
183 changes: 179 additions & 4 deletions control-plane/pkg/reconciler/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
package trigger

import (
"context"
"testing"

triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
"knative.dev/pkg/ptr"

"knative.dev/eventing/pkg/auth"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -27,7 +34,8 @@ import (
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
"knative.dev/pkg/configmap"
reconcilertesting "knative.dev/pkg/reconciler/testing"

Expand All @@ -42,8 +50,7 @@ import (
)

func TestNewController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)
ctx = clientpool.WithKafkaClientPool(ctx)

controller := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{
Expand All @@ -60,8 +67,13 @@ func TestNewController(t *testing.T) {
}
}

func SetUpInformerSelector(ctx context.Context) context.Context {
ctx = filteredFactory.WithSelectors(ctx, auth.OIDCLabelSelector)
return ctx
}

func TestFilterTriggers(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

tt := []struct {
name string
Expand Down Expand Up @@ -184,3 +196,166 @@ func TestFilterTriggers(t *testing.T) {
})
}
}

func TestFilterOIDCServiceAccounts(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

tt := []struct {
name string
sa *corev1.ServiceAccount
trigger *eventing.Trigger
brokers []*eventing.Broker
pass bool
}{{
name: "matching owner reference",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
}},
pass: true,
}, {
name: "references trigger for wrong broker class",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: "another-broker-class",
},
},
}},
pass: false,
}, {
name: "references trigger with correct finalizer",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
},
}},
pass: true,
}, {
name: "no owner reference",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
}},
pass: false,
}}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
brokerInformer := brokerinformer.Get(ctx)
for _, obj := range tc.brokers {
err := brokerInformer.Informer().GetStore().Add(obj)
assert.NoError(t, err)
}

triggerInformer := triggerinformer.Get(ctx)
err := triggerInformer.Informer().GetStore().Add(tc.trigger)
assert.NoError(t, err)

filter := filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName)
pass := filter(tc.sa)
assert.Equal(t, tc.pass, pass)
})
}
}
14 changes: 8 additions & 6 deletions control-plane/pkg/reconciler/trigger/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/auth"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"

serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"

eventing "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
Expand All @@ -60,7 +62,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
brokerInformer := brokerinformer.Get(ctx)
triggerInformer := triggerinformer.Get(ctx)
triggerLister := triggerInformer.Lister()
serviceaccountInformer := serviceaccountinformer.Get(ctx)
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

Expand All @@ -82,7 +84,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
},
BrokerLister: brokerInformer.Lister(),
ConfigMapLister: configmapInformer.Lister(),
ServiceAccountLister: serviceaccountInformer.Lister(),
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
EventingClient: eventingclient.Get(ctx),
Env: configs,
GetKafkaClient: clientPool.GetClient,
Expand Down Expand Up @@ -150,8 +152,8 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))

// Reconciler Trigger when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventing.Trigger{}),
oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.NamespacedBrokerClass, FinalizerName),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"knative.dev/pkg/configmap"
reconcilertesting "knative.dev/pkg/reconciler/testing"

_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"

Expand All @@ -37,7 +40,7 @@ import (
)

func TestNewNamespacedController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

ctx = clientpool.WithKafkaClientPool(ctx)

Expand Down
Loading