-
Notifications
You must be signed in to change notification settings - Fork 364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix route deletion for Service ClusterIP and LoadBalancerIP #4711
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import ( | |
discovery "k8s.io/api/discovery/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
apimachinerytypes "k8s.io/apimachinery/pkg/types" | ||
"k8s.io/apimachinery/pkg/util/sets" | ||
"k8s.io/client-go/informers" | ||
"k8s.io/client-go/tools/record" | ||
"k8s.io/klog/v2" | ||
|
@@ -104,6 +105,13 @@ type proxier struct { | |
serviceHealthServer healthcheck.ServiceHealthServer | ||
numLocalEndpoints map[apimachinerytypes.NamespacedName]int | ||
|
||
// serviceIPRouteReferences tracks the references of Service IP routes. The key is the Service IP and the value is | ||
// the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a | ||
// ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts. | ||
// With the references, we install a route exactly once as long as it's used by any ServicePorts and uninstall it | ||
// exactly once when it's no longer used by any ServicePorts. | ||
// It applies to ClusterIP and LoadBalancerIP. | ||
serviceIPRouteReferences map[string]sets.String | ||
// syncedOnce returns true if the proxier has synced rules at least once. | ||
syncedOnce bool | ||
syncedOnceMutex sync.RWMutex | ||
|
@@ -157,16 +165,24 @@ func (p *proxier) removeStaleServices() { | |
} | ||
delete(p.endpointsInstalledMap, svcPortName) | ||
} | ||
// Remove NodePort flows and configurations. | ||
if p.proxyAll && svcInfo.NodePort() > 0 { | ||
if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { | ||
klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) | ||
continue | ||
// Remove NodePort and ClusterIP flows and configurations. | ||
if p.proxyAll { | ||
if svcInfo.NodePort() > 0 { | ||
if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { | ||
klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) | ||
continue | ||
} | ||
} | ||
if svcInfo.ClusterIP() != nil { | ||
if err := p.deleteRouteForServiceIP(svcInfoStr, svcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { | ||
klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) | ||
continue | ||
} | ||
} | ||
} | ||
// Remove LoadBalancer flows and configurations. | ||
if p.proxyLoadBalancerIPs && len(svcInfo.LoadBalancerIPStrings()) > 0 { | ||
if err := p.uninstallLoadBalancerService(svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { | ||
if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { | ||
klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) | ||
continue | ||
} | ||
|
@@ -313,37 +329,73 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot | |
return nil | ||
} | ||
|
||
func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { | ||
func (p *proxier) installLoadBalancerService(svcInfoStr string, groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { | ||
for _, ingress := range loadBalancerIPStrings { | ||
if ingress != "" { | ||
if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, false); err != nil { | ||
ip := net.ParseIP(ingress) | ||
if err := p.ofClient.InstallServiceFlows(groupID, ip, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, false); err != nil { | ||
return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) | ||
} | ||
if p.proxyAll { | ||
if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddLoadBalancer); err != nil { | ||
return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) | ||
} | ||
} | ||
} | ||
} | ||
if p.proxyAll { | ||
if err := p.routeClient.AddLoadBalancer(loadBalancerIPStrings); err != nil { | ||
return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) | ||
return nil | ||
} | ||
|
||
func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn func(net.IP) error) error { | ||
ipStr := ip.String() | ||
references, exists := p.serviceIPRouteReferences[ipStr] | ||
// If the IP was not referenced by any Service port, install a route for it. | ||
// Otherwise, just reference it. | ||
if !exists { | ||
if err := addRouteFn(ip); err != nil { | ||
return err | ||
} | ||
references = sets.NewString(svcInfoStr) | ||
p.serviceIPRouteReferences[ipStr] = references | ||
} else { | ||
references.Insert(svcInfoStr) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { | ||
func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { | ||
for _, ingress := range loadBalancerIPStrings { | ||
if ingress != "" { | ||
if err := p.ofClient.UninstallServiceFlows(net.ParseIP(ingress), svcPort, protocol); err != nil { | ||
ip := net.ParseIP(ingress) | ||
if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { | ||
return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err) | ||
} | ||
if p.proxyAll { | ||
if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteLoadBalancer); err != nil { | ||
return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) | ||
} | ||
} | ||
} | ||
} | ||
if p.proxyAll { | ||
if err := p.routeClient.DeleteLoadBalancer(loadBalancerIPStrings); err != nil { | ||
return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) | ||
return nil | ||
} | ||
|
||
func (p *proxier) deleteRouteForServiceIP(svcInfoStr string, ip net.IP, deleteRouteFn func(net.IP) error) error { | ||
ipStr := ip.String() | ||
references, exists := p.serviceIPRouteReferences[ipStr] | ||
// If the IP was not referenced by this Service port, skip it. | ||
if exists && references.Has(svcInfoStr) { | ||
// Delete the IP only if this Service port is the last one referencing it. | ||
// Otherwise, just dereference it. | ||
if references.Len() == 1 { | ||
if err := deleteRouteFn(ip); err != nil { | ||
return err | ||
} | ||
delete(p.serviceIPRouteReferences, ipStr) | ||
} else { | ||
references.Delete(svcInfoStr) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -548,8 +600,8 @@ func (p *proxier) installServices() { | |
} | ||
} | ||
// If previous Service which has ClusterIP should be removed, remove ClusterIP routes. | ||
if svcInfo.ClusterIP() != nil { | ||
if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil { | ||
if pSvcInfo.ClusterIP() != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch |
||
if err := p.deleteRouteForServiceIP(pSvcInfo.String(), pSvcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { | ||
klog.ErrorS(err, "Error when uninstalling ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) | ||
continue | ||
} | ||
|
@@ -580,7 +632,7 @@ func (p *proxier) installServices() { | |
// is created, the routing target IP block will be recalculated for expansion to be able to route the new | ||
// created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR | ||
// can be finally calculated after creating enough ClusterIPs. | ||
if err := p.routeClient.AddClusterIPRoute(svcInfo.ClusterIP()); err != nil { | ||
if err := p.addRouteForServiceIP(svcInfo.String(), svcInfo.ClusterIP(), p.routeClient.AddClusterIPRoute); err != nil { | ||
klog.ErrorS(err, "Error when installing ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) | ||
continue | ||
} | ||
|
@@ -612,7 +664,7 @@ func (p *proxier) installServices() { | |
} | ||
// Remove LoadBalancer flows and configurations. | ||
if len(toDelete) > 0 { | ||
if err := p.uninstallLoadBalancerService(toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { | ||
if err := p.uninstallLoadBalancerService(pSvcInfo.String(), toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { | ||
klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) | ||
continue | ||
} | ||
|
@@ -624,7 +676,7 @@ func (p *proxier) installServices() { | |
klog.ErrorS(nil, "Group for Service externalTrafficPolicy was not installed", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr, "externalTrafficPolicy", externalPolicyLocal) | ||
continue | ||
} | ||
if err := p.installLoadBalancerService(groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { | ||
if err := p.installLoadBalancerService(svcInfo.String(), groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.ExternalPolicyLocal()); err != nil { | ||
klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) | ||
continue | ||
} | ||
|
@@ -1002,6 +1054,7 @@ func NewProxier( | |
endpointsInstalledMap: types.EndpointsMap{}, | ||
endpointsMap: types.EndpointsMap{}, | ||
endpointReferenceCounter: map[string]int{}, | ||
serviceIPRouteReferences: map[string]sets.String{}, | ||
nodeLabels: map[string]string{}, | ||
serviceStringMap: map[string]k8sproxy.ServicePortName{}, | ||
groupCounter: groupCounter, | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we could merge #3889, I think we could remove the code change for ClusterIP in this PR in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous releases had windows route issue, so better to merge this one before #3889 to make backport easier.