From cb963ffb9a04742acd1892ce8492a63ad4eae148 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 16 Mar 2023 00:37:55 +0800 Subject: [PATCH] Fix route deletion for Service ClusterIP and LoadBalancerIP When proxyAll is enabled, AntreaProxy needs to install routes in the host network namespace to redirect traffic to OVS for load balancing. For a Service with multiple ports, multiple ServicePorts are generated and processed. The previous code installed the route for a ClusterIP or a LoadBalancerIP multiple times when such a Service was created, and uninstalled the route multiple times when it was deleted, leading to a few problems. This patch adds a serviceIPRouteReferences which tracks the references of Service IPs' routes. The key is the Service IP and the value is the the set of ServiceInfo strings. With the references, we install a route exactly once as long as it's used by any ServicePorts and uninstall it exactly once when it's no longer used by any ServicePorts. This patch also fixes an issue that the route for ClusterIP was not removed on Windows Nodes after the Service was removed. Fixes #4361 Signed-off-by: Quan Tian --- pkg/agent/proxy/proxier.go | 99 +++++++++--- pkg/agent/proxy/proxier_test.go | 210 +++++++++++++++++++++++--- pkg/agent/route/interfaces.go | 8 +- pkg/agent/route/route_linux.go | 32 +--- pkg/agent/route/route_linux_test.go | 54 ++----- pkg/agent/route/route_windows.go | 18 +-- pkg/agent/route/testing/mock_route.go | 6 +- 7 files changed, 289 insertions(+), 138 deletions(-) diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 68445d97898..f496a404148 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -27,6 +27,7 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -104,6 +105,13 @@ type proxier struct { serviceHealthServer healthcheck.ServiceHealthServer numLocalEndpoints map[apimachinerytypes.NamespacedName]int + // serviceIPRouteReferences tracks the references of Service IP routes. The key is the Service IP and the value is + // the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a + // ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts. + // With the references, we install a route exactly once as long as it's used by any ServicePorts and uninstall it + // exactly once when it's no longer used by any ServicePorts. + // It applies to ClusterIP and LoadBalancerIP. + serviceIPRouteReferences map[string]sets.String // syncedOnce returns true if the proxier has synced rules at least once. syncedOnce bool syncedOnceMutex sync.RWMutex @@ -157,16 +165,24 @@ func (p *proxier) removeStaleServices() { } delete(p.endpointsInstalledMap, svcPortName) } - // Remove NodePort flows and configurations. - if p.proxyAll && svcInfo.NodePort() > 0 { - if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) - continue + // Remove NodePort and ClusterIP flows and configurations. + if p.proxyAll { + if svcInfo.NodePort() > 0 { + if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { + klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) + continue + } + } + if svcInfo.ClusterIP() != nil { + if err := p.deleteRouteForServiceIP(svcInfoStr, svcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { + klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) + continue + } } } // Remove LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs && len(svcInfo.LoadBalancerIPStrings()) > 0 { - if err := p.uninstallLoadBalancerService(svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { + if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -313,37 +329,73 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot return nil } -func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { +func (p *proxier) installLoadBalancerService(svcInfoStr string, groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { - if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, false); err != nil { + ip := net.ParseIP(ingress) + if err := p.ofClient.InstallServiceFlows(groupID, ip, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, false); err != nil { return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) } + if p.proxyAll { + if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddLoadBalancer); err != nil { + return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) + } + } } } - if p.proxyAll { - if err := p.routeClient.AddLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) + return nil +} + +func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn func(net.IP) error) error { + ipStr := ip.String() + references, exists := p.serviceIPRouteReferences[ipStr] + // If the IP was not referenced by any Service port, install a route for it. + // Otherwise, just reference it. + if !exists { + if err := addRouteFn(ip); err != nil { + return err } + references = sets.NewString(svcInfoStr) + p.serviceIPRouteReferences[ipStr] = references + } else { + references.Insert(svcInfoStr) } - return nil } -func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { +func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { - if err := p.ofClient.UninstallServiceFlows(net.ParseIP(ingress), svcPort, protocol); err != nil { + ip := net.ParseIP(ingress) + if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err) } + if p.proxyAll { + if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteLoadBalancer); err != nil { + return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) + } + } } } - if p.proxyAll { - if err := p.routeClient.DeleteLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) + return nil +} + +func (p *proxier) deleteRouteForServiceIP(svcInfoStr string, ip net.IP, deleteRouteFn func(net.IP) error) error { + ipStr := ip.String() + references, exists := p.serviceIPRouteReferences[ipStr] + // If the IP was not referenced by this Service port, skip it. + if exists && references.Has(svcInfoStr) { + // Delete the IP only if this Service port is the last one referencing it. + // Otherwise, just dereference it. + if references.Len() == 1 { + if err := deleteRouteFn(ip); err != nil { + return err + } + delete(p.serviceIPRouteReferences, ipStr) + } else { + references.Delete(svcInfoStr) } } - return nil } @@ -548,8 +600,8 @@ func (p *proxier) installServices() { } } // If previous Service which has ClusterIP should be removed, remove ClusterIP routes. - if svcInfo.ClusterIP() != nil { - if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil { + if pSvcInfo.ClusterIP() != nil { + if err := p.deleteRouteForServiceIP(pSvcInfo.String(), pSvcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { klog.ErrorS(err, "Error when uninstalling ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -580,7 +632,7 @@ func (p *proxier) installServices() { // is created, the routing target IP block will be recalculated for expansion to be able to route the new // created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR // can be finally calculated after creating enough ClusterIPs. - if err := p.routeClient.AddClusterIPRoute(svcInfo.ClusterIP()); err != nil { + if err := p.addRouteForServiceIP(svcInfo.String(), svcInfo.ClusterIP(), p.routeClient.AddClusterIPRoute); err != nil { klog.ErrorS(err, "Error when installing ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -612,7 +664,7 @@ func (p *proxier) installServices() { } // Remove LoadBalancer flows and configurations. if len(toDelete) > 0 { - if err := p.uninstallLoadBalancerService(toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { + if err := p.uninstallLoadBalancerService(pSvcInfo.String(), toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -624,7 +676,7 @@ func (p *proxier) installServices() { klog.ErrorS(nil, "Group for Service externalTrafficPolicy was not installed", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr, "externalTrafficPolicy", externalPolicyLocal) continue } - if err := p.installLoadBalancerService(groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { + if err := p.installLoadBalancerService(svcInfo.String(), groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -1002,6 +1054,7 @@ func NewProxier( endpointsInstalledMap: types.EndpointsMap{}, endpointsMap: types.EndpointsMap{}, endpointReferenceCounter: map[string]int{}, + serviceIPRouteReferences: map[string]sets.String{}, nodeLabels: map[string]string{}, serviceStringMap: map[string]k8sproxy.ServicePortName{}, groupCounter: groupCounter, diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 8dec1d4f06a..8a8a5ab874b 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -28,10 +28,12 @@ import ( discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" + "k8s.io/utils/pointer" mccommon "antrea.io/antrea/multicluster/controllers/multicluster/common" agentconfig "antrea.io/antrea/pkg/agent/config" @@ -531,7 +533,7 @@ func testLoadBalancerAdd(t *testing.T, } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if proxyLoadBalancerIPs { - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) @@ -760,6 +762,163 @@ func TestLoadBalancerAdd(t *testing.T) { }) } +func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator(false) + nodePortAddresses := []net.IP{net.ParseIP("0.0.0.0")} + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, false, withProxyAll, withEndpointSlice) + + port80Str := "port80" + port80Int32 := int32(80) + port443Str := "port443" + port443Int32 := int32(443) + port30001Int32 := int32(30001) + port30002Int32 := int32(30002) + protocolTCP := corev1.ProtocolTCP + endpoint1Address := "192.168.0.11" + endpoint2Address := "192.168.1.11" + endpoint1NodeName := fp.hostname + endpoint2NodeName := "node2" + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: port80Str, + Protocol: protocolTCP, + Port: port80Int32, + TargetPort: intstr.FromInt(int(port80Int32)), + NodePort: port30001Int32, + }, + { + Name: port443Str, + Protocol: protocolTCP, + Port: port443Int32, + TargetPort: intstr.FromInt(int(port443Int32)), + NodePort: port30002Int32, + }, + }, + ClusterIP: svc1IPv4.String(), + ClusterIPs: []string{svc1IPv4.String()}, + Type: corev1.ServiceTypeLoadBalancer, + ExternalTrafficPolicy: corev1.ServiceExternalTrafficPolicyTypeLocal, + HealthCheckNodePort: 40000, + IPFamilies: []corev1.IPFamily{corev1.IPv4Protocol}, + }, + Status: corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{ + {IP: loadBalancerIPv4.String()}, + }}, + }, + } + makeServiceMap(fp, svc) + + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-x5ks2", + Namespace: svc.Namespace, + Labels: map[string]string{ + discovery.LabelServiceName: svc.Name, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + endpoint1Address, + }, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), + }, + NodeName: &endpoint1NodeName, + }, + { + Addresses: []string{ + endpoint2Address, + }, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), + }, + NodeName: &endpoint2NodeName, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &port80Str, + Port: &port80Int32, + Protocol: &protocolTCP, + }, + { + Name: &port443Str, + Port: &port443Int32, + Protocol: &protocolTCP, + }, + }, + } + makeEndpointSliceMap(fp, endpointSlice) + + localEndpointForPort80 := k8sproxy.NewBaseEndpointInfo(endpoint1Address, endpoint1NodeName, "", int(port80Int32), true, true, true, false, nil) + localEndpointForPort443 := k8sproxy.NewBaseEndpointInfo(endpoint1Address, endpoint1NodeName, "", int(port443Int32), true, true, true, false, nil) + remoteEndpointForPort80 := k8sproxy.NewBaseEndpointInfo(endpoint2Address, endpoint2NodeName, "", int(port80Int32), false, true, true, false, nil) + remoteEndpointForPort443 := k8sproxy.NewBaseEndpointInfo(endpoint2Address, endpoint2NodeName, "", int(port443Int32), false, true, true, false, nil) + + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, []k8sproxy.Endpoint{localEndpointForPort80}).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) + // The route for the ClusterIP and the LoadBalancer IP should only be installed once. + mockRouteClient.EXPECT().AddClusterIPRoute(svc1IPv4).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIPv4).Times(1) + + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, []k8sproxy.Endpoint{localEndpointForPort443}).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP).Times(1) + + fp.syncProxyRules() + + // Remove the service. + fp.serviceChanges.OnServiceUpdate(svc, nil) + fp.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, true) + + mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})) + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port80Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + + mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})) + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port443Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + // The route for the ClusterIP and the LoadBalancer IP should only be uninstalled once. + mockRouteClient.EXPECT().DeleteClusterIPRoute(svc1IPv4) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIPv4) + + fp.syncProxyRules() + + assert.Emptyf(t, fp.serviceIPRouteReferences, "serviceIPRouteReferences was not cleaned up after Service was removed") +} + func TestNodePortAdd(t *testing.T) { t.Run("IPv4", func(t *testing.T) { t.Run("Endpoints", func(t *testing.T) { @@ -932,6 +1091,7 @@ func testClusterIPRemove(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6, nodeLo mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) } mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -990,6 +1150,7 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) fp.syncProxyRules() @@ -1048,15 +1209,16 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedAllEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1397,12 +1559,12 @@ func testServiceClusterIPUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() @@ -1503,14 +1665,14 @@ func testServicePortUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) s1 = mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) s2 = mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort+1), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) s2.After(s1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1606,12 +1768,12 @@ func testServiceNodePortUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() @@ -1694,7 +1856,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1725,8 +1887,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, s2 := mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) s2.After(s1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1880,21 +2042,22 @@ func testServiceIngressIPsUpdate(t *testing.T, } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer(gomock.InAnyOrder(loadBalancerIPStrs)).Times(1) + for _, ip := range loadBalancerIPs { + mockRouteClient.EXPECT().AddLoadBalancer(ip).Times(1) + } toDeleteLoadBalancerIPs := smallSliceDifference(loadBalancerIPStrs, updatedLoadBalancerIPStrs) toAddLoadBalancerIPs := smallSliceDifference(updatedLoadBalancerIPStrs, loadBalancerIPStrs) for _, ipStr := range toDeleteLoadBalancerIPs { mockOFClient.EXPECT().UninstallServiceFlows(net.ParseIP(ipStr), uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(net.ParseIP(ipStr)).Times(1) } for _, ipStr := range toAddLoadBalancerIPs { mockOFClient.EXPECT().InstallServiceFlows(groupID, net.ParseIP(ipStr), uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(net.ParseIP(ipStr)).Times(1) } - mockRouteClient.EXPECT().DeleteLoadBalancer(gomock.InAnyOrder(toDeleteLoadBalancerIPs)).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer(gomock.InAnyOrder(toAddLoadBalancerIPs)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1971,7 +2134,6 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(updatedAffinitySeconds), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(affinitySeconds), false, corev1.ServiceTypeNodePort, false).Times(1) @@ -1979,7 +2141,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() @@ -2081,12 +2243,12 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 630c0dc57c8..c55e9f7ae5a 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -65,11 +65,11 @@ type Interface interface { // ClusterIP Service traffic from host network. DeleteClusterIPRoute(svcIP net.IP) error - // AddLoadBalancer adds configurations when a LoadBalancer Service is created. - AddLoadBalancer(externalIPs []string) error + // AddLoadBalancer adds configurations when a LoadBalancer IP is added. + AddLoadBalancer(externalIP net.IP) error - // DeleteLoadBalancer deletes related configurations when a LoadBalancer Service is deleted. - DeleteLoadBalancer(externalIPs []string) error + // DeleteLoadBalancer deletes related configurations when a LoadBalancer IP is deleted. + DeleteLoadBalancer(externalIP net.IP) error // Run starts the sync loop. Run(stopCh <-chan struct{}) diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 8794bc80598..62853fce9de 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -1420,11 +1420,10 @@ func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error { return nil } -// addLoadBalancerIngressIPRoute is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea +// AddLoadBalancer is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea // gateway on host. -func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error { +func (c *Client) AddLoadBalancer(svcIP net.IP) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex - svcIP := net.ParseIP(svcIPStr) isIPv6 := utilnet.IsIPv6(svcIP) var gw net.IP var mask int @@ -1446,11 +1445,10 @@ func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error { return nil } -// deleteLoadBalancerIngressIPRoute is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea +// DeleteLoadBalancer is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea // gateway on host. -func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error { +func (c *Client) DeleteLoadBalancer(svcIP net.IP) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex - svcIP := net.ParseIP(svcIPStr) isIPv6 := utilnet.IsIPv6(svcIP) var gw net.IP var mask int @@ -1476,28 +1474,6 @@ func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error { return nil } -// AddLoadBalancer is used to add routing entries when a LoadBalancer Service is added. -func (c *Client) AddLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.addLoadBalancerIngressIPRoute(svcIPStr); err != nil { - return err - } - } - - return nil -} - -// DeleteLoadBalancer is used to delete routing entries when a LoadBalancer Service is deleted. -func (c *Client) DeleteLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.deleteLoadBalancerIngressIPRoute(svcIPStr); err != nil { - return err - } - } - - return nil -} - // AddLocalAntreaFlexibleIPAMPodRule is used to add IP to target ip set when an AntreaFlexibleIPAM Pod is added. An entry is added // for every Pod IP. func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index da9d7f7abc3..831fcce1214 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -1418,12 +1418,12 @@ func TestAddLoadBalancer(t *testing.T) { nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} tests := []struct { name string - externalIPs []string + externalIP string expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) }{ { - name: "IPv4", - externalIPs: []string{"1.1.1.1", "1.1.1.2"}, + name: "IPv4", + externalIP: "1.1.1.1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteReplace(&netlink.Route{ Dst: &net.IPNet{ @@ -1434,20 +1434,11 @@ func TestAddLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteReplace(&netlink.Route{ - Dst: &net.IPNet{ - IP: net.ParseIP("1.1.1.2"), - Mask: net.CIDRMask(32, 32), - }, - Gw: config.VirtualServiceIPv4, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, { - name: "IPv6", - externalIPs: []string{"fd00:1234:5678:dead:beaf::1", "fd00:1234:5678:dead:beaf::a"}, + name: "IPv6", + externalIP: "fd00:1234:5678:dead:beaf::1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteReplace(&netlink.Route{ Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, @@ -1455,12 +1446,6 @@ func TestAddLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteReplace(&netlink.Route{ - Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::a"), Mask: net.CIDRMask(128, 128)}, - Gw: config.VirtualServiceIPv6, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, } @@ -1475,7 +1460,7 @@ func TestAddLoadBalancer(t *testing.T) { } tt.expectedCalls(mockNetlink.EXPECT()) - assert.NoError(t, c.AddLoadBalancer(tt.externalIPs)) + assert.NoError(t, c.AddLoadBalancer(net.ParseIP(tt.externalIP))) }) } } @@ -1484,12 +1469,12 @@ func TestDeleteLoadBalancer(t *testing.T) { nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} tests := []struct { name string - externalIPs []string + externalIP string expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) }{ { - name: "IPv4", - externalIPs: []string{"1.1.1.1", "1.1.1.2"}, + name: "IPv4", + externalIP: "1.1.1.1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteDel(&netlink.Route{ Dst: &net.IPNet{ @@ -1500,20 +1485,11 @@ func TestDeleteLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteDel(&netlink.Route{ - Dst: &net.IPNet{ - IP: net.ParseIP("1.1.1.2"), - Mask: net.CIDRMask(32, 32), - }, - Gw: config.VirtualServiceIPv4, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, { - name: "IPv6", - externalIPs: []string{"fd00:1234:5678:dead:beaf::1", "fd00:1234:5678:dead:beaf::a"}, + name: "IPv6", + externalIP: "fd00:1234:5678:dead:beaf::1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteDel(&netlink.Route{ Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, @@ -1521,12 +1497,6 @@ func TestDeleteLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteDel(&netlink.Route{ - Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::a"), Mask: net.CIDRMask(128, 128)}, - Gw: config.VirtualServiceIPv6, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, } @@ -1541,7 +1511,7 @@ func TestDeleteLoadBalancer(t *testing.T) { } tt.expectedCalls(mockNetlink.EXPECT()) - assert.NoError(t, c.DeleteLoadBalancer(tt.externalIPs)) + assert.NoError(t, c.DeleteLoadBalancer(net.ParseIP(tt.externalIP))) }) } } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 40a1e27dc20..006dd4f7415 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -404,22 +404,12 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco return util.RemoveNetNatStaticMapping(antreaNatNodePort, "0.0.0.0", port, string(protocol)) } -func (c *Client) AddLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.addServiceRoute(net.ParseIP(svcIPStr)); err != nil { - return err - } - } - return nil +func (c *Client) AddLoadBalancer(externalIP net.IP) error { + return c.addServiceRoute(externalIP) } -func (c *Client) DeleteLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.deleteServiceRoute(net.ParseIP(svcIPStr)); err != nil { - return err - } - } - return nil +func (c *Client) DeleteLoadBalancer(externalIP net.IP) error { + return c.deleteServiceRoute(externalIP) } func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 2f23c07ca38..ba8086898f5 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -65,7 +65,7 @@ func (mr *MockInterfaceMockRecorder) AddClusterIPRoute(arg0 interface{}) *gomock } // AddLoadBalancer mocks base method -func (m *MockInterface) AddLoadBalancer(arg0 []string) error { +func (m *MockInterface) AddLoadBalancer(arg0 net.IP) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddLoadBalancer", arg0) ret0, _ := ret[0].(error) @@ -149,7 +149,7 @@ func (mr *MockInterfaceMockRecorder) DeleteClusterIPRoute(arg0 interface{}) *gom } // DeleteLoadBalancer mocks base method -func (m *MockInterface) DeleteLoadBalancer(arg0 []string) error { +func (m *MockInterface) DeleteLoadBalancer(arg0 net.IP) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteLoadBalancer", arg0) ret0, _ := ret[0].(error)