Skip to content
14 changes: 9 additions & 5 deletions go-controller/pkg/ovn/default_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (oc *DefaultNetworkController) run(_ context.Context) error {

if config.OVNKubernetesFeature.EnableEgressIP {
// This is probably the best starting order for all egress IP handlers.
// WatchEgressIPNamespaces and WatchEgressIPPods only use the informer
// WatchEgressIPPods and WatchEgressIPNamespaces only use the informer
// cache to retrieve the egress IPs when determining if namespace/pods
// match. It is thus better if we initialize them first and allow
// WatchEgressNodes / WatchEgressIP to initialize after. Those handlers
Expand All @@ -465,10 +465,14 @@ func (oc *DefaultNetworkController) run(_ context.Context) error {
// risk performing a bunch of modifications on the EgressIP objects when
// we restart and then have these handlers act on stale data when they
// sync.
if err := WithSyncDurationMetric("egress ip namespace", oc.WatchEgressIPNamespaces); err != nil {
// Initialize WatchEgressIPPods before WatchEgressIPNamespaces to ensure
// that no pod events are missed by the EgressIPController. It's acceptable
// to miss a namespace event, as it will be handled indirectly through
// the pod delete event within that namespace.
if err := WithSyncDurationMetric("egress ip pod", oc.WatchEgressIPPods); err != nil {
return err
}
if err := WithSyncDurationMetric("egress ip pod", oc.WatchEgressIPPods); err != nil {
if err := WithSyncDurationMetric("egress ip namespace", oc.WatchEgressIPNamespaces); err != nil {
return err
}
if err := WithSyncDurationMetric("egress node", oc.WatchEgressNodes); err != nil {
Expand Down Expand Up @@ -1183,13 +1187,13 @@ func (h *defaultNetworkControllerEventHandler) SyncFunc(objs []interface{}) erro
case factory.EgressFirewallType:
syncFunc = h.oc.syncEgressFirewall

case factory.EgressIPNamespaceType:
case factory.EgressIPPodType:
syncFunc = h.oc.eIPC.syncEgressIPs

case factory.EgressNodeType:
syncFunc = h.oc.eIPC.initClusterEgressPolicies

case factory.EgressIPPodType,
case factory.EgressIPNamespaceType,
factory.EgressIPType:
syncFunc = nil

Expand Down
183 changes: 132 additions & 51 deletions go-controller/pkg/ovn/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,25 +788,7 @@ func (e *EgressIPController) addPodEgressIPAssignments(ni util.NetInfo, name str
if len(statusAssignments) == 0 {
return nil
}
// We need to proceed with add only under two conditions
// 1) egressNode present in at least one status is local to this zone
// (NOTE: The relation between egressIPName and nodeName is 1:1 i.e in the same object the given node will be present only in one status)
// 2) the pod being added is local to this zone
proceed := false
for _, status := range statusAssignments {
e.nodeZoneState.LockKey(status.Node)
isLocalZoneEgressNode, loadedEgressNode := e.nodeZoneState.Load(status.Node)
if loadedEgressNode && isLocalZoneEgressNode {
proceed = true
e.nodeZoneState.UnlockKey(status.Node)
break
}
e.nodeZoneState.UnlockKey(status.Node)
}
if !proceed && !e.isPodScheduledinLocalZone(pod) {
return nil // nothing to do if none of the status nodes are local to this master and pod is also remote
}
var remainingAssignments []egressipv1.EgressIPStatusItem
var remainingAssignments, staleAssignments []egressipv1.EgressIPStatusItem
nadName := ni.GetNetworkName()
if ni.IsUserDefinedNetwork() {
nadNames := ni.GetNADs()
Expand Down Expand Up @@ -836,15 +818,21 @@ func (e *EgressIPController) addPodEgressIPAssignments(ni util.NetInfo, name str
podIPs: podIPs,
network: ni,
}
e.podAssignment.Store(podKey, podState)
} else if podState.egressIPName == name || podState.egressIPName == "" {
// We do the setup only if this egressIP object is the one serving this pod OR
// podState.egressIPName can be empty if no re-routes were found in
// syncPodAssignmentCache for the existing pod, we will treat this case as a new add
for _, status := range statusAssignments {
if exists := podState.egressStatuses.contains(status); !exists {
// Add the status if it's not already in the cache, or if it exists but is in pending state
// (meaning it was populated during EIP sync and needs to be processed for the pod).
if value, exists := podState.egressStatuses.statusMap[status]; !exists || value == egressStatusStatePending {
remainingAssignments = append(remainingAssignments, status)
}
// Detect stale EIP status entries (same EgressIP reassigned to a different node)
// and queue the outdated entry for cleanup.
if staleStatus := podState.egressStatuses.hasStaleEIPStatus(status); staleStatus != nil {
staleAssignments = append(staleAssignments, *staleStatus)
}
}
podState.podIPs = podIPs
podState.egressIPName = name
Expand All @@ -866,6 +854,36 @@ func (e *EgressIPController) addPodEgressIPAssignments(ni util.NetInfo, name str
podState.standbyEgressIPNames.Insert(name)
return nil
}
for _, staleStatus := range staleAssignments {
klog.V(2).Infof("Deleting stale pod egress IP status: %v for EgressIP: %s and pod: %s/%s/%v", staleStatus, name, pod.Namespace, pod.Name, podIPNets)
err = e.deletePodEgressIPAssignments(ni, name, []egressipv1.EgressIPStatusItem{staleStatus}, pod)
if err != nil {
klog.Warningf("Failed to delete stale EgressIP status %s/%v for pod %s: %v", name, staleStatus, podKey, err)
}
delete(podState.egressStatuses.statusMap, staleStatus)
}
// We store podState into podAssignment cache at this place for two reasons.
// 1. When podAssignmentState is newly created.
// 2. deletePodEgressIPAssignments might clean the podAssignment cache, make sure we add it back.
e.podAssignment.Store(podKey, podState)
// We need to proceed with add only under two conditions
// 1) egressNode present in at least one status is local to this zone
// (NOTE: The relation between egressIPName and nodeName is 1:1 i.e in the same object the given node will be present only in one status)
// 2) the pod being added is local to this zone
proceed := false
for _, status := range statusAssignments {
e.nodeZoneState.LockKey(status.Node)
isLocalZoneEgressNode, loadedEgressNode := e.nodeZoneState.Load(status.Node)
if loadedEgressNode && isLocalZoneEgressNode {
proceed = true
e.nodeZoneState.UnlockKey(status.Node)
break
}
e.nodeZoneState.UnlockKey(status.Node)
}
if !proceed && !e.isPodScheduledinLocalZone(pod) {
return nil // nothing to do if none of the status nodes are local to this master and pod is also remote
}
for _, status := range remainingAssignments {
klog.V(2).Infof("Adding pod egress IP status: %v for EgressIP: %s and pod: %s/%s/%v", status, name, pod.Namespace, pod.Name, podIPNets)
nodesToLock := []string{status.Node, pod.Spec.NodeName}
Expand Down Expand Up @@ -1155,6 +1173,8 @@ type egressIPCache struct {
egressLocalNodesCache sets.Set[string]
// egressIP IP -> assigned node name
egressIPIPToNodeCache map[string]string
// egressIP name -> egress IP -> assigned node name
egressIPToAssignedNodes map[string]map[string]string
// node name -> network name -> redirect IPs
egressNodeRedirectsCache nodeNetworkRedirects
// network name -> OVN cluster router name
Expand Down Expand Up @@ -1594,6 +1614,14 @@ func (e *EgressIPController) syncPodAssignmentCache(egressIPCache egressIPCache)
}
}

// populate podState.egressStatuses with assigned node for active egressIP IPs.
if podState.egressIPName == egressIPName {
for egressIPIP, nodeName := range egressIPCache.egressIPToAssignedNodes[egressIPName] {
podState.egressStatuses.statusMap[egressipv1.EgressIPStatusItem{
EgressIP: egressIPIP, Node: nodeName}] = egressStatusStatePending
}
}

e.podAssignment.Store(podKey, podState)
return nil
}); err != nil {
Expand All @@ -1611,6 +1639,21 @@ func (e *EgressIPController) syncPodAssignmentCache(egressIPCache egressIPCache)
// It also removes stale nexthops from router policies used by EgressIPs.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (e *EgressIPController) syncStaleEgressReroutePolicy(cache egressIPCache) error {
// limit Nodes only to egress node(s) for the EgressIP name
limitToValidEgressNodes := func(eipName string, nodeRedirectCache map[string]redirectIPs) map[string]redirectIPs {
filteredEgressNodesRedirectsCache := make(map[string]redirectIPs, 0)
egressNodeNames, ok := cache.egressIPNameToAssignedNodes[eipName]
if !ok {
return filteredEgressNodesRedirectsCache
}
for _, egressNode := range egressNodeNames {
if nodeRedirect, ok := nodeRedirectCache[egressNode]; ok {
filteredEgressNodesRedirectsCache[egressNode] = nodeRedirect
}
}
return filteredEgressNodesRedirectsCache
}

for eipName, networkCache := range cache.egressIPNameToPods {
for networkName, data := range networkCache {
logicalRouterPolicyStaleNexthops := []*nbdb.LogicalRouterPolicy{}
Expand All @@ -1619,11 +1662,6 @@ func (e *EgressIPController) syncStaleEgressReroutePolicy(cache egressIPCache) e
if item.Priority != types.EgressIPReroutePriority || item.ExternalIDs[libovsdbops.NetworkKey.String()] != networkName {
return false
}
networkNodeRedirectCache, ok := cache.egressNodeRedirectsCache.cache[networkName]
if !ok || len(networkNodeRedirectCache) == 0 {
klog.Infof("syncStaleEgressReroutePolicy found invalid logical router policy (UUID: %s) because no assigned Nodes for EgressIP %s", item.UUID, eipName)
return true
}
extractedEgressIPName, _ := getEIPLRPObjK8MetaData(item.ExternalIDs)
if extractedEgressIPName == "" {
klog.Errorf("syncStaleEgressReroutePolicy found logical router policy (UUID: %s) with invalid meta data associated with network %s", item.UUID, networkName)
Expand All @@ -1634,6 +1672,11 @@ func (e *EgressIPController) syncStaleEgressReroutePolicy(cache egressIPCache) e
_, ok := cache.egressIPNameToPods[extractedEgressIPName]
return !ok
}
networkNodeRedirectCache := limitToValidEgressNodes(eipName, cache.egressNodeRedirectsCache.cache[networkName])
if len(networkNodeRedirectCache) == 0 {
klog.Infof("syncStaleEgressReroutePolicy deleting invalid logical router policy %q because there are no existing nodes assigned to its EgressIP %s", item.UUID, eipName)
return true
}
splitMatch := strings.Split(item.Match, " ")
podIPStr := splitMatch[len(splitMatch)-1]
podIP := net.ParseIP(podIPStr)
Expand Down Expand Up @@ -1689,13 +1732,13 @@ func (e *EgressIPController) syncStaleEgressReroutePolicy(cache egressIPCache) e
// Update Logical Router Policies that have stale nexthops. Notice that we must do this separately
// because logicalRouterPolicyStaleNexthops must be populated first
for _, staleNextHopLogicalRouterPolicy := range logicalRouterPolicyStaleNexthops {
if staleNextHopLogicalRouterPolicy.Nexthop == nil {
continue
}
klog.Infof("syncStaleEgressReroutePolicy will remove stale nexthops for LRP %q for network %s: %s",
staleNextHopLogicalRouterPolicy.UUID, networkName, *staleNextHopLogicalRouterPolicy.Nexthop)
klog.Infof("syncStaleEgressReroutePolicy will remove stale nexthops for LRP %q for network %s: %v",
staleNextHopLogicalRouterPolicy.UUID, networkName, staleNextHopLogicalRouterPolicy.Nexthops)
}
// nothing to do if there's no stale next hops
if len(logicalRouterPolicyStaleNexthops) == 0 {
continue
}

err = libovsdbops.DeleteNextHopsFromLogicalRouterPolicies(e.nbClient, cache.networkToRouter[networkName], logicalRouterPolicyStaleNexthops...)
if err != nil {
return fmt.Errorf("unable to remove stale next hops from logical router policies for network %s: %v", networkName, err)
Expand Down Expand Up @@ -1874,28 +1917,37 @@ func (e *EgressIPController) generateCacheForEgressIP() (egressIPCache, error) {
r := redirectIPs{}
mgmtPort := &nbdb.LogicalSwitchPort{Name: ni.GetNetworkScopedK8sMgmtIntfName(node.Name)}
mgmtPort, err := libovsdbops.GetLogicalSwitchPort(e.nbClient, mgmtPort)
if err != nil {
// if switch port isnt created, we can assume theres nothing to sync
if errors.Is(err, libovsdbclient.ErrNotFound) {
continue
}
// return if error is anything other than not found to allow retry
if err != nil && !errors.Is(err, libovsdbclient.ErrNotFound) {
return cache, fmt.Errorf("failed to find management port for node %s: %v", node.Name, err)
}
mgmtPortAddresses := mgmtPort.GetAddresses()
if len(mgmtPortAddresses) == 0 {
return cache, fmt.Errorf("management switch port %s for node %s does not contain any addresses", ni.GetNetworkScopedK8sMgmtIntfName(node.Name), node.Name)
}
// assuming only one IP per IP family
for _, mgmtPortAddress := range mgmtPortAddresses {
mgmtPortAddressesStr := strings.Fields(mgmtPortAddress)
mgmtPortIP := net.ParseIP(mgmtPortAddressesStr[1])
if utilnet.IsIPv6(mgmtPortIP) {
if ip := mgmtPortIP.To16(); ip != nil {
r.v6MgtPort = ip.String()
// if management port is available, gather the data. If it's not available, OVN constructs that depend on a deleted
// management port IP will fail and be cleaned up in sync LRPs func.
if mgmtPort != nil {
mgmtPortAddresses := mgmtPort.GetAddresses()
if len(mgmtPortAddresses) == 0 {
return cache, fmt.Errorf("management switch port %s for node %s does not contain any addresses", ni.GetNetworkScopedK8sMgmtIntfName(node.Name), node.Name)
}
// Extract at most one IP per family; entries are "MAC IP [IP ...]"
for _, macPlusIPs := range mgmtPortAddresses {
parts := strings.Fields(macPlusIPs)
if len(parts) < 2 {
continue // no IPs
}
} else {
if ip := mgmtPortIP.To4(); ip != nil {
r.v4MgtPort = ip.String()
for _, ipStr := range parts[1:] {
ip := net.ParseIP(ipStr)
if ip == nil {
continue
}
if utilnet.IsIPv6(ip) {
if r.v6MgtPort == "" && ip.To16() != nil {
r.v6MgtPort = ip.String()
}
} else {
if r.v4MgtPort == "" && ip.To4() != nil {
r.v4MgtPort = ip.String()
}
}
}
}
}
Expand Down Expand Up @@ -1951,6 +2003,9 @@ func (e *EgressIPController) generateCacheForEgressIP() (egressIPCache, error) {
// egressIP IP -> node name. Assigned node for EIP.
egressIPIPNodeCache := make(map[string]string, 0)
cache.egressIPIPToNodeCache = egressIPIPNodeCache
// egressIP name -> egressIP IP -> node name.
egressIPToAssignedNodes := make(map[string]map[string]string, 0)
cache.egressIPToAssignedNodes = egressIPToAssignedNodes
cache.markCache = make(map[string]string)
egressIPs, err := e.watchFactory.GetEgressIPs()
if err != nil {
Expand All @@ -1964,13 +2019,15 @@ func (e *EgressIPController) generateCacheForEgressIP() (egressIPCache, error) {
cache.markCache[egressIP.Name] = mark.String()
egressIPsCache[egressIP.Name] = make(map[string]selectedPods, 0)
egressIPNameNodesCache[egressIP.Name] = make([]string, 0, len(egressIP.Status.Items))
egressIPToAssignedNodes[egressIP.Name] = make(map[string]string, 0)
for _, status := range egressIP.Status.Items {
eipIP := net.ParseIP(status.EgressIP)
if eipIP == nil {
klog.Errorf("Failed to parse EgressIP %s IP %q from status", egressIP.Name, status.EgressIP)
continue
}
egressIPIPNodeCache[eipIP.String()] = status.Node
egressIPToAssignedNodes[egressIP.Name][eipIP.String()] = status.Node
if localZoneNodes.Has(status.Node) {
egressLocalNodesCache.Insert(status.Node)
}
Expand Down Expand Up @@ -2227,9 +2284,18 @@ func InitClusterEgressPolicies(nbClient libovsdbclient.Client, addressSetFactory
return nil
}

// egressStatusStatePending marks entries populated during EIP sync and
// indicates they must be reconciled again for the pod.
const egressStatusStatePending = "pending"

type statusMap map[egressipv1.EgressIPStatusItem]string

type egressStatuses struct {
// statusMap tracks per EIP status assignment for a pod.
// Key: egressipv1.EgressIPStatusItem {EgressIP, Node}
// Values:
// "" -> applied/reconciled
// egressStatusStatePending -> populated during EIP sync, pending reconcile.
statusMap
}

Expand All @@ -2241,6 +2307,21 @@ func (e egressStatuses) contains(potentialStatus egressipv1.EgressIPStatusItem)
return false
}

// hasStaleEIPStatus checks for stale EIP status entries already in cache.
// This addresses the race condition where an EIP is reassigned to a different node
// but the cache still contains the old assignment, leading to stale SNAT/LRP entries.
func (e egressStatuses) hasStaleEIPStatus(potentialStatus egressipv1.EgressIPStatusItem) *egressipv1.EgressIPStatusItem {
var staleStatus *egressipv1.EgressIPStatusItem
for status := range e.statusMap {
if status.EgressIP == potentialStatus.EgressIP &&
status.Node != potentialStatus.Node {
staleStatus = &egressipv1.EgressIPStatusItem{EgressIP: status.EgressIP, Node: status.Node}
break
}
}
return staleStatus
}

func (e egressStatuses) delete(deleteStatus egressipv1.EgressIPStatusItem) {
delete(e.statusMap, deleteStatus)
}
Expand Down
Loading