diff --git a/go-controller/pkg/node/default_node_network_controller.go b/go-controller/pkg/node/default_node_network_controller.go index 66722d9694..d8cde1fab2 100644 --- a/go-controller/pkg/node/default_node_network_controller.go +++ b/go-controller/pkg/node/default_node_network_controller.go @@ -1474,13 +1474,24 @@ func (nc *DefaultNodeNetworkController) reconcileConntrackUponEndpointSliceEvent klog.Errorf("Failed to get service port for endpoint %s: %v", oldIPStr, err) continue } - // upon update and delete events, flush conntrack only for UDP + // upon update and delete events, flush UDP conntrack for Service port klog.V(5).Infof("Deleting conntrack entry for endpoint %s, port %d, protocol %s", oldIPStr, servicePort.Port, *oldPort.Protocol) if err := util.DeleteConntrackServicePort(oldIPStr, servicePort.Port, *oldPort.Protocol, netlink.ConntrackReplyAnyIP, nil); err != nil { klog.Errorf("Failed to delete conntrack entry for %s port %d: %v", oldIPStr, servicePort.Port, err) errors = append(errors, err) } + + // Flush UDP conntrack entries for NodePort (and LoadBalancer services that allocate NodePorts) + // TODO: Once vishvananda/netlink support ConntrackFilterType '--reply-port-src', we can use one DeleteConntrackServicePort() call + // conntrack entries for both ClusterIP and NodePort. + if util.ServiceTypeHasNodePort(svc) && servicePort.NodePort > 0 { + if err := util.DeleteConntrackServicePort(oldIPStr, servicePort.NodePort, *oldPort.Protocol, + netlink.ConntrackReplyAnyIP, nil); err != nil { + klog.Errorf("Failed to delete conntrack entry for %s NodePort %d: %v", oldIPStr, servicePort.NodePort, err) + errors = append(errors, err) + } + } } } } diff --git a/go-controller/pkg/node/default_node_network_controller_test.go b/go-controller/pkg/node/default_node_network_controller_test.go index 6ecd5bdeb6..eda939a8a9 100644 --- a/go-controller/pkg/node/default_node_network_controller_test.go +++ b/go-controller/pkg/node/default_node_network_controller_test.go @@ -1769,7 +1769,6 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } ip string port uint16 protocol uint8 - family netlink.InetFamily } // Test data structure for table-driven tests @@ -1782,12 +1781,21 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } expectedFilters []expectedConntrackFilter } - // Helper to create EndpointSlice - makeEndpointSlice := func(portConfigs []struct { + type endpointPortConfig struct { name *string port int32 protocol corev1.Protocol - }, addresses []string) *discovery.EndpointSlice { + } + + type servicePortConfig struct { + name string + port int32 + targetPort int32 + protocol corev1.Protocol + } + + // Helper to create EndpointSlice + makeEndpointSlice := func(portConfigs []endpointPortConfig, addresses []string) *discovery.EndpointSlice { ports := make([]discovery.EndpointPort, len(portConfigs)) for i, pc := range portConfigs { p := pc.port @@ -1815,12 +1823,7 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } } // Helper to create Service - makeService := func(portConfigs []struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }) *corev1.Service { + makeService := func(portConfigs []servicePortConfig) *corev1.Service { ports := make([]corev1.ServicePort, len(portConfigs)) for i, pc := range portConfigs { ports[i] = corev1.ServicePort{ @@ -1842,6 +1845,16 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } } } + // Helper to create NodePort or LoadBalancer Service by invoking makeService + makeServiceWithNodePort := func(portConfigs []servicePortConfig, nodePorts []int32, svcType corev1.ServiceType) *corev1.Service { + svc := makeService(portConfigs) + svc.Spec.Type = svcType + for i := 0; i < len(nodePorts) && i < len(svc.Spec.Ports); i++ { + svc.Spec.Ports[i].NodePort = nodePorts[i] + } + return svc + } + // Helper function to build expected ConntrackFilter for verification buildExpectedFilter := func(ef expectedConntrackFilter) *netlink.ConntrackFilter { filter := &netlink.ConntrackFilter{} @@ -1942,13 +1955,8 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("old endpointslice is nil", reconcileConntrackTestCase{ - desc: "should not delete any conntrack entries when old endpoint is nil", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should not delete any conntrack entries when old endpoint is nil", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: nil, newEndpointSlice: &discovery.EndpointSlice{}, expectedConntrackCalls: 0, @@ -1957,69 +1965,42 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("service exists with matching unnamed port", reconcileConntrackTestCase{ - desc: "should delete conntrack with service port for unnamed port", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack with service port for unnamed port", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("service exists with matching named port", reconcileConntrackTestCase{ - desc: "should delete conntrack with service port for named port", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack with service port for named port", + service: makeService([]servicePortConfig{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("service exists but port name mismatch", reconcileConntrackTestCase{ - desc: "should skip conntrack deletion when port name doesn't match", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should skip conntrack deletion when port name doesn't match", + service: makeService([]servicePortConfig{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: strPtr("grpc"), port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: strPtr("grpc"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2032,11 +2013,7 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } desc: "should return early without deleting conntrack when service not found", service: nil, oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2046,19 +2023,10 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("TCP protocol should be skipped", reconcileConntrackTestCase{ - desc: "should skip conntrack deletion for TCP protocol", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: tcpProtocol}}), + desc: "should skip conntrack deletion for TCP protocol", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: tcpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: tcpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: tcpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2068,78 +2036,51 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("multiple endpoints", reconcileConntrackTestCase{ - desc: "should delete conntrack for each endpoint", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for each endpoint", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, ), newEndpointSlice: nil, expectedConntrackCalls: 3, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.2", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.3", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.2", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.3", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("IPv6 endpoint", reconcileConntrackTestCase{ - desc: "should delete conntrack for IPv6 endpoint", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for IPv6 endpoint", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"fd00::1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V6}, + {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("dual-stack endpoints", reconcileConntrackTestCase{ - desc: "should delete conntrack for both IPv4 and IPv6", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for both IPv4 and IPv6", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1", "fd00::1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 2, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V6}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), @@ -2147,21 +2088,12 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("multiple service ports with matching names", reconcileConntrackTestCase{ desc: "should match correct service port by name for multiple ports", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{ + service: makeService([]servicePortConfig{ {name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, {name: "https", port: testServicePort2, targetPort: testEndpointPort2, protocol: udpProtocol}, }), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{ + []endpointPortConfig{ {name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}, {name: strPtr("https"), port: testEndpointPort2, protocol: udpProtocol}, }, @@ -2170,11 +2102,94 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } newEndpointSlice: nil, expectedConntrackCalls: 2, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.1", port: uint16(testServicePort2), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.1", port: uint16(testServicePort2), protocol: syscall.IPPROTO_UDP}, }, }, ), + Entry("NodePort service", reconcileConntrackTestCase{ + desc: "should delete conntrack entries for both service port and NodePort", + service: makeServiceWithNodePort([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}, + []int32{30000}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("NodePort service with mixed protocols should only clean UDP NodePort", reconcileConntrackTestCase{ + desc: "should only delete conntrack for UDP NodePort, not TCP (protocol filtering)", + service: makeServiceWithNodePort([]servicePortConfig{ + {name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, + {name: "", port: testServicePort2, targetPort: testEndpointPort1, protocol: tcpProtocol}, + }, []int32{30000, 30001}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, // Only UDP: service port + NodePort (TCP port 30001 should be skipped) + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("NodePort service with multiple UDP ports", reconcileConntrackTestCase{ + desc: "should delete conntrack entries only for the specific NodePort that changed", + service: makeServiceWithNodePort([]servicePortConfig{ + {name: "dns", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, + {name: "snmp", port: testServicePort2, targetPort: testEndpointPort1, protocol: udpProtocol}, + }, []int32{30000, 30002}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: strPtr("dns"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: strPtr("dns"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, // service port + NodePort for "dns" only + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("LoadBalancer service with NodePort allocation", reconcileConntrackTestCase{ + desc: "should delete conntrack entries for both service port and NodePort", + service: func() *corev1.Service { + svc := makeServiceWithNodePort([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}, + []int32{30000}, corev1.ServiceTypeLoadBalancer) + svc.Status = corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{{IP: "5.5.5.5"}}, + }, + } + return svc + }(), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("LoadBalancer service with AllocateLoadBalancerNodePorts=false", func() reconcileConntrackTestCase { + allocateNodePorts := false + return reconcileConntrackTestCase{ + desc: "should only delete conntrack entries for service port (no NodePort)", + service: func() *corev1.Service { + svc := makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}) + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Spec.AllocateLoadBalancerNodePorts = &allocateNodePorts + svc.Status = corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{{IP: "5.5.5.5"}}, + }, + } + return svc + }(), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 1, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + }, + } + }()), ) }) }) diff --git a/go-controller/pkg/node/gateway_localnet_linux_test.go b/go-controller/pkg/node/gateway_localnet_linux_test.go index 71d112b491..d328d7dffb 100644 --- a/go-controller/pkg/node/gateway_localnet_linux_test.go +++ b/go-controller/pkg/node/gateway_localnet_linux_test.go @@ -202,7 +202,7 @@ func newEndpointSlice(svcName, namespace string, endpoints []discovery.Endpoint, } } -func makeConntrackFilter(ip string, port int, protocol corev1.Protocol) *netlink.ConntrackFilter { +func makeConntrackFilter(ip string, port int, protocol corev1.Protocol, filterType netlink.ConntrackFilterType) *netlink.ConntrackFilter { filter := &netlink.ConntrackFilter{} var err error @@ -221,15 +221,17 @@ func makeConntrackFilter(ip string, port int, protocol corev1.Protocol) *netlink } ipAddress := net.ParseIP(ip) Expect(ipAddress).NotTo(BeNil()) - err = filter.AddIP(netlink.ConntrackOrigDstIP, ipAddress) + err = filter.AddIP(filterType, ipAddress) Expect(err).NotTo(HaveOccurred()) return filter } type ctFilterDesc struct { - ip string - port int + ip string + port int + protocol corev1.Protocol + filterType netlink.ConntrackFilterType } func addConntrackMocks(nlMock *mocks.NetLinkOps, filterDescs []ctFilterDesc) { @@ -240,7 +242,7 @@ func addConntrackMocks(nlMock *mocks.NetLinkOps, filterDescs []ctFilterDesc) { OnCallMethodArgs: []interface{}{ netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(netlink.FAMILY_V4), - makeConntrackFilter(ctf.ip, ctf.port, corev1.ProtocolTCP), + makeConntrackFilter(ctf.ip, ctf.port, ctf.protocol, ctf.filterType), }, RetArgList: []interface{}{uint(1), nil}, }) @@ -1619,7 +1621,7 @@ var _ = Describe("Node Operations", func() { fNPW.watchFactory = wf Expect(startNodePortWatcher(fNPW, fakeClient)).To(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"1.1.1.1", 8032}, {"10.129.0.2", 8032}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"1.1.1.1", 8032, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"10.129.0.2", 8032, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) Eventually(func() bool { @@ -1708,7 +1710,7 @@ var _ = Describe("Node Operations", func() { fNPW.watchFactory = wf Expect(startNodePortWatcher(fNPW, fakeClient)).To(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 0}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 0, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) Eventually(fExec.CalledMatchesExpected, "2s").Should(BeTrue(), fExec.ErrorDesc) @@ -1848,7 +1850,7 @@ var _ = Describe("Node Operations", func() { return nodenft.MatchNFTRules(expectedNFT, nft.Dump()) }).Should(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.10.10.1", 8034}, {"10.129.0.2", 8034}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.10.10.1", 8034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"10.129.0.2", 8034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -1976,11 +1978,11 @@ var _ = Describe("Node Operations", func() { }).Should(Equal(expectedLBExternalIPFlows2)) addConntrackMocks(netlinkMock, []ctFilterDesc{ - {"1.1.1.1", 8080}, - {"1.1.1.2", 8080}, - {"5.5.5.5", 8080}, - {"192.168.18.15", 31111}, - {"10.129.0.2", 8080}, + {"1.1.1.1", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"1.1.1.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"5.5.5.5", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, }) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( @@ -2195,7 +2197,7 @@ var _ = Describe("Node Operations", func() { return nodenft.MatchNFTRules(expectedNFT, nft.Dump()) }).Should(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 38034}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 38034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2332,7 +2334,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(BeNil()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2478,7 +2480,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2628,7 +2630,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2773,7 +2775,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2921,7 +2923,7 @@ var _ = Describe("Node Operations", func() { Expect(fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111")).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) diff --git a/test/e2e/service.go b/test/e2e/service.go index d859a7b019..ee4ae3ce65 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -789,6 +789,151 @@ var _ = ginkgo.Describe("Services", feature.Service, func() { // network is removed by provider Context API }) + ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service via a different node", func(ctx context.Context) { + const ( + serviceName = "svc-udp" + srcPort = 12345 + podClient = "pod-client" + podBackend1 = "pod-server-1" + podBackend2 = "pod-server-2" + ) + var clientNodeInfo, serverNodeInfo, backendNodeInfo nodeInfo + + cs := f.ClientSet + ns := f.Namespace.Name + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 3) + framework.ExpectNoError(err) + if len(nodes.Items) < 3 { + e2eskipper.Skipf( + "Test requires >= 3 Ready nodes, but there are only %v nodes", + len(nodes.Items)) + } + + family := v1.IPv4Protocol + if IsIPv6Cluster(cs) { + family = v1.IPv6Protocol + } + + ips := e2enode.GetAddressesByTypeAndFamily(&nodes.Items[0], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + clientNodeInfo = nodeInfo{ + name: nodes.Items[0].Name, + nodeIP: ips[0], + } + + ips = e2enode.GetAddressesByTypeAndFamily(&nodes.Items[1], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + backendNodeInfo = nodeInfo{ + name: nodes.Items[1].Name, + nodeIP: ips[0], + } + + ips = e2enode.GetAddressesByTypeAndFamily(&nodes.Items[2], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + serverNodeInfo = nodeInfo{ + name: nodes.Items[2].Name, + nodeIP: ips[0], + } + + // Create a NodePort service + udpJig := e2eservice.NewTestJig(cs, ns, serviceName) + ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns) + udpService, err := udpJig.CreateUDPService(ctx, func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)}, + } + }) + framework.ExpectNoError(err) + + // Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds + ginkgo.By("creating a client pod for probing the service " + serviceName) + clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil) + nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name} + e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, serverNodeInfo.nodeIP, udpService.Spec.Ports[0].NodePort) + clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + clientPod.Spec.Containers[0].Name = podClient + e2epod.NewPodClient(f).CreateSync(ctx, clientPod) + + // Read the client pod logs + logs, err := e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + + // Add a backend pod to the service in the other node + ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName + " at node " + backendNodeInfo.name) + serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod1.Labels = udpJig.Labels + nodeSelection = e2epod.NodeSelection{Name: backendNodeInfo.name} + e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection) + e2epod.NewPodClient(f).CreateSync(ctx, serverPod1) + + ginkgo.By("Waiting for the endpoint to be ready") + err = framework.WaitForServiceEndpointsNum(ctx, f.ClientSet, f.Namespace.Name, + serviceName, 1, time.Second, wait.ForeverTestTimeout) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", + serviceName, f.Namespace.Name) + + logContainsFn := func(text, podName string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + logs, err := e2epod.GetPodLogs(ctx, cs, ns, podName, podName) + if err != nil { + // Retry the error next time. + return false, nil + } + if !strings.Contains(string(logs), text) { + return false, nil + } + return true, nil + } + } + // Note that the fact that Endpoints object already exists, does NOT mean + // that openflows were already programmed. + // Additionally take into account that UDP conntract entries timeout is + // 30 seconds by default. + // Based on the above check if the pod receives the traffic. + ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, time.Minute, true, logContainsFn(podBackend1, podClient)); err != nil { + logs, err = e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 1") + } + + // Create a second pod + ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName + " at node " + backendNodeInfo.name) + serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod2.Labels = udpJig.Labels + nodeSelection = e2epod.NodeSelection{Name: backendNodeInfo.name} + e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection) + e2epod.NewPodClient(f).CreateSync(ctx, serverPod2) + + // and delete the first pod + framework.Logf("Cleaning up %s pod", podBackend1) + e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout) + + ginkgo.By("Waiting for the endpoint to be ready") + err = framework.WaitForServiceEndpointsNum(ctx, f.ClientSet, f.Namespace.Name, + serviceName, 1, time.Second, wait.ForeverTestTimeout) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", + serviceName, f.Namespace.Name) + + // Check that the second pod keeps receiving traffic + // UDP conntrack entries timeout is 30 sec by default + ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, time.Minute, true, logContainsFn(podBackend2, podClient)); err != nil { + logs, err = e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 2") + } + }) + ginkgo.It("should listen on each host addresses", func() { endPoints := make([]*v1.Pod, 0) endpointsSelector := map[string]string{"servicebackend": "true"}