diff --git a/go-controller/hybrid-overlay/pkg/controller/node.go b/go-controller/hybrid-overlay/pkg/controller/node.go index cdf65da4e4..3d551ba257 100644 --- a/go-controller/hybrid-overlay/pkg/controller/node.go +++ b/go-controller/hybrid-overlay/pkg/controller/node.go @@ -92,8 +92,9 @@ func NewNode( ) (*Node, error) { nodeLister := listers.NewNodeLister(nodeInformer.GetIndexer()) + podLister := listers.NewPodLister(podInformer.GetIndexer()) - controller, err := newNodeController(kube, nodeName, nodeLister) + controller, err := newNodeController(kube, nodeName, nodeLister, podLister) if err != nil { return nil, err } diff --git a/go-controller/hybrid-overlay/pkg/controller/node_linux.go b/go-controller/hybrid-overlay/pkg/controller/node_linux.go index b9da7e7d60..3dd051adaa 100644 --- a/go-controller/hybrid-overlay/pkg/controller/node_linux.go +++ b/go-controller/hybrid-overlay/pkg/controller/node_linux.go @@ -42,7 +42,10 @@ type flowCacheEntry struct { // NodeController is the node hybrid overlay controller type NodeController struct { nodeName string - // an atomic uint32 for testing purposes 0 = uninitialized and 1 = initialized + // an atomic uint32 to check the initialization status + // 0 = not initialized + // 1 = drIP and drMAC are set but the initial pods are not wired for hybrid overlay + // 2 = drIP and drMAC are set and the intiial pods are wired for hybrid overlay initialized uint32 drMAC net.HardwareAddr drIP net.IP @@ -55,6 +58,7 @@ type NodeController struct { flowChan chan struct{} nodeLister listers.NodeLister + podLister listers.PodLister } // newNodeController returns a node handler that listens for node events @@ -66,6 +70,7 @@ func newNodeController( _ kube.Interface, nodeName string, nodeLister listers.NodeLister, + podLister listers.PodLister, ) (nodeController, error) { node := &NodeController{ @@ -75,7 +80,9 @@ func newNodeController( flowMutex: sync.Mutex{}, flowChan: make(chan struct{}, 1), nodeLister: nodeLister, + podLister: podLister, } + atomic.StoreUint32(&node.initialized, 0) return node, nil } @@ -269,6 +276,20 @@ func (n *NodeController) AddNode(node *kapi.Node) error { klog.Infof("Add hybridOverlay Node %s", node.Name) err = n.hybridOverlayNodeUpdate(node) } + if atomic.LoadUint32(&n.initialized) == 1 { + pods, err := n.podLister.List(labels.Everything()) + if err != nil { + return fmt.Errorf("cannot fully initialize node %s for hybrid overlay, cannot list pods: %v", n.nodeName, err) + } + + for _, pod := range pods { + err := n.AddPod(pod) + if err != nil { + klog.Errorf("Cannot wire pod %s for hybrid overlay, %v", pod.Name, err) + } + } + atomic.StoreUint32(&n.initialized, 2) + } return err } @@ -476,7 +497,7 @@ func (n *NodeController) handleHybridOverlayMACIPChange(node *kapi.Node) error { // EnsureHybridOverlayBridge sets up the hybrid overlay bridge func (n *NodeController) EnsureHybridOverlayBridge(node *kapi.Node) error { - if atomic.LoadUint32(&n.initialized) == 1 { + if atomic.LoadUint32(&n.initialized) >= 1 { if node.Annotations[hotypes.HybridOverlayDRIP] != n.drIP.String() || node.Annotations[hotypes.HybridOverlayDRMAC] != n.drMAC.String() { if err := n.handleHybridOverlayMACIPChange(node); err != nil { diff --git a/go-controller/hybrid-overlay/pkg/controller/node_linux_test.go b/go-controller/hybrid-overlay/pkg/controller/node_linux_test.go index a2c7c1d313..aa21067887 100644 --- a/go-controller/hybrid-overlay/pkg/controller/node_linux_test.go +++ b/go-controller/hybrid-overlay/pkg/controller/node_linux_test.go @@ -7,6 +7,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/urfave/cli/v2" v1 "k8s.io/api/core/v1" @@ -425,7 +426,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() { linuxNode, okay := n.controller.(*NodeController) Expect(okay).To(BeTrue()) Eventually(func() bool { - return atomic.LoadUint32(&linuxNode.initialized) == 1 + return atomic.LoadUint32(&linuxNode.initialized) == 2 }, 2).Should(BeTrue()) Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc) @@ -483,7 +484,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() { linuxNode, okay := n.controller.(*NodeController) Expect(okay).To(BeTrue()) Eventually(func() bool { - return atomic.LoadUint32(&linuxNode.initialized) == 1 + return atomic.LoadUint32(&linuxNode.initialized) == 2 }, 2).Should(BeTrue()) Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc) @@ -516,6 +517,87 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() { } appRun(app) }) + ovntest.OnSupportedPlatformsIt("on startup will add a local linux pod that times out on the initial addPod event", func() { + app.Action = func(ctx *cli.Context) error { + const ( + pod1IP string = "1.2.3.5" + pod1CIDR string = pod1IP + "/24" + pod1MAC string = "aa:bb:cc:dd:ee:ff" + ) + + annotations := createNodeAnnotationsForSubnet(thisNodeSubnet) + annotations[hotypes.HybridOverlayDRMAC] = thisNodeDRMAC + annotations["k8s.ovn.org/node-gateway-router-lrp-ifaddr"] = "{\"ipv4\":\"100.64.0.3/16\"}" + annotations[hotypes.HybridOverlayDRIP] = thisNodeDRIP + node := createNode(thisNode, "linux", thisNodeIP, annotations) + testPod := createPod("test", "pod1", thisNode, pod1CIDR, pod1MAC) + fakeClient := fake.NewSimpleClientset( + //&v1.NodeList{ + // Items: []v1.Node{*node}, + //}, + &v1.PodList{ + Items: []v1.Pod{*testPod}, + }, + ) + + // Node setup from initial node sync + addNodeSetupCmds(fexec, thisNode) + _, err := config.InitConfig(ctx, fexec, nil) + Expect(err).NotTo(HaveOccurred()) + + f := informers.NewSharedInformerFactory(fakeClient, informer.DefaultResyncInterval) + + n, err := NewNode( + &kube.Kube{KClient: fakeClient}, + thisNode, + f.Core().V1().Nodes().Informer(), + f.Core().V1().Pods().Informer(), + informer.NewTestEventHandler, + ) + Expect(err).NotTo(HaveOccurred()) + + addEnsureHybridOverlayBridgeMocks(nlMock, thisNodeDRIP, "") + // initial flowSync + addSyncFlows(fexec) + // flowsync after EnsureHybridOverlayBridge() + addSyncFlows(fexec) + addSyncFlows(fexec) + + f.Start(stopChan) + wg.Add(1) + go func() { + defer wg.Done() + n.Run(stopChan) + }() + + linuxNode, okay := n.controller.(*NodeController) + Expect(okay).To(BeTrue()) + time.Sleep(2 * time.Second) + _, err = fakeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() bool { + return atomic.LoadUint32(&linuxNode.initialized) == 2 + }, 2).Should(BeTrue()) + + initialFlowCache := map[string]*flowCacheEntry{ + "0x0": generateInitialFlowCacheEntry(mgmtIfAddr.IP.String(), thisNodeDRIP, thisNodeDRMAC), + } + + initialFlowCache[podIPToCookie(net.ParseIP(pod1IP))] = &flowCacheEntry{ + flows: []string{"table=10,cookie=0x" + podIPToCookie(net.ParseIP(pod1IP)) + ",priority=100,ip,nw_dst=" + pod1IP + ",actions=set_field:" + thisNodeDRMAC + "->eth_src,set_field:" + pod1MAC + "->eth_dst,output:ext"}, + ignoreLearn: true, + } + Eventually(func() error { + linuxNode.flowMutex.Lock() + defer linuxNode.flowMutex.Unlock() + return compareFlowCache(linuxNode.flowCache, initialFlowCache) + }, 2).Should(BeNil()) + Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc) + return nil + } + appRun(app) + }) ovntest.OnSupportedPlatformsIt("sets up tunnels for Windows nodes", func() { app.Action = func(ctx *cli.Context) error { @@ -569,7 +651,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() { linuxNode, okay := n.controller.(*NodeController) Expect(okay).To(BeTrue()) Eventually(func() bool { - return atomic.LoadUint32(&linuxNode.initialized) == 1 + return atomic.LoadUint32(&linuxNode.initialized) == 2 }, 2).Should(BeTrue()) Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc) @@ -668,7 +750,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() { linuxNode, okay := n.controller.(*NodeController) Expect(okay).To(BeTrue()) Eventually(func() bool { - return atomic.LoadUint32(&linuxNode.initialized) == 1 + return atomic.LoadUint32(&linuxNode.initialized) == 2 }, 2).Should(BeTrue()) Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc) @@ -803,7 +885,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() { linuxNode, okay := n.controller.(*NodeController) Expect(okay).To(BeTrue()) Eventually(func() bool { - return atomic.LoadUint32(&linuxNode.initialized) == 1 + return atomic.LoadUint32(&linuxNode.initialized) == 2 }, 2).Should(BeTrue()) Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc) diff --git a/go-controller/hybrid-overlay/pkg/controller/node_windows.go b/go-controller/hybrid-overlay/pkg/controller/node_windows.go index 61d8d23b7c..903cf3d2a4 100644 --- a/go-controller/hybrid-overlay/pkg/controller/node_windows.go +++ b/go-controller/hybrid-overlay/pkg/controller/node_windows.go @@ -38,6 +38,7 @@ type NodeController struct { func newNodeController(kube kube.Interface, nodeName string, nodeLister listers.NodeLister, + podLister listers.PodLister, ) (nodeController, error) { supportedFeatures := hcn.GetSupportedFeatures() if !supportedFeatures.HostRoute {