Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions source/informers/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informers

import (
"github.com/stretchr/testify/mock"
corev1lister "k8s.io/client-go/listers/core/v1"
discoveryv1lister "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache"
)

type FakeServiceInformer struct {
mock.Mock
}

func (f *FakeServiceInformer) Informer() cache.SharedIndexInformer {
args := f.Called()
return args.Get(0).(cache.SharedIndexInformer)
}

func (f *FakeServiceInformer) Lister() corev1lister.ServiceLister {
return corev1lister.NewServiceLister(f.Informer().GetIndexer())
}

type FakeEndpointSliceInformer struct {
mock.Mock
}

func (f *FakeEndpointSliceInformer) Informer() cache.SharedIndexInformer {
args := f.Called()
return args.Get(0).(cache.SharedIndexInformer)
}

func (f *FakeEndpointSliceInformer) Lister() discoveryv1lister.EndpointSliceLister {
return discoveryv1lister.NewEndpointSliceLister(f.Informer().GetIndexer())
}

type FakeNodeInformer struct {
mock.Mock
}

func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer {
args := f.Called()
return args.Get(0).(cache.SharedIndexInformer)
}

func (f *FakeNodeInformer) Lister() corev1lister.NodeLister {
return corev1lister.NewNodeLister(f.Informer().GetIndexer())
}
91 changes: 44 additions & 47 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,61 +96,52 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
// Set the resync period to 0 to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices()
podInformer := informerFactory.Core().V1().Pods()

// Add default resource event handlers to properly initialize informer.
_, _ = serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
_, _ = endpointSlicesInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
_, _ = podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
_, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())

// Transform the slice into a map so it will be way much easier and fast to filter later
sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter)
if err != nil {
return nil, err
}

var endpointSlicesInformer discoveryinformers.EndpointSliceInformer
var podInformer coreinformers.PodInformer
if sTypesFilter.isRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) {
endpointSlicesInformer = informerFactory.Discovery().V1().EndpointSlices()
podInformer = informerFactory.Core().V1().Pods()

_, _ = endpointSlicesInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
_, _ = podInformer.Informer().AddEventHandler(informers.DefaultEventHandler())

// Add an indexer to the EndpointSlice informer to index by the service name label
err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{
serviceNameIndexKey: func(obj any) ([]string, error) {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
// This should never happen because the Informer should only contain EndpointSlice objects
return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj)
}
serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName]
if serviceName == "" {
return nil, nil
}
key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String()
return []string{key}, nil
},
})
if err != nil {
return nil, err
}
}

var nodeInformer coreinformers.NodeInformer
if sTypesFilter.isNodeInformerRequired() {
if sTypesFilter.isRequired(v1.ServiceTypeNodePort) {
nodeInformer = informerFactory.Core().V1().Nodes()
_, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
}

// Add an indexer to the EndpointSlice informer to index by the service name label
err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{
serviceNameIndexKey: func(obj any) ([]string, error) {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
// This should never happen because the Informer should only contain EndpointSlice objects
return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj)
}
serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName]
if serviceName == "" {
return nil, nil
}
key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String()
return []string{key}, nil
},
})
if err != nil {
return nil, err
}

informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
Expand Down Expand Up @@ -802,10 +793,10 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) {
// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
_, _ = sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
if sc.listenEndpointEvents {
if sc.listenEndpointEvents && sc.serviceTypeFilter.isRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) {
_, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
if sc.serviceTypeFilter.isNodeInformerRequired() {
if sc.serviceTypeFilter.isRequired(v1.ServiceTypeNodePort) {
_, _ = sc.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
}
Expand Down Expand Up @@ -842,12 +833,18 @@ func (sc *serviceTypes) isProcessed(serviceType v1.ServiceType) bool {
return !sc.enabled || sc.types[serviceType]
}

func (sc *serviceTypes) isNodeInformerRequired() bool {
if !sc.enabled {
// isRequired returns true if service type filtering is disabled or if any of the provided service types are present in the filter.
// If no options are provided, it returns true.
func (sc *serviceTypes) isRequired(opts ...v1.ServiceType) bool {
if len(opts) == 0 || !sc.enabled {
return true
}
_, ok := sc.types[v1.ServiceTypeNodePort]
return ok
for _, opt := range opts {
if _, ok := sc.types[opt]; ok {
return true
}
}
return false
}

// conditionToBool converts an EndpointConditions condition to a bool value.
Expand Down
Loading
Loading