Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions go-controller/pkg/libovsdbops/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
)

// findSwitch looks up the switch in the cache and sets the UUID
func findSwitch(nbClient libovsdbclient.Client, lswitch *nbdb.LogicalSwitch) error {
// findSwitchUUID looks up the switch in the cache and sets the UUID
func findSwitchUUID(nbClient libovsdbclient.Client, lswitch *nbdb.LogicalSwitch) error {
if lswitch.UUID != "" && !IsNamedUUID(lswitch.UUID) {
return nil
}
Expand Down Expand Up @@ -119,6 +119,24 @@ func FindAllNodeLocalSwitches(nbClient libovsdbclient.Client) ([]nbdb.LogicalSwi
return switches, nil
}

// FindSwitchByName finds switch with provided name. If more than one is found, it will error.
func FindSwitchByName(nbClient libovsdbclient.Client, name string) (*nbdb.LogicalSwitch, error) {
nameSearch := func(item *nbdb.LogicalSwitch) bool {
return item.Name == name
}

switches, err := findSwitchesByPredicate(nbClient, nameSearch)
if err != nil {
return nil, err
}

if len(switches) > 1 {
return nil, fmt.Errorf("unexpectedly found multiple switches with same name: %+v", switches)
}

return &switches[0], nil
}

func AddLoadBalancersToSwitchOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, lswitch *nbdb.LogicalSwitch, lbs ...*nbdb.LoadBalancer) ([]libovsdb.Operation, error) {
if ops == nil {
ops = []libovsdb.Operation{}
Expand All @@ -127,7 +145,7 @@ func AddLoadBalancersToSwitchOps(nbClient libovsdbclient.Client, ops []libovsdb.
return ops, nil
}

err := findSwitch(nbClient, lswitch)
err := findSwitchUUID(nbClient, lswitch)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,7 +175,7 @@ func RemoveLoadBalancersFromSwitchOps(nbClient libovsdbclient.Client, ops []libo
return ops, nil
}

err := findSwitch(nbClient, lswitch)
err := findSwitchUUID(nbClient, lswitch)
if err != nil {
return nil, err
}
Expand Down
23 changes: 15 additions & 8 deletions go-controller/pkg/ovn/address_set/address_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
Expand All @@ -30,7 +31,7 @@ const (
ipv6AddressSetSuffix = "_v6"
)

type AddressSetIterFunc func(hashedName, namespace, suffix string)
type AddressSetIterFunc func(hashedName, namespace, suffix string) error
type AddressSetDoFunc func(as AddressSet) error

// AddressSetFactory is an interface for managing address set objects
Expand Down Expand Up @@ -176,7 +177,7 @@ func (asf *ovnAddressSetFactory) EnsureAddressSet(name string) (AddressSet, erro
return &ovnAddressSets{nbClient: asf.nbClient, name: name, ipv4: v4set, ipv6: v6set}, nil
}

func forEachAddressSet(nbClient libovsdbclient.Client, do func(string)) error {
func forEachAddressSet(nbClient libovsdbclient.Client, do func(string) error) error {
addrSetList := &[]nbdb.AddressSet{}
ctx, cancel := context.WithTimeout(context.Background(), types.OVSDBTimeout)
defer cancel()
Expand All @@ -189,9 +190,17 @@ func forEachAddressSet(nbClient libovsdbclient.Client, do func(string)) error {
return fmt.Errorf("error reading address sets: %+v", err)
}

var errors []error
for _, addrSet := range *addrSetList {
do(addrSet.ExternalIDs["name"])
if err := do(addrSet.ExternalIDs["name"]); err != nil {
errors = append(errors, err)
}
}

if len(errors) > 0 {
return fmt.Errorf("failed to iterate address sets: %v", utilerrors.NewAggregate(errors))
}

return nil
}

Expand All @@ -200,14 +209,14 @@ func forEachAddressSet(nbClient libovsdbclient.Client, do func(string)) error {
// OVN. (Unhashed address set names are of the form namespaceName[.suffix1.suffix2. .suffixN])
func (asf *ovnAddressSetFactory) ProcessEachAddressSet(iteratorFn AddressSetIterFunc) error {
processedAddressSets := sets.String{}
err := forEachAddressSet(asf.nbClient, func(name string) {
return forEachAddressSet(asf.nbClient, func(name string) error {
// Remove the suffix from the address set name and normalize
addrSetName := truncateSuffixFromAddressSet(name)
if processedAddressSets.Has(addrSetName) {
// We have already processed the address set. In case of dual stack we will have _v4 and _v6
// suffixes for address sets. Since we are normalizing these two address sets through this API
// we will process only one normalized address set name.
return
return nil
}
processedAddressSets.Insert(addrSetName)
names := strings.Split(addrSetName, ".")
Expand All @@ -216,10 +225,8 @@ func (asf *ovnAddressSetFactory) ProcessEachAddressSet(iteratorFn AddressSetIter
if len(names) >= 2 {
nameSuffix = names[1]
}
iteratorFn(addrSetName, addrSetNamespace, nameSuffix)
return iteratorFn(addrSetName, addrSetNamespace, nameSuffix)
})

return err
}

func truncateSuffixFromAddressSet(asName string) string {
Expand Down
3 changes: 2 additions & 1 deletion go-controller/pkg/ovn/address_set/address_set_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func NonDualStackAddressSetCleanup(nbClient libovsdbclient.Client) error {
const old = 0
const new = 1
addressSets := map[string][2]bool{}
err := forEachAddressSet(nbClient, func(name string) {
err := forEachAddressSet(nbClient, func(name string) error {
shortName := truncateSuffixFromAddressSet(name)
spec, found := addressSets[shortName]
if !found {
Expand All @@ -30,6 +30,7 @@ func NonDualStackAddressSetCleanup(nbClient libovsdbclient.Client) error {
spec[new] = true
}
addressSets[shortName] = spec
return nil
})

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go-controller/pkg/ovn/address_set/address_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ = ginkgo.Describe("OVN Address Set operations", func() {
},
}

err = asFactory.ProcessEachAddressSet(func(addrSetName, namespaceName, nameSuffix string) {
err = asFactory.ProcessEachAddressSet(func(addrSetName, namespaceName, nameSuffix string) error {
found := false
for _, n := range namespaces {
name := n.makeNames()
Expand All @@ -116,6 +116,7 @@ var _ = ginkgo.Describe("OVN Address Set operations", func() {
}
}
gomega.Expect(found).To(gomega.BeTrue())
return nil
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return nil
Expand Down
4 changes: 3 additions & 1 deletion go-controller/pkg/ovn/address_set/fake_address_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (f *FakeAddressSetFactory) ProcessEachAddressSet(iteratorFn AddressSetIterF
if len(parts) >= 2 {
nameSuffix = parts[1]
}
iteratorFn(asName, addrSetNamespace, nameSuffix)
if err := iteratorFn(asName, addrSetNamespace, nameSuffix); err != nil {
return err
}
}
return nil
}
Expand Down
25 changes: 14 additions & 11 deletions go-controller/pkg/ovn/egressfirewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,32 @@ func newEgressFirewallRule(rawEgressFirewallRule egressfirewallapi.EgressFirewal

// NOTE: Utilize the fact that we know that all egress firewall related setup must have a priority: types.MinimumReservedEgressFirewallPriority <= priority <= types.EgressFirewallStartPriority
func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
oc.syncWithRetry("syncEgressFirewall", func() error { return oc.syncEgressFirewallRetriable(egressFirewalls) })
}

// This function implements the main body of work of what is described by syncEgressFirewall.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (oc *Controller) syncEgressFirewallRetriable(egressFirewalls []interface{}) error {
// Lookup all ACLs used for egress Firewalls
egressFirewallACLs, err := libovsdbops.FindACLsByPriorityRange(oc.nbClient, types.MinimumReservedEgressFirewallPriority, types.EgressFirewallStartPriority)
if err != nil {
klog.Errorf("Unable to list egress firewall ACLs, cannot cleanup old stale data, err: %v", err)
return
return fmt.Errorf("unable to list egress firewall ACLs, cannot cleanup old stale data, err: %v", err)
}

if config.Gateway.Mode == config.GatewayModeShared {
// Mode is shared gateway mode, make sure to delete all egfw ACLs on the node switches
if len(egressFirewallACLs) != 0 {
err = libovsdbops.RemoveACLsFromNodeSwitches(oc.nbClient, egressFirewallACLs)
if err != nil {
klog.Errorf("Failed to remove reject acl from node logical switches: %v", err)
return
return fmt.Errorf("failed to remove reject acl from node logical switches: %v", err)
}
}
} else if config.Gateway.Mode == config.GatewayModeLocal {
// Mode is local gateway mode, make sure to delete all egfw ACLs on the join switches
if len(egressFirewallACLs) != 0 {
err = libovsdbops.RemoveACLsFromJoinSwitch(oc.nbClient, egressFirewallACLs)
if err != nil {
klog.Errorf("Failed to remove reject acl from node logical switches: %v", err)
return
return fmt.Errorf("failed to remove reject acl from node logical switches: %v", err)
}
}
}
Expand All @@ -143,7 +146,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
})
}
if _, err := oc.modelClient.CreateOrUpdate(opModels...); err != nil {
klog.Errorf("Unable to set ACL direction on egress firewall acls, cannot convert old ACL data err: %v", err)
return fmt.Errorf("unable to set ACL direction on egress firewall acls, cannot convert old ACL data err: %v", err)
}
}
// In any gateway mode, make sure to delete all LRPs on ovn_cluster_router.
Expand All @@ -170,7 +173,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
},
}
if err := oc.modelClient.Delete(opModels...); err != nil {
klog.Errorf("Unable to remove egress firewall policy, cannot cleanup old stale data, err: %v", err)
return fmt.Errorf("unable to remove egress firewall policy, cannot cleanup old stale data, err: %v", err)
}

// sync the ovn and k8s egressFirewall states
Expand All @@ -189,7 +192,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
// get all the k8s EgressFirewall Objects
egressFirewallList, err := oc.kube.GetEgressFirewalls()
if err != nil {
klog.Errorf("Cannot reconcile the state of egressfirewalls in ovn database and k8s. err: %v", err)
return fmt.Errorf("cannot reconcile the state of egressfirewalls in ovn database and k8s. err: %v", err)
}
// delete entries from the map that exist in k8s and ovn
for _, egressFirewall := range egressFirewallList.Items {
Expand All @@ -199,10 +202,10 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) {
for spuriousEF := range ovnEgressFirewalls {
err := oc.deleteEgressFirewallRules(spuriousEF)
if err != nil {
klog.Errorf("Cannot fully reconcile the state of egressfirewalls ACLs for namespace %s still exist in ovn db: %v", spuriousEF, err)
return
return fmt.Errorf("cannot fully reconcile the state of egressfirewalls ACLs for namespace %s still exist in ovn db: %v", spuriousEF, err)
}
}
return nil
}

func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.EgressFirewall) error {
Expand Down
12 changes: 12 additions & 0 deletions go-controller/pkg/ovn/egressgw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2190,6 +2190,10 @@ var _ = ginkgo.Describe("OVN Egress Gateway Operations", func() {
Name: types.GWRouterPrefix + nodeName,
UUID: types.GWRouterPrefix + nodeName + "-UUID",
},
&nbdb.LogicalSwitch{
UUID: "node1",
Name: "node1",
},
},
},
&v1.NamespaceList{
Expand Down Expand Up @@ -2220,6 +2224,10 @@ var _ = ginkgo.Describe("OVN Egress Gateway Operations", func() {
Name: ovntypes.GWRouterToJoinSwitchPrefix + ovntypes.GWRouterPrefix + nodeName,
Networks: []string{"100.64.0.4/32"},
},
&nbdb.LogicalSwitch{
UUID: "node1",
Name: "node1",
},
}
injectNode(fakeOvn)
fakeOvn.controller.WatchNamespaces()
Expand All @@ -2241,6 +2249,10 @@ var _ = ginkgo.Describe("OVN Egress Gateway Operations", func() {
Name: ovntypes.GWRouterToJoinSwitchPrefix + ovntypes.GWRouterPrefix + nodeName,
Networks: []string{"100.64.0.4/32"},
},
&nbdb.LogicalSwitch{
UUID: "node1",
Name: "node1",
},
}
err = deletePerPodGRSNAT(fakeOvn.controller.nbClient, nodeName, extIPs, []*net.IPNet{fullMaskPodNet})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
Expand Down
47 changes: 34 additions & 13 deletions go-controller/pkg/ovn/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -999,13 +1000,25 @@ func (oc *Controller) syncEgressIPs(eIPs []interface{}) {
// - Egress IPs which have been deleted while ovnkube-master was down
// - pods/namespaces which have stopped matching on egress IPs while
// ovnkube-master was down
if egressIPToPodIPCache, err := oc.generatePodIPCacheForEgressIP(eIPs); err == nil {
oc.syncStaleEgressReroutePolicy(egressIPToPodIPCache)
oc.syncStaleSNATRules(egressIPToPodIPCache)
}
oc.syncWithRetry("syncEgressIPs", func() error {
egressIPToPodIPCache, err := oc.generatePodIPCacheForEgressIP(eIPs)
if err != nil {
return fmt.Errorf("syncEgressIPs unable to generate cache for egressip: %v", err)
}
if err = oc.syncStaleEgressReroutePolicy(egressIPToPodIPCache); err != nil {
return fmt.Errorf("syncEgressIPs unable to remove stale reroute policies: %v", err)
}
if err = oc.syncStaleSNATRules(egressIPToPodIPCache); err != nil {
return fmt.Errorf("syncEgressIPs unable to remove stale nats: %v", err)
}
return nil
})
}

func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[string]sets.String) {
// This function implements a portion of syncEgressIPs.
// It removes OVN logical router policies used by EgressIPs deleted while ovnkube-master was down.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[string]sets.String) error {
logicalRouter := nbdb.LogicalRouter{}
logicalRouterPolicyRes := []nbdb.LogicalRouterPolicy{}
opModels := []libovsdbops.OperationModel{
Expand Down Expand Up @@ -1040,11 +1053,15 @@ func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[stri
},
}
if err := oc.modelClient.Delete(opModels...); err != nil {
klog.Errorf("Unable to remove stale logical router policies, err: %v", err)
return fmt.Errorf("unable to remove stale logical router policies, err: %v", err)
}
return nil
}

func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.String) {
// This function implements a portion of syncEgressIPs.
// It removes OVN NAT rules used by EgressIPs deleted while ovnkube-master was down.
// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.String) error {
predicate := func(item *nbdb.NAT) bool {
egressIPName, exists := item.ExternalIDs["name"]
// Exclude rows that have no name or are not the right type
Expand All @@ -1062,34 +1079,38 @@ func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.St

nats, err := libovsdbops.FindNATsUsingPredicate(oc.nbClient, predicate)
if err != nil {
klog.Errorf("Unable to sync egress IPs err: %v", err)
return
return fmt.Errorf("unable to sync egress IPs err: %v", err)
}

if len(nats) == 0 {
// No stale nat entries to deal with: noop.
return
return nil
}

routers, err := libovsdbops.FindRoutersUsingNAT(oc.nbClient, nats)
if err != nil {
klog.Errorf("Unable to sync egress IPs, err: %v", err)
return
return fmt.Errorf("unable to sync egress IPs, err: %v", err)
}

var errors []error
ops := []ovsdb.Operation{}
for _, router := range routers {
ops, err = libovsdbops.DeleteNATsFromRouterOps(oc.nbClient, ops, &router, nats...)
if err != nil {
klog.Errorf("Error deleting stale NAT from router %s: %v", router.Name, err)
errors = append(errors, err)
continue
}
}
if len(errors) > 0 {
return fmt.Errorf("failed deleting stale NAT: %v", utilerrors.NewAggregate(errors))
}

_, err = libovsdbops.TransactAndCheck(oc.nbClient, ops)
if err != nil {
klog.Errorf("Error deleting stale NATs: %v", err)
return fmt.Errorf("error deleting stale NATs: %v", err)
}
return nil
}

// generatePodIPCacheForEgressIP builds a cache of egressIP name -> podIPs for fast
Expand Down
Loading