Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go-controller/hybrid-overlay/pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func NewNode(
) (*Node, error) {

nodeLister := listers.NewNodeLister(nodeInformer.GetIndexer())
podLister := listers.NewPodLister(podInformer.GetIndexer())

controller, err := newNodeController(kube, nodeName, nodeLister)
controller, err := newNodeController(kube, nodeName, nodeLister, podLister)
if err != nil {
return nil, err
}
Expand Down
25 changes: 23 additions & 2 deletions go-controller/hybrid-overlay/pkg/controller/node_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ type flowCacheEntry struct {
// NodeController is the node hybrid overlay controller
type NodeController struct {
nodeName string
// an atomic uint32 for testing purposes 0 = uninitialized and 1 = initialized
// an atomic uint32 to check the initialization status
// 0 = not initialized
// 1 = drIP and drMAC are set but the initial pods are not wired for hybrid overlay
// 2 = drIP and drMAC are set and the intiial pods are wired for hybrid overlay
initialized uint32
drMAC net.HardwareAddr
drIP net.IP
Expand All @@ -55,6 +58,7 @@ type NodeController struct {
flowChan chan struct{}

nodeLister listers.NodeLister
podLister listers.PodLister
}

// newNodeController returns a node handler that listens for node events
Expand All @@ -66,6 +70,7 @@ func newNodeController(
_ kube.Interface,
nodeName string,
nodeLister listers.NodeLister,
podLister listers.PodLister,
) (nodeController, error) {

node := &NodeController{
Expand All @@ -75,7 +80,9 @@ func newNodeController(
flowMutex: sync.Mutex{},
flowChan: make(chan struct{}, 1),
nodeLister: nodeLister,
podLister: podLister,
}
atomic.StoreUint32(&node.initialized, 0)
return node, nil
}

Expand Down Expand Up @@ -269,6 +276,20 @@ func (n *NodeController) AddNode(node *kapi.Node) error {
klog.Infof("Add hybridOverlay Node %s", node.Name)
err = n.hybridOverlayNodeUpdate(node)
}
if atomic.LoadUint32(&n.initialized) == 1 {
pods, err := n.podLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("cannot fully initialize node %s for hybrid overlay, cannot list pods: %v", n.nodeName, err)
}

for _, pod := range pods {
err := n.AddPod(pod)
if err != nil {
klog.Errorf("Cannot wire pod %s for hybrid overlay, %v", pod.Name, err)
}
}
atomic.StoreUint32(&n.initialized, 2)
}
return err
}

Expand Down Expand Up @@ -476,7 +497,7 @@ func (n *NodeController) handleHybridOverlayMACIPChange(node *kapi.Node) error {

// EnsureHybridOverlayBridge sets up the hybrid overlay bridge
func (n *NodeController) EnsureHybridOverlayBridge(node *kapi.Node) error {
if atomic.LoadUint32(&n.initialized) == 1 {
if atomic.LoadUint32(&n.initialized) >= 1 {
if node.Annotations[hotypes.HybridOverlayDRIP] != n.drIP.String() ||
node.Annotations[hotypes.HybridOverlayDRMAC] != n.drMAC.String() {
if err := n.handleHybridOverlayMACIPChange(node); err != nil {
Expand Down
92 changes: 87 additions & 5 deletions go-controller/hybrid-overlay/pkg/controller/node_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/urfave/cli/v2"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -425,7 +426,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() {
linuxNode, okay := n.controller.(*NodeController)
Expect(okay).To(BeTrue())
Eventually(func() bool {
return atomic.LoadUint32(&linuxNode.initialized) == 1
return atomic.LoadUint32(&linuxNode.initialized) == 2
}, 2).Should(BeTrue())

Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc)
Expand Down Expand Up @@ -483,7 +484,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() {
linuxNode, okay := n.controller.(*NodeController)
Expect(okay).To(BeTrue())
Eventually(func() bool {
return atomic.LoadUint32(&linuxNode.initialized) == 1
return atomic.LoadUint32(&linuxNode.initialized) == 2
}, 2).Should(BeTrue())

Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc)
Expand Down Expand Up @@ -516,6 +517,87 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() {
}
appRun(app)
})
ovntest.OnSupportedPlatformsIt("on startup will add a local linux pod that times out on the initial addPod event", func() {
app.Action = func(ctx *cli.Context) error {
const (
pod1IP string = "1.2.3.5"
pod1CIDR string = pod1IP + "/24"
pod1MAC string = "aa:bb:cc:dd:ee:ff"
)

annotations := createNodeAnnotationsForSubnet(thisNodeSubnet)
annotations[hotypes.HybridOverlayDRMAC] = thisNodeDRMAC
annotations["k8s.ovn.org/node-gateway-router-lrp-ifaddr"] = "{\"ipv4\":\"100.64.0.3/16\"}"
annotations[hotypes.HybridOverlayDRIP] = thisNodeDRIP
node := createNode(thisNode, "linux", thisNodeIP, annotations)
testPod := createPod("test", "pod1", thisNode, pod1CIDR, pod1MAC)
fakeClient := fake.NewSimpleClientset(
//&v1.NodeList{
// Items: []v1.Node{*node},
//},
&v1.PodList{
Items: []v1.Pod{*testPod},
},
)

// Node setup from initial node sync
addNodeSetupCmds(fexec, thisNode)
_, err := config.InitConfig(ctx, fexec, nil)
Expect(err).NotTo(HaveOccurred())

f := informers.NewSharedInformerFactory(fakeClient, informer.DefaultResyncInterval)

n, err := NewNode(
&kube.Kube{KClient: fakeClient},
thisNode,
f.Core().V1().Nodes().Informer(),
f.Core().V1().Pods().Informer(),
informer.NewTestEventHandler,
)
Expect(err).NotTo(HaveOccurred())

addEnsureHybridOverlayBridgeMocks(nlMock, thisNodeDRIP, "")
// initial flowSync
addSyncFlows(fexec)
// flowsync after EnsureHybridOverlayBridge()
addSyncFlows(fexec)
addSyncFlows(fexec)

f.Start(stopChan)
wg.Add(1)
go func() {
defer wg.Done()
n.Run(stopChan)
}()

linuxNode, okay := n.controller.(*NodeController)
Expect(okay).To(BeTrue())
time.Sleep(2 * time.Second)
_, err = fakeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())

Eventually(func() bool {
return atomic.LoadUint32(&linuxNode.initialized) == 2
}, 2).Should(BeTrue())

initialFlowCache := map[string]*flowCacheEntry{
"0x0": generateInitialFlowCacheEntry(mgmtIfAddr.IP.String(), thisNodeDRIP, thisNodeDRMAC),
}

initialFlowCache[podIPToCookie(net.ParseIP(pod1IP))] = &flowCacheEntry{
flows: []string{"table=10,cookie=0x" + podIPToCookie(net.ParseIP(pod1IP)) + ",priority=100,ip,nw_dst=" + pod1IP + ",actions=set_field:" + thisNodeDRMAC + "->eth_src,set_field:" + pod1MAC + "->eth_dst,output:ext"},
ignoreLearn: true,
}
Eventually(func() error {
linuxNode.flowMutex.Lock()
defer linuxNode.flowMutex.Unlock()
return compareFlowCache(linuxNode.flowCache, initialFlowCache)
}, 2).Should(BeNil())
Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc)
return nil
}
appRun(app)
})

ovntest.OnSupportedPlatformsIt("sets up tunnels for Windows nodes", func() {
app.Action = func(ctx *cli.Context) error {
Expand Down Expand Up @@ -569,7 +651,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() {
linuxNode, okay := n.controller.(*NodeController)
Expect(okay).To(BeTrue())
Eventually(func() bool {
return atomic.LoadUint32(&linuxNode.initialized) == 1
return atomic.LoadUint32(&linuxNode.initialized) == 2
}, 2).Should(BeTrue())

Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc)
Expand Down Expand Up @@ -668,7 +750,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() {
linuxNode, okay := n.controller.(*NodeController)
Expect(okay).To(BeTrue())
Eventually(func() bool {
return atomic.LoadUint32(&linuxNode.initialized) == 1
return atomic.LoadUint32(&linuxNode.initialized) == 2
}, 2).Should(BeTrue())

Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc)
Expand Down Expand Up @@ -803,7 +885,7 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() {
linuxNode, okay := n.controller.(*NodeController)
Expect(okay).To(BeTrue())
Eventually(func() bool {
return atomic.LoadUint32(&linuxNode.initialized) == 1
return atomic.LoadUint32(&linuxNode.initialized) == 2
}, 2).Should(BeTrue())

Eventually(fexec.CalledMatchesExpected, 2).Should(BeTrue(), fexec.ErrorDesc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type NodeController struct {
func newNodeController(kube kube.Interface,
nodeName string,
nodeLister listers.NodeLister,
podLister listers.PodLister,
) (nodeController, error) {
supportedFeatures := hcn.GetSupportedFeatures()
if !supportedFeatures.HostRoute {
Expand Down