Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions source/endpoint_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/source/informers"

v1alpha3 "istio.io/api/networking/v1alpha3"
istiov1a "istio.io/client-go/pkg/apis/networking/v1"
Expand Down Expand Up @@ -92,14 +93,9 @@ func svcInformerWithServices(toLookup, underTest int) (coreinformers.ServiceInfo
svcInformer := informerFactory.Core().V1().Services()
ctx := context.Background()

_, err := svcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
err := svcInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
if err != nil {
return nil, fmt.Errorf("failed to add event handler: %w", err)
return nil, fmt.Errorf("failed to add indexer: %w", err)
}

services := fixturesSvcWithLabels(toLookup, underTest)
Expand Down
9 changes: 6 additions & 3 deletions source/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package source
import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
coreinformers "k8s.io/client-go/informers/core/v1"

"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/source/informers"
)

// EndpointsForHostname returns the endpoint objects for each host-target combination.
Expand Down Expand Up @@ -84,14 +86,15 @@ func EndpointsForHostname(hostname string, targets endpoint.Targets, ttl endpoin
func EndpointTargetsFromServices(svcInformer coreinformers.ServiceInformer, namespace string, selector map[string]string) (endpoint.Targets, error) {
targets := endpoint.Targets{}

services, err := svcInformer.Lister().Services(namespace).List(labels.Everything())
services, err := svcInformer.Informer().GetIndexer().ByIndex(informers.IndexWithSpecSelector, informers.ToSHA(labels.Set(selector).String()))

if err != nil {
return nil, fmt.Errorf("failed to list labels for services in namespace %q: %w", namespace, err)
}

for _, service := range services {
if !MatchesServiceSelector(selector, service.Spec.Selector) {
for _, svc := range services {
service, ok := svc.(*corev1.Service)
if !ok {
continue
}

Expand Down
3 changes: 3 additions & 0 deletions source/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/external-dns/source/informers"

"sigs.k8s.io/external-dns/endpoint"
)
Expand Down Expand Up @@ -247,6 +248,8 @@ func TestEndpointTargetsFromServices(t *testing.T) {
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(client, 0,
kubeinformers.WithNamespace(tt.namespace))
serviceInformer := informerFactory.Core().V1().Services()
err := serviceInformer.Informer().AddIndexers(informers.IndexerSpecSelector[*corev1.Service]())
assert.NoError(t, err)

for _, svc := range tt.services {
_, err := client.CoreV1().Services(tt.namespace).Create(context.Background(), svc, metav1.CreateOptions{})
Expand Down
49 changes: 49 additions & 0 deletions source/informers/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package informers

import (
"github.com/stretchr/testify/mock"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
corev1lister "k8s.io/client-go/listers/core/v1"
discoveryv1lister "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -58,3 +61,49 @@ func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer {
func (f *FakeNodeInformer) Lister() corev1lister.NodeLister {
return corev1lister.NewNodeLister(f.Informer().GetIndexer())
}

func fakeService() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-service",
Namespace: "ns",
Labels: map[string]string{"env": "prod", "team": "devops"},
Annotations: map[string]string{"description": "Enriched service object"},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{"app": "demo"},
ExternalIPs: []string{"1.2.3.4"},
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.FromInt32(8080),
Protocol: corev1.ProtocolTCP,
},
{
Name: "https",
Port: 443,
TargetPort: intstr.FromInt32(8443),
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceTypeLoadBalancer,
},
Status: corev1.ServiceStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{
{IP: "5.6.7.8", Hostname: "lb.example.com"},
},
},
Conditions: []metav1.Condition{
{
Type: "Available",
Status: metav1.ConditionTrue,
Reason: "MinimumReplicasAvailable",
Message: "Service is available",
LastTransitionTime: metav1.Now(),
},
},
},
}
}
21 changes: 20 additions & 1 deletion source/informers/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package informers
import (
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -25,7 +26,8 @@ import (
)

const (
IndexWithSelectors = "withSelectors"
IndexWithSelectors = "withSelectors"
IndexWithSpecSelector = "spec.selector"
)

type IndexSelectorOptions struct {
Expand Down Expand Up @@ -95,6 +97,23 @@ func IndexerWithOptions[T metav1.Object](optFns ...func(options *IndexSelectorOp
}
}

// IndexerSpecSelector returns a cache.Indexers map that indexes Kubernetes Service objects
// by a hash of their .spec.selector field. This enables efficient lookups of Services
// sharing the same selector. The function is generic over metav1.Object, but only operates
// on *corev1.Service objects at the moment. If the object is not a Service, it does not index.
func IndexerSpecSelector[T metav1.Object]() cache.Indexers {
return cache.Indexers{
IndexWithSpecSelector: func(obj interface{}) ([]string, error) {
entity, ok := obj.(*corev1.Service)
if !ok {
return nil, nil
}
key := ToSHA(labels.Set(entity.Spec.Selector).String())
return []string{key}, nil
},
}
}

// GetByKey retrieves an object of type T (metav1.Object) from the given cache.Indexer by its key.
// It returns the object and an error if the retrieval or type assertion fails.
// If the object does not exist, it returns the zero value of T and nil.
Expand Down
31 changes: 31 additions & 0 deletions source/informers/indexers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,34 @@ func TestGetByKey_TypeAssertionFailure(t *testing.T) {
assert.Contains(t, err.Error(), "object is not of type")
assert.Nil(t, result)
}

func TestIndexerSpecSelector_Service(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
svc := &corev1.Service{}
svc.Spec.Selector = map[string]string{"app": "demo", "tier": "backend"}

keys, err := indexers[IndexWithSpecSelector](svc)
assert.NoError(t, err)
expected := ToSHA(labels.Set(svc.Spec.Selector).String())
assert.Equal(t, []string{expected}, keys)
}

func TestIndexerSpecSelector_NonService(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
obj := "not-a-service"

keys, err := indexers[IndexWithSpecSelector](obj)
assert.NoError(t, err)
assert.Nil(t, keys)
}

func TestIndexerSpecSelector_EmptySelector(t *testing.T) {
indexers := IndexerSpecSelector[*corev1.Service]()
svc := &corev1.Service{}
svc.Spec.Selector = map[string]string{}

keys, err := indexers[IndexWithSpecSelector](svc)
assert.NoError(t, err)
expected := ToSHA(labels.Set(svc.Spec.Selector).String())
assert.Equal(t, []string{expected}, keys)
}
80 changes: 80 additions & 0 deletions source/informers/transformers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informers

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)

type TransformOptions struct {
specSelector bool
specExternalIps bool
statusLb bool
}

func TransformerWithOptions[T metav1.Object](optFns ...func(options *TransformOptions)) cache.TransformFunc {
options := TransformOptions{}
for _, fn := range optFns {
fn(&options)
}
return func(obj any) (any, error) {
// only transform if the object is a Service at the moment
entity, ok := obj.(*corev1.Service)
if !ok {
return nil, nil
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: entity.Name,
Namespace: entity.Namespace,
DeletionTimestamp: entity.DeletionTimestamp,
},
Spec: corev1.ServiceSpec{},
Status: corev1.ServiceStatus{},
}
if options.specSelector {
svc.Spec.Selector = entity.Spec.Selector
}
if options.specExternalIps {
svc.Spec.ExternalIPs = entity.Spec.ExternalIPs
}
if options.statusLb {
svc.Status.LoadBalancer = entity.Status.LoadBalancer
}
return svc, nil
}
}

// TransformWithSpecSelector enables copying the Service's .spec.selector field.
func TransformWithSpecSelector() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.specSelector = true
}
}

// TransformWithSpecExternalIPs enables copying the Service's .spec.externalIPs field.
func TransformWithSpecExternalIPs() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.specExternalIps = true
}
}

// TransformWithStatusLoadBalancer enables copying the Service's .status.loadBalancer field.
func TransformWithStatusLoadBalancer() func(options *TransformOptions) {
return func(options *TransformOptions) {
options.statusLb = true
}
}
Loading
Loading