Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func newNodeController(kube kube.Interface,
"UDP port. Please make sure you install all the KB updates on your system.")
}

node, err := kube.GetNode(nodeName)
node, err := kube.GetNodeForWindows(nodeName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -345,7 +345,7 @@ func (n *NodeController) initSelf(node *corev1.Node, nodeSubnet *net.IPNet) erro
}

// Add existing nodes
nodes, err := n.kube.GetNodes()
nodes, err := n.kube.GetNodesForWindows()
if err != nil {
return fmt.Errorf("error in initializing/fetching nodes: %v", err)
}
Expand All @@ -370,7 +370,7 @@ func (n *NodeController) uninitSelf(node *corev1.Node) error {
networkName, n.networkID, node.Name)

// Remove existing nodes
nodes, err := n.kube.GetNodes()
nodes, err := n.kube.GetNodesForWindows()
if err != nil {
return fmt.Errorf("failed to get nodes: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (n *NodeController) AddNode(node *corev1.Node) error {
} else {
// Make sure the local node has been initialized before adding a hybridOverlay remote node
if atomic.LoadUint32(n.initState) < hotypes.DistributedRouterInitialized {
localNode, err := n.kube.GetNode(n.nodeName)
localNode, err := n.nodeLister.Get(n.nodeName)
if err != nil {
return fmt.Errorf("cannot get local node: %s: %w", n.nodeName, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (ncc *networkClusterController) Reconcile(netInfo util.NetInfo) error {
klog.Errorf("Failed to reconcile network %s: %v", ncc.GetNetworkName(), err)
}
if reconcilePendingPods && ncc.retryPods != nil {
if err := objretry.RequeuePendingPods(ncc.kube, ncc.GetNetInfo(), ncc.retryPods); err != nil {
if err := objretry.RequeuePendingPods(ncc.watchFactory, ncc.GetNetInfo(), ncc.retryPods); err != nil {
klog.Errorf("Failed to requeue pending pods for network %s: %v", ncc.GetNetworkName(), err)
}
}
Expand Down Expand Up @@ -576,7 +576,7 @@ func (h *networkClusterControllerEventHandler) UpdateResource(oldObj, newObj int
// 1. we missed an add event (bug in kapi informer code)
// 2. a user removed the annotation on the node
// Either way to play it safe for now do a partial json unmarshal check
if !nodeFailed && util.NoHostSubnet(oldNode) != util.NoHostSubnet(newNode) && !h.ncc.nodeAllocator.NeedsNodeAllocation(newNode) {
if !nodeFailed && util.NoHostSubnet(oldNode) == util.NoHostSubnet(newNode) && !h.ncc.nodeAllocator.NeedsNodeAllocation(newNode) {
// no other node updates would require us to reconcile again
return nil
}
Expand Down
17 changes: 7 additions & 10 deletions go-controller/pkg/clustermanager/node/node_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,27 +195,24 @@ func (na *NodeAllocator) NeedsNodeAllocation(node *corev1.Node) bool {
}

// ovn node check
// allocation is all or nothing, so if one field was allocated from:
// nodeSubnets, joinSubnet, layer 2 tunnel id, then all of them were
if na.hasNodeSubnetAllocation() {
if util.HasNodeHostSubnetAnnotation(node, na.netInfo.GetNetworkName()) {
return false
if !util.HasNodeHostSubnetAnnotation(node, na.netInfo.GetNetworkName()) {
return true
}
}

if na.hasJoinSubnetAllocation() {
if util.HasNodeGatewayRouterJoinNetwork(node, na.netInfo.GetNetworkName()) {
return false
if !util.HasNodeGatewayRouterJoinNetwork(node, na.netInfo.GetNetworkName()) {
return true
}
}

if util.IsNetworkSegmentationSupportEnabled() && na.netInfo.IsPrimaryNetwork() && util.DoesNetworkRequireTunnelIDs(na.netInfo) {
if util.HasUDNLayer2NodeGRLRPTunnelID(node, na.netInfo.GetNetworkName()) {
return false
if !util.HasUDNLayer2NodeGRLRPTunnelID(node, na.netInfo.GetNetworkName()) {
return true
}
}

return true
return false

}

Expand Down
34 changes: 34 additions & 0 deletions go-controller/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const DefaultVXLANPort = 4789

const DefaultDBTxnTimeout = time.Second * 100

// DefaultEphemeralPortRange is used for unit testing only
const DefaultEphemeralPortRange = "32768-60999"

// The following are global config parameters that other modules may access directly
var (
// Build information. Populated at build-time.
Expand Down Expand Up @@ -494,6 +497,10 @@ type GatewayConfig struct {
DisableForwarding bool `gcfg:"disable-forwarding"`
// AllowNoUplink (disabled by default) controls if the external gateway bridge without an uplink port is allowed in local gateway mode.
AllowNoUplink bool `gcfg:"allow-no-uplink"`
// EphemeralPortRange is the range of ports used by egress SNAT operations in OVN. Specifically for NAT where
// the source IP of the NAT will be a shared Node IP address. If unset, the value will be determined by sysctl lookup
// for the kernel's ephemeral range: net.ipv4.ip_local_port_range. Format is "<min port>-<max port>".
EphemeralPortRange string `gfcg:"ephemeral-port-range"`
}

// OvnAuthConfig holds client authentication and location details for
Expand Down Expand Up @@ -664,6 +671,9 @@ func PrepareTestConfig() error {
Kubernetes.DisableRequestedChassis = false
EnableMulticast = false
Default.OVSDBTxnTimeout = 5 * time.Second
if Gateway.Mode != GatewayModeDisabled {
Gateway.EphemeralPortRange = DefaultEphemeralPortRange
}

if err := completeConfig(); err != nil {
return err
Expand Down Expand Up @@ -1509,6 +1519,14 @@ var OVNGatewayFlags = []cli.Flag{
Usage: "Allow the external gateway bridge without an uplink port in local gateway mode",
Destination: &cliConfig.Gateway.AllowNoUplink,
},
&cli.StringFlag{
Name: "ephemeral-port-range",
Usage: "The port range in '<min port>-<max port>' format for OVN to use when SNAT'ing to a node IP. " +
"This range should not collide with the node port range being used in Kubernetes. If not provided, " +
"the default value will be derived from checking the sysctl value of net.ipv4.ip_local_port_range on the node.",
Destination: &cliConfig.Gateway.EphemeralPortRange,
Value: Gateway.EphemeralPortRange,
},
// Deprecated CLI options
&cli.BoolFlag{
Name: "init-gateways",
Expand Down Expand Up @@ -1917,6 +1935,19 @@ func buildGatewayConfig(ctx *cli.Context, cli, file *config) error {
if !found {
return fmt.Errorf("invalid gateway mode %q: expect one of %s", string(Gateway.Mode), strings.Join(validModes, ","))
}

if len(Gateway.EphemeralPortRange) > 0 {
if !isValidEphemeralPortRange(Gateway.EphemeralPortRange) {
return fmt.Errorf("invalid ephemeral-port-range, should be in the format <min port>-<max port>")
}
} else {
// auto-detect ephermal range
portRange, err := getKernelEphemeralPortRange()
if err != nil {
return fmt.Errorf("unable to auto-detect ephemeral port range to use with OVN")
}
Gateway.EphemeralPortRange = portRange
}
}

// Options are only valid if Mode is not disabled
Expand All @@ -1927,6 +1958,9 @@ func buildGatewayConfig(ctx *cli.Context, cli, file *config) error {
if Gateway.NextHop != "" {
return fmt.Errorf("gateway next-hop option %q not allowed when gateway is disabled", Gateway.NextHop)
}
if len(Gateway.EphemeralPortRange) > 0 {
return fmt.Errorf("gateway ephemeral port range option not allowed when gateway is disabled")
}
}

if Gateway.Mode != GatewayModeShared && Gateway.VLANID != 0 {
Expand Down
48 changes: 48 additions & 0 deletions go-controller/pkg/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package config
import (
"fmt"
"net"
"os"
"reflect"
"regexp"
"strconv"
"strings"

Expand Down Expand Up @@ -328,3 +330,49 @@ func AllocateV6MasqueradeIPs(masqueradeSubnetNetworkAddress net.IP, masqueradeIP
}
return nil
}

func isValidEphemeralPortRange(s string) bool {
// Regex to match "<number>-<number>" with no extra characters
re := regexp.MustCompile(`^(\d{1,5})-(\d{1,5})$`)
matches := re.FindStringSubmatch(s)
if matches == nil {
return false
}

minPort, err1 := strconv.Atoi(matches[1])
maxPort, err2 := strconv.Atoi(matches[2])
if err1 != nil || err2 != nil {
return false
}

// Port numbers must be in the 1-65535 range
if minPort < 1 || minPort > 65535 || maxPort < 0 || maxPort > 65535 {
return false
}

return maxPort > minPort
}

func getKernelEphemeralPortRange() (string, error) {
data, err := os.ReadFile("/proc/sys/net/ipv4/ip_local_port_range")
if err != nil {
return "", fmt.Errorf("failed to read port range: %w", err)
}

parts := strings.Fields(string(data))
if len(parts) != 2 {
return "", fmt.Errorf("unexpected format: %q", string(data))
}

minPort, err := strconv.Atoi(parts[0])
if err != nil {
return "", fmt.Errorf("invalid min port: %w", err)
}

maxPort, err := strconv.Atoi(parts[1])
if err != nil {
return "", fmt.Errorf("invalid max port: %w", err)
}

return fmt.Sprintf("%d-%d", minPort, maxPort), nil
}
3 changes: 2 additions & 1 deletion go-controller/pkg/controllermanager/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
libovsdbops "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdb/ops"
libovsdbutil "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdb/util"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics/recorders"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/networkmanager"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/observability"
Expand Down Expand Up @@ -438,7 +439,7 @@ func (cm *ControllerManager) Start(ctx context.Context) error {
// with k=10,
// for a cluster with 10 nodes, measurement of 1 in every 100 requests
// for a cluster with 100 nodes, measurement of 1 in every 1000 requests
metrics.GetConfigDurationRecorder().Run(cm.nbClient, cm.kube, 10, time.Second*5, cm.stopChan)
recorders.GetConfigDurationRecorder().Run(cm.nbClient, cm.watchFactory, 10, time.Second*5, cm.stopChan)
}
cm.podRecorder.Run(cm.sbClient, cm.stopChan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ var _ = Describe("Healthcheck tests", func() {
},
}
nodeList := []*corev1.Node{node}
factoryMock.On("GetNode", nodeName).Return(nodeList[0], nil)
factoryMock.On("GetNodeForWindows", nodeName).Return(nodeList[0], nil)
factoryMock.On("GetNodes").Return(nodeList, nil)
factoryMock.On("UserDefinedNetworkInformer").Return(nil)
factoryMock.On("ClusterUserDefinedNetworkInformer").Return(nil)
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/kube/annotator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("Annotator", func() {
err := nodeAnnot.Run()
Expect(err).ToNot(HaveOccurred())

node, err := kube.GetNode(nodeName)
node, err := kube.GetNodeForWindows(nodeName)
Expect(err).ToNot(HaveOccurred())

// should contain initial annotations
Expand Down
57 changes: 13 additions & 44 deletions go-controller/pkg/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
Expand Down Expand Up @@ -62,12 +61,11 @@ type Interface interface {
PatchNode(old, new *corev1.Node) error
UpdateNodeStatus(node *corev1.Node) error
UpdatePodStatus(pod *corev1.Pod) error
GetAnnotationsOnPod(namespace, name string) (map[string]string, error)
GetNodes() ([]*corev1.Node, error)
GetNamespaces(labelSelector metav1.LabelSelector) ([]*corev1.Namespace, error)
GetPods(namespace string, opts metav1.ListOptions) ([]*corev1.Pod, error)
GetPod(namespace, name string) (*corev1.Pod, error)
GetNode(name string) (*corev1.Node, error)
// GetPodsForDBChecker should only be used by legacy DB checker. Use watchFactory instead to get pods.
GetPodsForDBChecker(namespace string, opts metav1.ListOptions) ([]*corev1.Pod, error)
// GetNodeForWindows should only be used for windows hybrid overlay binary and never in linux code
GetNodeForWindows(name string) (*corev1.Node, error)
GetNodesForWindows() ([]*corev1.Node, error)
Events() kv1core.EventInterface
}

Expand Down Expand Up @@ -201,7 +199,7 @@ func (k *Kube) SetAnnotationsOnService(namespace, name string, annotations map[s

// SetTaintOnNode tries to add a new taint to the node. If the taint already exists, it doesn't do anything.
func (k *Kube) SetTaintOnNode(nodeName string, taint *corev1.Taint) error {
node, err := k.GetNode(nodeName)
node, err := k.GetNodeForWindows(nodeName)
if err != nil {
klog.Errorf("Unable to retrieve node %s for tainting %s: %v", nodeName, taint.ToString(), err)
return err
Expand Down Expand Up @@ -234,7 +232,7 @@ func (k *Kube) SetTaintOnNode(nodeName string, taint *corev1.Taint) error {
// RemoveTaintFromNode removes all the taints that have the same key and effect from the node.
// If the taint doesn't exist, it doesn't do anything.
func (k *Kube) RemoveTaintFromNode(nodeName string, taint *corev1.Taint) error {
node, err := k.GetNode(nodeName)
node, err := k.GetNodeForWindows(nodeName)
if err != nil {
klog.Errorf("Unable to retrieve node %s for tainting %s: %v", nodeName, taint.ToString(), err)
return err
Expand Down Expand Up @@ -324,32 +322,8 @@ func (k *Kube) UpdatePodStatus(pod *corev1.Pod) error {
return err
}

// GetAnnotationsOnPod obtains the pod annotations from kubernetes apiserver, given the name and namespace
func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, error) {
pod, err := k.KClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return pod.ObjectMeta.Annotations, nil
}

// GetNamespaces returns the list of all Namespace objects matching the labelSelector
func (k *Kube) GetNamespaces(labelSelector metav1.LabelSelector) ([]*corev1.Namespace, error) {
list := []*corev1.Namespace{}
err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k.KClient.CoreV1().Namespaces().List(ctx, opts)
}).EachListItem(context.TODO(), metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
ResourceVersion: "0",
}, func(obj runtime.Object) error {
list = append(list, obj.(*corev1.Namespace))
return nil
})
return list, err
}

// GetPods returns the list of all Pod objects in a namespace matching the options
func (k *Kube) GetPods(namespace string, opts metav1.ListOptions) ([]*corev1.Pod, error) {
// GetPodsForDBChecker returns the list of all Pod objects in a namespace matching the options. Only used by the legacy db checker.
func (k *Kube) GetPodsForDBChecker(namespace string, opts metav1.ListOptions) ([]*corev1.Pod, error) {
list := []*corev1.Pod{}
opts.ResourceVersion = "0"
err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
Expand All @@ -361,13 +335,8 @@ func (k *Kube) GetPods(namespace string, opts metav1.ListOptions) ([]*corev1.Pod
return list, err
}

// GetPod obtains the pod from kubernetes apiserver, given the name and namespace
func (k *Kube) GetPod(namespace, name string) (*corev1.Pod, error) {
return k.KClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

// GetNodes returns the list of all Node objects from kubernetes
func (k *Kube) GetNodes() ([]*corev1.Node, error) {
// GetNodesForWindows returns the list of all Node objects from kubernetes. Only used by windows binary.
func (k *Kube) GetNodesForWindows() ([]*corev1.Node, error) {
list := []*corev1.Node{}
err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k.KClient.CoreV1().Nodes().List(ctx, opts)
Expand All @@ -380,8 +349,8 @@ func (k *Kube) GetNodes() ([]*corev1.Node, error) {
return list, err
}

// GetNode returns the Node resource from kubernetes apiserver, given its name
func (k *Kube) GetNode(name string) (*corev1.Node, error) {
// GetNodeForWindows returns the Node resource from kubernetes apiserver, given its name. Only used by windows binary.
func (k *Kube) GetNodeForWindows(name string) (*corev1.Node, error) {
return k.KClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
}

Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/kube/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ var _ = Describe("Kube", func() {
err := kube.SetTaintOnNode(node.Name, &taint)
Expect(err).ToNot(HaveOccurred())

updatedNode, err := kube.GetNode(node.Name)
updatedNode, err := kube.GetNodeForWindows(node.Name)
Expect(err).ToNot(HaveOccurred())
Expect(updatedNode.Spec.Taints).To(Equal([]corev1.Taint{taint}))
})
Expand Down
Loading