Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions source/endpoint_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/source/informers"

v1alpha3 "istio.io/api/networking/v1alpha3"
istiov1a "istio.io/client-go/pkg/apis/networking/v1"
Expand Down Expand Up @@ -85,21 +86,16 @@ func BenchmarkEndpointTargetsFromServicesHighIterateOverGateways(b *testing.B) {
}
}

// helperToPopulateFakeClientWithServices populates a fake Kubernetes client with a specified services.
// svcInformerWithServices populates a fake Kubernetes client with a specified services.
func svcInformerWithServices(toLookup, underTest int) (coreinformers.ServiceInformer, error) {
client := fake.NewClientset()
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0, kubeinformers.WithNamespace("default"))
svcInformer := informerFactory.Core().V1().Services()
ctx := context.Background()

_, err := svcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
err := informers.ServiceWithDefaultOptions(svcInformer, "default")
if err != nil {
return nil, fmt.Errorf("failed to add event handler: %w", err)
return nil, fmt.Errorf("failed to set default options for service informer: %w", err)
}

services := fixturesSvcWithLabels(toLookup, underTest)
Expand Down
20 changes: 17 additions & 3 deletions source/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
coreinformers "k8s.io/client-go/informers/core/v1"

Check failure on line 21 in source/endpoints.go

View workflow job for this annotation

GitHub Actions / Markdown, Go and OAS

File is not properly formatted (goimports)
"sigs.k8s.io/external-dns/source/informers"

"sigs.k8s.io/external-dns/endpoint"
)
Expand Down Expand Up @@ -84,14 +86,26 @@
func EndpointTargetsFromServices(svcInformer coreinformers.ServiceInformer, namespace string, selector map[string]string) (endpoint.Targets, error) {
targets := endpoint.Targets{}

services, err := svcInformer.Lister().Services(namespace).List(labels.Everything())
var indexName string
var indexedValue string

if namespace == "" {
indexName = informers.SvcSpecSelectorIndex
indexedValue = informers.ToSHA(labels.Set(selector).String())
} else {
indexName = informers.SvcNamespaceSpecSelectorIndex
indexedValue = informers.ToSHA(namespace + "/" + labels.Set(selector).String())
}

services, err := svcInformer.Informer().GetIndexer().ByIndex(indexName, indexedValue)

if err != nil {
return nil, fmt.Errorf("failed to list labels for services in namespace %q: %w", namespace, err)
}

for _, service := range services {
if !MatchesServiceSelector(selector, service.Spec.Selector) {
for _, svc := range services {
service, ok := svc.(*corev1.Service)
if !ok {
continue
}

Expand Down
10 changes: 6 additions & 4 deletions source/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/source/informers"

"sigs.k8s.io/external-dns/endpoint"
)
Expand Down Expand Up @@ -244,15 +245,16 @@ func TestEndpointTargetsFromServices(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := fake.NewClientset()
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0,
kubeinformers.WithNamespace(tt.namespace))
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0, kubeinformers.WithNamespace(tt.namespace))
serviceInformer := informerFactory.Core().V1().Services()
err := informers.ServiceWithDefaultOptions(serviceInformer, tt.namespace)
assert.NoError(t, err)

for _, svc := range tt.services {
_, err := client.CoreV1().Services(tt.namespace).Create(context.Background(), svc, metav1.CreateOptions{})
err = serviceInformer.Informer().GetIndexer().Add(svc)
assert.NoError(t, err)

err = serviceInformer.Informer().GetIndexer().Add(svc)
_, err := client.CoreV1().Services(tt.namespace).Create(context.Background(), svc, metav1.CreateOptions{})
assert.NoError(t, err)
}

Expand Down
103 changes: 103 additions & 0 deletions source/informers/service_indexers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informers

import (
"crypto/sha1"
"encoding/hex"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
)

const (
SvcSpecSelectorIndex = "spec.selector"
SvcNamespaceSpecSelectorIndex = "namespace/spec.selector"
)

var (
// ServiceIndexers of indexers to allow fast lookups of services by their spec.selector.
// This indexer is used to find services that match a specific label selector.
// Usage:
// serviceInformer.Informer().AddIndexers(ServiceIndexers)
// serviceInformer.Lister().ByIndex(SvcSpecSelectorIndex, ToSHA(labels.Set(selector).String()))
ServiceIndexers = cache.Indexers{
SvcSpecSelectorIndex: func(obj any) ([]string, error) {
svc, ok := obj.(*corev1.Service)
if !ok {
return nil, nil
}
return []string{ToSHA(labels.Set(svc.Spec.Selector).String())}, nil
},
}
// ServiceNsSelectorIndexers for namespace/spec.selector to allow fast lookups of services by their spec.selector and namespace.
// Usage:
// serviceInformer.Informer().AddIndexers(ServiceNsSelectorIndexers)
// serviceInformer.Lister().ByIndex(SvcNamespaceSpecSelectorIndex, ToSHA(svc.Namespace + "/" + labels.Set(svc.Spec.Selector).String()))
ServiceNsSelectorIndexers = cache.Indexers{
SvcNamespaceSpecSelectorIndex: func(obj any) ([]string, error) {
svc, ok := obj.(*corev1.Service)
if !ok {
return nil, nil
}
return []string{ToSHA(svc.Namespace + "/" + labels.Set(svc.Spec.Selector).String())}, nil
},
}
)

func ServiceWithDefaultOptions(serviceInformer corev1informers.ServiceInformer, namespace string) error {
_, _ = serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
if !log.IsLevelEnabled(log.DebugLevel) {
return
}
u, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Debugf("%s added", u.GetKind())
} else {
log.WithFields(log.Fields{
"apiVersion": u.GetAPIVersion(),
"kind": u.GetKind(),
"name": u.GetName(),
"namespace": u.GetNamespace(),
}).Info("added")
}
},
},
)

if namespace == "" {
return serviceInformer.Informer().AddIndexers(ServiceIndexers)
}
return serviceInformer.Informer().AddIndexers(ServiceNsSelectorIndexers)
}

// ToSHA returns the SHA1 hash of the input string as a hex string.
// Using a SHA1 hash of the label selector string (as in ToSHA(labels.Set(selector).String())) is useful:
// - It provides a consistent and compact representation of the selector.
// - It allows for efficient indexing and lookup in Kubernetes informers.
// - It avoids issues with long label selector strings that could exceed index length limits.
func ToSHA(s string) string {
h := sha1.New()
h.Write([]byte(s))
return hex.EncodeToString(h.Sum(nil))
}
114 changes: 114 additions & 0 deletions source/informers/service_indexers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informers

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

func TestServiceIndexers_SpecSelectorIndex(t *testing.T) {
svc := &corev1.Service{}
svc.Spec.Selector = map[string]string{"app": "nginx", "env": "prod"}

indexFunc := ServiceIndexers[SvcSpecSelectorIndex]
indexKeys, err := indexFunc(svc)
expectedKey := ToSHA(labels.Set(svc.Spec.Selector).String())

assert.NoError(t, err)
assert.Len(t, indexKeys, 1)
assert.Equal(t, expectedKey, indexKeys[0])
}

func TestServiceNsSelectorIndexers_NamespaceSpecSelectorIndex(t *testing.T) {
svc := &corev1.Service{}
svc.Namespace = "test-ns"
svc.Spec.Selector = map[string]string{"app": "nginx", "env": "prod"}

indexFunc := ServiceNsSelectorIndexers[SvcNamespaceSpecSelectorIndex]
indexKeys, err := indexFunc(svc)

expectedKey := ToSHA(svc.Namespace + "/" + labels.Set(svc.Spec.Selector).String())

assert.NoError(t, err)
assert.Len(t, indexKeys, 1)
assert.Equal(t, expectedKey, indexKeys[0])
}

func TestServiceWithDefaultOptions_AddsCorrectIndexers(t *testing.T) {
tests := []struct {
name string
namespace string
want cache.Indexers
}{
{"empty namespace", "", ServiceIndexers},
{"non-empty namespace", "ns", ServiceNsSelectorIndexers},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockInf := &mockInformer{}

si := &fakeSvcInformer{
informer: mockInf,
}
err := ServiceWithDefaultOptions(si, tt.namespace)
assert.NoError(t, err)
assert.Equal(t, 1, mockInf.addedIndexersTimes, "AddIndexers should be called once")
assert.Equal(t, 1, mockInf.addedHandlerTimes, "AddEventHandler should be called once")
assert.Equal(t, tt.want, mockInf.addedIndexers)
})
}
}

// mocks
type fakeSvcInformer struct {
mock.Mock
informer *mockInformer
}

func (f *fakeSvcInformer) Informer() cache.SharedIndexInformer {
return f.informer
}

func (f *fakeSvcInformer) Lister() corev1listers.ServiceLister {
return nil
}

// mockInformer implements the minimal methods needed for testing.
type mockInformer struct {
cache.SharedIndexInformer
addedIndexers cache.Indexers
addedIndexersTimes int
addedHandlerTimes int
}

func (m *mockInformer) AddEventHandler(_ cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
m.addedHandlerTimes++
return nil, nil

Check failure on line 107 in source/informers/service_indexers_test.go

View workflow job for this annotation

GitHub Actions / Markdown, Go and OAS

return both a `nil` error and an invalid value: use a sentinel error instead (nilnil)
}

func (m *mockInformer) AddIndexers(indexers cache.Indexers) error {
m.addedIndexers = indexers
m.addedIndexersTimes++
return nil
}
25 changes: 11 additions & 14 deletions source/istio_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,17 @@ func NewIstioGatewaySource(
// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()

istioInformerFactory := istioinformers.NewSharedInformerFactory(istioClient, 0)
gatewayInformer := istioInformerFactory.Networking().V1alpha3().Gateways()

// Add default resource event handlers to properly initialize informer.
serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("service added")
},
},
)
serviceInformer := informerFactory.Core().V1().Services()
err = informers.ServiceWithDefaultOptions(serviceInformer, namespace)
if err != nil {
return nil, err
}

gatewayInformer.Informer().AddEventHandler(
_, _ = gatewayInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("gateway added")
Expand All @@ -104,10 +101,10 @@ func NewIstioGatewaySource(
istioInformerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
return nil, err
}
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil {
return nil, err
}

Expand Down Expand Up @@ -195,10 +192,10 @@ func (sc *gatewaySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, e
}

// AddEventHandler adds an event handler that should be triggered if the watched Istio Gateway changes.
func (sc *gatewaySource) AddEventHandler(ctx context.Context, handler func()) {
func (sc *gatewaySource) AddEventHandler(_ context.Context, handler func()) {
log.Debug("Adding event handler for Istio Gateway")

sc.gatewayInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
_, _ = sc.gatewayInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}

// filterByAnnotations filters a list of configs by a given annotation selector.
Expand Down
Loading
Loading