Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go-controller/pkg/libovsdb/ops/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ func DeleteLogicalRouter(nbClient libovsdbclient.Client, router *nbdb.LogicalRou

// LOGICAL ROUTER PORT OPs

type logicalRouterPortPredicate func(*nbdb.LogicalRouterPort) bool

// FindLogicalRouterPortWithPredicate looks up logical router port from
// the cache based on a given predicate
func FindLogicalRouterPortWithPredicate(nbClient libovsdbclient.Client, p logicalRouterPortPredicate) ([]*nbdb.LogicalRouterPort, error) {
ctx, cancel := context.WithTimeout(context.Background(), types.OVSDBTimeout)
defer cancel()
found := []*nbdb.LogicalRouterPort{}
err := nbClient.WhereCache(p).List(ctx, &found)
return found, err
}

// GetLogicalRouterPort looks up a logical router port from the cache
func GetLogicalRouterPort(nbClient libovsdbclient.Client, lrp *nbdb.LogicalRouterPort) (*nbdb.LogicalRouterPort, error) {
found := []*nbdb.LogicalRouterPort{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,21 @@ func (nadController *NetAttachDefinitionController) onNetworkAttachDefinitionUpd
}

func (nadController *NetAttachDefinitionController) onNetworkAttachDefinitionDelete(obj interface{}) {
nad := obj.(*nettypes.NetworkAttachmentDefinition)
if nad == nil {
utilruntime.HandleError(fmt.Errorf("invalid net-attach-def provided to onNetworkAttachDefinitionDelete()"))
return
nad, ok := obj.(*nettypes.NetworkAttachmentDefinition)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
nad, ok = tombstone.Obj.(*nettypes.NetworkAttachmentDefinition)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a NetworkAttachmentDefinition object %#v", obj))
return
}
}

klog.V(4).Infof("%s: Deleting net-attach-def %s/%s", nadController.name, nad.Namespace, nad.Name)
nadController.queueNetworkAttachDefinition(obj)
nadController.queueNetworkAttachDefinition(nad)
}

// getAllNetworkControllers returns a snapshot of all managed NAD associated network controllers.
Expand Down
91 changes: 46 additions & 45 deletions go-controller/pkg/node/node_ip_handler_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,16 @@ func (c *addressManager) runInternal(stopChan <-chan struct{}, doneWg *sync.Wait

// updates OVN's EncapIP if the node IP changed
func (c *addressManager) handleNodePrimaryAddrChange() {
c.Lock()
defer c.Unlock()
nodePrimaryAddrChanged, err := c.nodePrimaryAddrChanged()
if err != nil {
klog.Errorf("Address Manager failed to check node primary address change: %v", err)
return
}
if nodePrimaryAddrChanged {
klog.Infof("Node primary address changed to %v. Updating OVN encap IP.", c.nodePrimaryAddr)
c.updateOVNEncapIPAndReconnect()
updateOVNEncapIPAndReconnect(c.nodePrimaryAddr)
}
}

Expand Down Expand Up @@ -334,7 +336,7 @@ func (c *addressManager) nodePrimaryAddrChanged() (bool, error) {
if nodePrimaryAddr == nil {
return false, fmt.Errorf("failed to parse the primary IP address string from kubernetes node status")
}
c.Lock()

var exists bool
for _, hostCIDR := range c.cidrs.UnsortedList() {
ip, _, err := net.ParseCIDR(hostCIDR)
Expand All @@ -348,7 +350,6 @@ func (c *addressManager) nodePrimaryAddrChanged() (bool, error) {
break
}
}
c.Unlock()

if !exists || c.nodePrimaryAddr.Equal(nodePrimaryAddr) {
return false, nil
Expand All @@ -358,48 +359,6 @@ func (c *addressManager) nodePrimaryAddrChanged() (bool, error) {
return true, nil
}

// updateOVNEncapIP updates encap IP to OVS when the node primary IP changed.
func (c *addressManager) updateOVNEncapIPAndReconnect() {
checkCmd := []string{
"get",
"Open_vSwitch",
".",
"external_ids:ovn-encap-ip",
}
encapIP, stderr, err := util.RunOVSVsctl(checkCmd...)
if err != nil {
klog.Warningf("Unable to retrieve configured ovn-encap-ip from OVS: %v, %q", err, stderr)
} else {
encapIP = strings.TrimSuffix(encapIP, "\n")
if len(encapIP) > 0 && c.nodePrimaryAddr.String() == encapIP {
klog.V(4).Infof("Will not update encap IP, value: %s is the already configured", c.nodePrimaryAddr)
return
}
}

confCmd := []string{
"set",
"Open_vSwitch",
".",
fmt.Sprintf("external_ids:ovn-encap-ip=%s", c.nodePrimaryAddr),
}

_, stderr, err = util.RunOVSVsctl(confCmd...)
if err != nil {
klog.Errorf("Error setting OVS encap IP: %v %q", err, stderr)
return
}

// force ovn-controller to reconnect SB with new encap IP immediately.
// otherwise there will be a max delay of 200s due to the 100s
// ovn-controller inactivity probe.
_, stderr, err = util.RunOVNAppctlWithTimeout(5, "-t", "ovn-controller", "exit", "--restart")
if err != nil {
klog.Errorf("Failed to exit ovn-controller %v %q", err, stderr)
return
}
}

// detects if the IP is valid for a node
// excludes things like local IPs, mgmt port ip, special masquerade IP
func (c *addressManager) isValidNodeIP(addr net.IP) bool {
Expand Down Expand Up @@ -470,3 +429,45 @@ func (c *addressManager) sync() {
c.OnChanged()
}
}

// updateOVNEncapIPAndReconnect updates encap IP to OVS when the node primary IP changed.
func updateOVNEncapIPAndReconnect(newIP net.IP) {
checkCmd := []string{
"get",
"Open_vSwitch",
".",
"external_ids:ovn-encap-ip",
}
encapIP, stderr, err := util.RunOVSVsctl(checkCmd...)
if err != nil {
klog.Warningf("Unable to retrieve configured ovn-encap-ip from OVS: %v, %q", err, stderr)
} else {
encapIP = strings.TrimSuffix(encapIP, "\n")
if len(encapIP) > 0 && newIP.String() == encapIP {
klog.V(4).Infof("Will not update encap IP %s - it is already configured", newIP.String())
return
}
}

confCmd := []string{
"set",
"Open_vSwitch",
".",
fmt.Sprintf("external_ids:ovn-encap-ip=%s", newIP),
}

_, stderr, err = util.RunOVSVsctl(confCmd...)
if err != nil {
klog.Errorf("Error setting OVS encap IP %s: %v %q", newIP.String(), err, stderr)
return
}

// force ovn-controller to reconnect SB with new encap IP immediately.
// otherwise there will be a max delay of 200s due to the 100s
// ovn-controller inactivity probe.
_, stderr, err = util.RunOVNAppctlWithTimeout(5, "-t", "ovn-controller", "exit", "--restart")
if err != nil {
klog.Errorf("Failed to exit ovn-controller %v %q", err, stderr)
return
}
}
126 changes: 69 additions & 57 deletions go-controller/pkg/ovn/controller/apbroute/network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ type conntrackClient struct {
podLister corev1listers.PodLister
}

func (nb *northBoundClient) findLogicalRouterPortWithPredicate(p func(item *nbdb.LogicalRouterPort) bool) ([]*nbdb.LogicalRouterPort, error) {
return libovsdbops.FindLogicalRouterPortWithPredicate(nb.nbClient, p)
}

func (nb *northBoundClient) findLogicalRouterPoliciesWithPredicate(p func(item *nbdb.LogicalRouterPolicy) bool) ([]*nbdb.LogicalRouterPolicy, error) {
return libovsdbops.FindLogicalRouterPoliciesWithPredicate(nb.nbClient, p)
}

func (nb *northBoundClient) findLogicalRouterStaticRoutesWithPredicate(p func(item *nbdb.LogicalRouterStaticRoute) bool) ([]*nbdb.LogicalRouterStaticRoute, error) {
return libovsdbops.FindLogicalRouterStaticRoutesWithPredicate(nb.nbClient, p)
}
Expand Down Expand Up @@ -464,9 +472,10 @@ func (nb *northBoundClient) deletePodGWRoute(routeInfo *RouteInfo, podIP, gw, gr
}

node := util.GetWorkerFromGatewayRouter(gr)

// The gw is deleted from the routes cache after this func is called, length 1
// means it is the last gw for the pod and the hybrid route policy should be deleted.
if entry := routeInfo.PodExternalRoutes[podIP]; len(entry) == 1 {
if entry := routeInfo.PodExternalRoutes[podIP]; len(entry) <= 1 {
if err := nb.delHybridRoutePolicyForPod(net.ParseIP(podIP), node); err != nil {
return fmt.Errorf("unable to delete hybrid route policy for pod %s: err: %v", routeInfo.PodName, err)
}
Expand Down Expand Up @@ -528,72 +537,75 @@ func (nb *northBoundClient) deleteLogicalRouterStaticRoute(podIP, mask, gw, gr s
// DelHybridRoutePolicyForPod handles deleting a logical route policy that
// forces pod egress traffic to be rerouted to a gateway router for local gateway mode.
func (nb *northBoundClient) delHybridRoutePolicyForPod(podIP net.IP, node string) error {
if config.Gateway.Mode == config.GatewayModeLocal {
// Delete podIP from the node's address_set.
asIndex := GetHybridRouteAddrSetDbIDs(node, nb.controllerName)
as, err := nb.addressSetFactory.EnsureAddressSet(asIndex)
if err != nil {
return fmt.Errorf("cannot Ensure that addressSet for node %s exists %v", node, err)
if config.Gateway.Mode != config.GatewayModeLocal {
return nil
}

// Delete podIP from the node's address_set.
asIndex := GetHybridRouteAddrSetDbIDs(node, nb.controllerName)
as, err := nb.addressSetFactory.EnsureAddressSet(asIndex)
if err != nil {
return fmt.Errorf("cannot Ensure that addressSet for node %s exists %v", node, err)
}
err = as.DeleteIPs([]net.IP{podIP})
if err != nil {
return fmt.Errorf("unable to remove PodIP %s: to the address set %s, err: %v", podIP.String(), node, err)
}

// delete hybrid policy to bypass lr-policy in GR, only if there are zero pods on this node.
ipv4HashedAS, ipv6HashedAS := as.GetASHashNames()
ipv4PodIPs, ipv6PodIPs := as.GetIPs()
deletePolicy := false
var l3Prefix string
var matchSrcAS string
if utilnet.IsIPv6(podIP) {
l3Prefix = "ip6"
if len(ipv6PodIPs) == 0 {
deletePolicy = true
}
err = as.DeleteIPs([]net.IP{(podIP)})
if err != nil {
return fmt.Errorf("unable to remove PodIP %s: to the address set %s, err: %v", podIP.String(), node, err)
matchSrcAS = ipv6HashedAS
} else {
l3Prefix = "ip4"
if len(ipv4PodIPs) == 0 {
deletePolicy = true
}

// delete hybrid policy to bypass lr-policy in GR, only if there are zero pods on this node.
ipv4HashedAS, ipv6HashedAS := as.GetASHashNames()
ipv4PodIPs, ipv6PodIPs := as.GetIPs()
deletePolicy := false
var l3Prefix string
var matchSrcAS string
if utilnet.IsIPv6(podIP) {
l3Prefix = "ip6"
if len(ipv6PodIPs) == 0 {
deletePolicy = true
matchSrcAS = ipv4HashedAS
}
if deletePolicy {
var matchDst string
var clusterL3Prefix string
for _, clusterSubnet := range config.Default.ClusterSubnets {
if utilnet.IsIPv6CIDR(clusterSubnet.CIDR) {
clusterL3Prefix = "ip6"
} else {
clusterL3Prefix = "ip4"
}
matchSrcAS = ipv6HashedAS
} else {
l3Prefix = "ip4"
if len(ipv4PodIPs) == 0 {
deletePolicy = true
if l3Prefix != clusterL3Prefix {
continue
}
matchSrcAS = ipv4HashedAS
matchDst += fmt.Sprintf(" && %s.dst != %s", l3Prefix, clusterSubnet.CIDR)
}
if deletePolicy {
var matchDst string
var clusterL3Prefix string
for _, clusterSubnet := range config.Default.ClusterSubnets {
if utilnet.IsIPv6CIDR(clusterSubnet.CIDR) {
clusterL3Prefix = "ip6"
} else {
clusterL3Prefix = "ip4"
}
if l3Prefix != clusterL3Prefix {
continue
}
matchDst += fmt.Sprintf(" && %s.dst != %s", l3Prefix, clusterSubnet.CIDR)
}
matchStr := fmt.Sprintf(`inport == "%s%s" && %s.src == $%s`, types.RouterToSwitchPrefix, node, l3Prefix, matchSrcAS)
matchStr += matchDst
matchStr := fmt.Sprintf(`inport == "%s%s" && %s.src == $%s`, types.RouterToSwitchPrefix, node, l3Prefix, matchSrcAS)
matchStr += matchDst

p := func(item *nbdb.LogicalRouterPolicy) bool {
return item.Priority == types.HybridOverlayReroutePriority && item.Match == matchStr
}
err := libovsdbops.DeleteLogicalRouterPoliciesWithPredicate(nb.nbClient, types.OVNClusterRouter, p)
if err != nil {
return fmt.Errorf("error deleting policy %s on router %s: %v", matchStr, types.OVNClusterRouter, err)
}
p := func(item *nbdb.LogicalRouterPolicy) bool {
return item.Priority == types.HybridOverlayReroutePriority && item.Match == matchStr
}
if len(ipv4PodIPs) == 0 && len(ipv6PodIPs) == 0 {
// delete address set.
err := as.Destroy()
if err != nil {
return fmt.Errorf("failed to remove address set: %s, on: %s, err: %v",
as.GetName(), node, err)
}
err := libovsdbops.DeleteLogicalRouterPoliciesWithPredicate(nb.nbClient, types.OVNClusterRouter, p)
if err != nil {
return fmt.Errorf("error deleting policy %s on router %s: %v", matchStr, types.OVNClusterRouter, err)
}
}
if len(ipv4PodIPs) == 0 && len(ipv6PodIPs) == 0 {
// delete address set.
err := as.Destroy()
if err != nil {
return fmt.Errorf("failed to remove address set: %s, on: %s, err: %v",
as.GetName(), node, err)
}
}
return nil

}

// extSwitchPrefix returns the prefix of the external switch to use for
Expand Down
Loading