From 8336fa0371ff7f09f16b5f6902e03668d106121a Mon Sep 17 00:00:00 2001 From: Periyasamy Palanisamy Date: Wed, 11 Feb 2026 18:02:04 +0100 Subject: [PATCH 1/5] Minimize ACLs by combining ipBlocks into single ACL Combine all ipBlocks in a NetworkPolicy rule into single ACL instead of creating one ACL per ipBlock. This reduces ACL bloat when policy having multiple ipBlocks for ingress/egress rules. Signed-off-by: Periyasamy Palanisamy (cherry picked from commit fd2701582dca1961435745697b018e636e045dfe) (cherry picked from commit 14b73098cbbdb1833fd714430d707b6ef268b886) (cherry picked from commit 21ef647488cfad3e15e727eb5a0708ddd47c01ed) --- .../pkg/libovsdb/ops/db_object_types.go | 5 +- go-controller/pkg/ovn/gress_policy.go | 48 +++-- go-controller/pkg/ovn/gress_policy_test.go | 19 +- go-controller/pkg/ovn/policy_test.go | 182 +++++++++++++++++- 4 files changed, 212 insertions(+), 42 deletions(-) diff --git a/go-controller/pkg/libovsdb/ops/db_object_types.go b/go-controller/pkg/libovsdb/ops/db_object_types.go index 45c2777637..87102451c7 100644 --- a/go-controller/pkg/libovsdb/ops/db_object_types.go +++ b/go-controller/pkg/libovsdb/ops/db_object_types.go @@ -242,9 +242,10 @@ var ACLNetworkPolicyPortIndex = newObjectIDsType(acl, NetworkPolicyPortIndexOwne // ingress/egress + NetworkPolicy[In/E]gressRule idx - defines given gressPolicy. // ACLs are created for gp.portPolicies which are grouped by protocol: // - for empty policy (no selectors and no ip blocks) - empty ACL (see allIPsMatch) +// with idx=emptyIdx (-1) // OR -// - all selector-based peers ACL -// - for every IPBlock +1 ACL +// - all selector-based peers ACL with idx=emptyIdx (-1) +// - all ipBlocks combined into a single ACL with idx=ipBlockCombinedIdx (-2) // Therefore unique id for a given gressPolicy is protocol name + IPBlock idx // (protocol will be "None" if no port policy is defined, and empty policy and all // selector-based peers ACLs will have idx=-1) diff --git a/go-controller/pkg/ovn/gress_policy.go b/go-controller/pkg/ovn/gress_policy.go index ad20fadfb3..b1f844123c 100644 --- a/go-controller/pkg/ovn/gress_policy.go +++ b/go-controller/pkg/ovn/gress_policy.go @@ -22,6 +22,11 @@ import ( const ( // emptyIdx is used to create ACL for gressPolicy that doesn't have ipBlocks emptyIdx = -1 + // ipBlockCombinedIdx is used when creating an ACL for a gressPolicy + // that contains ipBlocks. Previously, one ACL was created per ipBlock. + // This is changed to create a single combined ACL for all ipBlocks, + // and this special index value identifies those new ACLs. + ipBlockCombinedIdx = -2 ) type gressPolicy struct { @@ -167,14 +172,14 @@ func (gp *gressPolicy) allIPsMatch() string { } } -func (gp *gressPolicy) getMatchFromIPBlock(lportMatch, l4Match string) []string { +func (gp *gressPolicy) getMatchFromIPBlock(lportMatch, l4Match string) string { var direction string if gp.policyType == knet.PolicyTypeIngress { direction = "src" } else { direction = "dst" } - var matchStrings []string + var ipBlockMatches []string var matchStr, ipVersion string for _, ipBlock := range gp.ipBlocks { if utilnet.IsIPv6CIDRString(ipBlock.CIDR) { @@ -185,17 +190,22 @@ func (gp *gressPolicy) getMatchFromIPBlock(lportMatch, l4Match string) []string if len(ipBlock.Except) == 0 { matchStr = fmt.Sprintf("%s.%s == %s", ipVersion, direction, ipBlock.CIDR) } else { - matchStr = fmt.Sprintf("%s.%s == %s && %s.%s != {%s}", ipVersion, direction, ipBlock.CIDR, + matchStr = fmt.Sprintf("(%s.%s == %s && %s.%s != {%s})", ipVersion, direction, ipBlock.CIDR, ipVersion, direction, strings.Join(ipBlock.Except, ", ")) } - if l4Match == libovsdbutil.UnspecifiedL4Match { - matchStr = fmt.Sprintf("%s && %s", matchStr, lportMatch) - } else { - matchStr = fmt.Sprintf("%s && %s && %s", matchStr, l4Match, lportMatch) - } - matchStrings = append(matchStrings, matchStr) + ipBlockMatches = append(ipBlockMatches, matchStr) } - return matchStrings + var l3Match string + if len(ipBlockMatches) == 1 { + l3Match = ipBlockMatches[0] + } else { + l3Match = fmt.Sprintf("(%s)", strings.Join(ipBlockMatches, " || ")) + } + + if l4Match == libovsdbutil.UnspecifiedL4Match { + return fmt.Sprintf("%s && %s", l3Match, lportMatch) + } + return fmt.Sprintf("%s && %s && %s", l3Match, l4Match, lportMatch) } // addNamespaceAddressSet adds a namespace address set to the gress policy. @@ -285,13 +295,11 @@ func (gp *gressPolicy) buildLocalPodACLs(portGroupName string, aclLogging *libov for protocol, l4Match := range libovsdbutil.GetL4MatchesFromNetworkPolicyPorts(gp.portPolicies) { if len(gp.ipBlocks) > 0 { // Add ACL allow rule for IPBlock CIDR - ipBlockMatches := gp.getMatchFromIPBlock(lportMatch, l4Match) - for ipBlockIdx, ipBlockMatch := range ipBlockMatches { - aclIDs := gp.getNetpolACLDbIDs(ipBlockIdx, protocol) - acl := libovsdbutil.BuildACLWithDefaultTier(aclIDs, types.DefaultAllowPriority, ipBlockMatch, action, - aclLogging, gp.aclPipeline) - createdACLs = append(createdACLs, acl) - } + ipBlockMatch := gp.getMatchFromIPBlock(lportMatch, l4Match) + aclIDs := gp.getNetpolACLDbIDs(ipBlockCombinedIdx, protocol) + acl := libovsdbutil.BuildACLWithDefaultTier(aclIDs, types.DefaultAllowPriority, ipBlockMatch, action, + aclLogging, gp.aclPipeline) + createdACLs = append(createdACLs, acl) } // if there are pod/namespace selector, then allow packets from/to that address_set or // if the NetworkPolicyPeer is empty, then allow from all sources or to all destinations. @@ -334,10 +342,10 @@ func (gp *gressPolicy) getNetpolACLDbIDs(ipBlockIdx int, protocol string) *libov // gress rule index libovsdbops.GressIdxKey: strconv.Itoa(gp.idx), // acls are created for every gp.portPolicies which are grouped by protocol: - // - for empty policy (no selectors and no ip blocks) - empty ACL + // - for empty policy (no selectors and no ip blocks) - empty ACL with idx=emptyIdx (-1) // OR - // - all selector-based peers ACL - // - for every IPBlock +1 ACL + // - all selector-based peers ACL with idx=emptyIdx (-1) + // - all ipBlocks combined into a single ACL with idx=ipBlockCombinedIdx (-2) // Therefore unique id for a given gressPolicy is protocol name + IPBlock idx // (protocol will be "None" if no port policy is defined, and empty policy and all // selector-based peers ACLs will have idx=-1) diff --git a/go-controller/pkg/ovn/gress_policy_test.go b/go-controller/pkg/ovn/gress_policy_test.go index 14b2a65a7c..f45be5385a 100644 --- a/go-controller/pkg/ovn/gress_policy_test.go +++ b/go-controller/pkg/ovn/gress_policy_test.go @@ -16,7 +16,7 @@ func TestGetMatchFromIPBlock(t *testing.T) { ipBlocks []*knet.IPBlock lportMatch string l4Match string - expected []string + expected string }{ { desc: "IPv4 only no except", @@ -27,7 +27,7 @@ func TestGetMatchFromIPBlock(t *testing.T) { }, lportMatch: "fake", l4Match: "input", - expected: []string{"ip4.src == 0.0.0.0/0 && input && fake"}, + expected: "ip4.src == 0.0.0.0/0 && input && fake", }, { desc: "multiple IPv4 only no except", @@ -41,8 +41,7 @@ func TestGetMatchFromIPBlock(t *testing.T) { }, lportMatch: "fake", l4Match: "input", - expected: []string{"ip4.src == 0.0.0.0/0 && input && fake", - "ip4.src == 10.1.0.0/16 && input && fake"}, + expected: "(ip4.src == 0.0.0.0/0 || ip4.src == 10.1.0.0/16) && input && fake", }, { desc: "IPv6 only no except", @@ -53,7 +52,7 @@ func TestGetMatchFromIPBlock(t *testing.T) { }, lportMatch: "fake", l4Match: "input", - expected: []string{"ip6.src == fd00:10:244:3::49/32 && input && fake"}, + expected: "ip6.src == fd00:10:244:3::49/32 && input && fake", }, { desc: "mixed IPv4 and IPv6 no except", @@ -67,8 +66,7 @@ func TestGetMatchFromIPBlock(t *testing.T) { }, lportMatch: "fake", l4Match: "input", - expected: []string{"ip6.src == ::/0 && input && fake", - "ip4.src == 0.0.0.0/0 && input && fake"}, + expected: "(ip6.src == ::/0 || ip4.src == 0.0.0.0/0) && input && fake", }, { desc: "IPv4 only with except", @@ -80,7 +78,7 @@ func TestGetMatchFromIPBlock(t *testing.T) { }, lportMatch: "fake", l4Match: "input", - expected: []string{"ip4.src == 0.0.0.0/0 && ip4.src != {10.1.0.0/16} && input && fake"}, + expected: "(ip4.src == 0.0.0.0/0 && ip4.src != {10.1.0.0/16}) && input && fake", }, { desc: "multiple IPv4 with except", @@ -95,8 +93,7 @@ func TestGetMatchFromIPBlock(t *testing.T) { }, lportMatch: "fake", l4Match: "input", - expected: []string{"ip4.src == 0.0.0.0/0 && ip4.src != {10.1.0.0/16} && input && fake", - "ip4.src == 10.1.0.0/16 && input && fake"}, + expected: "((ip4.src == 0.0.0.0/0 && ip4.src != {10.1.0.0/16}) || ip4.src == 10.1.0.0/16) && input && fake", }, { desc: "IPv4 with IPv4 except", @@ -108,7 +105,7 @@ func TestGetMatchFromIPBlock(t *testing.T) { }, lportMatch: "fake", l4Match: "input", - expected: []string{"ip4.src == 0.0.0.0/0 && ip4.src != {10.1.0.0/16} && input && fake"}, + expected: "(ip4.src == 0.0.0.0/0 && ip4.src != {10.1.0.0/16}) && input && fake", }, } diff --git a/go-controller/pkg/ovn/policy_test.go b/go-controller/pkg/ovn/policy_test.go index ba0c3f9592..08d0ff434b 100644 --- a/go-controller/pkg/ovn/policy_test.go +++ b/go-controller/pkg/ovn/policy_test.go @@ -6,6 +6,7 @@ import ( "net" "runtime" "sort" + "strings" "time" "github.com/onsi/ginkgo/v2" @@ -294,13 +295,27 @@ func getGressACLs(gressIdx int, peers []knet.NetworkPolicyPeer, policyType knet. acl.UUID = dbIDs.String() + "-UUID" acls = append(acls, acl) } - for i, ipBlock := range ipBlocks { - match := fmt.Sprintf("ip4.%s == %s && %s == @%s", ipDir, ipBlock, portDir, pgName) + if len(ipBlocks) > 0 { + var ipBlockMatches []string + for _, ipBlock := range ipBlocks { + ipVersion := "ip4" + if utilnet.IsIPv6CIDRString(ipBlock) { + ipVersion = "ip6" + } + ipBlockMatches = append(ipBlockMatches, fmt.Sprintf("%s.%s == %s", ipVersion, ipDir, ipBlock)) + } + var match string + if len(ipBlockMatches) == 1 { + match = ipBlockMatches[0] + } else { + match = fmt.Sprintf("(%s)", strings.Join(ipBlockMatches, " || ")) + } + match = fmt.Sprintf("%s && %s == @%s", match, portDir, pgName) action := nbdb.ACLActionAllowRelated if params.statelessNetPol { action = nbdb.ACLActionAllowStateless } - dbIDs := gp.getNetpolACLDbIDs(i, libovsdbutil.UnspecifiedL4Protocol) + dbIDs := gp.getNetpolACLDbIDs(ipBlockCombinedIdx, libovsdbutil.UnspecifiedL4Protocol) acl := libovsdbops.BuildACL( libovsdbutil.GetACLName(dbIDs), direction, @@ -363,6 +378,17 @@ func getPolicyData(params *netpolDataParams) []libovsdbtest.TestData { acls = append(acls, getGressACLs(i, egress.To, knet.PolicyTypeEgress, params)...) } + pg := getPolicyPortGroup(params, acls) + + data := []libovsdbtest.TestData{} + for _, acl := range acls { + data = append(data, acl) + } + data = append(data, pg) + return data +} + +func getPolicyPortGroup(params *netpolDataParams, acls []*nbdb.ACL) *nbdb.PortGroup { lsps := []*nbdb.LogicalSwitchPort{} for _, uuid := range params.localPortUUIDs { lsps = append(lsps, &nbdb.LogicalSwitchPort{UUID: uuid}) @@ -377,12 +403,7 @@ func getPolicyData(params *netpolDataParams) []libovsdbtest.TestData { ) pg.UUID = pg.Name + "-UUID" - data := []libovsdbtest.TestData{} - for _, acl := range acls { - data = append(data, acl) - } - data = append(data, pg) - return data + return pg } func newNetpolDataParams(networkPolicy *knet.NetworkPolicy) *netpolDataParams { @@ -958,6 +979,149 @@ var _ = ginkgo.Describe("OVN NetworkPolicy Operations", func() { } gomega.Expect(app.Run([]string{app.Name})).To(gomega.Succeed()) }) + + ginkgo.It("reconciles existing networkPolicies with has legacy ipBlock ACLs", func() { + app.Action = func(*cli.Context) error { + namespace1 := *newNamespace(namespaceName1) + namespace1AddressSetv4, _ := buildNamespaceAddressSets(namespace1.Name, nil) + peer := knet.NetworkPolicyPeer{ + IPBlock: &knet.IPBlock{ + CIDR: "1.1.1.1", + }, + } + // equivalent rules in one peer + networkPolicy1 := newNetworkPolicy(netPolicyName1, namespace1.Name, metav1.LabelSelector{}, + []knet.NetworkPolicyIngressRule{{ + From: []knet.NetworkPolicyPeer{peer, peer}, + }}, nil) + // equivalent rules in different peers + networkPolicy2 := newNetworkPolicy(netPolicyName2, namespace1.Name, metav1.LabelSelector{}, + []knet.NetworkPolicyIngressRule{ + { + From: []knet.NetworkPolicyPeer{peer}, + }, + { + From: []knet.NetworkPolicyPeer{peer}, + }, + }, nil) + initialData := initialDB.NBData + initialData = append(initialData, namespace1AddressSetv4) + defaultDenyExpectedData := getDefaultDenyDataMultiplePolicies([]*knet.NetworkPolicy{networkPolicy1, networkPolicy2}) + initialData = append(initialData, defaultDenyExpectedData...) + + // NetworkPolicy 1 contains a single gress policy that previously + // created one legacy ACL per ipBlock. Simulate two legacy ACLs + // corresponding to ipBlock indexes 0 and 1 of the gress policy. + // ACL1 => libovsdbops.GressIdxKey: 0, libovsdbops.IpBlockIndexKey: 0 + // ACL2 => libovsdbops.GressIdxKey: 0, libovsdbops.IpBlockIndexKey: 1 + netInfo := &util.DefaultNetInfo{} + fakeController := getFakeBaseController(netInfo) + controllerName := getNetworkControllerName(netInfo.GetNetworkName()) + pgName1 := fakeController.getNetworkPolicyPGName(namespace1.Name, networkPolicy1.Name) + gp1 := gressPolicy{ + policyNamespace: networkPolicy1.Namespace, + policyName: networkPolicy1.Name, + policyType: knet.PolicyTypeIngress, + idx: 0, + controllerName: controllerName, + } + var legacyACLPolicy1 []*nbdb.ACL + for idx := 0; idx < 2; idx++ { + legacyACLIDs := gp1.getNetpolACLDbIDs(idx, libovsdbutil.UnspecifiedL4Protocol) + legacyACL := libovsdbops.BuildACL( + libovsdbutil.GetACLName(legacyACLIDs), + nbdb.ACLDirectionToLport, + types.DefaultAllowPriority, + fmt.Sprintf("ip4.src == 1.1.1.1 && outport == @%s", pgName1), + nbdb.ACLActionAllowRelated, + types.OvnACLLoggingMeter, + "", + false, + legacyACLIDs.GetExternalIDs(), + nil, + types.DefaultACLTier, + ) + legacyACL.UUID = legacyACLIDs.String() + "-UUID" + initialData = append(initialData, legacyACL) + legacyACLPolicy1 = append(legacyACLPolicy1, legacyACL) + } + pgNetworkPolicy1 := getPolicyPortGroup(newNetpolDataParams(networkPolicy1), legacyACLPolicy1) + initialData = append(initialData, pgNetworkPolicy1) + + // NetworkPolicy 2 contains two gress policies, each with one legacy + // ACL per ipBlock. Simulate two legacy ACL corresponding to gress + // policy indexes 0 and 1, respectively. + // ACL1 => libovsdbops.GressIdxKey: 0, libovsdbops.IpBlockIndexKey: 0 + // ACL2 => libovsdbops.GressIdxKey: 1, libovsdbops.IpBlockIndexKey: 0 + pgName2 := fakeController.getNetworkPolicyPGName(namespace1.Name, networkPolicy2.Name) + firstgp2 := gressPolicy{ + policyNamespace: networkPolicy2.Namespace, + policyName: networkPolicy2.Name, + policyType: knet.PolicyTypeIngress, + idx: 0, + controllerName: controllerName, + } + secondgp2 := gressPolicy{ + policyNamespace: networkPolicy2.Namespace, + policyName: networkPolicy2.Name, + policyType: knet.PolicyTypeIngress, + idx: 1, + controllerName: controllerName, + } + legacyACLID := firstgp2.getNetpolACLDbIDs(0, libovsdbutil.UnspecifiedL4Protocol) + legacyACL := libovsdbops.BuildACL( + libovsdbutil.GetACLName(legacyACLID), + nbdb.ACLDirectionToLport, + types.DefaultAllowPriority, + fmt.Sprintf("ip4.src == 1.1.1.1 && outport == @%s", pgName2), + nbdb.ACLActionAllowRelated, + types.OvnACLLoggingMeter, + "", + false, + legacyACLID.GetExternalIDs(), + nil, + types.DefaultACLTier, + ) + legacyACL.UUID = legacyACLID.String() + "-UUID" + initialData = append(initialData, legacyACL) + + legacyACLID2 := secondgp2.getNetpolACLDbIDs(0, libovsdbutil.UnspecifiedL4Protocol) + legacyACL2 := libovsdbops.BuildACL( + libovsdbutil.GetACLName(legacyACLID2), + nbdb.ACLDirectionToLport, + types.DefaultAllowPriority, + fmt.Sprintf("ip4.src == 1.1.1.1 && outport == @%s", pgName2), + nbdb.ACLActionAllowRelated, + types.OvnACLLoggingMeter, + "", + false, + legacyACLID2.GetExternalIDs(), + nil, + types.DefaultACLTier, + ) + legacyACL2.UUID = legacyACLID2.String() + "-UUID" + initialData = append(initialData, legacyACL2) + pgNetworkPolicy2 := getPolicyPortGroup(newNetpolDataParams(networkPolicy2), []*nbdb.ACL{legacyACL, legacyACL2}) + initialData = append(initialData, pgNetworkPolicy2) + + startOvn(libovsdbtest.TestSetup{NBData: initialData}, []corev1.Namespace{namespace1}, + []knet.NetworkPolicy{*networkPolicy1, *networkPolicy2}, + nil, nil) + + // check the initial data is updated and all legacy ACLs should be cleaned up + gressPolicy1ExpectedData := getPolicyData(newNetpolDataParams(networkPolicy1)) + gressPolicy2ExpectedData := getPolicyData(newNetpolDataParams(networkPolicy2)) + finalData := initialDB.NBData + finalData = append(finalData, namespace1AddressSetv4) + finalData = append(finalData, gressPolicy1ExpectedData...) + finalData = append(finalData, gressPolicy2ExpectedData...) + finalData = append(finalData, defaultDenyExpectedData...) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(finalData)) + + return nil + } + gomega.Expect(app.Run([]string{app.Name})).To(gomega.Succeed()) + }) }) ginkgo.Context("during execution", func() { From 46ce09a9e9ad75d388fa9ff678afb814e6c46e89 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Wed, 27 Aug 2025 07:54:20 +0000 Subject: [PATCH 2/5] Clear stale conntrack UDP entries for nodePorts When an EndpointSlice for a UDP NodePort or loadbalancer type of service is updated, stale conntrack entries for removed endpoints must be flushed. The existing logic failed to do this correctly if the backend pod was on a different node. This patch fixes the issue by flushing conntrack entries by filtering the nodePort when the node is not hosting the backend pod. In case that the backend pod was on the same node as the service, this issue won't happen. Since all old pod entries are removed from the node by the function deletePodConntrack when the pod is deleted. Signed-off-by: Peng Liu (cherry picked from commit b426934bc27b426d861c3ee0723b7f64fb35a031) Signed-off-by: Venkata Charan Sunku --- .../node/default_node_network_controller.go | 13 +- .../default_node_network_controller_test.go | 269 +++++++++--------- .../pkg/node/gateway_localnet_linux_test.go | 40 +-- 3 files changed, 175 insertions(+), 147 deletions(-) diff --git a/go-controller/pkg/node/default_node_network_controller.go b/go-controller/pkg/node/default_node_network_controller.go index 66722d9694..d8cde1fab2 100644 --- a/go-controller/pkg/node/default_node_network_controller.go +++ b/go-controller/pkg/node/default_node_network_controller.go @@ -1474,13 +1474,24 @@ func (nc *DefaultNodeNetworkController) reconcileConntrackUponEndpointSliceEvent klog.Errorf("Failed to get service port for endpoint %s: %v", oldIPStr, err) continue } - // upon update and delete events, flush conntrack only for UDP + // upon update and delete events, flush UDP conntrack for Service port klog.V(5).Infof("Deleting conntrack entry for endpoint %s, port %d, protocol %s", oldIPStr, servicePort.Port, *oldPort.Protocol) if err := util.DeleteConntrackServicePort(oldIPStr, servicePort.Port, *oldPort.Protocol, netlink.ConntrackReplyAnyIP, nil); err != nil { klog.Errorf("Failed to delete conntrack entry for %s port %d: %v", oldIPStr, servicePort.Port, err) errors = append(errors, err) } + + // Flush UDP conntrack entries for NodePort (and LoadBalancer services that allocate NodePorts) + // TODO: Once vishvananda/netlink support ConntrackFilterType '--reply-port-src', we can use one DeleteConntrackServicePort() call + // conntrack entries for both ClusterIP and NodePort. + if util.ServiceTypeHasNodePort(svc) && servicePort.NodePort > 0 { + if err := util.DeleteConntrackServicePort(oldIPStr, servicePort.NodePort, *oldPort.Protocol, + netlink.ConntrackReplyAnyIP, nil); err != nil { + klog.Errorf("Failed to delete conntrack entry for %s NodePort %d: %v", oldIPStr, servicePort.NodePort, err) + errors = append(errors, err) + } + } } } } diff --git a/go-controller/pkg/node/default_node_network_controller_test.go b/go-controller/pkg/node/default_node_network_controller_test.go index 6ecd5bdeb6..eda939a8a9 100644 --- a/go-controller/pkg/node/default_node_network_controller_test.go +++ b/go-controller/pkg/node/default_node_network_controller_test.go @@ -1769,7 +1769,6 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } ip string port uint16 protocol uint8 - family netlink.InetFamily } // Test data structure for table-driven tests @@ -1782,12 +1781,21 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } expectedFilters []expectedConntrackFilter } - // Helper to create EndpointSlice - makeEndpointSlice := func(portConfigs []struct { + type endpointPortConfig struct { name *string port int32 protocol corev1.Protocol - }, addresses []string) *discovery.EndpointSlice { + } + + type servicePortConfig struct { + name string + port int32 + targetPort int32 + protocol corev1.Protocol + } + + // Helper to create EndpointSlice + makeEndpointSlice := func(portConfigs []endpointPortConfig, addresses []string) *discovery.EndpointSlice { ports := make([]discovery.EndpointPort, len(portConfigs)) for i, pc := range portConfigs { p := pc.port @@ -1815,12 +1823,7 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } } // Helper to create Service - makeService := func(portConfigs []struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }) *corev1.Service { + makeService := func(portConfigs []servicePortConfig) *corev1.Service { ports := make([]corev1.ServicePort, len(portConfigs)) for i, pc := range portConfigs { ports[i] = corev1.ServicePort{ @@ -1842,6 +1845,16 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } } } + // Helper to create NodePort or LoadBalancer Service by invoking makeService + makeServiceWithNodePort := func(portConfigs []servicePortConfig, nodePorts []int32, svcType corev1.ServiceType) *corev1.Service { + svc := makeService(portConfigs) + svc.Spec.Type = svcType + for i := 0; i < len(nodePorts) && i < len(svc.Spec.Ports); i++ { + svc.Spec.Ports[i].NodePort = nodePorts[i] + } + return svc + } + // Helper function to build expected ConntrackFilter for verification buildExpectedFilter := func(ef expectedConntrackFilter) *netlink.ConntrackFilter { filter := &netlink.ConntrackFilter{} @@ -1942,13 +1955,8 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("old endpointslice is nil", reconcileConntrackTestCase{ - desc: "should not delete any conntrack entries when old endpoint is nil", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should not delete any conntrack entries when old endpoint is nil", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: nil, newEndpointSlice: &discovery.EndpointSlice{}, expectedConntrackCalls: 0, @@ -1957,69 +1965,42 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("service exists with matching unnamed port", reconcileConntrackTestCase{ - desc: "should delete conntrack with service port for unnamed port", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack with service port for unnamed port", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("service exists with matching named port", reconcileConntrackTestCase{ - desc: "should delete conntrack with service port for named port", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack with service port for named port", + service: makeService([]servicePortConfig{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("service exists but port name mismatch", reconcileConntrackTestCase{ - desc: "should skip conntrack deletion when port name doesn't match", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should skip conntrack deletion when port name doesn't match", + service: makeService([]servicePortConfig{{name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: strPtr("grpc"), port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: strPtr("grpc"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2032,11 +2013,7 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } desc: "should return early without deleting conntrack when service not found", service: nil, oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2046,19 +2023,10 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("TCP protocol should be skipped", reconcileConntrackTestCase{ - desc: "should skip conntrack deletion for TCP protocol", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: tcpProtocol}}), + desc: "should skip conntrack deletion for TCP protocol", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: tcpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: tcpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: tcpProtocol}}, []string{"10.0.0.1"}, ), newEndpointSlice: nil, @@ -2068,78 +2036,51 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("multiple endpoints", reconcileConntrackTestCase{ - desc: "should delete conntrack for each endpoint", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for each endpoint", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, ), newEndpointSlice: nil, expectedConntrackCalls: 3, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.2", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.3", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.2", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.3", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("IPv6 endpoint", reconcileConntrackTestCase{ - desc: "should delete conntrack for IPv6 endpoint", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for IPv6 endpoint", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"fd00::1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 1, expectedFilters: []expectedConntrackFilter{ - {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V6}, + {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), Entry("dual-stack endpoints", reconcileConntrackTestCase{ - desc: "should delete conntrack for both IPv4 and IPv6", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), + desc: "should delete conntrack for both IPv4 and IPv6", + service: makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, + []endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.0.0.1", "fd00::1"}, ), newEndpointSlice: nil, expectedConntrackCalls: 2, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V6}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "fd00::1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, }, }, ), @@ -2147,21 +2088,12 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } Entry("multiple service ports with matching names", reconcileConntrackTestCase{ desc: "should match correct service port by name for multiple ports", - service: makeService([]struct { - name string - port int32 - targetPort int32 - protocol corev1.Protocol - }{ + service: makeService([]servicePortConfig{ {name: "http", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, {name: "https", port: testServicePort2, targetPort: testEndpointPort2, protocol: udpProtocol}, }), oldEndpointSlice: makeEndpointSlice( - []struct { - name *string - port int32 - protocol corev1.Protocol - }{ + []endpointPortConfig{ {name: strPtr("http"), port: testEndpointPort1, protocol: udpProtocol}, {name: strPtr("https"), port: testEndpointPort2, protocol: udpProtocol}, }, @@ -2170,11 +2102,94 @@ add element inet ovn-kubernetes remote-node-ips-v6 { 2002:db8:1::4 } newEndpointSlice: nil, expectedConntrackCalls: 2, expectedFilters: []expectedConntrackFilter{ - {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, - {ip: "10.0.0.1", port: uint16(testServicePort2), protocol: syscall.IPPROTO_UDP, family: netlink.FAMILY_V4}, + {ip: "10.0.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.0.0.1", port: uint16(testServicePort2), protocol: syscall.IPPROTO_UDP}, }, }, ), + Entry("NodePort service", reconcileConntrackTestCase{ + desc: "should delete conntrack entries for both service port and NodePort", + service: makeServiceWithNodePort([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}, + []int32{30000}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("NodePort service with mixed protocols should only clean UDP NodePort", reconcileConntrackTestCase{ + desc: "should only delete conntrack for UDP NodePort, not TCP (protocol filtering)", + service: makeServiceWithNodePort([]servicePortConfig{ + {name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, + {name: "", port: testServicePort2, targetPort: testEndpointPort1, protocol: tcpProtocol}, + }, []int32{30000, 30001}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, // Only UDP: service port + NodePort (TCP port 30001 should be skipped) + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("NodePort service with multiple UDP ports", reconcileConntrackTestCase{ + desc: "should delete conntrack entries only for the specific NodePort that changed", + service: makeServiceWithNodePort([]servicePortConfig{ + {name: "dns", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}, + {name: "snmp", port: testServicePort2, targetPort: testEndpointPort1, protocol: udpProtocol}, + }, []int32{30000, 30002}, corev1.ServiceTypeNodePort), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: strPtr("dns"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: strPtr("dns"), port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, // service port + NodePort for "dns" only + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("LoadBalancer service with NodePort allocation", reconcileConntrackTestCase{ + desc: "should delete conntrack entries for both service port and NodePort", + service: func() *corev1.Service { + svc := makeServiceWithNodePort([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}, + []int32{30000}, corev1.ServiceTypeLoadBalancer) + svc.Status = corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{{IP: "5.5.5.5"}}, + }, + } + return svc + }(), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 2, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + {ip: "10.128.0.1", port: 30000, protocol: syscall.IPPROTO_UDP}, + }, + }), + Entry("LoadBalancer service with AllocateLoadBalancerNodePorts=false", func() reconcileConntrackTestCase { + allocateNodePorts := false + return reconcileConntrackTestCase{ + desc: "should only delete conntrack entries for service port (no NodePort)", + service: func() *corev1.Service { + svc := makeService([]servicePortConfig{{name: "", port: testServicePort1, targetPort: testEndpointPort1, protocol: udpProtocol}}) + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Spec.AllocateLoadBalancerNodePorts = &allocateNodePorts + svc.Status = corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{{IP: "5.5.5.5"}}, + }, + } + return svc + }(), + oldEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.1"}), + newEndpointSlice: makeEndpointSlice([]endpointPortConfig{{name: nil, port: testEndpointPort1, protocol: udpProtocol}}, []string{"10.128.0.2"}), + expectedConntrackCalls: 1, + expectedFilters: []expectedConntrackFilter{ + {ip: "10.128.0.1", port: uint16(testServicePort1), protocol: syscall.IPPROTO_UDP}, + }, + } + }()), ) }) }) diff --git a/go-controller/pkg/node/gateway_localnet_linux_test.go b/go-controller/pkg/node/gateway_localnet_linux_test.go index 71d112b491..d328d7dffb 100644 --- a/go-controller/pkg/node/gateway_localnet_linux_test.go +++ b/go-controller/pkg/node/gateway_localnet_linux_test.go @@ -202,7 +202,7 @@ func newEndpointSlice(svcName, namespace string, endpoints []discovery.Endpoint, } } -func makeConntrackFilter(ip string, port int, protocol corev1.Protocol) *netlink.ConntrackFilter { +func makeConntrackFilter(ip string, port int, protocol corev1.Protocol, filterType netlink.ConntrackFilterType) *netlink.ConntrackFilter { filter := &netlink.ConntrackFilter{} var err error @@ -221,15 +221,17 @@ func makeConntrackFilter(ip string, port int, protocol corev1.Protocol) *netlink } ipAddress := net.ParseIP(ip) Expect(ipAddress).NotTo(BeNil()) - err = filter.AddIP(netlink.ConntrackOrigDstIP, ipAddress) + err = filter.AddIP(filterType, ipAddress) Expect(err).NotTo(HaveOccurred()) return filter } type ctFilterDesc struct { - ip string - port int + ip string + port int + protocol corev1.Protocol + filterType netlink.ConntrackFilterType } func addConntrackMocks(nlMock *mocks.NetLinkOps, filterDescs []ctFilterDesc) { @@ -240,7 +242,7 @@ func addConntrackMocks(nlMock *mocks.NetLinkOps, filterDescs []ctFilterDesc) { OnCallMethodArgs: []interface{}{ netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(netlink.FAMILY_V4), - makeConntrackFilter(ctf.ip, ctf.port, corev1.ProtocolTCP), + makeConntrackFilter(ctf.ip, ctf.port, ctf.protocol, ctf.filterType), }, RetArgList: []interface{}{uint(1), nil}, }) @@ -1619,7 +1621,7 @@ var _ = Describe("Node Operations", func() { fNPW.watchFactory = wf Expect(startNodePortWatcher(fNPW, fakeClient)).To(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"1.1.1.1", 8032}, {"10.129.0.2", 8032}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"1.1.1.1", 8032, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"10.129.0.2", 8032, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) Eventually(func() bool { @@ -1708,7 +1710,7 @@ var _ = Describe("Node Operations", func() { fNPW.watchFactory = wf Expect(startNodePortWatcher(fNPW, fakeClient)).To(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 0}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 0, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) Eventually(fExec.CalledMatchesExpected, "2s").Should(BeTrue(), fExec.ErrorDesc) @@ -1848,7 +1850,7 @@ var _ = Describe("Node Operations", func() { return nodenft.MatchNFTRules(expectedNFT, nft.Dump()) }).Should(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.10.10.1", 8034}, {"10.129.0.2", 8034}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.10.10.1", 8034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"10.129.0.2", 8034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -1976,11 +1978,11 @@ var _ = Describe("Node Operations", func() { }).Should(Equal(expectedLBExternalIPFlows2)) addConntrackMocks(netlinkMock, []ctFilterDesc{ - {"1.1.1.1", 8080}, - {"1.1.1.2", 8080}, - {"5.5.5.5", 8080}, - {"192.168.18.15", 31111}, - {"10.129.0.2", 8080}, + {"1.1.1.1", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"1.1.1.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"5.5.5.5", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, + {"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, }) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( @@ -2195,7 +2197,7 @@ var _ = Describe("Node Operations", func() { return nodenft.MatchNFTRules(expectedNFT, nft.Dump()) }).Should(Succeed()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 38034}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 38034, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2332,7 +2334,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(BeNil()) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2478,7 +2480,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2628,7 +2630,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2773,7 +2775,7 @@ var _ = Describe("Node Operations", func() { flows := fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111") Expect(flows).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) @@ -2921,7 +2923,7 @@ var _ = Describe("Node Operations", func() { Expect(fNPW.ofm.getFlowsByKey("NodePort_namespace1_service1_tcp_31111")).To(Equal(expectedFlows)) - addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080}, {"192.168.18.15", 31111}}) + addConntrackMocks(netlinkMock, []ctFilterDesc{{"10.129.0.2", 8080, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}, {"192.168.18.15", 31111, corev1.ProtocolTCP, netlink.ConntrackOrigDstIP}}) Expect(fakeClient.KubeClient.CoreV1().Services(service.Namespace).Delete( context.Background(), service.Name, metav1.DeleteOptions{})).To(Succeed()) From 679442dab871efc5e7e1c2d08f7ccefacc06632b Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Mon, 8 Sep 2025 07:38:25 +0000 Subject: [PATCH 3/5] Add a e2e test for NodePort service It should be able to preserve UDP traffic when server pod cycles for a NodePort service via a different node. Signed-off-by: Peng Liu (cherry picked from commit 4e5502658c23cc1a22ca1bbc646fbcb73e4b9bb8) Signed-off-by: Venkata Charan Sunku --- test/e2e/service.go | 145 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/test/e2e/service.go b/test/e2e/service.go index d859a7b019..ee4ae3ce65 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -789,6 +789,151 @@ var _ = ginkgo.Describe("Services", feature.Service, func() { // network is removed by provider Context API }) + ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service via a different node", func(ctx context.Context) { + const ( + serviceName = "svc-udp" + srcPort = 12345 + podClient = "pod-client" + podBackend1 = "pod-server-1" + podBackend2 = "pod-server-2" + ) + var clientNodeInfo, serverNodeInfo, backendNodeInfo nodeInfo + + cs := f.ClientSet + ns := f.Namespace.Name + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 3) + framework.ExpectNoError(err) + if len(nodes.Items) < 3 { + e2eskipper.Skipf( + "Test requires >= 3 Ready nodes, but there are only %v nodes", + len(nodes.Items)) + } + + family := v1.IPv4Protocol + if IsIPv6Cluster(cs) { + family = v1.IPv6Protocol + } + + ips := e2enode.GetAddressesByTypeAndFamily(&nodes.Items[0], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + clientNodeInfo = nodeInfo{ + name: nodes.Items[0].Name, + nodeIP: ips[0], + } + + ips = e2enode.GetAddressesByTypeAndFamily(&nodes.Items[1], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + backendNodeInfo = nodeInfo{ + name: nodes.Items[1].Name, + nodeIP: ips[0], + } + + ips = e2enode.GetAddressesByTypeAndFamily(&nodes.Items[2], v1.NodeInternalIP, family) + gomega.Expect(ips).ToNot(gomega.BeEmpty()) + + serverNodeInfo = nodeInfo{ + name: nodes.Items[2].Name, + nodeIP: ips[0], + } + + // Create a NodePort service + udpJig := e2eservice.NewTestJig(cs, ns, serviceName) + ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns) + udpService, err := udpJig.CreateUDPService(ctx, func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(80)}, + } + }) + framework.ExpectNoError(err) + + // Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds + ginkgo.By("creating a client pod for probing the service " + serviceName) + clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil) + nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name} + e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, serverNodeInfo.nodeIP, udpService.Spec.Ports[0].NodePort) + clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + clientPod.Spec.Containers[0].Name = podClient + e2epod.NewPodClient(f).CreateSync(ctx, clientPod) + + // Read the client pod logs + logs, err := e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + + // Add a backend pod to the service in the other node + ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName + " at node " + backendNodeInfo.name) + serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod1.Labels = udpJig.Labels + nodeSelection = e2epod.NodeSelection{Name: backendNodeInfo.name} + e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection) + e2epod.NewPodClient(f).CreateSync(ctx, serverPod1) + + ginkgo.By("Waiting for the endpoint to be ready") + err = framework.WaitForServiceEndpointsNum(ctx, f.ClientSet, f.Namespace.Name, + serviceName, 1, time.Second, wait.ForeverTestTimeout) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", + serviceName, f.Namespace.Name) + + logContainsFn := func(text, podName string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + logs, err := e2epod.GetPodLogs(ctx, cs, ns, podName, podName) + if err != nil { + // Retry the error next time. + return false, nil + } + if !strings.Contains(string(logs), text) { + return false, nil + } + return true, nil + } + } + // Note that the fact that Endpoints object already exists, does NOT mean + // that openflows were already programmed. + // Additionally take into account that UDP conntract entries timeout is + // 30 seconds by default. + // Based on the above check if the pod receives the traffic. + ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, time.Minute, true, logContainsFn(podBackend1, podClient)); err != nil { + logs, err = e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 1") + } + + // Create a second pod + ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName + " at node " + backendNodeInfo.name) + serverPod2 := e2epod.NewAgnhostPod(ns, podBackend2, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod2.Labels = udpJig.Labels + nodeSelection = e2epod.NodeSelection{Name: backendNodeInfo.name} + e2epod.SetNodeSelection(&serverPod2.Spec, nodeSelection) + e2epod.NewPodClient(f).CreateSync(ctx, serverPod2) + + // and delete the first pod + framework.Logf("Cleaning up %s pod", podBackend1) + e2epod.NewPodClient(f).DeleteSync(ctx, podBackend1, metav1.DeleteOptions{}, e2epod.DefaultPodDeletionTimeout) + + ginkgo.By("Waiting for the endpoint to be ready") + err = framework.WaitForServiceEndpointsNum(ctx, f.ClientSet, f.Namespace.Name, + serviceName, 1, time.Second, wait.ForeverTestTimeout) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", + serviceName, f.Namespace.Name) + + // Check that the second pod keeps receiving traffic + // UDP conntrack entries timeout is 30 sec by default + ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, time.Minute, true, logContainsFn(podBackend2, podClient)); err != nil { + logs, err = e2epod.GetPodLogs(ctx, cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 2") + } + }) + ginkgo.It("should listen on each host addresses", func() { endPoints := make([]*v1.Pod, 0) endpointsSelector := map[string]string{"servicebackend": "true"} From 225848ac4d84b1ce7eec88514428d96ece658b40 Mon Sep 17 00:00:00 2001 From: arkadeepsen Date: Wed, 21 Jan 2026 23:45:04 +0530 Subject: [PATCH 4/5] EgressFirewall: Use exponential backoff to refresh IP addresses for DNS names Signed-off-by: arkadeepsen (cherry picked from commit 1d5ee82ff968cbdefd741130ec49aec63d6086e3) --- go-controller/pkg/util/dns.go | 53 ++++-- go-controller/pkg/util/dns_test.go | 261 ++++++++++++++++++++++++++++- 2 files changed, 298 insertions(+), 16 deletions(-) diff --git a/go-controller/pkg/util/dns.go b/go-controller/pkg/util/dns.go index 9466ad16f5..86d8a9e054 100644 --- a/go-controller/pkg/util/dns.go +++ b/go-controller/pkg/util/dns.go @@ -16,8 +16,12 @@ import ( ) const ( - // defaultTTL is used if an invalid or zero TTL is provided. - defaultTTL = 30 * time.Minute + // defaultMinTTL is the minimum TTL value that will be used for a domain name if an invalid or zero TTL is found + defaultMinTTL = 5 * time.Second + // defaultMaxTTL is the maximum TTL value that will be used for a domain name if an invalid or zero TTL is found + defaultMaxTTL = 2 * time.Minute + // maxRetryBeforeBackoff is the maximum number of times to retry a DNS lookup before exponential backoff starts + maxRetryBeforeBackoff = 10 ) type dnsValue struct { @@ -27,6 +31,8 @@ type dnsValue struct { ttl time.Duration // Holds (last dns lookup time + ttl), tells when to refresh IPs next time nextQueryTime time.Time + // Number of times the DNS lookup has been retried before backoff starts + retryCount int } type DNS struct { @@ -105,11 +111,22 @@ func (d *DNS) updateOne(dns string) (bool, error) { return false, fmt.Errorf("DNS value not found in dnsMap for domain: %q", dns) } - ips, ttl, err := d.getIPsAndMinTTL(dns) - if err != nil { - res.nextQueryTime = time.Now().Add(defaultTTL) - d.dnsMap[dns] = res - return false, err + ips, ttl, retry, err := d.getIPsAndMinTTL(dns) + if retry { + // If the DNS lookup has been retried maxRetryCount times, use exponential backoff + // by doubling the previous TTL. The TTL is capped at defaultMaxTTL. + if res.retryCount >= maxRetryBeforeBackoff { + ttl = min(res.ttl*2, defaultMaxTTL) + } else { + // Increment the retry count + res.retryCount++ + } + // If no valid IPs were found, use the previous IPs as fallback. + if len(ips) == 0 { + ips = res.ips + } + } else { + res.retryCount = 0 } changed := false @@ -120,10 +137,10 @@ func (d *DNS) updateOne(dns string) (bool, error) { res.ttl = ttl res.nextQueryTime = time.Now().Add(res.ttl) d.dnsMap[dns] = res - return changed, nil + return changed, err } -func (d *DNS) getIPsAndMinTTL(domain string) ([]net.IP, time.Duration, error) { +func (d *DNS) getIPsAndMinTTL(domain string) ([]net.IP, time.Duration, bool, error) { ips := []net.IP{} ttlSet := false var ttlSeconds uint32 @@ -197,19 +214,27 @@ func (d *DNS) getIPsAndMinTTL(domain string) ([]net.IP, time.Duration, error) { } if !ttlSet || (len(ips) == 0) { - return nil, defaultTTL, fmt.Errorf("IPv4 or IPv6 addr not found for domain: %q, nameservers: %v", domain, d.nameservers) + return nil, defaultMinTTL, true, fmt.Errorf("IPv4 or IPv6 addr not found for domain: %q, nameservers: %v", domain, d.nameservers) } + ips = removeDuplicateIPs(ips) + ttl, err := time.ParseDuration(fmt.Sprintf("%ds", minTTL)) if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid TTL value for domain: %q, err: %v, defaulting ttl=%s", domain, err, defaultTTL.String())) - ttl = defaultTTL + utilruntime.HandleError(fmt.Errorf("invalid TTL value for domain: %q, err: %v", domain, err)) + return ips, defaultMinTTL, true, nil } if ttl == 0 { - ttl = defaultTTL + // If the TTL is 0, return the default minimum TTL. The retry is set to false as this + // is not an error scenario. TTL being 0 is a valid scenario for some DNS servers + // and it means that the IP addresses should be refreshed everytime whenever the DNS + // name is being used. From the point of view of OVN-Kubernetes, the IP addresses are + // refreshed every defaultMinTTL. + klog.V(5).Infof("TTL value is 0 for domain: %q, defaulting ttl=%s", domain, defaultMinTTL.String()) + return ips, defaultMinTTL, false, nil } - return removeDuplicateIPs(ips), ttl, nil + return ips, ttl, false, nil } func (d *DNS) GetNextQueryTime() (time.Time, string, bool) { diff --git a/go-controller/pkg/util/dns_test.go b/go-controller/pkg/util/dns_test.go index a9d248042b..9f40c176ba 100644 --- a/go-controller/pkg/util/dns_test.go +++ b/go-controller/pkg/util/dns_test.go @@ -70,13 +70,16 @@ func TestGetIPsAndMinTTL(t *testing.T) { tests := []struct { desc string errExp bool + retry bool ipv4Mode bool ipv6Mode bool dnsOpsMockHelper []ovntest.TestifyMockHelper + expectedTTL time.Duration }{ { desc: "call to Exchange fails IPv4 only", errExp: true, + retry: true, ipv4Mode: true, ipv6Mode: false, dnsOpsMockHelper: []ovntest.TestifyMockHelper{ @@ -89,10 +92,12 @@ func TestGetIPsAndMinTTL(t *testing.T) { CallTimes: 1, }, }, + expectedTTL: defaultMinTTL, }, { desc: "Exchange returns correctly but Rcode != RcodeSuccess IPv4 only", errExp: true, + retry: true, ipv4Mode: true, ipv6Mode: false, dnsOpsMockHelper: []ovntest.TestifyMockHelper{ @@ -105,6 +110,46 @@ func TestGetIPsAndMinTTL(t *testing.T) { CallTimes: 1, }, }, + expectedTTL: defaultMinTTL, + }, + { + desc: "Exchange returns correctly but with TTL 0 IPv4 only", + errExp: false, + retry: false, + ipv4Mode: true, + ipv6Mode: false, + dnsOpsMockHelper: []ovntest.TestifyMockHelper{ + {OnCallMethodName: "SetQuestion", OnCallMethodArgType: []string{"*dns.Msg", "string", "uint16"}, RetArgList: []interface{}{&dns.Msg{}}, CallTimes: 1}, + {OnCallMethodName: "Fqdn", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{"www.test.com"}, CallTimes: 1}, + {OnCallMethodName: "Exchange", OnCallMethodArgType: []string{"*dns.Client", "*dns.Msg", "string"}, RetArgList: []interface{}{&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeSuccess}, Answer: []dns.RR{&dns.A{A: net.ParseIP("1.2.3.4")}}}, 0 * time.Second, nil}, CallTimes: 1}, + }, + expectedTTL: defaultMinTTL, + }, + { + desc: "Exchange returns correctly but no Answer IPv4 only", + errExp: true, + retry: true, + ipv4Mode: true, + ipv6Mode: false, + dnsOpsMockHelper: []ovntest.TestifyMockHelper{ + {OnCallMethodName: "SetQuestion", OnCallMethodArgType: []string{"*dns.Msg", "string", "uint16"}, RetArgList: []interface{}{&dns.Msg{}}, CallTimes: 1}, + {OnCallMethodName: "Fqdn", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{"www.test.com"}, CallTimes: 1}, + {OnCallMethodName: "Exchange", OnCallMethodArgType: []string{"*dns.Client", "*dns.Msg", "string"}, RetArgList: []interface{}{&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeSuccess}, Answer: []dns.RR{}}, 0 * time.Second, nil}, CallTimes: 1}, + }, + expectedTTL: defaultMinTTL, + }, + { + desc: "Exchange returns correctly but with non-zero TTL IPv4 only", + errExp: false, + retry: false, + ipv4Mode: true, + ipv6Mode: false, + dnsOpsMockHelper: []ovntest.TestifyMockHelper{ + {OnCallMethodName: "SetQuestion", OnCallMethodArgType: []string{"*dns.Msg", "string", "uint16"}, RetArgList: []interface{}{&dns.Msg{}}, CallTimes: 1}, + {OnCallMethodName: "Fqdn", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{"www.test.com"}, CallTimes: 1}, + {OnCallMethodName: "Exchange", OnCallMethodArgType: []string{"*dns.Client", "*dns.Msg", "string"}, RetArgList: []interface{}{&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeSuccess}, Answer: []dns.RR{&dns.A{Hdr: dns.RR_Header{Ttl: 100}, A: net.ParseIP("1.2.3.4")}}}, 0 * time.Second, nil}, CallTimes: 1}, + }, + expectedTTL: 100 * time.Second, }, } @@ -128,19 +173,22 @@ func TestGetIPsAndMinTTL(t *testing.T) { } config.IPv4Mode = tc.ipv4Mode config.IPv6Mode = tc.ipv6Mode - res, _, err := testDNS.getIPsAndMinTTL("www.test.com") - t.Log(res, err) + res, ttl, retry, err := testDNS.getIPsAndMinTTL("www.test.com") + t.Log(res, ttl, retry, err) if tc.errExp { require.Error(t, err) } else { require.NoError(t, err) } + assert.Equal(t, tc.retry, retry, "the exponentialBackoff variable should match the return from dns.getIPsAndMinTTL()") + assert.Equal(t, tc.expectedTTL, ttl, "the ttl variable should match the return from dns.getIPsAndMinTTL()") mockDNSOps.AssertExpectations(t) }) } } func TestUpdate(t *testing.T) { + config.IPv4Mode = true mockDNSOps := new(util_mocks.DNSOps) SetDNSLibOpsMockInst(mockDNSOps) @@ -252,6 +300,7 @@ func TestUpdate(t *testing.T) { } func TestAdd(t *testing.T) { + config.IPv4Mode = true dnsName := "www.testing.com" mockDNSOps := new(util_mocks.DNSOps) SetDNSLibOpsMockInst(mockDNSOps) @@ -319,3 +368,211 @@ func TestAdd(t *testing.T) { } } + +func TestIPsEqual(t *testing.T) { + tests := []struct { + desc string + oldips []net.IP + newips []net.IP + expEqual bool + }{ + { + desc: "oldips and newips are the same", + oldips: []net.IP{net.ParseIP("1.2.3.4")}, + newips: []net.IP{net.ParseIP("1.2.3.4")}, + expEqual: true, + }, + { + desc: "oldips and newips are different", + oldips: []net.IP{net.ParseIP("1.2.3.4")}, + newips: []net.IP{net.ParseIP("1.2.3.5")}, + expEqual: false, + }, + { + desc: "oldips and newips are different length", + oldips: []net.IP{net.ParseIP("1.2.3.4")}, + newips: []net.IP{net.ParseIP("1.2.3.4"), net.ParseIP("1.2.3.5")}, + expEqual: false, + }, + { + desc: "oldips is nil and newips is not nil", + oldips: nil, + newips: []net.IP{net.ParseIP("1.2.3.4"), net.ParseIP("1.2.3.5")}, + expEqual: false, + }, + { + desc: "oldips is empty and newips is not empty", + oldips: []net.IP{}, + newips: []net.IP{net.ParseIP("1.2.3.4"), net.ParseIP("1.2.3.5")}, + expEqual: false, + }, + { + desc: "oldips is not nil and newips is nil", + oldips: []net.IP{net.ParseIP("1.2.3.4"), net.ParseIP("1.2.3.5")}, + newips: nil, + expEqual: false, + }, + { + desc: "oldips is not empty and newips is empty", + oldips: []net.IP{net.ParseIP("1.2.3.4"), net.ParseIP("1.2.3.5")}, + newips: []net.IP{}, + expEqual: false, + }, + { + desc: "oldips and newips are both nil", + oldips: nil, + newips: nil, + expEqual: true, + }, + { + desc: "oldips and newips are both empty", + oldips: []net.IP{}, + newips: []net.IP{}, + expEqual: true, + }, + { + desc: "oldips is nil and newips is empty", + oldips: nil, + newips: []net.IP{}, + expEqual: true, + }, + { + desc: "oldips is empty and newips is nil", + oldips: []net.IP{}, + newips: nil, + expEqual: true, + }, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("%d:%s", i, tc.desc), func(t *testing.T) { + res := ipsEqual(tc.oldips, tc.newips) + assert.Equal(t, tc.expEqual, res) + }) + } +} + +func TestUpdateOne(t *testing.T) { + config.IPv4Mode = true + dnsName := "www.testing.com" + newIP := net.ParseIP("1.2.3.4") + fqdnOpsMockHelper := ovntest.TestifyMockHelper{ + OnCallMethodName: "Fqdn", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{dnsName}, CallTimes: 1, + } + setQuestionOpsMockHelper := ovntest.TestifyMockHelper{ + OnCallMethodName: "SetQuestion", OnCallMethodArgType: []string{"*dns.Msg", "string", "uint16"}, RetArgList: []interface{}{&dns.Msg{}}, CallTimes: 1, + } + exchangeSuccessNoAnswerOpsMockHelper := ovntest.TestifyMockHelper{ + OnCallMethodName: "Exchange", OnCallMethodArgType: []string{"*dns.Client", "*dns.Msg", "string"}, RetArgList: []interface{}{&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeSuccess}, Answer: []dns.RR{}}, 0 * time.Second, nil}, CallTimes: 1, + } + exchangeSuccessZeroTTLOpsMockHelper := ovntest.TestifyMockHelper{ + OnCallMethodName: "Exchange", OnCallMethodArgType: []string{"*dns.Client", "*dns.Msg", "string"}, RetArgList: []interface{}{&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeSuccess}, Answer: []dns.RR{&dns.A{A: newIP}}}, 0 * time.Second, nil}, CallTimes: 1, + } + exchangeSuccessNonZeroTTLOpsMockHelper := ovntest.TestifyMockHelper{ + OnCallMethodName: "Exchange", OnCallMethodArgType: []string{"*dns.Client", "*dns.Msg", "string"}, RetArgList: []interface{}{&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeSuccess}, Answer: []dns.RR{&dns.A{Hdr: dns.RR_Header{Ttl: 100}, A: newIP}}}, 0 * time.Second, nil}, CallTimes: 1, + } + exchangeFailureOpsMockHelper := ovntest.TestifyMockHelper{ + OnCallMethodName: "Exchange", OnCallMethodArgType: []string{"*dns.Client", "*dns.Msg", "string"}, RetArgList: []interface{}{&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeServerFailure}}, 0 * time.Second, nil}, CallTimes: 1, + } + tests := []struct { + desc string + numCalls int + exchangeOpsMockHelper ovntest.TestifyMockHelper + expTTL time.Duration + }{ + { + desc: "when Exchange function returns with Rcode != RcodeSuccess, defaultMinTTL is used", + numCalls: 1, + exchangeOpsMockHelper: exchangeFailureOpsMockHelper, + expTTL: defaultMinTTL, + }, + { + desc: "when Exchange function returns successfully but without Answer, defaultMinTTL is used", + numCalls: 1, + exchangeOpsMockHelper: exchangeSuccessNoAnswerOpsMockHelper, + expTTL: defaultMinTTL, + }, + { + desc: "when TTL returned is 0 by Exchange function, defaultMinTTL is used", + numCalls: 1, + exchangeOpsMockHelper: exchangeSuccessZeroTTLOpsMockHelper, + expTTL: defaultMinTTL, + }, + { + desc: "when TTL returned is 0 by Exchange function 2 times, defaultMinTTL is used", + numCalls: 2, + exchangeOpsMockHelper: exchangeSuccessZeroTTLOpsMockHelper, + expTTL: defaultMinTTL, + }, + { + desc: "when TTL returned is 0 by Exchange function 11 times, defaultMinTTL is used", + numCalls: 11, + exchangeOpsMockHelper: exchangeSuccessZeroTTLOpsMockHelper, + expTTL: defaultMinTTL, + }, + { + desc: "when Exchange function returns with Rcode != RcodeSuccess twice, defaultMinTTL is used", + numCalls: 2, + exchangeOpsMockHelper: exchangeFailureOpsMockHelper, + expTTL: defaultMinTTL, + }, + { + desc: "when Exchange function returns with Rcode != RcodeSuccess 10 times, defaultMinTTL is used", + numCalls: 10, + exchangeOpsMockHelper: exchangeFailureOpsMockHelper, + expTTL: defaultMinTTL, + }, + { + desc: "when Exchange function returns with Rcode != RcodeSuccess 11 times, defaultMinTTL is doubled", + numCalls: 11, + exchangeOpsMockHelper: exchangeFailureOpsMockHelper, + expTTL: 2 * defaultMinTTL, + }, + { + desc: "when Exchange function returns with Rcode != RcodeSuccess 14 times, 16 (2^4) times defaultMinTTL is used", + numCalls: 14, + exchangeOpsMockHelper: exchangeFailureOpsMockHelper, + expTTL: 16 * defaultMinTTL, + }, + { + desc: "when Exchange function returns with Rcode != RcodeSuccess 15 times, defaultMaxTTL is used", + numCalls: 15, + exchangeOpsMockHelper: exchangeFailureOpsMockHelper, + expTTL: defaultMaxTTL, + }, + { + desc: "when TTL returned is non-zero by Exchange function, it is used", + numCalls: 1, + exchangeOpsMockHelper: exchangeSuccessNonZeroTTLOpsMockHelper, + expTTL: 100 * time.Second, + }, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("%d:%s", i, tc.desc), func(t *testing.T) { + mockDNSOps := new(util_mocks.DNSOps) + SetDNSLibOpsMockInst(mockDNSOps) + dnsOpsMockHelper := []ovntest.TestifyMockHelper{fqdnOpsMockHelper, setQuestionOpsMockHelper, tc.exchangeOpsMockHelper} + for index := 0; index < tc.numCalls; index++ { + for _, item := range dnsOpsMockHelper { + call := mockDNSOps.On(item.OnCallMethodName) + for _, arg := range item.OnCallMethodArgType { + call.Arguments = append(call.Arguments, mock.AnythingOfType(arg)) + } + for _, ret := range item.RetArgList { + call.ReturnArguments = append(call.ReturnArguments, ret) + } + call.Once() + } + } + dns := DNS{ + dnsMap: make(map[string]dnsValue), + nameservers: []string{"1.1.1.1"}, + } + dns.dnsMap[dnsName] = dnsValue{} + for i := 0; i < tc.numCalls; i++ { + _, _ = dns.updateOne(dnsName) + } + assert.Equal(t, tc.expTTL, dns.dnsMap[dnsName].ttl) + mockDNSOps.AssertExpectations(t) + }) + } +} From ab20d0a283c4509e6b8c8495730764171839cac8 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Thu, 5 Mar 2026 11:07:11 +0800 Subject: [PATCH 5/5] node: fix serviceUpdateNotNeeded nil pointer comparison serviceUpdateNotNeeded() used explicit nil guards before dereferencing InternalTrafficPolicy and AllocateLoadBalancerNodePorts. When both old and new are nil (all non-LoadBalancer services), (nil != nil && ...) evaluates to false, incorrectly indicating an update is needed. Use reflect.DeepEqual on the pointer directly, which handles nil == nil. Signed-off-by: Peng Liu (cherry picked from commit 40caf4c2077e55f30666d73bcc74b858e70aa253) (cherry picked from commit 581bfdeff13113b56a3bd443ffacb88aaf5f8217) (cherry picked from commit 2a5bf31ca99ff81ce9db6dc43906e91f9f632856) --- go-controller/pkg/node/gateway_shared_intf.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/go-controller/pkg/node/gateway_shared_intf.go b/go-controller/pkg/node/gateway_shared_intf.go index 51f747fdfb..98738f2daf 100644 --- a/go-controller/pkg/node/gateway_shared_intf.go +++ b/go-controller/pkg/node/gateway_shared_intf.go @@ -804,10 +804,8 @@ func serviceUpdateNotNeeded(old, new *corev1.Service) bool { reflect.DeepEqual(new.Spec.Type, old.Spec.Type) && reflect.DeepEqual(new.Status.LoadBalancer.Ingress, old.Status.LoadBalancer.Ingress) && reflect.DeepEqual(new.Spec.ExternalTrafficPolicy, old.Spec.ExternalTrafficPolicy) && - (new.Spec.InternalTrafficPolicy != nil && old.Spec.InternalTrafficPolicy != nil && - reflect.DeepEqual(*new.Spec.InternalTrafficPolicy, *old.Spec.InternalTrafficPolicy)) && - (new.Spec.AllocateLoadBalancerNodePorts != nil && old.Spec.AllocateLoadBalancerNodePorts != nil && - reflect.DeepEqual(*new.Spec.AllocateLoadBalancerNodePorts, *old.Spec.AllocateLoadBalancerNodePorts)) + reflect.DeepEqual(new.Spec.InternalTrafficPolicy, old.Spec.InternalTrafficPolicy) && + reflect.DeepEqual(new.Spec.AllocateLoadBalancerNodePorts, old.Spec.AllocateLoadBalancerNodePorts) } // AddService handles configuring shared gateway bridge flows to steer External IP, Node Port, Ingress LB traffic into OVN