diff --git a/go-controller/pkg/libovsdbops/router.go b/go-controller/pkg/libovsdbops/router.go index d5e8ecd209..d4f5dd1668 100644 --- a/go-controller/pkg/libovsdbops/router.go +++ b/go-controller/pkg/libovsdbops/router.go @@ -885,6 +885,9 @@ func CreateOrUpdateNATs(nbClient libovsdbclient.Client, router *nbdb.LogicalRout // logical router and returns the corresponding ops func DeleteNATsOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, router *nbdb.LogicalRouter, nats ...*nbdb.NAT) ([]libovsdb.Operation, error) { routerNats, err := getRouterNATs(nbClient, router) + if err == libovsdbclient.ErrNotFound { + return ops, nil + } if err != nil { return ops, fmt.Errorf("unable to get NAT entries for router %+v: %w", router, err) } diff --git a/go-controller/pkg/ovn/egressgw.go b/go-controller/pkg/ovn/egressgw.go index 461a33d7da..160f8ed2cb 100644 --- a/go-controller/pkg/ovn/egressgw.go +++ b/go-controller/pkg/ovn/egressgw.go @@ -1015,6 +1015,10 @@ func (oc *Controller) buildClusterECMPCacheFromNamespaces(clusterRouteCache map[ } for _, gwIP := range gwIPs { for _, nsPod := range nsPods { + // ignore completed pods, host networked pods, pods not scheduled + if !util.PodWantsNetwork(nsPod) || util.PodCompleted(nsPod) || !util.PodScheduled(nsPod) { + continue + } for _, podIP := range nsPod.Status.PodIPs { if utilnet.IsIPv6(gwIP) != utilnet.IsIPv6String(podIP.IP) { continue @@ -1068,6 +1072,10 @@ func (oc *Controller) buildClusterECMPCacheFromPods(clusterRouteCache map[string } for _, gwIP := range gwIPs { for _, nsPod := range nsPods { + // ignore completed pods, host networked pods, pods not scheduled + if !util.PodWantsNetwork(nsPod) || util.PodCompleted(nsPod) || !util.PodScheduled(nsPod) { + continue + } for _, podIP := range nsPod.Status.PodIPs { if utilnet.IsIPv6(gwIP) != utilnet.IsIPv6String(podIP.IP) { continue diff --git a/go-controller/pkg/ovn/ipallocator/allocator.go b/go-controller/pkg/ovn/ipallocator/allocator.go index 294a06ac4f..76fc56510f 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator.go +++ b/go-controller/pkg/ovn/ipallocator/allocator.go @@ -31,18 +31,15 @@ import ( type Interface interface { Allocate(net.IP) error AllocateNext() (net.IP, error) - Release(net.IP) error + Release(net.IP) ForEach(func(net.IP)) CIDR() net.IPNet - - // For testing Has(ip net.IP) bool } var ( - ErrFull = errors.New("range is full") - ErrAllocated = errors.New("provided IP is already allocated") - ErrMismatchedNetwork = errors.New("the provided network does not match the current range") + ErrFull = errors.New("range is full") + ErrAllocated = errors.New("provided IP is already allocated") ) type ErrNotInRange struct { @@ -109,7 +106,7 @@ func NewAllocatorCIDRRange(cidr *net.IPNet, allocatorFactory allocator.Allocator return &r, err } -// Helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store. +// NewCIDRRange is a helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store. func NewCIDRRange(cidr *net.IPNet) (*Range, error) { return NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) { return allocator.NewAllocationMap(max, rangeSpec), nil @@ -174,13 +171,13 @@ func (r *Range) AllocateNext() (net.IP, error) { // Release releases the IP back to the pool. Releasing an // unallocated IP or an IP out of the range is a no-op and // returns no error. -func (r *Range) Release(ip net.IP) error { +func (r *Range) Release(ip net.IP) { ok, offset := r.contains(ip) if !ok { - return nil + return } - return r.alloc.Release(offset) + r.alloc.Release(offset) } // ForEach calls the provided function for each allocated IP. diff --git a/go-controller/pkg/ovn/ipallocator/allocator/bitmap.go b/go-controller/pkg/ovn/ipallocator/allocator/bitmap.go index 21446c50ce..eb3c70a231 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator/bitmap.go +++ b/go-controller/pkg/ovn/ipallocator/allocator/bitmap.go @@ -129,17 +129,16 @@ func (r *AllocationBitmap) AllocateNext() (int, bool, error) { // Release releases the item back to the pool. Releasing an // unallocated item or an item out of the range is a no-op and // returns no error. -func (r *AllocationBitmap) Release(offset int) error { +func (r *AllocationBitmap) Release(offset int) { r.lock.Lock() defer r.lock.Unlock() if r.allocated.Bit(offset) == 0 { - return nil + return } r.allocated = r.allocated.SetBit(r.allocated, offset, 0) r.count-- - return nil } const ( diff --git a/go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go b/go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go index 2fa87af82c..ad57752bf2 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go +++ b/go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go @@ -75,9 +75,7 @@ func TestRelease(t *testing.T) { t.Errorf("expect offset %v allocated", offset) } - if err := m.Release(offset); err != nil { - t.Errorf("unexpected error: %v", err) - } + m.Release(offset) if m.Has(offset) { t.Errorf("expect offset %v not allocated", offset) @@ -196,9 +194,7 @@ func TestRoundRobinAllocationOrdering(t *testing.T) { } // Release one of the pre-allocated entries - if err := m.Release(0); err != nil { - t.Fatalf("unexpected error: %v", err) - } + m.Release(0) // Next allocation should be after the most recently allocated entry, // not one of the just-released ones @@ -264,9 +260,7 @@ func TestRoundRobinRelease(t *testing.T) { t.Fatalf("expect offset %d allocated", offset) } - if err := m.Release(offset); err != nil { - t.Fatalf("unexpected error: %v", err) - } + m.Release(offset) if m.Has(offset) { t.Fatalf("expect offset %d not allocated", offset) @@ -289,9 +283,7 @@ func TestRoundRobinWrapAround(t *testing.T) { t.Fatalf("got offset %d but expected offset %d", offset, i) } - if err := m.Release(offset); err != nil { - t.Fatalf("unexpected release error: %v", err) - } + m.Release(offset) } } diff --git a/go-controller/pkg/ovn/ipallocator/allocator/interfaces.go b/go-controller/pkg/ovn/ipallocator/allocator/interfaces.go index 8f1b2f179b..b078f38075 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator/interfaces.go +++ b/go-controller/pkg/ovn/ipallocator/allocator/interfaces.go @@ -21,7 +21,7 @@ package allocator type Interface interface { Allocate(int) (bool, error) AllocateNext() (int, bool, error) - Release(int) error + Release(int) ForEach(func(int)) // For testing diff --git a/go-controller/pkg/ovn/ipallocator/allocator_test.go b/go-controller/pkg/ovn/ipallocator/allocator_test.go index 79ed00a89a..65b98179ab 100644 --- a/go-controller/pkg/ovn/ipallocator/allocator_test.go +++ b/go-controller/pkg/ovn/ipallocator/allocator_test.go @@ -114,9 +114,7 @@ func TestAllocate(t *testing.T) { t.Fatal(err) } released := net.ParseIP(tc.released) - if err := r.Release(released); err != nil { - t.Fatal(err) - } + r.Release(released) if f := r.Free(); f != 1 { t.Errorf("Test %s unexpected free %d", tc.name, f) } @@ -131,9 +129,7 @@ func TestAllocate(t *testing.T) { t.Errorf("Test %s unexpected %s : %s", tc.name, ip, released) } - if err := r.Release(released); err != nil { - t.Fatal(err) - } + r.Release(released) for _, outOfRange := range tc.outOfRange { err = r.Allocate(net.ParseIP(outOfRange)) if _, ok := err.(*ErrNotInRange); !ok { diff --git a/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go b/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go index 89ba486b50..96532ade59 100644 --- a/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go +++ b/go-controller/pkg/ovn/logical_switch_manager/logical_switch_manager.go @@ -215,9 +215,8 @@ func (manager *LogicalSwitchManager) AllocateIPs(nodeName string, ipnets []*net. // iterate over range of already allocated indices and release // ips allocated before the error occurred. for relIdx, relIPNet := range allocated { - if relErr := lsi.ipams[relIdx].Release(relIPNet.IP); relErr != nil { - klog.Errorf("Error while releasing IP: %s, err: %v", relIPNet.IP, relErr) - } else if relIPNet.IP != nil { + lsi.ipams[relIdx].Release(relIPNet.IP) + if relIPNet.IP != nil { klog.Warningf("Reserved IP: %s was released", relIPNet.IP.String()) } } @@ -271,9 +270,8 @@ func (manager *LogicalSwitchManager) AllocateNextIPs(nodeName string) ([]*net.IP // iterate over range of already allocated indices and release // ips allocated before the error occurred. for relIdx, relIPNet := range ipnets { - if relErr := lsi.ipams[relIdx].Release(relIPNet.IP); relErr != nil { - klog.Errorf("Error while releasing IP: %s, err: %v", relIPNet.IP, relErr) - } else if relIPNet.IP != nil { + lsi.ipams[relIdx].Release(relIPNet.IP) + if relIPNet.IP != nil { klog.Warningf("Reserved IP: %s was released", relIPNet.IP.String()) } } @@ -315,9 +313,7 @@ func (manager *LogicalSwitchManager) ReleaseIPs(nodeName string, ipnets []*net.I for _, ipam := range lsi.ipams { cidr := ipam.CIDR() if cidr.Contains(ipnet.IP) { - if err := ipam.Release(ipnet.IP); err != nil { - return err - } + ipam.Release(ipnet.IP) break } } @@ -325,6 +321,40 @@ func (manager *LogicalSwitchManager) ReleaseIPs(nodeName string, ipnets []*net.I return nil } +// ConditionalIPRelease determines if any IP is available to be released from an IPAM conditionally if func is true. +// It guarantees state of the allocator will not change while executing the predicate function +// TODO(trozet): add unit testing for this function +func (manager *LogicalSwitchManager) ConditionalIPRelease(nodeName string, ipnets []*net.IPNet, predicate func() (bool, error)) (bool, error) { + manager.RLock() + defer manager.RUnlock() + if ipnets == nil || nodeName == "" { + klog.V(5).Infof("Node name is empty or ip slice to release is nil") + return false, nil + } + lsi, ok := manager.cache[nodeName] + if !ok { + return false, nil + } + if len(lsi.ipams) == 0 { + return false, nil + } + + // check if ipam has one of the ip addresses, and then execute the predicate function to determine + // if this IP should be released or not + for _, ipnet := range ipnets { + for _, ipam := range lsi.ipams { + cidr := ipam.CIDR() + if cidr.Contains(ipnet.IP) { + if ipam.Has(ipnet.IP) { + return predicate() + } + } + } + } + + return false, nil +} + // IP allocator manager for join switch's IPv4 and IPv6 subnets. type JoinSwitchIPManager struct { lsm *LogicalSwitchManager diff --git a/go-controller/pkg/ovn/master.go b/go-controller/pkg/ovn/master.go index ccaad81b5d..358e5b52d8 100644 --- a/go-controller/pkg/ovn/master.go +++ b/go-controller/pkg/ovn/master.go @@ -1374,6 +1374,9 @@ func (oc *Controller) addUpdateNodeEvent(node *kapi.Node, nSyncs *nodeSyncs) err klog.V(5).Infof("When adding node %s, found %d pods to add to retryPods", node.Name, len(pods.Items)) for _, pod := range pods.Items { pod := pod + if util.PodCompleted(&pod) { + continue + } klog.V(5).Infof("Adding pod %s/%s to retryPods", pod.Namespace, pod.Name) oc.retryPods.addRetryObj(&pod) } diff --git a/go-controller/pkg/ovn/namespace.go b/go-controller/pkg/ovn/namespace.go index d823e5ed26..ad9118db5f 100644 --- a/go-controller/pkg/ovn/namespace.go +++ b/go-controller/pkg/ovn/namespace.go @@ -639,7 +639,7 @@ func (oc *Controller) createNamespaceAddrSetAllPods(ns string) (addressset.Addre } else { ips = make([]net.IP, 0, len(existingPods)) for _, pod := range existingPods { - if !pod.Spec.HostNetwork { + if util.PodWantsNetwork(pod) && !util.PodCompleted(pod) && util.PodScheduled(pod) { podIPs, err := util.GetAllPodIPs(pod) if err != nil { klog.Warningf(err.Error()) diff --git a/go-controller/pkg/ovn/obj_retry.go b/go-controller/pkg/ovn/obj_retry.go index 51568ed5d7..8f889ee17e 100644 --- a/go-controller/pkg/ovn/obj_retry.go +++ b/go-controller/pkg/ovn/obj_retry.go @@ -997,7 +997,7 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) *factory.Handler klog.Errorf("Upon add event: %v", err) return } - klog.Infof("Add event received for resource %v, key=%s", objectsToRetry.oType, key) + klog.V(5).Infof("Add event received for resource %v, key=%s", objectsToRetry.oType, key) objectsToRetry.initRetryObjWithAdd(obj, key) objectsToRetry.skipRetryObj(key) @@ -1074,7 +1074,7 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) *factory.Handler return } - klog.Infof("Update event received for resource %v, oldKey=%s, newKey=%s", + klog.V(5).Infof("Update event received for resource %v, oldKey=%s, newKey=%s", objectsToRetry.oType, oldKey, newKey) objectsToRetry.skipRetryObj(newKey) @@ -1165,7 +1165,13 @@ func (oc *Controller) WatchResource(objectsToRetry *retryObjs) *factory.Handler klog.Errorf("Delete of resource %v failed: %v", objectsToRetry.oType, err) return } - klog.Infof("Delete event received for resource %v %s", objectsToRetry.oType, key) + klog.V(5).Infof("Delete event received for resource %v %s", objectsToRetry.oType, key) + // If object is in terminal state, we would have already deleted it during update. + // No reason to attempt to delete it here again. + if oc.isObjectInTerminalState(objectsToRetry.oType, obj) { + klog.Infof("Ignoring delete event for completed resource %v %s", objectsToRetry.oType, key) + return + } objectsToRetry.skipRetryObj(key) internalCacheEntry := oc.getInternalCacheEntry(objectsToRetry.oType, obj) objectsToRetry.initRetryObjWithDelete(obj, key, internalCacheEntry) // set up the retry obj for deletion diff --git a/go-controller/pkg/ovn/pods.go b/go-controller/pkg/ovn/pods.go index 998274fc85..5ebf3f1b8a 100644 --- a/go-controller/pkg/ovn/pods.go +++ b/go-controller/pkg/ovn/pods.go @@ -30,7 +30,7 @@ func (oc *Controller) syncPodsRetriable(pods []interface{}) error { return fmt.Errorf("spurious object in syncPods: %v", podInterface) } annotations, err := util.UnmarshalPodAnnotation(pod.Annotations) - if util.PodScheduled(pod) && util.PodWantsNetwork(pod) && err == nil { + if util.PodScheduled(pod) && util.PodWantsNetwork(pod) && !util.PodCompleted(pod) && err == nil { // skip nodes that are not running ovnk (inferred from host subnets) if oc.lsManager.IsNonHostSubnetSwitch(pod.Spec.NodeName) { continue @@ -120,12 +120,53 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er podIfAddrs = portInfo.ips } - var allOps, ops []ovsdb.Operation - if ops, err = oc.deletePodFromNamespace(pod.Namespace, podIfAddrs, portUUID); err != nil { - return fmt.Errorf("unable to delete pod %s from namespace: %w", podDesc, err) + shouldRelease := true + // check to make sure no other pods are using this IP before we try to release it if this is a completed pod. + if util.PodCompleted(pod) { + if shouldRelease, err = oc.lsManager.ConditionalIPRelease(pod.Spec.NodeName, podIfAddrs, func() (bool, error) { + pods, err := oc.watchFactory.GetAllPods() + if err != nil { + return false, fmt.Errorf("unable to get pods to determine if completed pod IP is in use by another pod. "+ + "Will not release pod %s/%s IP: %#v from allocator", pod.Namespace, pod.Name, podIfAddrs) + } + // iterate through all pods, ignore pods on other nodes + for _, p := range pods { + if util.PodCompleted(p) || !util.PodWantsNetwork(p) || !util.PodScheduled(p) || p.Spec.NodeName != pod.Spec.NodeName { + continue + } + // check if the pod addresses match in the OVN annotation + pAddrs, err := util.GetAllPodIPs(p) + if err != nil { + continue + } + + for _, pAddr := range pAddrs { + for _, podAddr := range podIfAddrs { + if pAddr.Equal(podAddr.IP) { + klog.Infof("Will not release IP address: %s for pod %s/%s. Detected another pod"+ + " using this IP: %s/%s", pAddr.String(), pod.Namespace, pod.Name, p.Namespace, p.Name) + return false, nil + } + } + } + } + klog.Infof("Releasing IPs for Completed pod: %s/%s, ips: %s", pod.Namespace, pod.Name, + util.JoinIPNetIPs(podIfAddrs, " ")) + return true, nil + }); err != nil { + return fmt.Errorf("cannot determine if IPs are safe to release for completed pod: %s: %w", podDesc, err) + } } - allOps = append(allOps, ops...) + var allOps, ops []ovsdb.Operation + + // if the ip is in use by another pod we should not try to remove it from the address set + if shouldRelease { + if ops, err = oc.deletePodFromNamespace(pod.Namespace, podIfAddrs, portUUID); err != nil { + return fmt.Errorf("unable to delete pod %s from namespace: %w", podDesc, err) + } + allOps = append(allOps, ops...) + } ops, err = oc.delLSPOps(logicalPort, pod.Spec.NodeName, portUUID) if err != nil { return fmt.Errorf("failed to create delete ops for the lsp: %s: %s", logicalPort, err) @@ -137,10 +178,9 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er return fmt.Errorf("cannot delete logical switch port %s, %v", logicalPort, err) } - klog.Infof("Attempting to release IPs for pod: %s/%s, ips: %s", pod.Namespace, pod.Name, - util.JoinIPNetIPs(podIfAddrs, " ")) - if err := oc.lsManager.ReleaseIPs(pod.Spec.NodeName, podIfAddrs); err != nil { - return fmt.Errorf("cannot release IPs for pod %s: %w", podDesc, err) + // do not remove SNATs/GW routes/IPAM for an IP address unless we have validated no other pod is using it + if !shouldRelease { + return nil } if config.Gateway.DisableSNATMultipleGWs { @@ -153,6 +193,16 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er return fmt.Errorf("cannot delete GW Routes for pod %s: %w", podDesc, err) } + // Releasing IPs needs to happen last so that we can deterministically know that if delete failed that + // the IP of the pod needs to be released. Otherwise we could have a completed pod failed to be removed + // and we dont know if the IP was released or not, and subsequently could accidentally release the IP + // while it is now on another pod + klog.Infof("Attempting to release IPs for pod: %s/%s, ips: %s", pod.Namespace, pod.Name, + util.JoinIPNetIPs(podIfAddrs, " ")) + if err := oc.lsManager.ReleaseIPs(pod.Spec.NodeName, podIfAddrs); err != nil { + return fmt.Errorf("cannot release IPs for pod %s: %w", podDesc, err) + } + return nil } diff --git a/go-controller/pkg/ovn/pods_test.go b/go-controller/pkg/ovn/pods_test.go index 5dae86018f..14a3bdbb45 100644 --- a/go-controller/pkg/ovn/pods_test.go +++ b/go-controller/pkg/ovn/pods_test.go @@ -12,6 +12,7 @@ import ( "github.com/urfave/cli/v2" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/ipallocator" ovstypes "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" @@ -439,6 +440,7 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() { return fakeOvn.controller.retryPods.checkRetryObj(myPod2Key) }).Should(gomega.BeTrue()) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t}, []string{"node1"}))) ginkgo.By("Marking myPod as completed should free IP") myPod.Status.Phase = v1.PodSucceeded @@ -462,7 +464,212 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() { gomega.Eventually(func() string { return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t2.namespace, t2.podName) }, 2).Should(gomega.MatchJSON(t2.getAnnotationsJson())) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t2}, []string{"node1"}))) + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + ginkgo.It("should not deallocate in-use and previously freed completed pods IP", func() { + app.Action = func(ctx *cli.Context) error { + namespaceT := *newNamespace("namespace1") + t := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + + fakeOvn.startWithDBSetup(initialDB, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespaceT, + }, + }, + &v1.PodList{ + Items: []v1.Pod{}, + }, + ) + + t.populateLogicalSwitchCache(fakeOvn, getLogicalSwitchUUID(fakeOvn.controller.nbClient, "node1")) + fakeOvn.controller.WatchNamespaces() + fakeOvn.controller.WatchPods() + + pod, _ := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Get(context.TODO(), t.podName, metav1.GetOptions{}) + gomega.Expect(pod).To(gomega.BeNil()) + + myPod := newPod(t.namespace, t.podName, t.nodeName, t.podIP) + _, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Create(context.TODO(), + myPod, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() string { + return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t.namespace, t.podName) + }, 2).Should(gomega.MatchJSON(t.getAnnotationsJson())) + + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t}, []string{"node1"}))) + + ginkgo.By("Allocating all of the rest of the node subnet") + // allocate all the rest of the IPs in the subnet + fakeOvn.controller.lsManager.AllocateUntilFull("node1") + + ginkgo.By("Creating another pod which will fail due to allocation full") + t2 := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod2", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + + myPod2, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Create(context.TODO(), + newPod(t2.namespace, t2.podName, t2.nodeName, t2.podIP), metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() string { + return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t2.namespace, t2.podName) + }, 2).Should(gomega.HaveLen(0)) + + myPod2Key, err := getResourceKey(factory.PodType, myPod2) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + return fakeOvn.controller.retryPods.checkRetryObj(myPod2Key) + }).Should(gomega.BeTrue()) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t}, []string{"node1"}))) + ginkgo.By("Marking myPod as completed should free IP") + myPod.Status.Phase = v1.PodSucceeded + + _, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).UpdateStatus(context.TODO(), + myPod, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // port should be gone or marked for removal in logical port cache + logicalPort := util.GetLogicalPortName(myPod.Namespace, myPod.Name) + gomega.Eventually(func() bool { + info, err := fakeOvn.controller.logicalPortCache.get(logicalPort) + return err != nil || !info.expires.IsZero() + }, 2).Should(gomega.BeTrue()) + + // there should also be no entry for this pod in the retry cache + gomega.Eventually(func() bool { + return fakeOvn.controller.retryPods.getObjRetryEntry(myPod2Key) == nil + }, retryObjInterval+time.Second).Should(gomega.BeTrue()) + ginkgo.By("Freed IP should now allow mypod2 to come up") + fakeOvn.controller.retryPods.requestRetryObjs() + gomega.Eventually(func() string { + return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t2.namespace, t2.podName) + }, 2).Should(gomega.MatchJSON(t2.getAnnotationsJson())) + + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t2}, []string{"node1"}))) + // 2nd pod should now have the IP + myPod2, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Get(context.TODO(), + t2.podName, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(myPod2.Status.PodIP).To(gomega.Equal(t2.podIP)) + + ginkgo.By("Updating the completed pod should not free the IP") + patch := struct { + Metadata map[string]interface{} `json:"metadata"` + }{ + Metadata: map[string]interface{}{ + "annotations": map[string]string{"dummy": "data"}, + }, + } + patchData, err := json.Marshal(&patch) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // trigger update event + _, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Patch(context.TODO(), myPod.Name, types.MergePatchType, patchData, metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // sleep a small amount to ensure the event was processed + time.Sleep(time.Second) + // try to allocate the IP and it should not work + annotation, err := util.UnmarshalPodAnnotation(myPod2.Annotations) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = fakeOvn.controller.lsManager.AllocateIPs(t.nodeName, annotation.IPs) + gomega.Expect(err).To(gomega.Equal(ipallocator.ErrAllocated)) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t2}, []string{"node1"}))) + + ginkgo.By("Deleting the completed pod should not allow a third pod to take the IP") + // now delete the completed pod + err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Delete(context.TODO(), + t.podName, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // now create a 3rd pod + t3 := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod3", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + myPod3, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Create(context.TODO(), + newPod(t3.namespace, t3.podName, t3.nodeName, t3.podIP), metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() string { + return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t3.namespace, t3.podName) + }, 2).Should(gomega.HaveLen(0)) + + // should be in retry because there are no more IPs left + myPod3Key, err := getResourceKey(factory.PodType, myPod3) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + return fakeOvn.controller.retryPods.checkRetryObj(myPod3Key) + }).Should(gomega.BeTrue()) + // TODO validate that the pods also have correct GW SNATs and route policies + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t2}, []string{"node1"}))) + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + ginkgo.It("should not allocate a completed pod on start up", func() { + app.Action = func(ctx *cli.Context) error { + namespaceT := *newNamespace("namespace1") + t := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + myPod := newPod(t.namespace, t.podName, t.nodeName, t.podIP) + myPod.Status.Phase = v1.PodSucceeded + + fakeOvn.startWithDBSetup(initialDB, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespaceT, + }, + }, + &v1.PodList{ + Items: []v1.Pod{*myPod}, + }, + ) + + t.populateLogicalSwitchCache(fakeOvn, getLogicalSwitchUUID(fakeOvn.controller.nbClient, "node1")) + fakeOvn.controller.WatchNamespaces() + fakeOvn.controller.WatchPods() + + expectedData := []libovsdbtest.TestData{getExpectedDataPodsAndSwitches([]testPod{}, []string{"node1"})} + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(expectedData...)) + fakeOvn.asf.ExpectAddressSetWithIPs(namespaceT.Name, []string{}) return nil }