Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go-controller/pkg/node/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (g *gateway) GetDefaultPodNetworkAdvertised() bool {
// Reconcile handles triggering updates to different components of a gateway, like OFM, Services
func (g *gateway) Reconcile() error {
klog.Info("Reconciling gateway with updates")
if err := g.openflowManager.updateBridgeFlowCache(g.nodeIPManager.ListAddresses()); err != nil {
if err := g.openflowManager.updateBridgeFlowCache(g.nodeIPManager.ListNetworkAddresses(), g.nodeIPManager.ListAddresses()); err != nil {
return err
}
err := g.updateSNATRules()
Expand Down
68 changes: 65 additions & 3 deletions go-controller/pkg/node/gateway_shared_intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,7 @@ func flowsForDefaultBridge(bridge *bridgeConfiguration, extraIPs []net.IP) ([]st
return dftFlows, nil
}

func commonFlows(bridge *bridgeConfiguration) ([]string, error) {
func commonFlows(hostSubnets []*net.IPNet, bridge *bridgeConfiguration) ([]string, error) {
// CAUTION: when adding new flows where the in_port is ofPortPatch and the out_port is ofPortPhys, ensure
// that dl_src is included in match criteria!
ofPortPhys := bridge.ofPortPhys
Expand Down Expand Up @@ -1845,6 +1845,28 @@ func commonFlows(bridge *bridgeConfiguration) ([]string, error) {
"actions=ct(commit, zone=%d, exec(set_field:%s->ct_mark)), output:%s",
defaultOpenFlowCookie, netConfig.ofPortPatch, bridgeMacAddress, config.Default.ConntrackZone,
netConfig.masqCTMark, ofPortPhys))
// IP traffic from the OVN network to the host network should be handled normally by the bridge instead of
// being forwarded directly to the NIC (prio=100 flow above). This allows pods in the default network to reach
// localnet pods on the same node.
for _, hostSubnet := range hostSubnets {
if hostSubnet.IP.To4() == nil {
continue
}
newFlow := fmt.Sprintf("cookie=%s, priority=102, in_port=%s, dl_src=%s, ip, nw_dst=%s, "+
"actions=ct(commit, zone=%d, exec(set_field:%s->ct_mark)), output:NORMAL",
defaultOpenFlowCookie, netConfig.ofPortPatch, bridgeMacAddress, hostSubnet,
config.Default.ConntrackZone, netConfig.masqCTMark)

if config.Gateway.Mode == config.GatewayModeLocal {
newFlow = fmt.Sprintf("cookie=%s, priority=102, in_port=LOCAL, dl_src=%s, ip, nw_dst=%s, "+
"actions=ct(commit, zone=%d, exec(set_field:%s->ct_mark)), output:NORMAL",
defaultOpenFlowCookie, bridgeMacAddress, hostSubnet,
config.Default.ConntrackZone, ctMarkHost)
}

dftFlows = append(dftFlows, newFlow)
}

} else {
// for UDN we additionally SNAT the packet from masquerade IP -> node IP
dftFlows = append(dftFlows,
Expand Down Expand Up @@ -1936,6 +1958,45 @@ func commonFlows(bridge *bridgeConfiguration) ([]string, error) {
fmt.Sprintf("cookie=%s, priority=100, in_port=%s, dl_src=%s, ipv6, "+
"actions=ct(commit, zone=%d, exec(set_field:%s->ct_mark)), output:%s",
defaultOpenFlowCookie, netConfig.ofPortPatch, bridgeMacAddress, config.Default.ConntrackZone, netConfig.masqCTMark, ofPortPhys))

// IP traffic from the OVN network to the host network should be handled normally by the bridge instead of
// being forwarded directly to the NIC (prio=100 flow above). This allows pods in the default network to reach
// localnet pods on the same node.
for _, hostSubnet := range hostSubnets {
if hostSubnet.IP.To4() != nil {
continue
}
newFlow := fmt.Sprintf("cookie=%s, priority=102, in_port=%s, dl_src=%s, ipv6, ipv6_dst=%s, "+
"actions=ct(commit, zone=%d, exec(set_field:%s->ct_mark)), output:NORMAL",
defaultOpenFlowCookie, netConfig.ofPortPatch, bridgeMacAddress, hostSubnet,
config.Default.ConntrackZone, netConfig.masqCTMark)

if config.Gateway.Mode == config.GatewayModeLocal {
newFlow = fmt.Sprintf("cookie=%s, priority=102, in_port=LOCAL, dl_src=%s, ipv6, ipv6_dst=%s, "+
"actions=ct(commit, zone=%d, exec(set_field:%s->ct_mark)), output:NORMAL",
defaultOpenFlowCookie, bridgeMacAddress, hostSubnet,
config.Default.ConntrackZone, ctMarkHost)
}

dftFlows = append(dftFlows, newFlow)
}

for _, icmpType := range []int{types.NeighborSolicitationICMPType, types.NeighborAdvertisementICMPType} {
newFlow := fmt.Sprintf("cookie=%s, priority=102, in_port=%s, dl_src=%s, icmp6, icmpv6_type=%d, "+
"actions=ct(commit, zone=%d, exec(set_field:%s->ct_mark)), output:NORMAL",
defaultOpenFlowCookie, netConfig.ofPortPatch, bridgeMacAddress, icmpType,
config.Default.ConntrackZone, netConfig.masqCTMark)

if config.Gateway.Mode == config.GatewayModeLocal {
newFlow = fmt.Sprintf("cookie=%s, priority=102, in_port=LOCAL, dl_src=%s, icmp6, icmpv6_type=%d, "+
"actions=ct(commit, zone=%d, exec(set_field:%s->ct_mark)), output:NORMAL",
defaultOpenFlowCookie, bridgeMacAddress, icmpType,
config.Default.ConntrackZone, ctMarkHost)
}

dftFlows = append(dftFlows, newFlow)
}

} else {
// for UDN we additionally SNAT the packet from masquerade IP -> node IP
dftFlows = append(dftFlows,
Expand Down Expand Up @@ -2304,6 +2365,7 @@ func newGateway(
}
gw.nodeIPManager = newAddressManager(nodeName, kube, cfg, watchFactory, gwBridge)
nodeIPs := gw.nodeIPManager.ListAddresses()
hostSubnets := gw.nodeIPManager.ListNetworkAddresses()

if config.OvnKubeNode.Mode == types.NodeModeFull {
// Delete stale masquerade resources if there are any. This is to make sure that there
Expand All @@ -2327,15 +2389,15 @@ func newGateway(
}
}

gw.openflowManager, err = newGatewayOpenFlowManager(gwBridge, exGwBridge, nodeIPs)
gw.openflowManager, err = newGatewayOpenFlowManager(gwBridge, exGwBridge, hostSubnets, nodeIPs)
if err != nil {
return err
}

// resync flows on IP change
gw.nodeIPManager.OnChanged = func() {
klog.V(5).Info("Node addresses changed, re-syncing bridge flows")
if err := gw.openflowManager.updateBridgeFlowCache(gw.nodeIPManager.ListAddresses()); err != nil {
if err := gw.openflowManager.updateBridgeFlowCache(gw.nodeIPManager.ListNetworkAddresses(), gw.nodeIPManager.ListAddresses()); err != nil {
// very unlikely - somehow node has lost its IP address
klog.Errorf("Failed to re-generate gateway flows after address change: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/node/gateway_udn.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ func (udng *UserDefinedNetworkGateway) doReconcile() error {
// add below OpenFlows based on the gateway mode and whether the network is advertised or not:
// table=1, n_packets=0, n_bytes=0, priority=16,ip,nw_dst=128.192.0.2 actions=LOCAL (Both gateway modes)
// table=1, n_packets=0, n_bytes=0, priority=15,ip,nw_dst=128.192.0.0/14 actions=output:3 (shared gateway mode)
if err := udng.openflowManager.updateBridgeFlowCache(udng.nodeIPManager.ListAddresses()); err != nil {
if err := udng.openflowManager.updateBridgeFlowCache(udng.nodeIPManager.ListNetworkAddresses(), udng.nodeIPManager.ListAddresses()); err != nil {
return fmt.Errorf("error while updating logical flow for UDN %s: %s", udng.GetNetworkName(), err)
}

Expand Down
12 changes: 6 additions & 6 deletions go-controller/pkg/node/gateway_udn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ var _ = Describe("UserDefinedNetworkGateway", func() {
localGw.openflowManager.syncFlows()
flowMap := udnGateway.gateway.openflowManager.flowCache

Expect(flowMap["DEFAULT"]).To(HaveLen(46))
Expect(flowMap["DEFAULT"]).To(HaveLen(50))
Expect(udnGateway.masqCTMark).To(Equal(udnGateway.masqCTMark))
var udnFlows int
for _, flows := range flowMap {
Expand All @@ -685,7 +685,7 @@ var _ = Describe("UserDefinedNetworkGateway", func() {

Expect(udnGateway.AddNetwork()).To(Succeed())
flowMap = udnGateway.gateway.openflowManager.flowCache
Expect(flowMap["DEFAULT"]).To(HaveLen(64)) // 18 UDN Flows are added by default
Expect(flowMap["DEFAULT"]).To(HaveLen(68)) // 18 UDN Flows are added by default
Expect(udnGateway.openflowManager.defaultBridge.netConfig).To(HaveLen(2)) // default network + UDN network
defaultUdnConfig := udnGateway.openflowManager.defaultBridge.netConfig["default"]
bridgeUdnConfig := udnGateway.openflowManager.defaultBridge.netConfig["bluenet"]
Expand Down Expand Up @@ -721,7 +721,7 @@ var _ = Describe("UserDefinedNetworkGateway", func() {
kubeMock.On("UpdateNodeStatus", cnode).Return(nil) // check if network key gets deleted from annotation
Expect(udnGateway.DelNetwork()).To(Succeed())
flowMap = udnGateway.gateway.openflowManager.flowCache
Expect(flowMap["DEFAULT"]).To(HaveLen(46)) // only default network flows are present
Expect(flowMap["DEFAULT"]).To(HaveLen(50)) // only default network flows are present
Expect(udnGateway.openflowManager.defaultBridge.netConfig).To(HaveLen(1)) // default network only
udnFlows = 0
for _, flows := range flowMap {
Expand Down Expand Up @@ -892,7 +892,7 @@ var _ = Describe("UserDefinedNetworkGateway", func() {
// FIXME: extract openflow manager func from the spawning of a go routine so it can be called directly below.
localGw.openflowManager.syncFlows()
flowMap := udnGateway.gateway.openflowManager.flowCache
Expect(flowMap["DEFAULT"]).To(HaveLen(46))
Expect(flowMap["DEFAULT"]).To(HaveLen(50))
Expect(udnGateway.masqCTMark).To(Equal(udnGateway.masqCTMark))
var udnFlows int
for _, flows := range flowMap {
Expand All @@ -909,7 +909,7 @@ var _ = Describe("UserDefinedNetworkGateway", func() {

Expect(udnGateway.AddNetwork()).To(Succeed())
flowMap = udnGateway.gateway.openflowManager.flowCache
Expect(flowMap["DEFAULT"]).To(HaveLen(64)) // 18 UDN Flows are added by default
Expect(flowMap["DEFAULT"]).To(HaveLen(68)) // 18 UDN Flows are added by default
Expect(udnGateway.openflowManager.defaultBridge.netConfig).To(HaveLen(2)) // default network + UDN network
defaultUdnConfig := udnGateway.openflowManager.defaultBridge.netConfig["default"]
bridgeUdnConfig := udnGateway.openflowManager.defaultBridge.netConfig["bluenet"]
Expand Down Expand Up @@ -945,7 +945,7 @@ var _ = Describe("UserDefinedNetworkGateway", func() {
kubeMock.On("UpdateNodeStatus", cnode).Return(nil) // check if network key gets deleted from annotation
Expect(udnGateway.DelNetwork()).To(Succeed())
flowMap = udnGateway.gateway.openflowManager.flowCache
Expect(flowMap["DEFAULT"]).To(HaveLen(46)) // only default network flows are present
Expect(flowMap["DEFAULT"]).To(HaveLen(50)) // only default network flows are present
Expect(udnGateway.openflowManager.defaultBridge.netConfig).To(HaveLen(1)) // default network only
udnFlows = 0
for _, flows := range flowMap {
Expand Down
16 changes: 16 additions & 0 deletions go-controller/pkg/node/node_ip_handler_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,22 @@ func (c *addressManager) ListAddresses() []net.IP {
return out
}

func (c *addressManager) ListNetworkAddresses() []*net.IPNet {
c.Lock()
defer c.Unlock()
addrs := sets.List(c.cidrs)
out := make([]*net.IPNet, 0, len(addrs))
for _, addr := range addrs {
_, cidr, err := net.ParseCIDR(addr)
if err != nil {
klog.Errorf("Failed to parse %s: %v", addr, err)
continue
}
out = append(out, cidr)
}
return out
}

type subscribeFn func() (bool, chan netlink.AddrUpdate, error)

func (c *addressManager) Run(stopChan <-chan struct{}, doneWg *sync.WaitGroup) {
Expand Down
10 changes: 5 additions & 5 deletions go-controller/pkg/node/openflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *openflowManager) syncFlows() {
// -- to handle host -> service access, via masquerading from the host to OVN GR
// -- to handle external -> service(ExternalTrafficPolicy: Local) -> host access without SNAT
func newGatewayOpenFlowManager(gwBridge, exGWBridge *bridgeConfiguration,
extraIPs []net.IP) (*openflowManager, error) {
hostSubnets []*net.IPNet, extraIPs []net.IP) (*openflowManager, error) {
// add health check function to check default OpenFlow flows are on the shared gateway bridge
ofm := &openflowManager{
defaultBridge: gwBridge,
Expand All @@ -173,7 +173,7 @@ func newGatewayOpenFlowManager(gwBridge, exGWBridge *bridgeConfiguration,
flowChan: make(chan struct{}, 1),
}

if err := ofm.updateBridgeFlowCache(extraIPs); err != nil {
if err := ofm.updateBridgeFlowCache(hostSubnets, extraIPs); err != nil {
return nil, err
}

Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *openflowManager) Run(stopChan <-chan struct{}, doneWg *sync.WaitGroup)

// updateBridgeFlowCache generates the "static" per-bridge flows
// note: this is shared between shared and local gateway modes
func (c *openflowManager) updateBridgeFlowCache(extraIPs []net.IP) error {
func (c *openflowManager) updateBridgeFlowCache(hostSubnets []*net.IPNet, extraIPs []net.IP) error {
// protect defaultBridge config from being updated by gw.nodeIPManager
c.defaultBridge.Lock()
defer c.defaultBridge.Unlock()
Expand All @@ -229,7 +229,7 @@ func (c *openflowManager) updateBridgeFlowCache(extraIPs []net.IP) error {
if err != nil {
return err
}
dftCommonFlows, err := commonFlows(c.defaultBridge)
dftCommonFlows, err := commonFlows(hostSubnets, c.defaultBridge)
if err != nil {
return err
}
Expand All @@ -243,7 +243,7 @@ func (c *openflowManager) updateBridgeFlowCache(extraIPs []net.IP) error {
c.externalGatewayBridge.Lock()
defer c.externalGatewayBridge.Unlock()
c.updateExBridgeFlowCacheEntry("NORMAL", []string{fmt.Sprintf("table=0,priority=0,actions=%s\n", util.NormalAction)})
exGWBridgeDftFlows, err := commonFlows(c.externalGatewayBridge)
exGWBridgeDftFlows, err := commonFlows(hostSubnets, c.externalGatewayBridge)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go-controller/pkg/types/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ const (

// OpenFlow and Networking constants
RouteAdvertisementICMPType = 134
NeighborSolicitationICMPType = 135
NeighborAdvertisementICMPType = 136

// Meter constants
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,12 +1536,12 @@ runcmd:
DeferCleanup(func() {
if e2eframework.TestContext.DeleteNamespace && (e2eframework.TestContext.DeleteNamespaceOnFailure || !CurrentSpecReport().Failed()) {
By("tearing down the localnet underlay")
Expect(teardownUnderlay(nodes)).To(Succeed())
Expect(teardownUnderlayOnSecondaryBridge(nodes)).To(Succeed())
}
})

const secondaryInterfaceName = "eth1"
Expect(setupUnderlay(nodes, secondaryInterfaceName, netConfig)).To(Succeed())
Expect(setupUnderlayOnSecondaryBridge(nodes, secondaryInterfaceName, netConfig)).To(Succeed())
}

By("Creating NetworkAttachmentDefinition")
Expand Down
47 changes: 30 additions & 17 deletions test/e2e/localnet-underlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,56 @@ import (
)

const (
bridgeName = "ovsbr1"
add = "add-br"
del = "del-br"
defaultOvsBridge = "breth0"
secondaryBridge = "ovsbr1"
add = "add-br"
del = "del-br"
)

func setupUnderlay(ovsPods []v1.Pod, portName string, nadConfig networkAttachmentConfig) error {
func configureBridgeMappingsForNode(ovsPod v1.Pod, networkName, bridgeName string) error {
return configureBridgeMappings(
ovsPod.Name,
defaultNetworkBridgeMapping(),
bridgeMapping(networkName, bridgeName),
)
}

func setupUnderlayOnSecondaryBridge(ovsPods []v1.Pod, portName string, nadConfig networkAttachmentConfig) error {
for _, ovsPod := range ovsPods {
if err := addOVSBridge(ovsPod.Name, bridgeName); err != nil {

if err := addOVSBridge(ovsPod.Name, secondaryBridge); err != nil {
return err
}

if nadConfig.vlanID > 0 {
if err := ovsEnableVLANAccessPort(ovsPod.Name, bridgeName, portName, nadConfig.vlanID); err != nil {
if err := ovsEnableVLANAccessPort(ovsPod.Name, secondaryBridge, portName, nadConfig.vlanID); err != nil {
return err
}
} else {
if err := ovsAttachPortToBridge(ovsPod.Name, bridgeName, portName); err != nil {
if err := ovsAttachPortToBridge(ovsPod.Name, secondaryBridge, portName); err != nil {
return err
}
}

if err := configureBridgeMappings(
ovsPod.Name,
defaultNetworkBridgeMapping(),
bridgeMapping(nadConfig.networkName, bridgeName),
); err != nil {
if err := configureBridgeMappingsForNode(ovsPod, nadConfig.networkName, secondaryBridge); err != nil {
return err
}
}
return nil
}

func teardownUnderlay(ovsPods []v1.Pod) error {
func setupUnderlayOnDefaultBridge(ovsPods []v1.Pod, nadConfig networkAttachmentConfig) error {
for _, ovsPod := range ovsPods {
if err := removeOVSBridge(ovsPod.Name, bridgeName); err != nil {
if err := configureBridgeMappingsForNode(ovsPod, nadConfig.networkName, defaultOvsBridge); err != nil {
return err
}
}
return nil
}

func teardownUnderlayOnSecondaryBridge(ovsPods []v1.Pod) error {
for _, ovsPod := range ovsPods {
if err := removeOVSBridge(ovsPod.Name, secondaryBridge); err != nil {
return err
}
}
Expand Down Expand Up @@ -96,7 +111,6 @@ func ovsAttachPortToBridge(ovsNodeName string, bridgeName string, portName strin
"kubectl", "-n", ovnNamespace, "exec", ovsNodeName, "--",
"ovs-vsctl", "add-port", bridgeName, portName,
}

if _, err := runCommand(cmd...); err != nil {
return fmt.Errorf("failed to add port %s to OVS bridge %s: %v", portName, bridgeName, err)
}
Expand All @@ -107,9 +121,8 @@ func ovsAttachPortToBridge(ovsNodeName string, bridgeName string, portName strin
func ovsEnableVLANAccessPort(ovsNodeName string, bridgeName string, portName string, vlanID int) error {
cmd := []string{
"kubectl", "-n", ovnNamespace, "exec", ovsNodeName, "--",
"ovs-vsctl", "add-port", bridgeName, portName, fmt.Sprintf("tag=%d", vlanID), "vlan_mode=access",
"ovs-vsctl", "--may-exist", "add-port", bridgeName, portName, fmt.Sprintf("tag=%d", vlanID), "vlan_mode=access",
}

if _, err := runCommand(cmd...); err != nil {
return fmt.Errorf("failed to add port %s to OVS bridge %s: %v", portName, bridgeName, err)
}
Expand Down
Loading