From 9ebf028006d23c92de29e5d1179d19b557c074c7 Mon Sep 17 00:00:00 2001 From: ivan katliarchuk Date: Mon, 14 Jul 2025 12:48:54 +0100 Subject: [PATCH 1/4] fix(source/service): disable pod and endpointSlicesInformer when not required Signed-off-by: ivan katliarchuk --- source/informers/fake_informers.go | 60 +++++++++ source/service.go | 89 +++++++------- source/service_test.go | 189 +++++++++++++++++++++++++---- 3 files changed, 270 insertions(+), 68 deletions(-) create mode 100644 source/informers/fake_informers.go diff --git a/source/informers/fake_informers.go b/source/informers/fake_informers.go new file mode 100644 index 0000000000..aed7b3aee7 --- /dev/null +++ b/source/informers/fake_informers.go @@ -0,0 +1,60 @@ +/* +Copyright 2025 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "github.com/stretchr/testify/mock" + corev1lister "k8s.io/client-go/listers/core/v1" + discoveryv1lister "k8s.io/client-go/listers/discovery/v1" + "k8s.io/client-go/tools/cache" +) + +type FakeServiceInformer struct { + mock.Mock +} + +func (f *FakeServiceInformer) Informer() cache.SharedIndexInformer { + args := f.Called() + return args.Get(0).(cache.SharedIndexInformer) +} + +func (f *FakeServiceInformer) Lister() corev1lister.ServiceLister { + return corev1lister.NewServiceLister(f.Informer().GetIndexer()) +} + +type FakeEndpointSliceInformer struct { + mock.Mock +} + +func (f *FakeEndpointSliceInformer) Informer() cache.SharedIndexInformer { + args := f.Called() + return args.Get(0).(cache.SharedIndexInformer) +} + +func (f *FakeEndpointSliceInformer) Lister() discoveryv1lister.EndpointSliceLister { + return discoveryv1lister.NewEndpointSliceLister(f.Informer().GetIndexer()) +} + +type FakeNodeInformer struct { + mock.Mock +} + +func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer { + args := f.Called() + return args.Get(0).(cache.SharedIndexInformer) +} + +func (f *FakeNodeInformer) Lister() corev1lister.NodeLister { + return corev1lister.NewNodeLister(f.Informer().GetIndexer()) +} diff --git a/source/service.go b/source/service.go index 24f3b60ee1..eb7e174e1b 100644 --- a/source/service.go +++ b/source/service.go @@ -96,28 +96,9 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name // Set the resync period to 0 to prevent processing when nothing has changed informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) serviceInformer := informerFactory.Core().V1().Services() - endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices() - podInformer := informerFactory.Core().V1().Pods() // Add default resource event handlers to properly initialize informer. - _, _ = serviceInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - }, - }, - ) - _, _ = endpointSlicesInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - }, - }, - ) - _, _ = podInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - }, - }, - ) + _, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) // Transform the slice into a map so it will be way much easier and fast to filter later sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter) @@ -125,32 +106,42 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name return nil, err } + var endpointSlicesInformer discoveryinformers.EndpointSliceInformer + var podInformer coreinformers.PodInformer + if sTypesFilter.isAllOrRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) { + endpointSlicesInformer = informerFactory.Discovery().V1().EndpointSlices() + podInformer = informerFactory.Core().V1().Pods() + + _, _ = endpointSlicesInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + _, _ = podInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + + // Add an indexer to the EndpointSlice informer to index by the service name label + err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{ + serviceNameIndexKey: func(obj any) ([]string, error) { + endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + // This should never happen because the Informer should only contain EndpointSlice objects + return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj) + } + serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] + if serviceName == "" { + return nil, nil + } + key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String() + return []string{key}, nil + }, + }) + if err != nil { + return nil, err + } + } + var nodeInformer coreinformers.NodeInformer - if sTypesFilter.isNodeInformerRequired() { + if sTypesFilter.isAllOrRequired(v1.ServiceTypeNodePort) { nodeInformer = informerFactory.Core().V1().Nodes() _, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) } - // Add an indexer to the EndpointSlice informer to index by the service name label - err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{ - serviceNameIndexKey: func(obj any) ([]string, error) { - endpointSlice, ok := obj.(*discoveryv1.EndpointSlice) - if !ok { - // This should never happen because the Informer should only contain EndpointSlice objects - return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj) - } - serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName] - if serviceName == "" { - return nil, nil - } - key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String() - return []string{key}, nil - }, - }) - if err != nil { - return nil, err - } - informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. @@ -802,10 +793,10 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) { // Right now there is no way to remove event handler from informer, see: // https://github.com/kubernetes/kubernetes/issues/79610 _, _ = sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) - if sc.listenEndpointEvents { + if sc.listenEndpointEvents && sc.serviceTypeFilter.isAllOrRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) { _, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) } - if sc.serviceTypeFilter.isNodeInformerRequired() { + if sc.serviceTypeFilter.isAllOrRequired(v1.ServiceTypeNodePort) { _, _ = sc.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) } } @@ -842,12 +833,16 @@ func (sc *serviceTypes) isProcessed(serviceType v1.ServiceType) bool { return !sc.enabled || sc.types[serviceType] } -func (sc *serviceTypes) isNodeInformerRequired() bool { - if !sc.enabled { +func (sc *serviceTypes) isAllOrRequired(opts ...v1.ServiceType) bool { + if len(opts) == 0 || !sc.enabled { return true } - _, ok := sc.types[v1.ServiceTypeNodePort] - return ok + for _, opt := range opts { + if _, ok := sc.types[opt]; ok { + return true + } + } + return false } // conditionToBool converts an EndpointConditions condition to a bool value. diff --git a/source/service_test.go b/source/service_test.go index ecbe1fc830..a265b4522b 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/external-dns/source/informers" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/internal/testutils" @@ -251,7 +252,7 @@ func testServiceSourceEndpoints(t *testing.T) { }, externalIPs: []string{}, lbs: []string{"1.2.3.4"}, - serviceTypesFilter: []string{}, + serviceTypesFilter: []string{string(v1.ServiceTypeLoadBalancer)}, expected: []*endpoint.Endpoint{ {DNSName: "foo.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, }, @@ -296,7 +297,7 @@ func testServiceSourceEndpoints(t *testing.T) { annotations: map[string]string{}, externalIPs: []string{}, lbs: []string{"1.2.3.4"}, - serviceTypesFilter: []string{}, + serviceTypesFilter: []string{string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeNodePort)}, expected: []*endpoint.Endpoint{ {DNSName: "foo.fqdn.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, {DNSName: "foo.fqdn.com", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, @@ -2443,6 +2444,7 @@ func TestHeadlessServices(t *testing.T) { podsReady []bool publishNotReadyAddresses bool nodes []v1.Node + serviceTypesFilter []string expected []*endpoint.Endpoint expectError bool }{ @@ -2473,6 +2475,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "foo-0.service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.1"}}, {DNSName: "foo-1.service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.2"}}, @@ -2507,6 +2510,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true}, false, []v1.Node{}, + []string{string(v1.ServiceTypeClusterIP), string(v1.ServiceTypeLoadBalancer)}, []*endpoint.Endpoint{ {DNSName: "foo-0.service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::1"}}, {DNSName: "foo-1.service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::2"}}, @@ -2541,6 +2545,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{}, false, }, @@ -2572,6 +2577,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "foo-0.service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.1"}, RecordTTL: endpoint.TTL(1)}, {DNSName: "foo-1.service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.2"}, RecordTTL: endpoint.TTL(1)}, @@ -2607,6 +2613,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "foo-0.service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::1"}, RecordTTL: endpoint.TTL(1)}, {DNSName: "foo-1.service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::2"}, RecordTTL: endpoint.TTL(1)}, @@ -2641,6 +2648,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, false}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "foo-0.service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.1"}}, {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.1"}}, @@ -2674,6 +2682,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, false}, true, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "foo-0.service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.1"}}, {DNSName: "foo-1.service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.2"}}, @@ -2708,6 +2717,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}}, }, @@ -2740,6 +2750,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}}, }, @@ -2772,6 +2783,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::1", "2001:db8::2"}}, }, @@ -2806,6 +2818,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true, true}, false, []v1.Node{}, + []string{string(v1.ServiceTypeClusterIP)}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, }, @@ -2840,6 +2853,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::4"}}, }, @@ -2884,6 +2898,7 @@ func TestHeadlessServices(t *testing.T) { }, }, }, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, }, @@ -2932,6 +2947,7 @@ func TestHeadlessServices(t *testing.T) { }, }, }, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::5"}}, }, @@ -2976,6 +2992,7 @@ func TestHeadlessServices(t *testing.T) { }, }, }, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::4"}}, }, @@ -3024,6 +3041,7 @@ func TestHeadlessServices(t *testing.T) { }, }, }, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, {DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::4"}}, @@ -3058,6 +3076,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}}, }, @@ -3091,6 +3110,7 @@ func TestHeadlessServices(t *testing.T) { []bool{true, true, true}, false, []v1.Node{}, + []string{}, []*endpoint.Endpoint{ {DNSName: "service.example.org", RecordType: endpoint.RecordTypeAAAA, Targets: endpoint.Targets{"2001:db8::4"}}, }, @@ -3187,7 +3207,7 @@ func TestHeadlessServices(t *testing.T) { true, false, false, - []string{}, + tc.serviceTypesFilter, tc.ignoreHostnameAnnotation, labels.Everything(), false, @@ -3939,7 +3959,6 @@ func TestHeadlessServicesHostIP(t *testing.T) { t.Run(tc.title, func(t *testing.T) { t.Parallel() - // Create a Kubernetes testing client kubernetes := fake.NewClientset() service := &v1.Service{ @@ -4079,7 +4098,7 @@ func TestExternalServices(t *testing.T) { }, "111.111.111.111", []string{}, - []string{}, + []string{string(v1.ServiceTypeNodePort), string(v1.ServiceTypeExternalName)}, []*endpoint.Endpoint{ {DNSName: "service.example.org", Targets: endpoint.Targets{"111.111.111.111"}, RecordType: endpoint.RecordTypeA}, }, @@ -4121,7 +4140,7 @@ func TestExternalServices(t *testing.T) { }, "remote.example.com", []string{}, - []string{}, + []string{string(v1.ServiceTypeExternalName)}, []*endpoint.Endpoint{ {DNSName: "service.example.org", Targets: endpoint.Targets{"remote.example.com"}, RecordType: endpoint.RecordTypeCNAME}, }, @@ -4316,6 +4335,8 @@ func TestNewServiceSourceInformersEnabled(t *testing.T) { assert.NotNil(t, svc.serviceTypeFilter) assert.False(t, svc.serviceTypeFilter.enabled) assert.NotNil(t, svc.nodeInformer) + assert.NotNil(t, svc.serviceInformer) + assert.NotNil(t, svc.endpointSlicesInformer) }, }, { @@ -4325,17 +4346,49 @@ func TestNewServiceSourceInformersEnabled(t *testing.T) { assert.NotNil(t, svc) assert.NotNil(t, svc.serviceTypeFilter) assert.True(t, svc.serviceTypeFilter.enabled) + assert.NotNil(t, svc.serviceInformer) assert.Nil(t, svc.nodeInformer) + assert.NotNil(t, svc.endpointSlicesInformer) + assert.NotNil(t, svc.podInformer) }, }, { - name: "serviceTypeFilter contains NodePort", - svcFilter: []string{string(v1.ServiceTypeNodePort)}, + name: "serviceTypeFilter contains NodePort and ExternalName", + svcFilter: []string{string(v1.ServiceTypeNodePort), string(v1.ServiceTypeExternalName)}, asserts: func(svc *serviceSource) { assert.NotNil(t, svc) assert.NotNil(t, svc.serviceTypeFilter) assert.True(t, svc.serviceTypeFilter.enabled) + assert.NotNil(t, svc.serviceInformer) assert.NotNil(t, svc.nodeInformer) + assert.NotNil(t, svc.endpointSlicesInformer) + assert.NotNil(t, svc.podInformer) + }, + }, + { + name: "serviceTypeFilter contains ExternalName", + svcFilter: []string{string(v1.ServiceTypeExternalName)}, + asserts: func(svc *serviceSource) { + assert.NotNil(t, svc) + assert.NotNil(t, svc.serviceTypeFilter) + assert.True(t, svc.serviceTypeFilter.enabled) + assert.NotNil(t, svc.serviceInformer) + assert.Nil(t, svc.nodeInformer) + assert.Nil(t, svc.endpointSlicesInformer) + assert.Nil(t, svc.podInformer) + }, + }, + { + name: "serviceTypeFilter contains LoadBalancer", + svcFilter: []string{string(v1.ServiceTypeLoadBalancer)}, + asserts: func(svc *serviceSource) { + assert.NotNil(t, svc) + assert.NotNil(t, svc.serviceTypeFilter) + assert.True(t, svc.serviceTypeFilter.enabled) + assert.NotNil(t, svc.serviceInformer) + assert.Nil(t, svc.nodeInformer) + assert.Nil(t, svc.endpointSlicesInformer) + assert.Nil(t, svc.podInformer) }, }, } @@ -4626,32 +4679,126 @@ func createTestServicesByType(namespace string, typeCounts map[v1.ServiceType]in func TestServiceTypes_isNodeInformerRequired(t *testing.T) { tests := []struct { - name string - filter []string - want bool + name string + filter []string + required []v1.ServiceType + want bool }{ { - name: "NodePort type present", - filter: []string{string(v1.ServiceTypeNodePort)}, - want: true, + name: "NodePort required and filter is empty", + filter: []string{}, + required: []v1.ServiceType{v1.ServiceTypeNodePort}, + want: true, + }, + { + name: "NodePort type present", + filter: []string{string(v1.ServiceTypeNodePort)}, + required: []v1.ServiceType{v1.ServiceTypeNodePort}, + want: true, }, { - name: "NodePort type absent, filter enabled", - filter: []string{string(v1.ServiceTypeLoadBalancer)}, - want: false, + name: "NodePort type absent, filter enabled", + filter: []string{string(v1.ServiceTypeLoadBalancer)}, + required: []v1.ServiceType{v1.ServiceTypeNodePort}, + want: false, }, { - name: "NodePort and other filters present", - filter: []string{string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeNodePort)}, - want: true, + name: "NodePort and other filters present", + filter: []string{string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeNodePort)}, + required: []v1.ServiceType{v1.ServiceTypeNodePort}, + want: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { filter, _ := newServiceTypesFilter(tt.filter) - got := filter.isNodeInformerRequired() + got := filter.isAllOrRequired(tt.required...) assert.Equal(t, tt.want, got) }) } } + +func TestServiceSource_AddEventHandler(t *testing.T) { + var fakeServiceInformer *informers.FakeServiceInformer + var fakeEdpInformer *informers.FakeEndpointSliceInformer + var fakeNodeInformer *informers.FakeNodeInformer + tests := []struct { + name string + filter []string + times int + asserts func(t *testing.T, s *serviceSource) + }{ + { + name: "AddEventHandler should trigger all event handlers when empty filter is provided", + filter: []string{}, + times: 3, + asserts: func(t *testing.T, s *serviceSource) { + fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeNodeInformer.AssertNumberOfCalls(t, "Informer", 1) + }, + }, + { + name: "AddEventHandler should trigger only service event handler", + filter: []string{string(v1.ServiceTypeExternalName), string(v1.ServiceTypeLoadBalancer)}, + times: 1, + asserts: func(t *testing.T, s *serviceSource) { + fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 0) + fakeNodeInformer.AssertNumberOfCalls(t, "Informer", 0) + }, + }, + { + name: "AddEventHandler should configure only service event handler", + filter: []string{string(v1.ServiceTypeExternalName), string(v1.ServiceTypeLoadBalancer), string(v1.ServiceTypeClusterIP)}, + times: 2, + asserts: func(t *testing.T, s *serviceSource) { + fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeNodeInformer.AssertNumberOfCalls(t, "Informer", 0) + }, + }, + { + name: "AddEventHandler should configure all service event handlers", + filter: []string{string(v1.ServiceTypeNodePort)}, + times: 3, + asserts: func(t *testing.T, s *serviceSource) { + fakeServiceInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeEdpInformer.AssertNumberOfCalls(t, "Informer", 1) + fakeNodeInformer.AssertNumberOfCalls(t, "Informer", 1) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeServiceInformer = new(informers.FakeServiceInformer) + infSvc := testInformer{} + fakeServiceInformer.On("Informer").Return(&infSvc) + + fakeEdpInformer = new(informers.FakeEndpointSliceInformer) + infEdp := testInformer{} + fakeEdpInformer.On("Informer").Return(&infEdp) + + fakeNodeInformer = new(informers.FakeNodeInformer) + infNode := testInformer{} + fakeNodeInformer.On("Informer").Return(&infNode) + + filter, _ := newServiceTypesFilter(tt.filter) + + svcSource := &serviceSource{ + endpointSlicesInformer: fakeEdpInformer, + serviceInformer: fakeServiceInformer, + nodeInformer: fakeNodeInformer, + serviceTypeFilter: filter, + listenEndpointEvents: true, + } + + svcSource.AddEventHandler(t.Context(), func() {}) + + assert.Equal(t, tt.times, infSvc.times+infEdp.times+infNode.times) + + tt.asserts(t, svcSource) + }) + } +} From cce0ba9839f5169eac0e03ad63e37dd4b042d505 Mon Sep 17 00:00:00 2001 From: ivan katliarchuk Date: Mon, 14 Jul 2025 12:55:58 +0100 Subject: [PATCH 2/4] fix(source/service): disable pod and endpointSlicesInformer when not required Signed-off-by: ivan katliarchuk --- source/service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/service.go b/source/service.go index eb7e174e1b..87a73cadcf 100644 --- a/source/service.go +++ b/source/service.go @@ -833,6 +833,8 @@ func (sc *serviceTypes) isProcessed(serviceType v1.ServiceType) bool { return !sc.enabled || sc.types[serviceType] } +// isAllOrRequired returns true if service type filtering is disabled or if any of the provided service types are present in the filter. +// If no options are provided, it returns true. func (sc *serviceTypes) isAllOrRequired(opts ...v1.ServiceType) bool { if len(opts) == 0 || !sc.enabled { return true From e3ad397621ddc6a5eb0b62e97db1dec6f4b4907f Mon Sep 17 00:00:00 2001 From: ivan katliarchuk Date: Mon, 21 Jul 2025 14:13:15 +0100 Subject: [PATCH 3/4] fix(source/service): disable pod and endpointSlices informers when they are not needed Signed-off-by: ivan katliarchuk --- source/informers/{fake_informers.go => fake.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename source/informers/{fake_informers.go => fake.go} (100%) diff --git a/source/informers/fake_informers.go b/source/informers/fake.go similarity index 100% rename from source/informers/fake_informers.go rename to source/informers/fake.go From 87bd583c34d75a38b0f02896b08ce5182e86c47a Mon Sep 17 00:00:00 2001 From: ivan katliarchuk Date: Tue, 29 Jul 2025 13:37:33 +0100 Subject: [PATCH 4/4] fix(source/service): disable pod and endpointSlices informers when they are not needed Signed-off-by: ivan katliarchuk --- source/service.go | 12 ++++++------ source/service_test.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/service.go b/source/service.go index 87a73cadcf..344a8569f5 100644 --- a/source/service.go +++ b/source/service.go @@ -108,7 +108,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name var endpointSlicesInformer discoveryinformers.EndpointSliceInformer var podInformer coreinformers.PodInformer - if sTypesFilter.isAllOrRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) { + if sTypesFilter.isRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) { endpointSlicesInformer = informerFactory.Discovery().V1().EndpointSlices() podInformer = informerFactory.Core().V1().Pods() @@ -137,7 +137,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name } var nodeInformer coreinformers.NodeInformer - if sTypesFilter.isAllOrRequired(v1.ServiceTypeNodePort) { + if sTypesFilter.isRequired(v1.ServiceTypeNodePort) { nodeInformer = informerFactory.Core().V1().Nodes() _, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) } @@ -793,10 +793,10 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) { // Right now there is no way to remove event handler from informer, see: // https://github.com/kubernetes/kubernetes/issues/79610 _, _ = sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) - if sc.listenEndpointEvents && sc.serviceTypeFilter.isAllOrRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) { + if sc.listenEndpointEvents && sc.serviceTypeFilter.isRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) { _, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) } - if sc.serviceTypeFilter.isAllOrRequired(v1.ServiceTypeNodePort) { + if sc.serviceTypeFilter.isRequired(v1.ServiceTypeNodePort) { _, _ = sc.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler)) } } @@ -833,9 +833,9 @@ func (sc *serviceTypes) isProcessed(serviceType v1.ServiceType) bool { return !sc.enabled || sc.types[serviceType] } -// isAllOrRequired returns true if service type filtering is disabled or if any of the provided service types are present in the filter. +// isRequired returns true if service type filtering is disabled or if any of the provided service types are present in the filter. // If no options are provided, it returns true. -func (sc *serviceTypes) isAllOrRequired(opts ...v1.ServiceType) bool { +func (sc *serviceTypes) isRequired(opts ...v1.ServiceType) bool { if len(opts) == 0 || !sc.enabled { return true } diff --git a/source/service_test.go b/source/service_test.go index a265b4522b..57bd71f48d 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -4713,7 +4713,7 @@ func TestServiceTypes_isNodeInformerRequired(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { filter, _ := newServiceTypesFilter(tt.filter) - got := filter.isAllOrRequired(tt.required...) + got := filter.isRequired(tt.required...) assert.Equal(t, tt.want, got) }) }