From f57d9e7da832d0f2feba3fe6530fe39fe2f31d0b Mon Sep 17 00:00:00 2001 From: Tim Rozet Date: Thu, 28 Apr 2022 12:24:32 -0400 Subject: [PATCH 1/3] Fix ip allocator release to not return error type The nested allocator calls were propagating an error type up during an IP release. However in the bitmap allocator function it was never possible to error during an IP release. Remove the return type. Signed-off-by: Tim Rozet --- go-controller/pkg/ovn/ipallocator/allocator.go | 17 ++++++++--------- .../pkg/ovn/ipallocator/allocator/bitmap.go | 5 ++--- .../ovn/ipallocator/allocator/bitmap_test.go | 16 ++++------------ .../pkg/ovn/ipallocator/allocator/interfaces.go | 2 +- .../pkg/ovn/ipallocator/allocator_test.go | 8 ++------ .../logical_switch_manager.go | 14 +++++--------- 6 files changed, 22 insertions(+), 40 deletions(-) diff --git a/go-controller/pkg/ovn/ipallocator/allocator.go b/go-controller/pkg/ovn/ipallocator/allocator.go index 294a06ac4f..13abcadb9d 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator.go +++ b/go-controller/pkg/ovn/ipallocator/allocator.go @@ -31,18 +31,17 @@ import ( type Interface interface { Allocate(net.IP) error AllocateNext() (net.IP, error) - Release(net.IP) error + Release(net.IP) ForEach(func(net.IP)) CIDR() net.IPNet - // For testing + // Has is used for testing Has(ip net.IP) bool } var ( - ErrFull = errors.New("range is full") - ErrAllocated = errors.New("provided IP is already allocated") - ErrMismatchedNetwork = errors.New("the provided network does not match the current range") + ErrFull = errors.New("range is full") + ErrAllocated = errors.New("provided IP is already allocated") ) type ErrNotInRange struct { @@ -109,7 +108,7 @@ func NewAllocatorCIDRRange(cidr *net.IPNet, allocatorFactory allocator.Allocator return &r, err } -// Helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store. +// NewCIDRRange is a helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store. func NewCIDRRange(cidr *net.IPNet) (*Range, error) { return NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) { return allocator.NewAllocationMap(max, rangeSpec), nil @@ -174,13 +173,13 @@ func (r *Range) AllocateNext() (net.IP, error) { // Release releases the IP back to the pool. Releasing an // unallocated IP or an IP out of the range is a no-op and // returns no error. -func (r *Range) Release(ip net.IP) error { +func (r *Range) Release(ip net.IP) { ok, offset := r.contains(ip) if !ok { - return nil + return } - return r.alloc.Release(offset) + r.alloc.Release(offset) } // ForEach calls the provided function for each allocated IP. diff --git a/go-controller/pkg/ovn/ipallocator/allocator/bitmap.go b/go-controller/pkg/ovn/ipallocator/allocator/bitmap.go index 21446c50ce..eb3c70a231 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator/bitmap.go +++ b/go-controller/pkg/ovn/ipallocator/allocator/bitmap.go @@ -129,17 +129,16 @@ func (r *AllocationBitmap) AllocateNext() (int, bool, error) { // Release releases the item back to the pool. Releasing an // unallocated item or an item out of the range is a no-op and // returns no error. -func (r *AllocationBitmap) Release(offset int) error { +func (r *AllocationBitmap) Release(offset int) { r.lock.Lock() defer r.lock.Unlock() if r.allocated.Bit(offset) == 0 { - return nil + return } r.allocated = r.allocated.SetBit(r.allocated, offset, 0) r.count-- - return nil } const ( diff --git a/go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go b/go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go index 2fa87af82c..ad57752bf2 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go +++ b/go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go @@ -75,9 +75,7 @@ func TestRelease(t *testing.T) { t.Errorf("expect offset %v allocated", offset) } - if err := m.Release(offset); err != nil { - t.Errorf("unexpected error: %v", err) - } + m.Release(offset) if m.Has(offset) { t.Errorf("expect offset %v not allocated", offset) @@ -196,9 +194,7 @@ func TestRoundRobinAllocationOrdering(t *testing.T) { } // Release one of the pre-allocated entries - if err := m.Release(0); err != nil { - t.Fatalf("unexpected error: %v", err) - } + m.Release(0) // Next allocation should be after the most recently allocated entry, // not one of the just-released ones @@ -264,9 +260,7 @@ func TestRoundRobinRelease(t *testing.T) { t.Fatalf("expect offset %d allocated", offset) } - if err := m.Release(offset); err != nil { - t.Fatalf("unexpected error: %v", err) - } + m.Release(offset) if m.Has(offset) { t.Fatalf("expect offset %d not allocated", offset) @@ -289,9 +283,7 @@ func TestRoundRobinWrapAround(t *testing.T) { t.Fatalf("got offset %d but expected offset %d", offset, i) } - if err := m.Release(offset); err != nil { - t.Fatalf("unexpected release error: %v", err) - } + m.Release(offset) } } diff --git a/go-controller/pkg/ovn/ipallocator/allocator/interfaces.go b/go-controller/pkg/ovn/ipallocator/allocator/interfaces.go index 8f1b2f179b..b078f38075 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator/interfaces.go +++ b/go-controller/pkg/ovn/ipallocator/allocator/interfaces.go @@ -21,7 +21,7 @@ package allocator type Interface interface { Allocate(int) (bool, error) AllocateNext() (int, bool, error) - Release(int) error + Release(int) ForEach(func(int)) // For testing diff --git a/go-controller/pkg/ovn/ipallocator/allocator_test.go b/go-controller/pkg/ovn/ipallocator/allocator_test.go index 79ed00a89a..65b98179ab 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator_test.go +++ b/go-controller/pkg/ovn/ipallocator/allocator_test.go @@ -114,9 +114,7 @@ func TestAllocate(t *testing.T) { t.Fatal(err) } released := net.ParseIP(tc.released) - if err := r.Release(released); err != nil { - t.Fatal(err) - } + r.Release(released) if f := r.Free(); f != 1 { t.Errorf("Test %s unexpected free %d", tc.name, f) } @@ -131,9 +129,7 @@ func TestAllocate(t *testing.T) { t.Errorf("Test %s unexpected %s : %s", tc.name, ip, released) } - if err := r.Release(released); err != nil { - t.Fatal(err) - } + r.Release(released) for _, outOfRange := range tc.outOfRange { err = r.Allocate(net.ParseIP(outOfRange)) if _, ok := err.(*ErrNotInRange); !ok { diff --git a/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go b/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go index 89ba486b50..254aeea763 100644 --- a/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go +++ b/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go @@ -215,9 +215,8 @@ func (manager *LogicalSwitchManager) AllocateIPs(nodeName string, ipnets []*net. // iterate over range of already allocated indices and release // ips allocated before the error occurred. for relIdx, relIPNet := range allocated { - if relErr := lsi.ipams[relIdx].Release(relIPNet.IP); relErr != nil { - klog.Errorf("Error while releasing IP: %s, err: %v", relIPNet.IP, relErr) - } else if relIPNet.IP != nil { + lsi.ipams[relIdx].Release(relIPNet.IP) + if relIPNet.IP != nil { klog.Warningf("Reserved IP: %s was released", relIPNet.IP.String()) } } @@ -271,9 +270,8 @@ func (manager *LogicalSwitchManager) AllocateNextIPs(nodeName string) ([]*net.IP // iterate over range of already allocated indices and release // ips allocated before the error occurred. for relIdx, relIPNet := range ipnets { - if relErr := lsi.ipams[relIdx].Release(relIPNet.IP); relErr != nil { - klog.Errorf("Error while releasing IP: %s, err: %v", relIPNet.IP, relErr) - } else if relIPNet.IP != nil { + lsi.ipams[relIdx].Release(relIPNet.IP) + if relIPNet.IP != nil { klog.Warningf("Reserved IP: %s was released", relIPNet.IP.String()) } } @@ -315,9 +313,7 @@ func (manager *LogicalSwitchManager) ReleaseIPs(nodeName string, ipnets []*net.I for _, ipam := range lsi.ipams { cidr := ipam.CIDR() if cidr.Contains(ipnet.IP) { - if err := ipam.Release(ipnet.IP); err != nil { - return err - } + ipam.Release(ipnet.IP) break } } From aaa7104fb9c507df07f9e298a61e9d14ec8260e5 Mon Sep 17 00:00:00 2001 From: Tim Rozet Date: Wed, 27 Apr 2022 17:22:37 -0400 Subject: [PATCH 2/3] Fixes various issues with completed pods Changes-Include: - During pod sync we allocate IPs of all existing pods, but we should ignore completed pods - During namespace add we add all of the pod IPs to the ns address set, but we should ignore completed pods - During processing of delete event for a completed pod, we were trying to delete the pod again, which would try to free the IP that was previously released and could be in use by another pod. We should ignore delete events for completed resources as they would have been handled during update. - On node add, we add all existing pods on that node back to retry as an "add". We should skip completed pods here. - We now check during deletion of a completed pod (should happen on update only) to make sure no other running pods are using this IP as a failsafe to ensure we never release an IP in use by another pod or the related OVN config Signed-off-by: Tim Rozet --- go-controller/pkg/ovn/egressgw.go | 8 + .../pkg/ovn/ipallocator/allocator.go | 2 - .../logical_switch_manager.go | 34 +++ go-controller/pkg/ovn/master.go | 3 + go-controller/pkg/ovn/namespace.go | 2 +- go-controller/pkg/ovn/obj_retry.go | 12 +- go-controller/pkg/ovn/pods.go | 68 +++++- go-controller/pkg/ovn/pods_test.go | 207 ++++++++++++++++++ 8 files changed, 321 insertions(+), 15 deletions(-) diff --git a/go-controller/pkg/ovn/egressgw.go b/go-controller/pkg/ovn/egressgw.go index 461a33d7da..160f8ed2cb 100644 --- a/go-controller/pkg/ovn/egressgw.go +++ b/go-controller/pkg/ovn/egressgw.go @@ -1015,6 +1015,10 @@ func (oc *Controller) buildClusterECMPCacheFromNamespaces(clusterRouteCache map[ } for _, gwIP := range gwIPs { for _, nsPod := range nsPods { + // ignore completed pods, host networked pods, pods not scheduled + if !util.PodWantsNetwork(nsPod) || util.PodCompleted(nsPod) || !util.PodScheduled(nsPod) { + continue + } for _, podIP := range nsPod.Status.PodIPs { if utilnet.IsIPv6(gwIP) != utilnet.IsIPv6String(podIP.IP) { continue @@ -1068,6 +1072,10 @@ func (oc *Controller) buildClusterECMPCacheFromPods(clusterRouteCache map[string } for _, gwIP := range gwIPs { for _, nsPod := range nsPods { + // ignore completed pods, host networked pods, pods not scheduled + if !util.PodWantsNetwork(nsPod) || util.PodCompleted(nsPod) || !util.PodScheduled(nsPod) { + continue + } for _, podIP := range nsPod.Status.PodIPs { if utilnet.IsIPv6(gwIP) != utilnet.IsIPv6String(podIP.IP) { continue diff --git a/go-controller/pkg/ovn/ipallocator/allocator.go b/go-controller/pkg/ovn/ipallocator/allocator.go index 13abcadb9d..76fc56510f 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator.go +++ b/go-controller/pkg/ovn/ipallocator/allocator.go @@ -34,8 +34,6 @@ type Interface interface { Release(net.IP) ForEach(func(net.IP)) CIDR() net.IPNet - - // Has is used for testing Has(ip net.IP) bool } diff --git a/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go b/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go index 254aeea763..96532ade59 100644 --- a/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go +++ b/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go @@ -321,6 +321,40 @@ func (manager *LogicalSwitchManager) ReleaseIPs(nodeName string, ipnets []*net.I return nil } +// ConditionalIPRelease determines if any IP is available to be released from an IPAM conditionally if func is true. +// It guarantees state of the allocator will not change while executing the predicate function +// TODO(trozet): add unit testing for this function +func (manager *LogicalSwitchManager) ConditionalIPRelease(nodeName string, ipnets []*net.IPNet, predicate func() (bool, error)) (bool, error) { + manager.RLock() + defer manager.RUnlock() + if ipnets == nil || nodeName == "" { + klog.V(5).Infof("Node name is empty or ip slice to release is nil") + return false, nil + } + lsi, ok := manager.cache[nodeName] + if !ok { + return false, nil + } + if len(lsi.ipams) == 0 { + return false, nil + } + + // check if ipam has one of the ip addresses, and then execute the predicate function to determine + // if this IP should be released or not + for _, ipnet := range ipnets { + for _, ipam := range lsi.ipams { + cidr := ipam.CIDR() + if cidr.Contains(ipnet.IP) { + if ipam.Has(ipnet.IP) { + return predicate() + } + } + } + } + + return false, nil +} + // IP allocator manager for join switch's IPv4 and IPv6 subnets. type JoinSwitchIPManager struct { lsm *LogicalSwitchManager diff --git a/go-controller/pkg/ovn/master.go b/go-controller/pkg/ovn/master.go index ccaad81b5d..358e5b52d8 100644 --- a/go-controller/pkg/ovn/master.go +++ b/go-controller/pkg/ovn/master.go @@ -1374,6 +1374,9 @@ func (oc *Controller) addUpdateNodeEvent(node *kapi.Node, nSyncs *nodeSyncs) err klog.V(5).Infof("When adding node %s, found %d pods to add to retryPods", node.Name, len(pods.Items)) for _, pod := range pods.Items { pod := pod + if util.PodCompleted(&pod) { + continue + } klog.V(5).Infof("Adding pod %s/%s to retryPods", pod.Namespace, pod.Name) oc.retryPods.addRetryObj(&pod) } diff --git a/go-controller/pkg/ovn/namespace.go b/go-controller/pkg/ovn/namespace.go index d823e5ed26..ad9118db5f 100644 --- a/go-controller/pkg/ovn/namespace.go +++ b/go-controller/pkg/ovn/namespace.go @@ -639,7 +639,7 @@ func (oc *Controller) createNamespaceAddrSetAllPods(ns string) (addressset.Addre } else { ips = make([]net.IP, 0, len(existingPods)) for _, pod := range existingPods { - if !pod.Spec.HostNetwork { + if util.PodWantsNetwork(pod) && !util.PodCompleted(pod) && util.PodScheduled(pod) { podIPs, err := util.GetAllPodIPs(pod) if err != nil { klog.Warningf(err.Error()) diff --git a/go-controller/pkg/ovn/obj_retry.go b/go-controller/pkg/ovn/obj_retry.go index 51568ed5d7..8f889ee17e 100644 --- a/go-controller/pkg/ovn/obj_retry.go +++ b/go-controller/pkg/ovn/obj_retry.go @@ -997,7 +997,7 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) *factory.Handler klog.Errorf("Upon add event: %v", err) return } - klog.Infof("Add event received for resource %v, key=%s", objectsToRetry.oType, key) + klog.V(5).Infof("Add event received for resource %v, key=%s", objectsToRetry.oType, key) objectsToRetry.initRetryObjWithAdd(obj, key) objectsToRetry.skipRetryObj(key) @@ -1074,7 +1074,7 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) *factory.Handler return } - klog.Infof("Update event received for resource %v, oldKey=%s, newKey=%s", + klog.V(5).Infof("Update event received for resource %v, oldKey=%s, newKey=%s", objectsToRetry.oType, oldKey, newKey) objectsToRetry.skipRetryObj(newKey) @@ -1165,7 +1165,13 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) *factory.Handler klog.Errorf("Delete of resource %v failed: %v", objectsToRetry.oType, err) return } - klog.Infof("Delete event received for resource %v %s", objectsToRetry.oType, key) + klog.V(5).Infof("Delete event received for resource %v %s", objectsToRetry.oType, key) + // If object is in terminal state, we would have already deleted it during update. + // No reason to attempt to delete it here again. + if oc.isObjectInTerminalState(objectsToRetry.oType, obj) { + klog.Infof("Ignoring delete event for completed resource %v %s", objectsToRetry.oType, key) + return + } objectsToRetry.skipRetryObj(key) internalCacheEntry := oc.getInternalCacheEntry(objectsToRetry.oType, obj) objectsToRetry.initRetryObjWithDelete(obj, key, internalCacheEntry) // set up the retry obj for deletion diff --git a/go-controller/pkg/ovn/pods.go b/go-controller/pkg/ovn/pods.go index 998274fc85..5ebf3f1b8a 100644 --- a/go-controller/pkg/ovn/pods.go +++ b/go-controller/pkg/ovn/pods.go @@ -30,7 +30,7 @@ func (oc *Controller) syncPodsRetriable(pods []interface{}) error { return fmt.Errorf("spurious object in syncPods: %v", podInterface) } annotations, err := util.UnmarshalPodAnnotation(pod.Annotations) - if util.PodScheduled(pod) && util.PodWantsNetwork(pod) && err == nil { + if util.PodScheduled(pod) && util.PodWantsNetwork(pod) && !util.PodCompleted(pod) && err == nil { // skip nodes that are not running ovnk (inferred from host subnets) if oc.lsManager.IsNonHostSubnetSwitch(pod.Spec.NodeName) { continue @@ -120,12 +120,53 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er podIfAddrs = portInfo.ips } - var allOps, ops []ovsdb.Operation - if ops, err = oc.deletePodFromNamespace(pod.Namespace, podIfAddrs, portUUID); err != nil { - return fmt.Errorf("unable to delete pod %s from namespace: %w", podDesc, err) + shouldRelease := true + // check to make sure no other pods are using this IP before we try to release it if this is a completed pod. + if util.PodCompleted(pod) { + if shouldRelease, err = oc.lsManager.ConditionalIPRelease(pod.Spec.NodeName, podIfAddrs, func() (bool, error) { + pods, err := oc.watchFactory.GetAllPods() + if err != nil { + return false, fmt.Errorf("unable to get pods to determine if completed pod IP is in use by another pod. "+ + "Will not release pod %s/%s IP: %#v from allocator", pod.Namespace, pod.Name, podIfAddrs) + } + // iterate through all pods, ignore pods on other nodes + for _, p := range pods { + if util.PodCompleted(p) || !util.PodWantsNetwork(p) || !util.PodScheduled(p) || p.Spec.NodeName != pod.Spec.NodeName { + continue + } + // check if the pod addresses match in the OVN annotation + pAddrs, err := util.GetAllPodIPs(p) + if err != nil { + continue + } + + for _, pAddr := range pAddrs { + for _, podAddr := range podIfAddrs { + if pAddr.Equal(podAddr.IP) { + klog.Infof("Will not release IP address: %s for pod %s/%s. Detected another pod"+ + " using this IP: %s/%s", pAddr.String(), pod.Namespace, pod.Name, p.Namespace, p.Name) + return false, nil + } + } + } + } + klog.Infof("Releasing IPs for Completed pod: %s/%s, ips: %s", pod.Namespace, pod.Name, + util.JoinIPNetIPs(podIfAddrs, " ")) + return true, nil + }); err != nil { + return fmt.Errorf("cannot determine if IPs are safe to release for completed pod: %s: %w", podDesc, err) + } } - allOps = append(allOps, ops...) + var allOps, ops []ovsdb.Operation + + // if the ip is in use by another pod we should not try to remove it from the address set + if shouldRelease { + if ops, err = oc.deletePodFromNamespace(pod.Namespace, podIfAddrs, portUUID); err != nil { + return fmt.Errorf("unable to delete pod %s from namespace: %w", podDesc, err) + } + allOps = append(allOps, ops...) + } ops, err = oc.delLSPOps(logicalPort, pod.Spec.NodeName, portUUID) if err != nil { return fmt.Errorf("failed to create delete ops for the lsp: %s: %s", logicalPort, err) @@ -137,10 +178,9 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er return fmt.Errorf("cannot delete logical switch port %s, %v", logicalPort, err) } - klog.Infof("Attempting to release IPs for pod: %s/%s, ips: %s", pod.Namespace, pod.Name, - util.JoinIPNetIPs(podIfAddrs, " ")) - if err := oc.lsManager.ReleaseIPs(pod.Spec.NodeName, podIfAddrs); err != nil { - return fmt.Errorf("cannot release IPs for pod %s: %w", podDesc, err) + // do not remove SNATs/GW routes/IPAM for an IP address unless we have validated no other pod is using it + if !shouldRelease { + return nil } if config.Gateway.DisableSNATMultipleGWs { @@ -153,6 +193,16 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er return fmt.Errorf("cannot delete GW Routes for pod %s: %w", podDesc, err) } + // Releasing IPs needs to happen last so that we can deterministically know that if delete failed that + // the IP of the pod needs to be released. Otherwise we could have a completed pod failed to be removed + // and we dont know if the IP was released or not, and subsequently could accidentally release the IP + // while it is now on another pod + klog.Infof("Attempting to release IPs for pod: %s/%s, ips: %s", pod.Namespace, pod.Name, + util.JoinIPNetIPs(podIfAddrs, " ")) + if err := oc.lsManager.ReleaseIPs(pod.Spec.NodeName, podIfAddrs); err != nil { + return fmt.Errorf("cannot release IPs for pod %s: %w", podDesc, err) + } + return nil } diff --git a/go-controller/pkg/ovn/pods_test.go b/go-controller/pkg/ovn/pods_test.go index 5dae86018f..14a3bdbb45 100644 --- a/go-controller/pkg/ovn/pods_test.go +++ b/go-controller/pkg/ovn/pods_test.go @@ -12,6 +12,7 @@ import ( "github.com/urfave/cli/v2" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/ipallocator" ovstypes "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" @@ -439,6 +440,7 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() { return fakeOvn.controller.retryPods.checkRetryObj(myPod2Key) }).Should(gomega.BeTrue()) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t}, []string{"node1"}))) ginkgo.By("Marking myPod as completed should free IP") myPod.Status.Phase = v1.PodSucceeded @@ -462,7 +464,212 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() { gomega.Eventually(func() string { return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t2.namespace, t2.podName) }, 2).Should(gomega.MatchJSON(t2.getAnnotationsJson())) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t2}, []string{"node1"}))) + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + ginkgo.It("should not deallocate in-use and previously freed completed pods IP", func() { + app.Action = func(ctx *cli.Context) error { + namespaceT := *newNamespace("namespace1") + t := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + + fakeOvn.startWithDBSetup(initialDB, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespaceT, + }, + }, + &v1.PodList{ + Items: []v1.Pod{}, + }, + ) + + t.populateLogicalSwitchCache(fakeOvn, getLogicalSwitchUUID(fakeOvn.controller.nbClient, "node1")) + fakeOvn.controller.WatchNamespaces() + fakeOvn.controller.WatchPods() + + pod, _ := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Get(context.TODO(), t.podName, metav1.GetOptions{}) + gomega.Expect(pod).To(gomega.BeNil()) + + myPod := newPod(t.namespace, t.podName, t.nodeName, t.podIP) + _, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Create(context.TODO(), + myPod, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() string { + return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t.namespace, t.podName) + }, 2).Should(gomega.MatchJSON(t.getAnnotationsJson())) + + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t}, []string{"node1"}))) + + ginkgo.By("Allocating all of the rest of the node subnet") + // allocate all the rest of the IPs in the subnet + fakeOvn.controller.lsManager.AllocateUntilFull("node1") + + ginkgo.By("Creating another pod which will fail due to allocation full") + t2 := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod2", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + + myPod2, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Create(context.TODO(), + newPod(t2.namespace, t2.podName, t2.nodeName, t2.podIP), metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() string { + return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t2.namespace, t2.podName) + }, 2).Should(gomega.HaveLen(0)) + + myPod2Key, err := getResourceKey(factory.PodType, myPod2) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + return fakeOvn.controller.retryPods.checkRetryObj(myPod2Key) + }).Should(gomega.BeTrue()) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t}, []string{"node1"}))) + ginkgo.By("Marking myPod as completed should free IP") + myPod.Status.Phase = v1.PodSucceeded + + _, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).UpdateStatus(context.TODO(), + myPod, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // port should be gone or marked for removal in logical port cache + logicalPort := util.GetLogicalPortName(myPod.Namespace, myPod.Name) + gomega.Eventually(func() bool { + info, err := fakeOvn.controller.logicalPortCache.get(logicalPort) + return err != nil || !info.expires.IsZero() + }, 2).Should(gomega.BeTrue()) + + // there should also be no entry for this pod in the retry cache + gomega.Eventually(func() bool { + return fakeOvn.controller.retryPods.getObjRetryEntry(myPod2Key) == nil + }, retryObjInterval+time.Second).Should(gomega.BeTrue()) + ginkgo.By("Freed IP should now allow mypod2 to come up") + fakeOvn.controller.retryPods.requestRetryObjs() + gomega.Eventually(func() string { + return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t2.namespace, t2.podName) + }, 2).Should(gomega.MatchJSON(t2.getAnnotationsJson())) + + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t2}, []string{"node1"}))) + // 2nd pod should now have the IP + myPod2, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Get(context.TODO(), + t2.podName, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(myPod2.Status.PodIP).To(gomega.Equal(t2.podIP)) + + ginkgo.By("Updating the completed pod should not free the IP") + patch := struct { + Metadata map[string]interface{} `json:"metadata"` + }{ + Metadata: map[string]interface{}{ + "annotations": map[string]string{"dummy": "data"}, + }, + } + patchData, err := json.Marshal(&patch) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // trigger update event + _, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Patch(context.TODO(), myPod.Name, types.MergePatchType, patchData, metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // sleep a small amount to ensure the event was processed + time.Sleep(time.Second) + // try to allocate the IP and it should not work + annotation, err := util.UnmarshalPodAnnotation(myPod2.Annotations) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = fakeOvn.controller.lsManager.AllocateIPs(t.nodeName, annotation.IPs) + gomega.Expect(err).To(gomega.Equal(ipallocator.ErrAllocated)) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t2}, []string{"node1"}))) + + ginkgo.By("Deleting the completed pod should not allow a third pod to take the IP") + // now delete the completed pod + err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Delete(context.TODO(), + t.podName, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // now create a 3rd pod + t3 := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod3", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + myPod3, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Create(context.TODO(), + newPod(t3.namespace, t3.podName, t3.nodeName, t3.podIP), metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() string { + return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t3.namespace, t3.podName) + }, 2).Should(gomega.HaveLen(0)) + + // should be in retry because there are no more IPs left + myPod3Key, err := getResourceKey(factory.PodType, myPod3) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + return fakeOvn.controller.retryPods.checkRetryObj(myPod3Key) + }).Should(gomega.BeTrue()) + // TODO validate that the pods also have correct GW SNATs and route policies + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t2}, []string{"node1"}))) + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + ginkgo.It("should not allocate a completed pod on start up", func() { + app.Action = func(ctx *cli.Context) error { + namespaceT := *newNamespace("namespace1") + t := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + myPod := newPod(t.namespace, t.podName, t.nodeName, t.podIP) + myPod.Status.Phase = v1.PodSucceeded + + fakeOvn.startWithDBSetup(initialDB, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespaceT, + }, + }, + &v1.PodList{ + Items: []v1.Pod{*myPod}, + }, + ) + + t.populateLogicalSwitchCache(fakeOvn, getLogicalSwitchUUID(fakeOvn.controller.nbClient, "node1")) + fakeOvn.controller.WatchNamespaces() + fakeOvn.controller.WatchPods() + + expectedData := []libovsdbtest.TestData{getExpectedDataPodsAndSwitches([]testPod{}, []string{"node1"})} + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(expectedData...)) + fakeOvn.asf.ExpectAddressSetWithIPs(namespaceT.Name, []string{}) return nil } From b66b80473de554dc23241c705c5ac785f9f4821b Mon Sep 17 00:00:00 2001 From: Tim Rozet Date: Thu, 28 Apr 2022 19:01:53 -0400 Subject: [PATCH 3/3] Do not fail on deleting NATs if they dont exist Upon fetching all of the NATs on a router, if the NATs or the router dont exist this should not be an error for deletion. Signed-off-by: Tim Rozet --- go-controller/pkg/libovsdbops/router.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go-controller/pkg/libovsdbops/router.go b/go-controller/pkg/libovsdbops/router.go index d5e8ecd209..d4f5dd1668 100644 --- a/go-controller/pkg/libovsdbops/router.go +++ b/go-controller/pkg/libovsdbops/router.go @@ -885,6 +885,9 @@ func CreateOrUpdateNATs(nbClient libovsdbclient.Client, router *nbdb.LogicalRout // logical router and returns the corresponding ops func DeleteNATsOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, router *nbdb.LogicalRouter, nats ...*nbdb.NAT) ([]libovsdb.Operation, error) { routerNats, err := getRouterNATs(nbClient, router) + if err == libovsdbclient.ErrNotFound { + return ops, nil + } if err != nil { return ops, fmt.Errorf("unable to get NAT entries for router %+v: %w", router, err) }