Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions go-controller/pkg/ovn/default_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (oc *DefaultNetworkController) Run(ctx context.Context) error {

if config.OVNKubernetesFeature.EnableEgressIP {
// This is probably the best starting order for all egress IP handlers.
// WatchEgressIPNamespaces and WatchEgressIPPods only use the informer
// WatchEgressIPPods and WatchEgressIPNamespaces only use the informer
// cache to retrieve the egress IPs when determining if namespace/pods
// match. It is thus better if we initialize them first and allow
// WatchEgressNodes / WatchEgressIP to initialize after. Those handlers
Expand All @@ -490,10 +490,14 @@ func (oc *DefaultNetworkController) Run(ctx context.Context) error {
// risk performing a bunch of modifications on the EgressIP objects when
// we restart and then have these handlers act on stale data when they
// sync.
if err := WithSyncDurationMetric("egress ip namespace", oc.WatchEgressIPNamespaces); err != nil {
// Initialize WatchEgressIPPods before WatchEgressIPNamespaces to ensure
// that no pod events are missed by the EgressIPController. It's acceptable
// to miss a namespace event, as it will be handled indirectly through
// the pod delete event within that namespace.
if err := WithSyncDurationMetric("egress ip pod", oc.WatchEgressIPPods); err != nil {
return err
}
if err := WithSyncDurationMetric("egress ip pod", oc.WatchEgressIPPods); err != nil {
if err := WithSyncDurationMetric("egress ip namespace", oc.WatchEgressIPNamespaces); err != nil {
return err
}
if err := WithSyncDurationMetric("egress node", oc.WatchEgressNodes); err != nil {
Expand Down Expand Up @@ -1126,13 +1130,13 @@ func (h *defaultNetworkControllerEventHandler) SyncFunc(objs []interface{}) erro
case factory.EgressFirewallType:
syncFunc = h.oc.syncEgressFirewall

case factory.EgressIPNamespaceType:
case factory.EgressIPPodType:
syncFunc = h.oc.syncEgressIPs

case factory.EgressNodeType:
syncFunc = h.oc.initClusterEgressPolicies

case factory.EgressIPPodType,
case factory.EgressIPNamespaceType,
factory.EgressIPType:
syncFunc = nil

Expand Down
107 changes: 80 additions & 27 deletions go-controller/pkg/ovn/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (oc *DefaultNetworkController) reconcileEgressIP(old, new *egressipv1.Egres
for _, pod := range pods {
podLabels := labels.Set(pod.Labels)
if !newPodSelector.Matches(podLabels) && oldPodSelector.Matches(podLabels) {
if err := oc.deletePodEgressIPAssignments(oldEIP.Name, oldEIP.Status.Items, pod); err != nil {
if err := oc.deletePodEgressIPAssignmentsWithLock(oldEIP.Name, oldEIP.Status.Items, pod); err != nil {
return err
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func (oc *DefaultNetworkController) reconcileEgressIP(old, new *egressipv1.Egres
for _, pod := range pods {
podLabels := labels.Set(pod.Labels)
if !newPodSelector.Matches(podLabels) && oldPodSelector.Matches(podLabels) {
if err := oc.deletePodEgressIPAssignments(oldEIP.Name, oldEIP.Status.Items, pod); err != nil {
if err := oc.deletePodEgressIPAssignmentsWithLock(oldEIP.Name, oldEIP.Status.Items, pod); err != nil {
return err
}
}
Expand Down Expand Up @@ -432,7 +432,7 @@ func (oc *DefaultNetworkController) reconcileEgressIPPod(old, new *v1.Pod) (err
// Check if the pod stopped matching. If the pod was deleted,
// "new" will be nil, so this must account for that case.
if !newMatches && oldMatches {
if err := oc.deletePodEgressIPAssignments(egressIP.Name, egressIP.Status.Items, oldPod); err != nil {
if err := oc.deletePodEgressIPAssignmentsWithLock(egressIP.Name, egressIP.Status.Items, oldPod); err != nil {
return err
}
continue
Expand All @@ -453,7 +453,7 @@ func (oc *DefaultNetworkController) reconcileEgressIPPod(old, new *v1.Pod) (err
// to match all pods in the namespace) and the pod has been deleted:
// "new" will be nil and we need to remove the setup
if new == nil {
if err := oc.deletePodEgressIPAssignments(egressIP.Name, egressIP.Status.Items, oldPod); err != nil {
if err := oc.deletePodEgressIPAssignmentsWithLock(egressIP.Name, egressIP.Status.Items, oldPod); err != nil {
return err
}
continue
Expand Down Expand Up @@ -531,25 +531,7 @@ func (oc *DefaultNetworkController) addPodEgressIPAssignments(name string, statu
if len(statusAssignments) == 0 {
return nil
}
// We need to proceed with add only under two conditions
// 1) egressNode present in at least one status is local to this zone
// (NOTE: The relation between egressIPName and nodeName is 1:1 i.e in the same object the given node will be present only in one status)
// 2) the pod being added is local to this zone
proceed := false
for _, status := range statusAssignments {
oc.eIPC.nodeZoneState.LockKey(status.Node)
isLocalZoneEgressNode, loadedEgressNode := oc.eIPC.nodeZoneState.Load(status.Node)
if loadedEgressNode && isLocalZoneEgressNode {
proceed = true
oc.eIPC.nodeZoneState.UnlockKey(status.Node)
break
}
oc.eIPC.nodeZoneState.UnlockKey(status.Node)
}
if !proceed && !oc.isPodScheduledinLocalZone(pod) {
return nil // nothing to do if none of the status nodes are local to this master and pod is also remote
}
var remainingAssignments []egressipv1.EgressIPStatusItem
var remainingAssignments, staleAssignments []egressipv1.EgressIPStatusItem
var podIPs []*net.IPNet
var err error
if oc.isPodScheduledinLocalZone(pod) {
Expand Down Expand Up @@ -588,15 +570,21 @@ func (oc *DefaultNetworkController) addPodEgressIPAssignments(name string, statu
egressStatuses: egressStatuses{make(map[egressipv1.EgressIPStatusItem]string)},
standbyEgressIPNames: sets.New[string](),
}
oc.eIPC.podAssignment[podKey] = podState
} else if podState.egressIPName == name || podState.egressIPName == "" {
// We do the setup only if this egressIP object is the one serving this pod OR
// podState.egressIPName can be empty if no re-routes were found in
// syncPodAssignmentCache for the existing pod, we will treat this case as a new add
for _, status := range statusAssignments {
if exists := podState.egressStatuses.contains(status); !exists {
// Add the status if it's not already in the cache, or if it exists but is in pending state
// (meaning it was populated during EIP sync and needs to be processed for the pod).
if value, exists := podState.egressStatuses.statusMap[status]; !exists || value == egressStatusStatePending {
remainingAssignments = append(remainingAssignments, status)
}
// Detect stale EIP status entries (same EgressIP reassigned to a different node)
// and queue the outdated entry for cleanup.
if staleStatus := podState.egressStatuses.hasStaleEIPStatus(status); staleStatus != nil {
staleAssignments = append(staleAssignments, *staleStatus)
}
}
podState.egressIPName = name
podState.standbyEgressIPNames.Delete(name)
Expand All @@ -616,6 +604,36 @@ func (oc *DefaultNetworkController) addPodEgressIPAssignments(name string, statu
podState.standbyEgressIPNames.Insert(name)
return nil
}
for _, staleStatus := range staleAssignments {
klog.V(2).Infof("Deleting stale pod egress IP status: %v for EgressIP: %s and pod: %s/%s/%v", staleStatus, name, pod.Namespace, pod.Name, podIPs)
err = oc.deletePodEgressIPAssignment(name, []egressipv1.EgressIPStatusItem{staleStatus}, pod)
if err != nil {
klog.Warningf("Failed to delete stale EgressIP status %s/%v for pod %s: %v", name, staleStatus, podKey, err)
}
delete(podState.egressStatuses.statusMap, staleStatus)
}
// We store podState into podAssignment cache at this place for two reasons.
// 1. When podAssignmentState is newly created.
// 2. deletePodEgressIPAssignments might clean the podAssignment cache, make sure we add it back.
oc.eIPC.podAssignment[podKey] = podState
// We need to proceed with add only under two conditions
// 1) egressNode present in at least one status is local to this zone
// (NOTE: The relation between egressIPName and nodeName is 1:1 i.e in the same object the given node will be present only in one status)
// 2) the pod being added is local to this zone
proceed := false
for _, status := range statusAssignments {
oc.eIPC.nodeZoneState.LockKey(status.Node)
isLocalZoneEgressNode, loadedEgressNode := oc.eIPC.nodeZoneState.Load(status.Node)
if loadedEgressNode && isLocalZoneEgressNode {
proceed = true
oc.eIPC.nodeZoneState.UnlockKey(status.Node)
break
}
oc.eIPC.nodeZoneState.UnlockKey(status.Node)
}
if !proceed && !oc.isPodScheduledinLocalZone(pod) {
return nil // nothing to do if none of the status nodes are local to this master and pod is also remote
}
for _, status := range remainingAssignments {
klog.V(2).Infof("Adding pod egress IP status: %v for EgressIP: %s and pod: %s/%s/%v", status, name, pod.Namespace, pod.Name, podIPs)
err = oc.eIPC.nodeZoneState.DoWithLock(status.Node, func(key string) error {
Expand Down Expand Up @@ -745,16 +763,20 @@ func (oc *DefaultNetworkController) deleteNamespaceEgressIPAssignment(name strin
}
}
for _, pod := range pods {
if err := oc.deletePodEgressIPAssignments(name, statusAssignments, pod); err != nil {
if err := oc.deletePodEgressIPAssignmentsWithLock(name, statusAssignments, pod); err != nil {
return err
}
}
return nil
}

func (oc *DefaultNetworkController) deletePodEgressIPAssignments(name string, statusesToRemove []egressipv1.EgressIPStatusItem, pod *kapi.Pod) error {
func (oc *DefaultNetworkController) deletePodEgressIPAssignmentsWithLock(name string, statusesToRemove []egressipv1.EgressIPStatusItem, pod *kapi.Pod) error {
oc.eIPC.podAssignmentMutex.Lock()
defer oc.eIPC.podAssignmentMutex.Unlock()
return oc.deletePodEgressIPAssignment(name, statusesToRemove, pod)
}

func (oc *DefaultNetworkController) deletePodEgressIPAssignment(name string, statusesToRemove []egressipv1.EgressIPStatusItem, pod *kapi.Pod) error {
podKey := getPodKey(pod)
podStatus, exists := oc.eIPC.podAssignment[podKey]
if !exists {
Expand Down Expand Up @@ -988,6 +1010,13 @@ func (oc *DefaultNetworkController) syncPodAssignmentCache(egressIPCache map[str
klog.Infof("EgressIP %s is managing pod %s", egressIPName, podKey)
}
}
// populate podState.egressStatuses with assigned node for active egressIP IPs.
if podState.egressIPName == egressIPName {
for egressIPIP, nodeName := range state.egressIPs {
podState.egressStatuses.statusMap[egressipv1.EgressIPStatusItem{
EgressIP: egressIPIP, Node: nodeName}] = egressStatusStatePending
}
}
oc.eIPC.podAssignment[podKey] = podState
}
}
Expand Down Expand Up @@ -1354,9 +1383,18 @@ func InitClusterEgressPolicies(nbClient libovsdbclient.Client, addressSetFactory
return nil
}

// egressStatusStatePending marks entries populated during EIP sync and
// indicates they must be reconciled again for the pod.
const egressStatusStatePending = "pending"

type statusMap map[egressipv1.EgressIPStatusItem]string

type egressStatuses struct {
// statusMap tracks per EIP status assignment for a pod.
// Key: egressipv1.EgressIPStatusItem {EgressIP, Node}
// Values:
// "" -> applied/reconciled
// egressStatusStatePending -> populated during EIP sync, pending reconcile.
statusMap
}

Expand All @@ -1368,6 +1406,21 @@ func (e egressStatuses) contains(potentialStatus egressipv1.EgressIPStatusItem)
return false
}

// hasStaleEIPStatus checks for stale EIP status entries already in cache.
// This addresses the race condition where an EIP is reassigned to a different node
// but the cache still contains the old assignment, leading to stale SNAT/LRP entries.
func (e egressStatuses) hasStaleEIPStatus(potentialStatus egressipv1.EgressIPStatusItem) *egressipv1.EgressIPStatusItem {
var staleStatus *egressipv1.EgressIPStatusItem
for status := range e.statusMap {
if status.EgressIP == potentialStatus.EgressIP &&
status.Node != potentialStatus.Node {
staleStatus = &egressipv1.EgressIPStatusItem{EgressIP: status.EgressIP, Node: status.Node}
break
}
}
return staleStatus
}

func (e egressStatuses) delete(deleteStatus egressipv1.EgressIPStatusItem) {
delete(e.statusMap, deleteStatus)
}
Expand Down
Loading