Skip to content

Commit

Permalink
Fix route deletion for Service ClusterIP and LoadBalancerIP
Browse files Browse the repository at this point in the history
When proxyAll is enabled, AntreaProxy needs to install routes in the
host network namespace to redirect traffic to OVS for load balancing.
For a Service with multiple ports, multiple ServicePorts are generated
and processed. The previous code installed the route for a ClusterIP or
a LoadBalancerIP multiple times when such a Service was created, and
uninstalled the route multiple times when it was deleted, leading to a
few problems.

This patch adds a serviceIPRouteReferences which tracks the references
of Service IPs' routes. The key is the Service IP and the value is the
the set of ServiceInfo strings. With the references, we install a route
exactly once as long as it's used by any ServicePorts and uninstall it
exactly once when it's no longer used by any ServicePorts.

This patch also fixes an issue that the route for ClusterIP was not
removed on Windows Nodes after the Service was removed.

Fixes #4361

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn authored and luolanzone committed Mar 24, 2023
1 parent 66be428 commit eea9bb8
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 122 deletions.
112 changes: 82 additions & 30 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
k8sapitypes "k8s.io/apimachinery/pkg/types"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -105,6 +105,13 @@ type proxier struct {
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

// serviceIPRouteReferences tracks the references of Service IP routes. The key is the Service IP and the value is
// the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a
// ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts.
// With the references, we install a route exactly once as long as it's used by any ServicePorts and uninstall it
// exactly once when it's no longer used by any ServicePorts.
// It applies to ClusterIP and LoadBalancerIP.
serviceIPRouteReferences map[string]sets.String
// syncedOnce returns true if the proxier has synced rules at least once.
syncedOnce bool
syncedOnceMutex sync.RWMutex
Expand Down Expand Up @@ -145,28 +152,35 @@ func (p *proxier) removeStaleServices() {
continue
}
svcInfo := svcPort.(*types.ServiceInfo)
klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String())
svcInfoStr := svcInfo.String()
klog.V(2).InfoS("Removing stale Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr)
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr)
continue
}

// Remove NodePort and ClusterIP flows and configurations.
if p.proxyAll {
// Remove NodePort flows and configurations.
if svcInfo.NodePort() > 0 {
if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName)
klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr)
continue
}
}
if svcInfo.ClusterIP() != nil {
if err := p.deleteRouteForServiceIP(svcInfoStr, svcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil {
klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName)
continue
}
}
}
// Remove LoadBalancer flows and configurations.
if p.proxyLoadBalancerIPs && len(svcInfo.LoadBalancerIPStrings()) > 0 {
if err := p.uninstallLoadBalancerService(svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName)
if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr)
continue
}
}
Expand Down Expand Up @@ -314,43 +328,80 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot
return nil
}

func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error {
func (p *proxier) installLoadBalancerService(svcInfoStr string, groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error {
for _, ingress := range loadBalancerIPStrings {
if ingress != "" {
if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer); err != nil {
return fmt.Errorf("failed to install Service LoadBalancer load balancing flows: %w", err)
ip := net.ParseIP(ingress)
if err := p.ofClient.InstallServiceFlows(groupID, ip, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer); err != nil {
return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err)
}
if p.proxyAll {
if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddLoadBalancer); err != nil {
return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err)
}
}
}
}
if p.proxyAll {
if err := p.routeClient.AddLoadBalancer(loadBalancerIPStrings); err != nil {
return fmt.Errorf("failed to install Service LoadBalancer traffic redirecting flows: %w", err)
return nil
}

func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn func(net.IP) error) error {
ipStr := ip.String()
references, exists := p.serviceIPRouteReferences[ipStr]
// If the IP was not referenced by any Service port, install a route for it.
// Otherwise, just reference it.
if !exists {
if err := addRouteFn(ip); err != nil {
return err
}
references = sets.NewString(svcInfoStr)
p.serviceIPRouteReferences[ipStr] = references
} else {
references.Insert(svcInfoStr)
}

return nil
}

func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error {
func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error {
for _, ingress := range loadBalancerIPStrings {
if ingress != "" {
if err := p.ofClient.UninstallServiceFlows(net.ParseIP(ingress), svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove Service LoadBalancer load balancing flows: %w", err)
ip := net.ParseIP(ingress)
if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err)
}
if p.proxyAll {
if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteLoadBalancer); err != nil {
return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err)
}
}
}
}
if p.proxyAll {
if err := p.routeClient.DeleteLoadBalancer(loadBalancerIPStrings); err != nil {
return fmt.Errorf("failed to remove Service LoadBalancer traffic redirecting flows: %w", err)
return nil
}

func (p *proxier) deleteRouteForServiceIP(svcInfoStr string, ip net.IP, deleteRouteFn func(net.IP) error) error {
ipStr := ip.String()
references, exists := p.serviceIPRouteReferences[ipStr]
// If the IP was not referenced by this Service port, skip it.
if exists && references.Has(svcInfoStr) {
// Delete the IP only if this Service port is the last one referencing it.
// Otherwise, just dereference it.
if references.Len() == 1 {
if err := deleteRouteFn(ip); err != nil {
return err
}
delete(p.serviceIPRouteReferences, ipStr)
} else {
references.Delete(svcInfoStr)
}
}

return nil
}

func (p *proxier) installServices() {
for svcPortName, svcPort := range p.serviceMap {
svcInfo := svcPort.(*types.ServiceInfo)
svcInfoStr := svcInfo.String()
endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName]
if !ok {
endpointsInstalled = map[string]k8sproxy.Endpoint{}
Expand Down Expand Up @@ -591,9 +642,9 @@ func (p *proxier) installServices() {
}
}
// If previous Service which has ClusterIP should be removed, remove ClusterIP routes.
if svcInfo.ClusterIP() != nil {
if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil {
klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName)
if pSvcInfo.ClusterIP() != nil {
if err := p.deleteRouteForServiceIP(pSvcInfo.String(), pSvcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil {
klog.ErrorS(err, "Error when uninstalling ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr)
continue
}
}
Expand All @@ -613,8 +664,8 @@ func (p *proxier) installServices() {
// is created, the routing target IP block will be recalculated for expansion to be able to route the new
// created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR
// can be finally calculated after creating enough ClusterIPs.
if err := p.routeClient.AddClusterIPRoute(svcInfo.ClusterIP()); err != nil {
klog.ErrorS(err, "Failed to install ClusterIP route of Service", "Service", svcPortName)
if err := p.addRouteForServiceIP(svcInfo.String(), svcInfo.ClusterIP(), p.routeClient.AddClusterIPRoute); err != nil {
klog.ErrorS(err, "Error when installing ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr)
continue
}

Expand All @@ -641,14 +692,14 @@ func (p *proxier) installServices() {
}
// Remove LoadBalancer flows and configurations.
if len(toDelete) > 0 {
if err := p.uninstallLoadBalancerService(toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName)
if err := p.uninstallLoadBalancerService(pSvcInfo.String(), toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil {
klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr)
continue
}
}
// Install LoadBalancer flows and configurations.
if len(toAdd) > 0 {
if err := p.installLoadBalancerService(nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.NodeLocalExternal()); err != nil {
if err := p.installLoadBalancerService(svcInfo.String(), nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.NodeLocalExternal()); err != nil {
klog.ErrorS(err, "Failed to install LoadBalancer flows and configurations of Service", "Service", svcPortName)
continue
}
Expand Down Expand Up @@ -838,7 +889,7 @@ func (p *proxier) GetProxyProvider() k8sproxy.Provider {
}

func (p *proxier) GetServiceFlowKeys(serviceName, namespace string) ([]string, []binding.GroupIDType, bool) {
namespacedName := k8sapitypes.NamespacedName{Namespace: namespace, Name: serviceName}
namespacedName := apimachinerytypes.NamespacedName{Namespace: namespace, Name: serviceName}
p.serviceEndpointsMapsMutex.Lock()
defer p.serviceEndpointsMapsMutex.Unlock()

Expand Down Expand Up @@ -914,6 +965,7 @@ func NewProxier(
endpointsInstalledMap: types.EndpointsMap{},
endpointsMap: types.EndpointsMap{},
endpointReferenceCounter: map[string]int{},
serviceIPRouteReferences: map[string]sets.String{},
serviceStringMap: map[string]k8sproxy.ServicePortName{},
oversizeServiceSet: sets.NewString(),
groupCounter: groupCounter,
Expand Down
5 changes: 4 additions & 1 deletion pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/testutil"

Expand Down Expand Up @@ -145,6 +146,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP
serviceInstalledMap: k8sproxy.ServiceMap{},
endpointsInstalledMap: types.EndpointsMap{},
endpointReferenceCounter: map[string]int{},
serviceIPRouteReferences: map[string]sets.String{},
endpointsMap: types.EndpointsMap{},
groupCounter: types.NewGroupCounter(groupIDAllocator, make(chan string, 100)),
ofClient: ofClient,
Expand Down Expand Up @@ -351,7 +353,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
}
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
if proxyLoadBalancerIPs {
mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1)
mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1)
}
mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1)

Expand Down Expand Up @@ -749,6 +751,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)
mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1)
fp.syncProxyRules()

fp.serviceChanges.OnServiceUpdate(service, nil)
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ type Interface interface {
// ClusterIP Service traffic from host network.
DeleteClusterIPRoute(svcIP net.IP) error

// AddLoadBalancer adds configurations when a LoadBalancer Service is created.
AddLoadBalancer(externalIPs []string) error
// AddLoadBalancer adds configurations when a LoadBalancer IP is added.
AddLoadBalancer(externalIP net.IP) error

// DeleteLoadBalancer deletes related configurations when a LoadBalancer Service is deleted.
DeleteLoadBalancer(externalIPs []string) error
// DeleteLoadBalancer deletes related configurations when a LoadBalancer IP is deleted.
DeleteLoadBalancer(externalIP net.IP) error

// Run starts the sync loop.
Run(stopCh <-chan struct{})
Expand Down
32 changes: 4 additions & 28 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,11 +1357,10 @@ func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error {
return nil
}

// addLoadBalancerIngressIPRoute is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea
// AddLoadBalancer is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea
// gateway on host.
func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error {
func (c *Client) AddLoadBalancer(svcIP net.IP) error {
linkIndex := c.nodeConfig.GatewayConfig.LinkIndex
svcIP := net.ParseIP(svcIPStr)
isIPv6 := utilnet.IsIPv6(svcIP)
var gw net.IP
var mask int
Expand All @@ -1383,11 +1382,10 @@ func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error {
return nil
}

// deleteLoadBalancerIngressIPRoute is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea
// DeleteLoadBalancer is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea
// gateway on host.
func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error {
func (c *Client) DeleteLoadBalancer(svcIP net.IP) error {
linkIndex := c.nodeConfig.GatewayConfig.LinkIndex
svcIP := net.ParseIP(svcIPStr)
isIPv6 := utilnet.IsIPv6(svcIP)
var gw net.IP
var mask int
Expand All @@ -1413,28 +1411,6 @@ func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error {
return nil
}

// AddLoadBalancer is used to add routing entries when a LoadBalancer Service is added.
func (c *Client) AddLoadBalancer(externalIPs []string) error {
for _, svcIPStr := range externalIPs {
if err := c.addLoadBalancerIngressIPRoute(svcIPStr); err != nil {
return err
}
}

return nil
}

// DeleteLoadBalancer is used to delete routing entries when a LoadBalancer Service is deleted.
func (c *Client) DeleteLoadBalancer(externalIPs []string) error {
for _, svcIPStr := range externalIPs {
if err := c.deleteLoadBalancerIngressIPRoute(svcIPStr); err != nil {
return err
}
}

return nil
}

// AddLocalAntreaFlexibleIPAMPodRule is used to add IP to target ip set when an AntreaFlexibleIPAM Pod is added. An entry is added
// for every Pod IP.
func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error {
Expand Down
Loading

0 comments on commit eea9bb8

Please sign in to comment.