diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 68445d97898..f496a404148 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -27,6 +27,7 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -104,6 +105,13 @@ type proxier struct { serviceHealthServer healthcheck.ServiceHealthServer numLocalEndpoints map[apimachinerytypes.NamespacedName]int + // serviceIPRouteReferences tracks the references of Service IP routes. The key is the Service IP and the value is + // the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a + // ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts. + // With the references, we install a route exactly once as long as it's used by any ServicePorts and uninstall it + // exactly once when it's no longer used by any ServicePorts. + // It applies to ClusterIP and LoadBalancerIP. + serviceIPRouteReferences map[string]sets.String // syncedOnce returns true if the proxier has synced rules at least once. syncedOnce bool syncedOnceMutex sync.RWMutex @@ -157,16 +165,24 @@ func (p *proxier) removeStaleServices() { } delete(p.endpointsInstalledMap, svcPortName) } - // Remove NodePort flows and configurations. - if p.proxyAll && svcInfo.NodePort() > 0 { - if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) - continue + // Remove NodePort and ClusterIP flows and configurations. + if p.proxyAll { + if svcInfo.NodePort() > 0 { + if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { + klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) + continue + } + } + if svcInfo.ClusterIP() != nil { + if err := p.deleteRouteForServiceIP(svcInfoStr, svcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { + klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) + continue + } } } // Remove LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs && len(svcInfo.LoadBalancerIPStrings()) > 0 { - if err := p.uninstallLoadBalancerService(svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { + if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -313,37 +329,73 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot return nil } -func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { +func (p *proxier) installLoadBalancerService(svcInfoStr string, groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { - if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, false); err != nil { + ip := net.ParseIP(ingress) + if err := p.ofClient.InstallServiceFlows(groupID, ip, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, false); err != nil { return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) } + if p.proxyAll { + if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddLoadBalancer); err != nil { + return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) + } + } } } - if p.proxyAll { - if err := p.routeClient.AddLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) + return nil +} + +func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn func(net.IP) error) error { + ipStr := ip.String() + references, exists := p.serviceIPRouteReferences[ipStr] + // If the IP was not referenced by any Service port, install a route for it. + // Otherwise, just reference it. + if !exists { + if err := addRouteFn(ip); err != nil { + return err } + references = sets.NewString(svcInfoStr) + p.serviceIPRouteReferences[ipStr] = references + } else { + references.Insert(svcInfoStr) } - return nil } -func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { +func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { - if err := p.ofClient.UninstallServiceFlows(net.ParseIP(ingress), svcPort, protocol); err != nil { + ip := net.ParseIP(ingress) + if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err) } + if p.proxyAll { + if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteLoadBalancer); err != nil { + return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) + } + } } } - if p.proxyAll { - if err := p.routeClient.DeleteLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) + return nil +} + +func (p *proxier) deleteRouteForServiceIP(svcInfoStr string, ip net.IP, deleteRouteFn func(net.IP) error) error { + ipStr := ip.String() + references, exists := p.serviceIPRouteReferences[ipStr] + // If the IP was not referenced by this Service port, skip it. + if exists && references.Has(svcInfoStr) { + // Delete the IP only if this Service port is the last one referencing it. + // Otherwise, just dereference it. + if references.Len() == 1 { + if err := deleteRouteFn(ip); err != nil { + return err + } + delete(p.serviceIPRouteReferences, ipStr) + } else { + references.Delete(svcInfoStr) } } - return nil } @@ -548,8 +600,8 @@ func (p *proxier) installServices() { } } // If previous Service which has ClusterIP should be removed, remove ClusterIP routes. - if svcInfo.ClusterIP() != nil { - if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil { + if pSvcInfo.ClusterIP() != nil { + if err := p.deleteRouteForServiceIP(pSvcInfo.String(), pSvcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { klog.ErrorS(err, "Error when uninstalling ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -580,7 +632,7 @@ func (p *proxier) installServices() { // is created, the routing target IP block will be recalculated for expansion to be able to route the new // created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR // can be finally calculated after creating enough ClusterIPs. - if err := p.routeClient.AddClusterIPRoute(svcInfo.ClusterIP()); err != nil { + if err := p.addRouteForServiceIP(svcInfo.String(), svcInfo.ClusterIP(), p.routeClient.AddClusterIPRoute); err != nil { klog.ErrorS(err, "Error when installing ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -612,7 +664,7 @@ func (p *proxier) installServices() { } // Remove LoadBalancer flows and configurations. if len(toDelete) > 0 { - if err := p.uninstallLoadBalancerService(toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { + if err := p.uninstallLoadBalancerService(pSvcInfo.String(), toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -624,7 +676,7 @@ func (p *proxier) installServices() { klog.ErrorS(nil, "Group for Service externalTrafficPolicy was not installed", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr, "externalTrafficPolicy", externalPolicyLocal) continue } - if err := p.installLoadBalancerService(groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { + if err := p.installLoadBalancerService(svcInfo.String(), groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -1002,6 +1054,7 @@ func NewProxier( endpointsInstalledMap: types.EndpointsMap{}, endpointsMap: types.EndpointsMap{}, endpointReferenceCounter: map[string]int{}, + serviceIPRouteReferences: map[string]sets.String{}, nodeLabels: map[string]string{}, serviceStringMap: map[string]k8sproxy.ServicePortName{}, groupCounter: groupCounter, diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 8dec1d4f06a..8a8a5ab874b 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -28,10 +28,12 @@ import ( discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" + "k8s.io/utils/pointer" mccommon "antrea.io/antrea/multicluster/controllers/multicluster/common" agentconfig "antrea.io/antrea/pkg/agent/config" @@ -531,7 +533,7 @@ func testLoadBalancerAdd(t *testing.T, } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if proxyLoadBalancerIPs { - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) @@ -760,6 +762,163 @@ func TestLoadBalancerAdd(t *testing.T) { }) } +func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator(false) + nodePortAddresses := []net.IP{net.ParseIP("0.0.0.0")} + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, false, withProxyAll, withEndpointSlice) + + port80Str := "port80" + port80Int32 := int32(80) + port443Str := "port443" + port443Int32 := int32(443) + port30001Int32 := int32(30001) + port30002Int32 := int32(30002) + protocolTCP := corev1.ProtocolTCP + endpoint1Address := "192.168.0.11" + endpoint2Address := "192.168.1.11" + endpoint1NodeName := fp.hostname + endpoint2NodeName := "node2" + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: port80Str, + Protocol: protocolTCP, + Port: port80Int32, + TargetPort: intstr.FromInt(int(port80Int32)), + NodePort: port30001Int32, + }, + { + Name: port443Str, + Protocol: protocolTCP, + Port: port443Int32, + TargetPort: intstr.FromInt(int(port443Int32)), + NodePort: port30002Int32, + }, + }, + ClusterIP: svc1IPv4.String(), + ClusterIPs: []string{svc1IPv4.String()}, + Type: corev1.ServiceTypeLoadBalancer, + ExternalTrafficPolicy: corev1.ServiceExternalTrafficPolicyTypeLocal, + HealthCheckNodePort: 40000, + IPFamilies: []corev1.IPFamily{corev1.IPv4Protocol}, + }, + Status: corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{ + {IP: loadBalancerIPv4.String()}, + }}, + }, + } + makeServiceMap(fp, svc) + + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-x5ks2", + Namespace: svc.Namespace, + Labels: map[string]string{ + discovery.LabelServiceName: svc.Name, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + endpoint1Address, + }, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), + }, + NodeName: &endpoint1NodeName, + }, + { + Addresses: []string{ + endpoint2Address, + }, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), + }, + NodeName: &endpoint2NodeName, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &port80Str, + Port: &port80Int32, + Protocol: &protocolTCP, + }, + { + Name: &port443Str, + Port: &port443Int32, + Protocol: &protocolTCP, + }, + }, + } + makeEndpointSliceMap(fp, endpointSlice) + + localEndpointForPort80 := k8sproxy.NewBaseEndpointInfo(endpoint1Address, endpoint1NodeName, "", int(port80Int32), true, true, true, false, nil) + localEndpointForPort443 := k8sproxy.NewBaseEndpointInfo(endpoint1Address, endpoint1NodeName, "", int(port443Int32), true, true, true, false, nil) + remoteEndpointForPort80 := k8sproxy.NewBaseEndpointInfo(endpoint2Address, endpoint2NodeName, "", int(port80Int32), false, true, true, false, nil) + remoteEndpointForPort443 := k8sproxy.NewBaseEndpointInfo(endpoint2Address, endpoint2NodeName, "", int(port443Int32), false, true, true, false, nil) + + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, []k8sproxy.Endpoint{localEndpointForPort80}).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) + // The route for the ClusterIP and the LoadBalancer IP should only be installed once. + mockRouteClient.EXPECT().AddClusterIPRoute(svc1IPv4).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIPv4).Times(1) + + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, []k8sproxy.Endpoint{localEndpointForPort443}).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP).Times(1) + + fp.syncProxyRules() + + // Remove the service. + fp.serviceChanges.OnServiceUpdate(svc, nil) + fp.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, true) + + mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})) + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port80Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + + mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})) + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port443Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + // The route for the ClusterIP and the LoadBalancer IP should only be uninstalled once. + mockRouteClient.EXPECT().DeleteClusterIPRoute(svc1IPv4) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIPv4) + + fp.syncProxyRules() + + assert.Emptyf(t, fp.serviceIPRouteReferences, "serviceIPRouteReferences was not cleaned up after Service was removed") +} + func TestNodePortAdd(t *testing.T) { t.Run("IPv4", func(t *testing.T) { t.Run("Endpoints", func(t *testing.T) { @@ -932,6 +1091,7 @@ func testClusterIPRemove(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6, nodeLo mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) } mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -990,6 +1150,7 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) fp.syncProxyRules() @@ -1048,15 +1209,16 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP net. mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, expectedAllEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1397,12 +1559,12 @@ func testServiceClusterIPUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() @@ -1503,14 +1665,14 @@ func testServicePortUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) s1 = mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) s2 = mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort+1), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) s2.After(s1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1606,12 +1768,12 @@ func testServiceNodePortUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() @@ -1694,7 +1856,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1725,8 +1887,8 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, s2 := mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) s2.After(s1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1880,21 +2042,22 @@ func testServiceIngressIPsUpdate(t *testing.T, } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer(gomock.InAnyOrder(loadBalancerIPStrs)).Times(1) + for _, ip := range loadBalancerIPs { + mockRouteClient.EXPECT().AddLoadBalancer(ip).Times(1) + } toDeleteLoadBalancerIPs := smallSliceDifference(loadBalancerIPStrs, updatedLoadBalancerIPStrs) toAddLoadBalancerIPs := smallSliceDifference(updatedLoadBalancerIPStrs, loadBalancerIPStrs) for _, ipStr := range toDeleteLoadBalancerIPs { mockOFClient.EXPECT().UninstallServiceFlows(net.ParseIP(ipStr), uint16(svcPort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(net.ParseIP(ipStr)).Times(1) } for _, ipStr := range toAddLoadBalancerIPs { mockOFClient.EXPECT().InstallServiceFlows(groupID, net.ParseIP(ipStr), uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(net.ParseIP(ipStr)).Times(1) } - mockRouteClient.EXPECT().DeleteLoadBalancer(gomock.InAnyOrder(toDeleteLoadBalancerIPs)).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer(gomock.InAnyOrder(toAddLoadBalancerIPs)).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1971,7 +2134,6 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(updatedAffinitySeconds), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(affinitySeconds), false, corev1.ServiceTypeNodePort, false).Times(1) @@ -1979,7 +2141,7 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() @@ -2081,12 +2243,12 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, corev1.ServiceTypeLoadBalancer, false).Times(1) - mockRouteClient.EXPECT().DeleteLoadBalancer([]string{loadBalancerIP.String()}).Times(1) - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } fp.syncProxyRules() diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 630c0dc57c8..c55e9f7ae5a 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -65,11 +65,11 @@ type Interface interface { // ClusterIP Service traffic from host network. DeleteClusterIPRoute(svcIP net.IP) error - // AddLoadBalancer adds configurations when a LoadBalancer Service is created. - AddLoadBalancer(externalIPs []string) error + // AddLoadBalancer adds configurations when a LoadBalancer IP is added. + AddLoadBalancer(externalIP net.IP) error - // DeleteLoadBalancer deletes related configurations when a LoadBalancer Service is deleted. - DeleteLoadBalancer(externalIPs []string) error + // DeleteLoadBalancer deletes related configurations when a LoadBalancer IP is deleted. + DeleteLoadBalancer(externalIP net.IP) error // Run starts the sync loop. Run(stopCh <-chan struct{}) diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 8794bc80598..62853fce9de 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -1420,11 +1420,10 @@ func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error { return nil } -// addLoadBalancerIngressIPRoute is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea +// AddLoadBalancer is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea // gateway on host. -func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error { +func (c *Client) AddLoadBalancer(svcIP net.IP) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex - svcIP := net.ParseIP(svcIPStr) isIPv6 := utilnet.IsIPv6(svcIP) var gw net.IP var mask int @@ -1446,11 +1445,10 @@ func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error { return nil } -// deleteLoadBalancerIngressIPRoute is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea +// DeleteLoadBalancer is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea // gateway on host. -func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error { +func (c *Client) DeleteLoadBalancer(svcIP net.IP) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex - svcIP := net.ParseIP(svcIPStr) isIPv6 := utilnet.IsIPv6(svcIP) var gw net.IP var mask int @@ -1476,28 +1474,6 @@ func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error { return nil } -// AddLoadBalancer is used to add routing entries when a LoadBalancer Service is added. -func (c *Client) AddLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.addLoadBalancerIngressIPRoute(svcIPStr); err != nil { - return err - } - } - - return nil -} - -// DeleteLoadBalancer is used to delete routing entries when a LoadBalancer Service is deleted. -func (c *Client) DeleteLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.deleteLoadBalancerIngressIPRoute(svcIPStr); err != nil { - return err - } - } - - return nil -} - // AddLocalAntreaFlexibleIPAMPodRule is used to add IP to target ip set when an AntreaFlexibleIPAM Pod is added. An entry is added // for every Pod IP. func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index da9d7f7abc3..831fcce1214 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -1418,12 +1418,12 @@ func TestAddLoadBalancer(t *testing.T) { nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} tests := []struct { name string - externalIPs []string + externalIP string expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) }{ { - name: "IPv4", - externalIPs: []string{"1.1.1.1", "1.1.1.2"}, + name: "IPv4", + externalIP: "1.1.1.1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteReplace(&netlink.Route{ Dst: &net.IPNet{ @@ -1434,20 +1434,11 @@ func TestAddLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteReplace(&netlink.Route{ - Dst: &net.IPNet{ - IP: net.ParseIP("1.1.1.2"), - Mask: net.CIDRMask(32, 32), - }, - Gw: config.VirtualServiceIPv4, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, { - name: "IPv6", - externalIPs: []string{"fd00:1234:5678:dead:beaf::1", "fd00:1234:5678:dead:beaf::a"}, + name: "IPv6", + externalIP: "fd00:1234:5678:dead:beaf::1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteReplace(&netlink.Route{ Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, @@ -1455,12 +1446,6 @@ func TestAddLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteReplace(&netlink.Route{ - Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::a"), Mask: net.CIDRMask(128, 128)}, - Gw: config.VirtualServiceIPv6, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, } @@ -1475,7 +1460,7 @@ func TestAddLoadBalancer(t *testing.T) { } tt.expectedCalls(mockNetlink.EXPECT()) - assert.NoError(t, c.AddLoadBalancer(tt.externalIPs)) + assert.NoError(t, c.AddLoadBalancer(net.ParseIP(tt.externalIP))) }) } } @@ -1484,12 +1469,12 @@ func TestDeleteLoadBalancer(t *testing.T) { nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} tests := []struct { name string - externalIPs []string + externalIP string expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) }{ { - name: "IPv4", - externalIPs: []string{"1.1.1.1", "1.1.1.2"}, + name: "IPv4", + externalIP: "1.1.1.1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteDel(&netlink.Route{ Dst: &net.IPNet{ @@ -1500,20 +1485,11 @@ func TestDeleteLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteDel(&netlink.Route{ - Dst: &net.IPNet{ - IP: net.ParseIP("1.1.1.2"), - Mask: net.CIDRMask(32, 32), - }, - Gw: config.VirtualServiceIPv4, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, { - name: "IPv6", - externalIPs: []string{"fd00:1234:5678:dead:beaf::1", "fd00:1234:5678:dead:beaf::a"}, + name: "IPv6", + externalIP: "fd00:1234:5678:dead:beaf::1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteDel(&netlink.Route{ Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, @@ -1521,12 +1497,6 @@ func TestDeleteLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteDel(&netlink.Route{ - Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::a"), Mask: net.CIDRMask(128, 128)}, - Gw: config.VirtualServiceIPv6, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, } @@ -1541,7 +1511,7 @@ func TestDeleteLoadBalancer(t *testing.T) { } tt.expectedCalls(mockNetlink.EXPECT()) - assert.NoError(t, c.DeleteLoadBalancer(tt.externalIPs)) + assert.NoError(t, c.DeleteLoadBalancer(net.ParseIP(tt.externalIP))) }) } } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 40a1e27dc20..006dd4f7415 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -404,22 +404,12 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco return util.RemoveNetNatStaticMapping(antreaNatNodePort, "0.0.0.0", port, string(protocol)) } -func (c *Client) AddLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.addServiceRoute(net.ParseIP(svcIPStr)); err != nil { - return err - } - } - return nil +func (c *Client) AddLoadBalancer(externalIP net.IP) error { + return c.addServiceRoute(externalIP) } -func (c *Client) DeleteLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.deleteServiceRoute(net.ParseIP(svcIPStr)); err != nil { - return err - } - } - return nil +func (c *Client) DeleteLoadBalancer(externalIP net.IP) error { + return c.deleteServiceRoute(externalIP) } func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 2f23c07ca38..ba8086898f5 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -65,7 +65,7 @@ func (mr *MockInterfaceMockRecorder) AddClusterIPRoute(arg0 interface{}) *gomock } // AddLoadBalancer mocks base method -func (m *MockInterface) AddLoadBalancer(arg0 []string) error { +func (m *MockInterface) AddLoadBalancer(arg0 net.IP) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddLoadBalancer", arg0) ret0, _ := ret[0].(error) @@ -149,7 +149,7 @@ func (mr *MockInterfaceMockRecorder) DeleteClusterIPRoute(arg0 interface{}) *gom } // DeleteLoadBalancer mocks base method -func (m *MockInterface) DeleteLoadBalancer(arg0 []string) error { +func (m *MockInterface) DeleteLoadBalancer(arg0 net.IP) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteLoadBalancer", arg0) ret0, _ := ret[0].(error)