Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions go-controller/pkg/clustermanager/network_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,32 +203,36 @@ func (ncc *networkClusterController) syncNodeClusterSubnet(node *corev1.Node) er
klog.Warningf("Failed to get node %s host subnets annotations for network %s : %v", node.Name, ncc.networkName, err)
}

// On return validExistingSubnets will contain any valid subnets that
// were already assigned to the node. allocatedSubnets will contain
// any newly allocated subnets required to ensure that the node has one subnet
// from each enabled IP family.
ipv4Mode, ipv6Mode := ncc.IPMode()
hostSubnets, allocatedSubnets, err := ncc.clusterSubnetAllocator.AllocateNodeSubnets(node.Name, existingSubnets, ipv4Mode, ipv6Mode)
validExistingSubnets, allocatedSubnets, err := ncc.clusterSubnetAllocator.AllocateNodeSubnets(node.Name, existingSubnets, ipv4Mode, ipv6Mode)
if err != nil {
return err
}

if len(allocatedSubnets) == 0 {
return nil
}

// Release the allocation on error
defer func() {
// If the existing subnets weren't OK, or new ones were allocated, update the node annotation.
// This happens in a couple cases:
// 1) new node: no existing subnets and one or more new subnets were allocated
// 2) dual-stack to single-stack conversion: two existing subnets but only one will be valid, and no allocated subnets
// 3) bad subnet annotation: one more existing subnets will be invalid and might have allocated a correct one
if len(existingSubnets) != len(validExistingSubnets) || len(allocatedSubnets) > 0 {
updatedSubnetsMap := map[string][]*net.IPNet{ncc.networkName: validExistingSubnets}
err = ncc.updateNodeSubnetAnnotationWithRetry(node.Name, updatedSubnetsMap)
if err != nil {
if errR := ncc.clusterSubnetAllocator.ReleaseNodeSubnets(node.Name, allocatedSubnets...); errR != nil {
klog.Warningf("Error releasing node %s subnets: %v", node.Name, errR)
}
return err
}
}()

hostSubnetsMap := map[string][]*net.IPNet{ncc.networkName: hostSubnets}
}

err = ncc.updateNodeSubnetAnnotationWithRetry(node.Name, hostSubnetsMap)
return err
return nil
}

// handleAddUpdateNodeEvent handles the delete node event
// handleDeleteNode handles the delete node event
func (ncc *networkClusterController) handleDeleteNode(node *corev1.Node) error {
if ncc.enableHybridOverlaySubnetAllocator {
ncc.releaseHybridOverlayNodeSubnet(node.Name)
Expand Down Expand Up @@ -260,7 +264,7 @@ func (ncc *networkClusterController) syncNodes(nodes []interface{}) error {
} else if hostSubnet != nil {
klog.V(5).Infof("Node %s contains subnets: %v", node.Name, hostSubnet)
if err := ncc.hybridOverlaySubnetAllocator.MarkSubnetsAllocated(node.Name, hostSubnet); err != nil {
klog.Errorf("Failed to mark the subnet %v as allocated in the hybrid subnet allocator for node %s: %w", hostSubnet, node.Name, err)
klog.Errorf("Failed to mark the subnet %v as allocated in the hybrid subnet allocator for node %s: %v", hostSubnet, node.Name, err)
}
}
}
Expand All @@ -269,7 +273,7 @@ func (ncc *networkClusterController) syncNodes(nodes []interface{}) error {
if len(hostSubnets) > 0 {
klog.V(5).Infof("Node %s contains subnets: %v for network : %s", node.Name, hostSubnets, ncc.networkName)
if err := ncc.clusterSubnetAllocator.MarkSubnetsAllocated(node.Name, hostSubnets...); err != nil {
klog.Errorf("Failed to mark the subnet %v as allocated in the cluster subnet allocator for node %s: %w", hostSubnets, node.Name, err)
klog.Errorf("Failed to mark the subnet %v as allocated in the cluster subnet allocator for node %s: %v", hostSubnets, node.Name, err)
}
} else {
klog.V(5).Infof("Node %s contains no subnets for network : %s", node.Name, ncc.networkName)
Expand Down
203 changes: 203 additions & 0 deletions go-controller/pkg/clustermanager/network_cluster_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package clustermanager

import (
"context"
"net"
"sync"

"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
"github.com/urfave/cli/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
ovntest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing"
ovntypes "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
)

var _ = ginkgo.Describe("Network Cluster Controller", func() {
var (
app *cli.App
f *factory.WatchFactory
stopChan chan struct{}
wg *sync.WaitGroup
)

ginkgo.BeforeEach(func() {
// Restore global default values before each testcase
gomega.Expect(config.PrepareTestConfig()).To(gomega.Succeed())

app = cli.NewApp()
app.Name = "test"
app.Flags = config.Flags
stopChan = make(chan struct{})
wg = &sync.WaitGroup{}
})

ginkgo.AfterEach(func() {
close(stopChan)
if f != nil {
f.Shutdown()
}
wg.Wait()
})

ginkgo.Context("Host Subnets", func() {
ginkgo.It("removes an unused dual-stack allocation from single-stack cluster", func() {
app.Action = func(ctx *cli.Context) error {
nodes := []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{
"k8s.ovn.org/node-subnets": "{\"default\":[\"10.128.0.0/24\", \"fd02:0:0:2::2895/64\"]}",
},
},
},
}
kubeFakeClient := fake.NewSimpleClientset(&v1.NodeList{
Items: nodes,
})
fakeClient := &util.OVNClusterManagerClientset{
KubeClient: kubeFakeClient,
}

_, err := config.InitConfig(ctx, nil, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
config.Kubernetes.HostNetworkNamespace = ""

f, err = factory.NewClusterManagerWatchFactory(fakeClient)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = f.Start()
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ncc := newNetworkClusterController(ovntypes.DefaultNetworkName, config.Default.ClusterSubnets,
fakeClient, f, false, &util.DefaultNetInfo{}, &util.DefaultNetConfInfo{})
ncc.Start(ctx.Context)
defer ncc.Stop()

// Check that the default network controller removes the unused v6 node subnet annotation
gomega.Eventually(func() ([]*net.IPNet, error) {
updatedNode, err := fakeClient.KubeClient.CoreV1().Nodes().Get(context.TODO(), nodes[0].Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

return util.ParseNodeHostSubnetAnnotation(updatedNode, ovntypes.DefaultNetworkName)
}, 2).Should(gomega.Equal(ovntest.MustParseIPNets("10.128.0.0/24")))

return nil
}

err := app.Run([]string{
app.Name,
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("allocates a subnet for a new node", func() {
app.Action = func(ctx *cli.Context) error {
nodes := []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
}
kubeFakeClient := fake.NewSimpleClientset(&v1.NodeList{
Items: nodes,
})
fakeClient := &util.OVNClusterManagerClientset{
KubeClient: kubeFakeClient,
}

_, err := config.InitConfig(ctx, nil, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
config.Kubernetes.HostNetworkNamespace = ""

f, err = factory.NewClusterManagerWatchFactory(fakeClient)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = f.Start()
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ncc := newNetworkClusterController(ovntypes.DefaultNetworkName, config.Default.ClusterSubnets,
fakeClient, f, false, &util.DefaultNetInfo{}, &util.DefaultNetConfInfo{})
ncc.Start(ctx.Context)
defer ncc.Stop()

// Check that the default network controller adds a subnet for the new node
gomega.Eventually(func() ([]*net.IPNet, error) {
updatedNode, err := fakeClient.KubeClient.CoreV1().Nodes().Get(context.TODO(), nodes[0].Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

return util.ParseNodeHostSubnetAnnotation(updatedNode, ovntypes.DefaultNetworkName)
}, 2).Should(gomega.Equal(ovntest.MustParseIPNets("10.128.0.0/23")))

return nil
}

err := app.Run([]string{
app.Name,
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("removes an invalid single-stack annotation", func() {
app.Action = func(ctx *cli.Context) error {
nodes := []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{
"k8s.ovn.org/node-subnets": "{\"default\":[\"10.128.0.0/24\", \"1.2.3.0/24\"]}",
},
},
},
}
kubeFakeClient := fake.NewSimpleClientset(&v1.NodeList{
Items: nodes,
})
fakeClient := &util.OVNClusterManagerClientset{
KubeClient: kubeFakeClient,
}

_, err := config.InitConfig(ctx, nil, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
config.Kubernetes.HostNetworkNamespace = ""

f, err = factory.NewClusterManagerWatchFactory(fakeClient)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = f.Start()
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ncc := newNetworkClusterController(ovntypes.DefaultNetworkName, config.Default.ClusterSubnets,
fakeClient, f, false, &util.DefaultNetInfo{}, &util.DefaultNetConfInfo{})
ncc.Start(ctx.Context)
defer ncc.Stop()

// Check that the default network controller removes the unused v6 node subnet annotation
gomega.Eventually(func() ([]*net.IPNet, error) {
updatedNode, err := fakeClient.KubeClient.CoreV1().Nodes().Get(context.TODO(), nodes[0].Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

return util.ParseNodeHostSubnetAnnotation(updatedNode, ovntypes.DefaultNetworkName)
}, 2).Should(gomega.Equal(ovntest.MustParseIPNets("10.128.0.0/24")))

return nil
}

err := app.Run([]string{
app.Name,
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})
})
})
21 changes: 12 additions & 9 deletions go-controller/pkg/ovn/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,19 @@ func (oc *DefaultNetworkController) addNode(node *kapi.Node) ([]*net.IPNet, erro
return nil, err
}

// OVN can work in single-stack or dual-stack only.
currentHostSubnets := len(hostSubnets)
expectedHostSubnets := 1
// if dual-stack mode we expect one subnet per each IP family
if config.IPv4Mode && config.IPv6Mode {
expectedHostSubnets = 2
// We expect one subnet per configured ClusterNetwork IP family.
var haveV4, haveV6 bool
for _, net := range hostSubnets {
if !haveV4 {
haveV4 = net.IP.To4() != nil
}
if !haveV6 {
haveV6 = net.IP.To4() == nil
}
}

if expectedHostSubnets != currentHostSubnets {
return nil, fmt.Errorf("failed to get expected host subnets for node %s, expected %d subnet(s) but current number of subnet(s) is %d", node.Name, expectedHostSubnets, currentHostSubnets)
if haveV4 != config.IPv4Mode || haveV6 != config.IPv6Mode {
return nil, fmt.Errorf("failed to get expected host subnets for node %s; expected v4 %v have %v, expected v6 %v have %v",
node.Name, config.IPv4Mode, haveV4, config.IPv6Mode, haveV6)
}

gwLRPIPs, err := oc.joinSwIPManager.EnsureJoinLRPIPs(node.Name)
Expand Down
61 changes: 60 additions & 1 deletion go-controller/pkg/ovn/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func startFakeController(oc *DefaultNetworkController, wg *sync.WaitGroup) []*ne
return clusterSubnets
}

var _ = ginkgo.Describe("Gateway Init Operations", func() {
var _ = ginkgo.Describe("Default network controller operations", func() {
var (
app *cli.App
f *factory.WatchFactory
Expand Down Expand Up @@ -1456,6 +1456,65 @@ var _ = ginkgo.Describe("Gateway Init Operations", func() {
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("reconciles node host subnets after dual-stack to single-stack downgrade", func() {
app.Action = func(ctx *cli.Context) error {
_, err := config.InitConfig(ctx, nil, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

newNodeSubnet := "10.1.1.0/24"
newNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "newNode",
Annotations: map[string]string{
"k8s.ovn.org/node-subnets": fmt.Sprintf("{\"default\":[\"%s\", \"fd02:0:0:2::2895/64\"]}", newNodeSubnet),
},
},
}

_, err = kubeFakeClient.CoreV1().Nodes().Create(context.TODO(), newNode, metav1.CreateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

startFakeController(oc, wg)

// check that a node event complaining about the mismatch between
// node subnets and cluster subnets was posted
gomega.Eventually(func() []string {
eventsLock.Lock()
defer eventsLock.Unlock()
eventsCopy := make([]string, 0, len(events))
for _, e := range events {
eventsCopy = append(eventsCopy, e)
}
return eventsCopy
}, 10).Should(gomega.ContainElement(gomega.ContainSubstring("failed to get expected host subnets for node newNode; expected v4 true have true, expected v6 false have true")))

// Simulate the ClusterManager reconciling the node annotations to single-stack
newNode, err = kubeFakeClient.CoreV1().Nodes().Get(context.TODO(), newNode.Name, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
newNode.Annotations["k8s.ovn.org/node-subnets"] = fmt.Sprintf("{\"default\":[\"%s\"]}", newNodeSubnet)
_, err = kubeFakeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

// Ensure that the node's switch is eventually created once the annotations
// are reconiled by the network cluster controller
newNodeLS := &nbdb.LogicalSwitch{Name: newNode.Name}
gomega.Eventually(func() error {
_, err := libovsdbops.GetLogicalSwitch(nbClient, newNodeLS)
return err
}, 10).ShouldNot(gomega.HaveOccurred())

return nil
}

err := app.Run([]string{
app.Name,
"-cluster-subnets=" + clusterCIDR,
"--init-gateways",
"--nodeport",
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})
})

func nodeNoHostSubnetAnnotation() map[string]string {
Expand Down