Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,18 @@ type serviceSource struct {
}

// NewServiceSource creates a new serviceSource with the given config.
func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal, publishHostIP, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector, resolveLoadBalancerHostname, listenEndpointEvents bool, exposeInternalIPv6 bool) (Source, error) {
func NewServiceSource(
ctx context.Context,
kubeClient kubernetes.Interface,
namespace, annotationFilter, fqdnTemplate string,
combineFqdnAnnotation bool, compatibility string,
publishInternal, publishHostIP, alwaysPublishNotReadyAddresses bool,
serviceTypeFilter []string,
ignoreHostnameAnnotation bool,
labelSelector labels.Selector,
resolveLoadBalancerHostname,
listenEndpointEvents, exposeInternalIPv6 bool,
) (Source, error) {
tmpl, err := fqdn.ParseTemplate(fqdnTemplate)
if err != nil {
return nil, err
Expand Down Expand Up @@ -139,7 +150,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
// Transformer is used to reduce the memory usage of the informer.
// The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster.
// If watchList is not used it will not prevent memory bursts on the initial informer sync.
podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) {
_ = podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) {
pod, ok := i.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("object is not a pod")
Expand Down Expand Up @@ -349,6 +360,7 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
targetsByHeadlessDomainAndType := sc.processHeadlessEndpointsFromSlices(
svc, pods, endpointSlices, hostname, endpointsType, publishPodIPs, publishNotReadyAddresses)
endpoints = buildHeadlessEndpoints(svc, targetsByHeadlessDomainAndType, ttl)

return endpoints
}

Expand Down Expand Up @@ -436,7 +448,11 @@ func findPodForEndpoint(ep discoveryv1.Endpoint, pods []*v1.Pod) *v1.Pod {
}

// Helper to get targets for domain
func (sc *serviceSource) getTargetsForDomain(pod *v1.Pod, ep discoveryv1.Endpoint, endpointSlice *discoveryv1.EndpointSlice, endpointsType string, headlessDomain string) endpoint.Targets {
func (sc *serviceSource) getTargetsForDomain(
pod *v1.Pod,
ep discoveryv1.Endpoint,
endpointSlice *discoveryv1.EndpointSlice,
endpointsType, headlessDomain string) endpoint.Targets {
targets := annotations.TargetsFromTargetAnnotation(pod.Annotations)
if len(targets) == 0 {
if endpointsType == EndpointsTypeNodeExternalIP {
Expand Down
152 changes: 144 additions & 8 deletions source/service_fqdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import (
func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {

for _, tt := range []struct {
title string
services []*v1.Service
endpointSlices []*discoveryv1.EndpointSlice
fqdnTemplate string
combineFQDN bool
publishHostIp bool
expected []*endpoint.Endpoint
title string
services []*v1.Service
endpointSlices []*discoveryv1.EndpointSlice
fqdnTemplate string
combineFQDN bool
publishHostIp bool
serviceTypesFilter []string
expected []*endpoint.Endpoint
}{
{
title: "templating with multiple services",
Expand Down Expand Up @@ -182,6 +183,129 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
{DNSName: "www.service-two.website.example.tld", RecordType: endpoint.RecordTypeCNAME, Targets: endpoint.Targets{"www.bucket-name.amazonaws.com"}},
},
},
{
title: "fqdn with endpoint-type annotation and loose service type filtering",
serviceTypesFilter: []string{},
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "svc-ns",
Name: "svc-one",
Annotations: map[string]string{
annotations.EndpointsTypeKey: EndpointsTypeNodeExternalIP,
},
},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: v1.ClusterIPNone,
ClusterIPs: []string{v1.ClusterIPNone},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{},
},
},
},
endpointSlices: []*discoveryv1.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc-one-xxxxx",
Namespace: "svc-ns",
Labels: map[string]string{
discoveryv1.LabelServiceName: "svc-one",
v1.IsHeadlessService: "",
},
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []string{"100.66.2.246"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
NodeName: testutils.ToPtr("test-node"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-1",
Namespace: "svc-ns",
},
},
{
Addresses: []string{"100.66.2.247"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
NodeName: testutils.ToPtr("test-node"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-2",
Namespace: "svc-ns",
},
},
},
},
},
fqdnTemplate: "{{.Name}}.{{.Namespace}}.cluster.com",
expected: []*endpoint.Endpoint{
{DNSName: "ip-10-1-164-158.internal.svc-one.svc-ns.cluster.com", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"203.0.113.10"}},
{DNSName: "svc-one.svc-ns.cluster.com", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"203.0.113.10"}},
},
},
{
title: "fqdn with endpoint-type annotation and service type filtering does not include required type",
serviceTypesFilter: []string{string(v1.ServiceTypeClusterIP)},
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "svc-ns",
Name: "svc-one",
Annotations: map[string]string{
annotations.EndpointsTypeKey: EndpointsTypeNodeExternalIP,
},
},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: v1.ClusterIPNone,
ClusterIPs: []string{v1.ClusterIPNone},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{},
},
},
},
endpointSlices: []*discoveryv1.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc-one-xxxxx",
Namespace: "svc-ns",
Labels: map[string]string{
discoveryv1.LabelServiceName: "svc-one",
v1.IsHeadlessService: "",
},
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []string{"100.66.2.246"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
NodeName: testutils.ToPtr("test-node"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-1",
Namespace: "svc-ns",
},
},
{
Addresses: []string{"100.66.2.247"},
Hostname: testutils.ToPtr("ip-10-1-164-158.internal"),
NodeName: testutils.ToPtr("test-node"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "pod-2",
Namespace: "svc-ns",
},
},
},
},
},
fqdnTemplate: "{{.Name}}.{{.Namespace}}.cluster.com",
expected: []*endpoint.Endpoint{},
},
{
title: "templating resolve service with zone PreferSameTrafficDistribution and topology.kubernetes.io/zone annotation",
services: []*v1.Service{
Expand Down Expand Up @@ -571,6 +695,17 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
require.NoError(t, err)
}

_, err := kubeClient.CoreV1().Nodes().Create(t.Context(), &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "test-node"},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "203.0.113.10"},
{Type: v1.NodeInternalIP, Address: "10.0.0.10"},
},
},
}, metav1.CreateOptions{})
require.NoError(t, err)

// Create endpoints and pods for the services
for _, el := range tt.endpointSlices {
_, err := kubeClient.DiscoveryV1().EndpointSlices(el.Namespace).Create(t.Context(), el, metav1.CreateOptions{})
Expand All @@ -583,6 +718,7 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
},
Spec: v1.PodSpec{
Hostname: *ep.Hostname,
NodeName: "test-node",
},
Status: v1.PodStatus{
HostIP: fmt.Sprintf("10.1.20.4%d", i),
Expand All @@ -603,7 +739,7 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) {
true,
tt.publishHostIp,
true,
[]string{},
tt.serviceTypesFilter,
false,
labels.Everything(),
false,
Expand Down
102 changes: 99 additions & 3 deletions source/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3171,6 +3171,102 @@ func TestHeadlessServices(t *testing.T) {
},
false,
},
{
"headless service with endpoints-type annotation is outside of serviceTypeFilter scope",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
false,
false,
map[string]string{"component": "foo"},
map[string]string{
annotations.HostnameKey: "service.example.org",
annotations.EndpointsTypeKey: EndpointsTypeNodeExternalIP,
},
map[string]string{},
v1.ClusterIPNone,
[]string{"2001:db8::1"},
[]string{"2001:db8::4"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo"},
[]string{"", "", ""},
[]bool{true, true, true},
false,
[]v1.Node{
{
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Type: v1.NodeExternalIP,
Address: "1.2.3.4",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.10.12",
},
},
},
},
},
[]string{string(v1.ServiceTypeClusterIP)},
[]*endpoint.Endpoint{},
false,
},
{
"headless service with endpoints-type annotation is in the scope of serviceTypeFilter",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
false,
false,
map[string]string{"component": "foo"},
map[string]string{
annotations.HostnameKey: "service.example.org",
annotations.EndpointsTypeKey: EndpointsTypeNodeExternalIP,
},
map[string]string{},
v1.ClusterIPNone,
[]string{"2001:db8::1"},
[]string{"1.2.3.4"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo"},
[]string{"", "", ""},
[]bool{true, true, true},
false,
[]v1.Node{
{
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Type: v1.NodeExternalIP,
Address: "1.2.3.4",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.10.12",
},
},
},
},
},
[]string{string(v1.ServiceTypeClusterIP), string(v1.ServiceTypeNodePort)},
[]*endpoint.Endpoint{
{DNSName: "service.example.org", RecordType: endpoint.RecordTypeA, Targets: endpoint.Targets{"1.2.3.4"}},
},
false,
},
} {

t.Run(tc.title, func(t *testing.T) {
Expand Down Expand Up @@ -3198,15 +3294,15 @@ func TestHeadlessServices(t *testing.T) {
require.NoError(t, err)

var endpointSliceEndpoints []discoveryv1.Endpoint
for i, podname := range tc.podnames {
for i, podName := range tc.podnames {
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{},
Hostname: tc.hostnames[i],
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: podname,
Name: podName,
Labels: tc.labels,
Annotations: tc.podAnnotations,
},
Expand All @@ -3224,7 +3320,7 @@ func TestHeadlessServices(t *testing.T) {
TargetRef: &v1.ObjectReference{
APIVersion: "",
Kind: "Pod",
Name: podname,
Name: podName,
},
Conditions: discoveryv1.EndpointConditions{
Ready: &tc.podsReady[i],
Expand Down
Loading