From 46ce09a9e9ad75d388fa9ff678afb814e6c46e89 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Wed, 27 Aug 2025 07:54:20 +0000 Subject: [PATCH 1/2] Clear stale conntrack UDP entries for nodePorts When an EndpointSlice for a UDP NodePort or loadbalancer type of service is updated, stale conntrack entries for removed endpoints must be flushed. The existing logic failed to do this correctly if the backend pod was on a different node. This patch fixes the issue by flushing conntrack entries by filtering the nodePort when the node is not hosting the backend pod. In case that the backend pod was on the same node as the service, this issue won't happen. Since all old pod entries are removed from the node by the function deletePodConntrack when the pod is deleted. Signed-off-by: Peng Liu (cherry picked from commit b426934bc27b426d861c3ee0723b7f64fb35a031) Signed-off-by: Venkata Charan Sunku --- .../node/default_node_network_controller.go | 13 +- .../default_node_network_controller_test.go | 269 +++++++++--------- .../pkg/node/gateway_localnet_linux_test.go | 40 +-- 3 files changed, 175 insertions(+), 147 deletions(-) diff --git a/go-controller/pkg/node/default_node_network_controller.go b/go-controller/pkg/node/default_node_network_controller.go index 66722d9694..d8cde1fab2 100644 --- a/go-controller/pkg/node/default_node_network_controller.go +++ b/go-controller/pkg/node/default_node_network_controller.go @@ -1474,13 +1474,24 @@ func (nc *DefaultNodeNetworkController) reconcileConntrackUponEndpointSliceEvent klog.Errorf("Failed to get service port for endpoint %s: %v", oldIPStr, err) continue } - // upon update and delete events, flush conntrack only for UDP + // upon update and delete events, flush UDP conntrack for Service port klog.V(5).Infof("Deleting conntrack entry for endpoint %s, port %d, protocol %s", oldIPStr, servicePort.Port, *oldPort.Protocol) if err := util.DeleteConntrackServicePort(oldIPStr, servicePort.Port, *oldPort.Protocol, netlink.ConntrackReplyAnyIP, nil); err != nil { klog.Errorf("Failed to delete conntrack entry for %s port %d: %v", oldIPStr, servicePort.Port, err) errors = append(errors, err) } + + // Flush UDP conntrack entries for NodePort (and LoadBalancer services that allocate NodePorts) + // TODO: Once vishvananda/netlink support ConntrackFilterType '--reply-port-src', we can use one DeleteConntrackServicePort() call + // conntrack entries for both ClusterIP and NodePort. + if util.ServiceTypeHasNodePort(svc) && servicePort.NodePort > 0 { + if err := util.DeleteConntrackServicePort(oldIPStr, servicePort.NodePort, *oldPort.Protocol, + netlink.ConntrackReplyAnyIP, nil); err != nil { + klog.Errorf("Failed to delete conntrack entry for %s NodePort %d: %v", oldIPStr, servicePort.NodePort, err) + errors = append(errors, err) + } + } } } } diff --git a/go-controller/pkg/node/default_node_network_controller_test.go b/go-controller/pkg/node/default_node_network_controller_test.go index 6ecd5bdeb6..eda939a8a9 100644 --- a/go-controller/pkg/node/default_node_network_controller_test.go +++ b/go-controller/pkg/node/default_node_network_controller_test.go @@ -1769,7 +1769,6 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } ip string port uint16 protocol uint8 - family netlink.InetFamily } // Test data structure for table-driven tests @@ -1782,12 +1781,21 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } expectedFilters []expectedConntrackFilter } - // Helper to create EndpointSlice - makeEndpointSlice := func(portConfigs []struct { + type endpointPortConfig struct { name *string port int32 protocol corev1.Protocol - }, addresses []string) *discovery.EndpointSlice { + } + + type servicePortConfig struct { + name string + port int32 + targetPort int32 + protocol corev1.Protocol + } + + // Helper to create EndpointSlice + makeEndpointSlice := func(portConfigs []endpointPortConfig, addresses []string) *discovery.EndpointSlice { ports := make([]discovery.EndpointPort, len(portConfigs)) for i, pc := range portConfigs { p := pc.port @@ -1815,12 +1823,7 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } } // Helper to create Service - makeService := func(portConfigs []struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }) *corev1.Service { + makeService := func(portConfigs []servicePortConfig) *corev1.Service { ports := make([]corev1.ServicePort, len(portConfigs)) for i, pc := range portConfigs { ports[i] = corev1.ServicePort{ @@ -1842,6 +1845,16 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } } } + // Helper to create NodePort or LoadBalancer Service by invoking makeService + makeServiceWithNodePort := func(portConfigs []servicePortConfig, nodePorts []int32, svcType corev1.ServiceType) *corev1.Service { + svc := makeService(portConfigs) + svc.Spec.Type = svcType + for i := 0; i < len(nodePorts) && i < len(svc.Spec.Ports); i++ { + svc.Spec.Ports[i].NodePort = nodePorts[i] + } + return svc + } + // Helper function to build expected ConntrackFilter for verification buildExpectedFilter := func(ef expectedConntrackFilter) *netlink.ConntrackFilter { filter := &netlink.ConntrackFilter{} @@ -1942,13 +1955,8 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("old endpointslice is nil", reconcileConntrackTestCase{ - desc: "should not delete any conntrack entries when old endpoint is nil", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should not delete any conntrack entries when old endpoint is nil", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: nil, newEndpointSlice: &discovery.EndpointSlice{}, expectedConntrackCalls: 0, @@ -1957,69 +1965,42 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("service exists with matching unnamed port", reconcileConntrackTestCase{ - desc: "should delete conntrack with service port for unnamed port", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack with service port for unnamed port", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("service exists with matching named port", reconcileConntrackTestCase{ - desc: "should delete conntrack with service port for named port", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack with service port for named port", + service: makeService([]servicePortConfig{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("service exists but port name mismatch", reconcileConntrackTestCase{ - desc: "should skip conntrack deletion when port name doesn't match", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should skip conntrack deletion when port name doesn't match", + service: makeService([]servicePortConfig{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: strPtr("grpc"), port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: strPtr("grpc"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2032,11 +2013,7 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } desc: "should return early without deleting conntrack when service not found", service: nil, oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2046,19 +2023,10 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("TCP protocol should be skipped", reconcileConntrackTestCase{ - desc: "should skip conntrack deletion for TCP protocol", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: tcpProtocol}}), + desc: "should skip conntrack deletion for TCP protocol", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: tcpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: tcpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: tcpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2068,78 +2036,51 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("multiple endpoints", reconcileConntrackTestCase{ - desc: "should delete conntrack for each endpoint", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for each endpoint", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, ), newEndpointSlice: nil, expectedConntrackCalls: 3, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.2", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.3", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.2", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.3", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("IPv6 endpoint", reconcileConntrackTestCase{ - desc: "should delete conntrack for IPv6 endpoint", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for IPv6 endpoint", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"fd00::1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V6}, + {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("dual-stack endpoints", reconcileConntrackTestCase{ - desc: "should delete conntrack for both IPv4 and IPv6", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for both IPv4 and IPv6", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1", "fd00::1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 2, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V6}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), @@ -2147,21 +2088,12 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("multiple service ports with matching names", reconcileConntrackTestCase{ desc: "should match correct service port by name for multiple ports", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{ + service: makeService([]servicePortConfig{ {name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, {name: "https", port: testServicePort2, targetPort: testEndpointPort2, protocol: udpProtocol}, }), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{ + []endpointPortConfig{ {name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}, {name: strPtr("https"), port: testEndpointPort2, protocol: udpProtocol}, }, @@ -2170,11 +2102,94 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } newEndpointSlice: nil, expectedConntrackCalls: 2, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.1", port: uint16(testServicePort2), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.1", port: uint16(testServicePort2), protocol: syscall.IPPROTO_UDP}, }, }, ), + Entry("NodePort service", reconcileConntrackTestCase{ + desc: "should delete conntrack entries for both service port and NodePort", + service: makeServiceWithNodePort([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}, + []int32{30000}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("NodePort service with mixed protocols should only clean UDP NodePort", reconcileConntrackTestCase{ + desc: "should only delete conntrack for UDP NodePort, not TCP (protocol filtering)", + service: makeServiceWithNodePort([]servicePortConfig{ + {name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, + {name: "", port: testServicePort2, targetPort: testEndpointPort1, protocol: tcpProtocol}, + }, []int32{30000, 30001}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, // Only UDP: service port + NodePort (TCP port 30001 should be skipped) + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("NodePort service with multiple UDP ports", reconcileConntrackTestCase{ + desc: "should delete conntrack entries only for the specific NodePort that changed", + service: makeServiceWithNodePort([]servicePortConfig{ + {name: "dns", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, + {name: "snmp", port: testServicePort2, targetPort: testEndpointPort1, protocol: udpProtocol}, + }, []int32{30000, 30002}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: strPtr("dns"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: strPtr("dns"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, // service port + NodePort for "dns" only + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("LoadBalancer service with NodePort allocation", reconcileConntrackTestCase{ + desc: "should delete conntrack entries for both service port and NodePort", + service: func() *corev1.Service { + svc := makeServiceWithNodePort([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}, + []int32{30000}, corev1.ServiceTypeLoadBalancer) + svc.Status = corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{{IP: "5.5.5.5"}}, + }, + } + return svc + }(), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("LoadBalancer service with AllocateLoadBalancerNodePorts=false", func() reconcileConntrackTestCase { + allocateNodePorts := false + return reconcileConntrackTestCase{ + desc: "should only delete conntrack entries for service port (no NodePort)", + service: func() *corev1.Service { + svc := makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}) + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Spec.AllocateLoadBalancerNodePorts = &allocateNodePorts + svc.Status = corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{{IP: "5.5.5.5"}}, + }, + } + return svc + }(), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 1, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + }, + } + }()), ) }) }) diff --git a/go-controller/pkg/node/gateway_localnet_linux_test.go b/go-controller/pkg/node/gateway_localnet_linux_test.go index 71d112b491..d328d7dffb 100644 --- a/go-controller/pkg/node/gateway_localnet_linux_test.go +++ b/go-controller/pkg/node/gateway_localnet_linux_test.go @@ -202,7 +202,7 @@ func newEndpointSlice(svcName, namespace string, endpoints []discovery.Endpoint, } } -func makeConntrackFilter(ip string, port int, protocol corev1.Protocol) *netlink.ConntrackFilter { +func makeConntrackFilter(ip string, port int, protocol corev1.Protocol, filterType netlink.ConntrackFilterType) *netlink.ConntrackFilter { filter := &netlink.ConntrackFilter{} var err error @@ -221,15 +221,17 @@ func makeConntrackFilter(ip string, port int, protocol corev1.Protocol) *netlink } ipAddress := net.ParseIP(ip) Expect(ipAddress).NotTo(BeNil()) - err = filter.AddIP(netlink.ConntrackOrigDstIP, ipAddress) + err = filter.AddIP(filterType, ipAddress) Expect(err).NotTo(HaveOccurred()) return filter } type ctFilterDesc struct { - ip string - port int + ip string + port int + protocol corev1.Protocol + filterType netlink.ConntrackFilterType } func addConntrackMocks(nlMock *mocks.NetLinkOps, filterDescs []ctFilterDesc) { @@ -240,7 +242,7 @@ func addConntrackMocks(nlMock *mocks.NetLinkOps, filterDescs []ctFilterDesc) { OnCallMethodArgs: []interface{}{ netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(netlink.FAMILY_V4), - makeConntrackFilter(ctf.ip, ctf.port, corev1.ProtocolTCP), + makeConntrackFilter(ctf.ip, ctf.port, ctf.protocol, ctf.filterType), }, RetArgList: []interface{}{uint(1), nil}, }) @@ -1619,7 +1621,7 @@ var _ = Describe("Node Operations", func() { fNPW.watchFactory = wf Expect(startNodePortWatcher(fNPW, fakeClient)).To(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"1.1.1.1", 8032}, {"10.129.0.2", 8032}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"1.1.1.1", 8032, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"10.129.0.2", 8032, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) Eventually(func() bool { @@ -1708,7 +1710,7 @@ var _ = Describe("Node Operations", func() { fNPW.watchFactory = wf Expect(startNodePortWatcher(fNPW, fakeClient)).To(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 0}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 0, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) Eventually(fExec.CalledMatchesExpected, "2s").Should(BeTrue(), fExec.ErrorDesc) @@ -1848,7 +1850,7 @@ var _ = Describe("Node Operations", func() { return nodenft.MatchNFTRules(expectedNFT, nft.Dump()) }).Should(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.10.10.1", 8034}, {"10.129.0.2", 8034}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.10.10.1", 8034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"10.129.0.2", 8034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -1976,11 +1978,11 @@ var _ = Describe("Node Operations", func() { }).Should(Equal(expectedLBExternalIPFlows2)) addConntrackMocks(netlinkMock, []ctFilterDesc{ - {"1.1.1.1", 8080}, - {"1.1.1.2", 8080}, - {"5.5.5.5", 8080}, - {"192.168.18.15", 31111}, - {"10.129.0.2", 8080}, + {"1.1.1.1", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"1.1.1.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"5.5.5.5", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, }) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( @@ -2195,7 +2197,7 @@ var _ = Describe("Node Operations", func() { return nodenft.MatchNFTRules(expectedNFT, nft.Dump()) }).Should(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 38034}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 38034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2332,7 +2334,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(BeNil()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2478,7 +2480,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2628,7 +2630,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2773,7 +2775,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2921,7 +2923,7 @@ var _ = Describe("Node Operations", func() { Expect(fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111")).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) From 679442dab871efc5e7e1c2d08f7ccefacc06632b Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Mon, 8 Sep 2025 07:38:25 +0000 Subject: [PATCH 2/2] Add a e2e test for NodePort service It should be able to preserve UDP traffic when server pod cycles for a NodePort service via a different node. Signed-off-by: Peng Liu (cherry picked from commit 4e5502658c23cc1a22ca1bbc646fbcb73e4b9bb8) Signed-off-by: Venkata Charan Sunku --- test/e2e/service.go | 145 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/test/e2e/service.go b/test/e2e/service.go index d859a7b019..ee4ae3ce65 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -789,6 +789,151 @@ var _ = ginkgo.Describe("Services", feature.Service, func() { // network is removed by provider Context API }) + ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service via a different node", func(ctx context.Context) { + const ( + serviceName = "svc-udp" + srcPort = 12345 + podClient = "pod-client" + podBackend1 = "pod-server-1" + podBackend2 = "pod-server-2" + ) + var clientNodeInfo, serverNodeInfo, backendNodeInfo nodeInfo + + cs := f.ClientSet + ns := f.Namespace.Name + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 3) + framework.ExpectNoError(err) + if len(nodes.Items) < 3 { + e2eskipper.Skipf( + "Test requires >= 3 Ready nodes, but there are only %v nodes", + len(nodes.Items)) + } + + family := v1.IPv4Protocol + if IsIPv6Cluster(cs) { + family = v1.IPv6Protocol + } + + ips := e2enode.GetAddressesByTypeAndFamily(&nodes.Items[0], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + clientNodeInfo = nodeInfo{ + name: nodes.Items[0].Name, + nodeIP: ips[0], + } + + ips = e2enode.GetAddressesByTypeAndFamily(&nodes.Items[1], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + backendNodeInfo = nodeInfo{ + name: nodes.Items[1].Name, + nodeIP: ips[0], + } + + ips = e2enode.GetAddressesByTypeAndFamily(&nodes.Items[2], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + serverNodeInfo = nodeInfo{ + name: nodes.Items[2].Name, + nodeIP: ips[0], + } + + // Create a NodePort service + udpJig := e2eservice.NewTestJig(cs, ns, serviceName) + ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns) + udpService, err := udpJig.CreateUDPService(ctx, func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)}, + } + }) + framework.ExpectNoError(err) + + // Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds + ginkgo.By("creating a client pod for probing the service " + serviceName) + clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil) + nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name} + e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, serverNodeInfo.nodeIP, udpService.Spec.Ports[0].NodePort) + clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + clientPod.Spec.Containers[0].Name = podClient + e2epod.NewPodClient(f).CreateSync(ctx, clientPod) + + // Read the client pod logs + logs, err := e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + + // Add a backend pod to the service in the other node + ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName + " at node " + backendNodeInfo.name) + serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod1.Labels = udpJig.Labels + nodeSelection = e2epod.NodeSelection{Name: backendNodeInfo.name} + e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection) + e2epod.NewPodClient(f).CreateSync(ctx, serverPod1) + + ginkgo.By("Waiting for the endpoint to be ready") + err = framework.WaitForServiceEndpointsNum(ctx, f.ClientSet, f.Namespace.Name, + serviceName, 1, time.Second, wait.ForeverTestTimeout) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", + serviceName, f.Namespace.Name) + + logContainsFn := func(text, podName string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + logs, err := e2epod.GetPodLogs(ctx, cs, ns, podName, podName) + if err != nil { + // Retry the error next time. + return false, nil + } + if !strings.Contains(string(logs), text) { + return false, nil + } + return true, nil + } + } + // Note that the fact that Endpoints object already exists, does NOT mean + // that openflows were already programmed. + // Additionally take into account that UDP conntract entries timeout is + // 30 seconds by default. + // Based on the above check if the pod receives the traffic. + ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, time.Minute, true, logContainsFn(podBackend1, podClient)); err != nil { + logs, err = e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 1") + } + + // Create a second pod + ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName + " at node " + backendNodeInfo.name) + serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod2.Labels = udpJig.Labels + nodeSelection = e2epod.NodeSelection{Name: backendNodeInfo.name} + e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection) + e2epod.NewPodClient(f).CreateSync(ctx, serverPod2) + + // and delete the first pod + framework.Logf("Cleaning up %s pod", podBackend1) + e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout) + + ginkgo.By("Waiting for the endpoint to be ready") + err = framework.WaitForServiceEndpointsNum(ctx, f.ClientSet, f.Namespace.Name, + serviceName, 1, time.Second, wait.ForeverTestTimeout) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", + serviceName, f.Namespace.Name) + + // Check that the second pod keeps receiving traffic + // UDP conntrack entries timeout is 30 sec by default + ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, time.Minute, true, logContainsFn(podBackend2, podClient)); err != nil { + logs, err = e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 2") + } + }) + ginkgo.It("should listen on each host addresses", func() { endPoints := make([]*v1.Pod, 0) endpointsSelector := map[string]string{"servicebackend": "true"}