Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion charts/external-dns/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ rules:
{{- end }}
{{- if or (has "service" .Values.sources) (has "contour-httpproxy" .Values.sources) (has "gloo-proxy" .Values.sources) (has "istio-gateway" .Values.sources) (has "istio-virtualservice" .Values.sources) (has "openshift-route" .Values.sources) (has "skipper-routegroup" .Values.sources) }}
- apiGroups: [""]
resources: ["services","endpoints"]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get","watch","list"]
{{- end }}
{{- if or (has "ingress" .Values.sources) (has "istio-gateway" .Values.sources) (has "istio-virtualservice" .Values.sources) (has "contour-httpproxy" .Values.sources) (has "openshift-route" .Values.sources) (has "skipper-routegroup" .Values.sources) }}
Expand Down
5 changes: 4 additions & 1 deletion charts/external-dns/tests/rbac_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ tests:
resources: ["pods"]
verbs: ["get", "watch", "list"]
- apiGroups: [""]
resources: ["services","endpoints"]
resources: ["services"]
verbs: ["get","watch","list"]
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get","watch","list"]
- apiGroups: ["extensions","networking.k8s.io"]
resources: ["ingresses"]
Expand Down
2 changes: 1 addition & 1 deletion docs/flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
| `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) |
| `--request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout |
| `--[no-]resolve-service-load-balancer-hostname` | Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs |
| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to Endpoints, for Service source (default: false) |
| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to EndpointSlices, for Service source (default: false) |
| `--cf-api-endpoint=""` | The fully-qualified domain name of the cloud foundry instance you are targeting |
| `--cf-username=""` | The username to log into the cloud foundry API |
| `--cf-password=""` | The password to log into the cloud foundry API |
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/externaldns/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func App(cfg *Config) *kingpin.Application {
app.Flag("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)").Default(defaultConfig.KubeConfig).StringVar(&cfg.KubeConfig)
app.Flag("request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout").Default(defaultConfig.RequestTimeout.String()).DurationVar(&cfg.RequestTimeout)
app.Flag("resolve-service-load-balancer-hostname", "Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs").BoolVar(&cfg.ResolveServiceLoadBalancerHostname)
app.Flag("listen-endpoint-events", "Trigger a reconcile on changes to Endpoints, for Service source (default: false)").BoolVar(&cfg.ListenEndpointEvents)
app.Flag("listen-endpoint-events", "Trigger a reconcile on changes to EndpointSlices, for Service source (default: false)").BoolVar(&cfg.ListenEndpointEvents)

// Flags related to cloud foundry
app.Flag("cf-api-endpoint", "The fully-qualified domain name of the cloud foundry instance you are targeting").Default(defaultConfig.CFAPIEndpoint).StringVar(&cfg.CFAPIEndpoint)
Expand Down
100 changes: 79 additions & 21 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import (

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

Expand All @@ -50,6 +53,7 @@ var (
v1.ServiceTypeLoadBalancer: {}, // Exposes the service externally using a cloud provider's load balancer.
v1.ServiceTypeExternalName: {}, // Maps the service to an external DNS name.
}
serviceNameIndexKey = "serviceName"
)

// serviceSource is an implementation of Source for Kubernetes service objects.
Expand All @@ -72,7 +76,7 @@ type serviceSource struct {
resolveLoadBalancerHostname bool
listenEndpointEvents bool
serviceInformer coreinformers.ServiceInformer
endpointsInformer coreinformers.EndpointsInformer
endpointSlicesInformer discoveryinformers.EndpointSliceInformer
podInformer coreinformers.PodInformer
nodeInformer coreinformers.NodeInformer
serviceTypeFilter *serviceTypes
Expand All @@ -93,7 +97,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
// Set the resync period to 0 to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
endpointsInformer := informerFactory.Core().V1().Endpoints()
endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices()
podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()

Expand All @@ -104,7 +108,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
},
},
)
endpointsInformer.Informer().AddEventHandler(
endpointSlicesInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
Expand All @@ -123,6 +127,26 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
},
)

// Add an indexer to the EndpointSlice informer to index by the service name label
err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{
serviceNameIndexKey: func(obj any) ([]string, error) {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
// This should never happen because the Informer should only contain EndpointSlice objects
return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj)
}
serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName]
if serviceName == "" {
return nil, nil
}
key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String()
return []string{key}, nil
},
})
if err != nil {
return nil, err
}

informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
Expand All @@ -148,7 +172,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
publishHostIP: publishHostIP,
alwaysPublishNotReadyAddresses: alwaysPublishNotReadyAddresses,
serviceInformer: serviceInformer,
endpointsInformer: endpointsInformer,
endpointSlicesInformer: endpointSlicesInformer,
podInformer: podInformer,
nodeInformer: nodeInformer,
serviceTypeFilter: sTypesFilter,
Expand Down Expand Up @@ -278,42 +302,63 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
return nil
}

endpointsObject, err := sc.endpointsInformer.Lister().Endpoints(svc.Namespace).Get(svc.GetName())
serviceKey := cache.ObjectName{Namespace: svc.Namespace, Name: svc.Name}.String()
rawEndpointSlices, err := sc.endpointSlicesInformer.Informer().GetIndexer().ByIndex(serviceNameIndexKey, serviceKey)
if err != nil {
log.Errorf("Get endpoints of service[%s] error:%v", svc.GetName(), err)
return endpoints
// Should never happen as long as the index exists
log.Errorf("Get EndpointSlices of service[%s] error:%v", svc.GetName(), err)
return nil
}

endpointSlices := make([]*discoveryv1.EndpointSlice, 0, len(rawEndpointSlices))
for _, obj := range rawEndpointSlices {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
// Should never happen as the indexer can only contain EndpointSlice objects
log.Errorf("Expected %T but got %T instead, skipping", endpointSlice, obj)
continue
}
endpointSlices = append(endpointSlices, endpointSlice)
}

pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
log.Errorf("List pods of service[%s] error: %v", svc.GetName(), err)
log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
return endpoints
}

endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations)
publishPodIPs := endpointsType != EndpointsTypeNodeExternalIP && endpointsType != EndpointsTypeHostIP && !sc.publishHostIP
publishNotReadyAddresses := svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses

targetsByHeadlessDomainAndType := make(map[endpoint.EndpointKey]endpoint.Targets)
for _, subset := range endpointsObject.Subsets {
addresses := subset.Addresses
if svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses {
addresses = append(addresses, subset.NotReadyAddresses...)
}
for _, endpointSlice := range endpointSlices {
for _, ep := range endpointSlice.Endpoints {
if !conditionToBool(ep.Conditions.Ready) && !publishNotReadyAddresses {
continue
}

if publishPodIPs &&
endpointSlice.AddressType != discoveryv1.AddressTypeIPv4 &&
endpointSlice.AddressType != discoveryv1.AddressTypeIPv6 {
log.Debugf("Skipping EndpointSlice %s/%s because its address type is unsupported: %s", endpointSlice.Namespace, endpointSlice.Name, endpointSlice.AddressType)
continue
}

for _, address := range addresses {
// find pod for this address
if address.TargetRef == nil || address.TargetRef.APIVersion != "" || address.TargetRef.Kind != "Pod" {
log.Debugf("Skipping address because its target is not a pod: %v", address)
if ep.TargetRef == nil || ep.TargetRef.APIVersion != "" || ep.TargetRef.Kind != "Pod" {
log.Debugf("Skipping address because its target is not a pod: %v", ep)
continue
}
var pod *v1.Pod
for _, v := range pods {
if v.Name == address.TargetRef.Name {
if v.Name == ep.TargetRef.Name {
pod = v
break
}
}
if pod == nil {
log.Errorf("Pod %s not found for address %v", address.TargetRef.Name, address)
log.Errorf("Pod %s not found for address %v", ep.TargetRef.Name, ep)
continue
}

Expand Down Expand Up @@ -341,8 +386,13 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
targets = endpoint.Targets{pod.Status.HostIP}
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, pod.Status.HostIP)
} else {
targets = endpoint.Targets{address.IP}
log.Debugf("Generating matching endpoint %s with EndpointAddress IP %s", headlessDomain, address.IP)
if len(ep.Addresses) == 0 {
log.Warnf("EndpointSlice %s/%s has no addresses for endpoint %v", endpointSlice.Namespace, endpointSlice.Name, ep)
continue
}
address := ep.Addresses[0] // Only use the first address, as additional addresses have no semantic defined
targets = endpoint.Targets{address}
log.Debugf("Generating matching endpoint %s with EndpointSliceAddress IP %s", headlessDomain, address)
}
}
for _, target := range targets {
Expand Down Expand Up @@ -758,7 +808,7 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) {
// https://github.com/kubernetes/kubernetes/issues/79610
sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
if sc.listenEndpointEvents {
sc.endpointsInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
}

Expand Down Expand Up @@ -789,3 +839,11 @@ func newServiceTypesFilter(filter []string) (*serviceTypes, error) {
types: types,
}, nil
}

// conditionToBool converts an EndpointConditions condition to a bool value.
func conditionToBool(v *bool) bool {
if v == nil {
return true // nil should be interpreted as "true" as per EndpointConditions spec
}
return *v
}
Loading
Loading