diff --git a/dist/images/Dockerfile.fedora b/dist/images/Dockerfile.fedora index c9a2843dad..e43ea2cd6b 100644 --- a/dist/images/Dockerfile.fedora +++ b/dist/images/Dockerfile.fedora @@ -9,13 +9,18 @@ # are built locally and included in the image (instead of the rpm) # -FROM fedora:34 +FROM fedora:35 USER root ENV PYTHONDONTWRITEBYTECODE yes -ARG ovnver=ovn-21.09.0-4.fc34 +ARG ovnver=ovn-21.12.0-5.fc35 +# Automatically populated when using docker buildx +ARG TARGETPLATFORM +ARG BUILDPLATFORM + +RUN echo "Running on $BUILDPLATFORM, building for $TARGETPLATFORM" # install needed rpms - openvswitch must be 2.10.4 or higher RUN INSTALL_PKGS=" \ @@ -29,7 +34,9 @@ RUN INSTALL_PKGS=" \ RUN mkdir -p /var/run/openvswitch -RUN koji download-build $ovnver --arch=x86_64 +RUN if [ "$TARGETPLATFORM" = "linux/amd64" ] || [ -z "$TARGETPLATFORM"] ; then koji download-build $ovnver --arch=x86_64 ; \ + else koji download-build $ovnver --arch=aarch64 ; fi + RUN rpm -Uhv --nodeps --force *.rpm # Built in ../../go_controller, then the binaries are copied here. diff --git a/go-controller/pkg/kube/kube.go b/go-controller/pkg/kube/kube.go index 5616f10525..e976fd7808 100644 --- a/go-controller/pkg/kube/kube.go +++ b/go-controller/pkg/kube/kube.go @@ -235,7 +235,7 @@ func (k *Kube) UpdateEgressFirewall(egressfirewall *egressfirewall.EgressFirewal // UpdateEgressIP updates the EgressIP with the provided EgressIP data func (k *Kube) UpdateEgressIP(eIP *egressipv1.EgressIP) error { - klog.Infof("Updating status on EgressIP %s", eIP.Name) + klog.Infof("Updating status on EgressIP %s status %v", eIP.Name, eIP.Status) _, err := k.EIPClient.K8sV1().EgressIPs().Update(context.TODO(), eIP, metav1.UpdateOptions{}) return err } diff --git a/go-controller/pkg/libovsdbops/switch.go b/go-controller/pkg/libovsdbops/switch.go index 77bbca6fca..eaf7ec06bb 100644 --- a/go-controller/pkg/libovsdbops/switch.go +++ b/go-controller/pkg/libovsdbops/switch.go @@ -13,8 +13,8 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" ) -// findSwitch looks up the switch in the cache and sets the UUID -func findSwitch(nbClient libovsdbclient.Client, lswitch *nbdb.LogicalSwitch) error { +// findSwitchUUID looks up the switch in the cache and sets the UUID +func findSwitchUUID(nbClient libovsdbclient.Client, lswitch *nbdb.LogicalSwitch) error { if lswitch.UUID != "" && !IsNamedUUID(lswitch.UUID) { return nil } @@ -119,6 +119,24 @@ func FindAllNodeLocalSwitches(nbClient libovsdbclient.Client) ([]nbdb.LogicalSwi return switches, nil } +// FindSwitchByName finds switch with provided name. If more than one is found, it will error. +func FindSwitchByName(nbClient libovsdbclient.Client, name string) (*nbdb.LogicalSwitch, error) { + nameSearch := func(item *nbdb.LogicalSwitch) bool { + return item.Name == name + } + + switches, err := findSwitchesByPredicate(nbClient, nameSearch) + if err != nil { + return nil, err + } + + if len(switches) > 1 { + return nil, fmt.Errorf("unexpectedly found multiple switches with same name: %+v", switches) + } + + return &switches[0], nil +} + func AddLoadBalancersToSwitchOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, lswitch *nbdb.LogicalSwitch, lbs ...*nbdb.LoadBalancer) ([]libovsdb.Operation, error) { if ops == nil { ops = []libovsdb.Operation{} @@ -127,7 +145,7 @@ func AddLoadBalancersToSwitchOps(nbClient libovsdbclient.Client, ops []libovsdb. return ops, nil } - err := findSwitch(nbClient, lswitch) + err := findSwitchUUID(nbClient, lswitch) if err != nil { return nil, err } @@ -157,7 +175,7 @@ func RemoveLoadBalancersFromSwitchOps(nbClient libovsdbclient.Client, ops []libo return ops, nil } - err := findSwitch(nbClient, lswitch) + err := findSwitchUUID(nbClient, lswitch) if err != nil { return nil, err } diff --git a/go-controller/pkg/ovn/address_set/address_set.go b/go-controller/pkg/ovn/address_set/address_set.go index 00ec503f42..946137ba6a 100644 --- a/go-controller/pkg/ovn/address_set/address_set.go +++ b/go-controller/pkg/ovn/address_set/address_set.go @@ -19,6 +19,7 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -29,7 +30,7 @@ const ( ipv6AddressSetSuffix = "_v6" ) -type AddressSetIterFunc func(hashedName, namespace, suffix string) +type AddressSetIterFunc func(hashedName, namespace, suffix string) error type AddressSetDoFunc func(as AddressSet) error // AddressSetFactory is an interface for managing address set objects @@ -175,7 +176,7 @@ func (asf *ovnAddressSetFactory) EnsureAddressSet(name string) (AddressSet, erro return &ovnAddressSets{nbClient: asf.nbClient, name: name, ipv4: v4set, ipv6: v6set}, nil } -func forEachAddressSet(nbClient libovsdbclient.Client, do func(string)) error { +func forEachAddressSet(nbClient libovsdbclient.Client, do func(string) error) error { addrSetList := &[]nbdb.AddressSet{} ctx, cancel := context.WithTimeout(context.Background(), types.OVSDBTimeout) defer cancel() @@ -188,9 +189,17 @@ func forEachAddressSet(nbClient libovsdbclient.Client, do func(string)) error { return fmt.Errorf("error reading address sets: %+v", err) } + var errors []error for _, addrSet := range *addrSetList { - do(addrSet.ExternalIDs["name"]) + if err := do(addrSet.ExternalIDs["name"]); err != nil { + errors = append(errors, err) + } + } + + if len(errors) > 0 { + return fmt.Errorf("failed to iterate address sets: %v", utilerrors.NewAggregate(errors)) } + return nil } @@ -199,14 +208,14 @@ func forEachAddressSet(nbClient libovsdbclient.Client, do func(string)) error { // OVN. (Unhashed address set names are of the form namespaceName[.suffix1.suffix2. .suffixN]) func (asf *ovnAddressSetFactory) ProcessEachAddressSet(iteratorFn AddressSetIterFunc) error { processedAddressSets := sets.String{} - err := forEachAddressSet(asf.nbClient, func(name string) { + return forEachAddressSet(asf.nbClient, func(name string) error { // Remove the suffix from the address set name and normalize addrSetName := truncateSuffixFromAddressSet(name) if processedAddressSets.Has(addrSetName) { // We have already processed the address set. In case of dual stack we will have _v4 and _v6 // suffixes for address sets. Since we are normalizing these two address sets through this API // we will process only one normalized address set name. - return + return nil } processedAddressSets.Insert(addrSetName) names := strings.Split(addrSetName, ".") @@ -215,10 +224,8 @@ func (asf *ovnAddressSetFactory) ProcessEachAddressSet(iteratorFn AddressSetIter if len(names) >= 2 { nameSuffix = names[1] } - iteratorFn(addrSetName, addrSetNamespace, nameSuffix) + return iteratorFn(addrSetName, addrSetNamespace, nameSuffix) }) - - return err } func truncateSuffixFromAddressSet(asName string) string { diff --git a/go-controller/pkg/ovn/address_set/address_set_cleanup.go b/go-controller/pkg/ovn/address_set/address_set_cleanup.go index 4180e69bfb..e45b9e9e95 100644 --- a/go-controller/pkg/ovn/address_set/address_set_cleanup.go +++ b/go-controller/pkg/ovn/address_set/address_set_cleanup.go @@ -16,7 +16,7 @@ func NonDualStackAddressSetCleanup(nbClient libovsdbclient.Client) error { const old = 0 const new = 1 addressSets := map[string][2]bool{} - err := forEachAddressSet(nbClient, func(name string) { + err := forEachAddressSet(nbClient, func(name string) error { shortName := truncateSuffixFromAddressSet(name) spec, found := addressSets[shortName] if !found { @@ -30,6 +30,7 @@ func NonDualStackAddressSetCleanup(nbClient libovsdbclient.Client) error { spec[new] = true } addressSets[shortName] = spec + return nil }) if err != nil { diff --git a/go-controller/pkg/ovn/address_set/address_set_test.go b/go-controller/pkg/ovn/address_set/address_set_test.go index 2cea6d035a..089ef9ee00 100644 --- a/go-controller/pkg/ovn/address_set/address_set_test.go +++ b/go-controller/pkg/ovn/address_set/address_set_test.go @@ -106,7 +106,7 @@ var _ = ginkgo.Describe("OVN Address Set operations", func() { }, } - err = asFactory.ProcessEachAddressSet(func(addrSetName, namespaceName, nameSuffix string) { + err = asFactory.ProcessEachAddressSet(func(addrSetName, namespaceName, nameSuffix string) error { found := false for _, n := range namespaces { name := n.makeNames() @@ -116,6 +116,7 @@ var _ = ginkgo.Describe("OVN Address Set operations", func() { } } gomega.Expect(found).To(gomega.BeTrue()) + return nil }) gomega.Expect(err).NotTo(gomega.HaveOccurred()) return nil diff --git a/go-controller/pkg/ovn/address_set/fake_address_set.go b/go-controller/pkg/ovn/address_set/fake_address_set.go index 6d58b1e48d..c03b377c8b 100644 --- a/go-controller/pkg/ovn/address_set/fake_address_set.go +++ b/go-controller/pkg/ovn/address_set/fake_address_set.go @@ -86,7 +86,9 @@ func (f *FakeAddressSetFactory) ProcessEachAddressSet(iteratorFn AddressSetIterF if len(parts) >= 2 { nameSuffix = parts[1] } - iteratorFn(asName, addrSetNamespace, nameSuffix) + if err := iteratorFn(asName, addrSetNamespace, nameSuffix); err != nil { + return err + } } return nil } diff --git a/go-controller/pkg/ovn/egressfirewall.go b/go-controller/pkg/ovn/egressfirewall.go index 46eb237644..18c177f22d 100644 --- a/go-controller/pkg/ovn/egressfirewall.go +++ b/go-controller/pkg/ovn/egressfirewall.go @@ -97,11 +97,16 @@ func newEgressFirewallRule(rawEgressFirewallRule egressfirewallapi.EgressFirewal // NOTE: Utilize the fact that we know that all egress firewall related setup must have a priority: types.MinimumReservedEgressFirewallPriority <= priority <= types.EgressFirewallStartPriority func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) { + oc.syncWithRetry("syncEgressFirewall", func() error { return oc.syncEgressFirewallRetriable(egressFirewalls) }) +} + +// This function implements the main body of work of what is described by syncEgressFirewall. +// Upon failure, it may be invoked multiple times in order to avoid a pod restart. +func (oc *Controller) syncEgressFirewallRetriable(egressFirewalls []interface{}) error { // Lookup all ACLs used for egress Firewalls egressFirewallACLs, err := libovsdbops.FindACLsByPriorityRange(oc.nbClient, types.MinimumReservedEgressFirewallPriority, types.EgressFirewallStartPriority) if err != nil { - klog.Errorf("Unable to list egress firewall ACLs, cannot cleanup old stale data, err: %v", err) - return + return fmt.Errorf("unable to list egress firewall ACLs, cannot cleanup old stale data, err: %v", err) } if config.Gateway.Mode == config.GatewayModeShared { @@ -109,8 +114,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) { if len(egressFirewallACLs) != 0 { err = libovsdbops.RemoveACLsFromNodeSwitches(oc.nbClient, egressFirewallACLs) if err != nil { - klog.Errorf("Failed to remove reject acl from node logical switches: %v", err) - return + return fmt.Errorf("failed to remove reject acl from node logical switches: %v", err) } } } else if config.Gateway.Mode == config.GatewayModeLocal { @@ -118,8 +122,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) { if len(egressFirewallACLs) != 0 { err = libovsdbops.RemoveACLsFromJoinSwitch(oc.nbClient, egressFirewallACLs) if err != nil { - klog.Errorf("Failed to remove reject acl from node logical switches: %v", err) - return + return fmt.Errorf("failed to remove reject acl from node logical switches: %v", err) } } } @@ -143,7 +146,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) { }) } if _, err := oc.modelClient.CreateOrUpdate(opModels...); err != nil { - klog.Errorf("Unable to set ACL direction on egress firewall acls, cannot convert old ACL data err: %v", err) + return fmt.Errorf("unable to set ACL direction on egress firewall acls, cannot convert old ACL data err: %v", err) } } // In any gateway mode, make sure to delete all LRPs on ovn_cluster_router. @@ -170,7 +173,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) { }, } if err := oc.modelClient.Delete(opModels...); err != nil { - klog.Errorf("Unable to remove egress firewall policy, cannot cleanup old stale data, err: %v", err) + return fmt.Errorf("unable to remove egress firewall policy, cannot cleanup old stale data, err: %v", err) } // sync the ovn and k8s egressFirewall states @@ -189,7 +192,7 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) { // get all the k8s EgressFirewall Objects egressFirewallList, err := oc.kube.GetEgressFirewalls() if err != nil { - klog.Errorf("Cannot reconcile the state of egressfirewalls in ovn database and k8s. err: %v", err) + return fmt.Errorf("cannot reconcile the state of egressfirewalls in ovn database and k8s. err: %v", err) } // delete entries from the map that exist in k8s and ovn for _, egressFirewall := range egressFirewallList.Items { @@ -199,10 +202,10 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) { for spuriousEF := range ovnEgressFirewalls { err := oc.deleteEgressFirewallRules(spuriousEF) if err != nil { - klog.Errorf("Cannot fully reconcile the state of egressfirewalls ACLs for namespace %s still exist in ovn db: %v", spuriousEF, err) - return + return fmt.Errorf("cannot fully reconcile the state of egressfirewalls ACLs for namespace %s still exist in ovn db: %v", spuriousEF, err) } } + return nil } func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.EgressFirewall) error { diff --git a/go-controller/pkg/ovn/egressgw_test.go b/go-controller/pkg/ovn/egressgw_test.go index 776e3aae1f..d4dd1b9170 100644 --- a/go-controller/pkg/ovn/egressgw_test.go +++ b/go-controller/pkg/ovn/egressgw_test.go @@ -2190,6 +2190,10 @@ var _ = ginkgo.Describe("OVN Egress Gateway Operations", func() { Name: types.GWRouterPrefix + nodeName, UUID: types.GWRouterPrefix + nodeName + "-UUID", }, + &nbdb.LogicalSwitch{ + UUID: "node1", + Name: "node1", + }, }, }, &v1.NamespaceList{ @@ -2220,6 +2224,10 @@ var _ = ginkgo.Describe("OVN Egress Gateway Operations", func() { Name: ovntypes.GWRouterToJoinSwitchPrefix + ovntypes.GWRouterPrefix + nodeName, Networks: []string{"100.64.0.4/32"}, }, + &nbdb.LogicalSwitch{ + UUID: "node1", + Name: "node1", + }, } injectNode(fakeOvn) fakeOvn.controller.WatchNamespaces() @@ -2241,6 +2249,10 @@ var _ = ginkgo.Describe("OVN Egress Gateway Operations", func() { Name: ovntypes.GWRouterToJoinSwitchPrefix + ovntypes.GWRouterPrefix + nodeName, Networks: []string{"100.64.0.4/32"}, }, + &nbdb.LogicalSwitch{ + UUID: "node1", + Name: "node1", + }, } err = deletePerPodGRSNAT(fakeOvn.controller.nbClient, nodeName, extIPs, []*net.IPNet{fullMaskPodNet}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) diff --git a/go-controller/pkg/ovn/egressip.go b/go-controller/pkg/ovn/egressip.go index 409f6fe35c..bbae33c977 100644 --- a/go-controller/pkg/ovn/egressip.go +++ b/go-controller/pkg/ovn/egressip.go @@ -30,6 +30,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -991,6 +992,12 @@ func (oc *Controller) isEgressNodeReachable(egressNode *kapi.Node) bool { return false } +type egressIPCacheEntry struct { + podIPs sets.String + gatewayRouterIPs sets.String + egressIPs sets.String +} + func (oc *Controller) syncEgressIPs(eIPs []interface{}) { // This part will take of syncing stale data which we might have in OVN if // there's no ovnkube-master running for a while, while there are changes to @@ -999,15 +1006,29 @@ func (oc *Controller) syncEgressIPs(eIPs []interface{}) { // - Egress IPs which have been deleted while ovnkube-master was down // - pods/namespaces which have stopped matching on egress IPs while // ovnkube-master was down - if egressIPToPodIPCache, err := oc.generatePodIPCacheForEgressIP(eIPs); err == nil { - oc.syncStaleEgressReroutePolicy(egressIPToPodIPCache) - oc.syncStaleSNATRules(egressIPToPodIPCache) - } + oc.syncWithRetry("syncEgressIPs", func() error { + egressIPCache, err := oc.generateCacheForEgressIP(eIPs) + if err != nil { + return fmt.Errorf("syncEgressIPs unable to generate cache for egressip: %v", err) + } + if err = oc.syncStaleEgressReroutePolicy(egressIPCache); err != nil { + return fmt.Errorf("syncEgressIPs unable to remove stale reroute policies: %v", err) + } + if err = oc.syncStaleSNATRules(egressIPCache); err != nil { + return fmt.Errorf("syncEgressIPs unable to remove stale nats: %v", err) + } + return nil + }) } -func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[string]sets.String) { +// This function implements a portion of syncEgressIPs. +// It removes OVN logical router policies used by EgressIPs deleted while ovnkube-master was down. +// It also removes stale nexthops from router policies used by EgressIPs. +// Upon failure, it may be invoked multiple times in order to avoid a pod restart. +func (oc *Controller) syncStaleEgressReroutePolicy(egressIPCache map[string]egressIPCacheEntry) error { logicalRouter := nbdb.LogicalRouter{} logicalRouterPolicyRes := []nbdb.LogicalRouterPolicy{} + logicalRouterPolicyStaleNexthops := make(map[string]nbdb.LogicalRouterPolicy) opModels := []libovsdbops.OperationModel{ { ModelPredicate: func(lrp *nbdb.LogicalRouterPolicy) bool { @@ -1015,14 +1036,36 @@ func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[stri return false } egressIPName := lrp.ExternalIDs["name"] - podIPCache, exists := egressIPToPodIPCache[egressIPName] + cacheEntry, exists := egressIPCache[egressIPName] splitMatch := strings.Split(lrp.Match, " ") logicalIP := splitMatch[len(splitMatch)-1] parsedLogicalIP := net.ParseIP(logicalIP) - if !exists || !podIPCache.Has(parsedLogicalIP.String()) { - klog.Infof("syncStaleEgressReroutePolicy will delete %s: %v", egressIPName, lrp) + if !exists || cacheEntry.gatewayRouterIPs.Len() == 0 || !cacheEntry.podIPs.Has(parsedLogicalIP.String()) { + klog.Infof("syncStaleEgressReroutePolicy will delete %s due to no nexthop or stale logical ip: %v", egressIPName, lrp) return true } + // Check for stale nexthops that may exist in the logical router policy and store that in logicalRouterPolicyStaleNexthops. + // Note: adding missing nexthop(s) to the logical router policy is done outside the scope of this function. + onlyStaleNextHops := true + staleNextHops := sets.NewString() + for _, nexthop := range lrp.Nexthops { + if cacheEntry.gatewayRouterIPs.Has(nexthop) { + onlyStaleNextHops = false + } else { + staleNextHops.Insert(nexthop) + } + } + if staleNextHops.Len() > 0 { + // If all nexthops are stale, let's go ahead and remove the entire row + if onlyStaleNextHops { + klog.Infof("syncStaleEgressReroutePolicy will delete %s due to stale nexthops: %v", egressIPName, lrp) + return true + } + logicalRouterPolicyStaleNexthops[lrp.UUID] = nbdb.LogicalRouterPolicy{ + UUID: lrp.UUID, + Nexthops: staleNextHops.UnsortedList(), + } + } return false }, ExistingResult: &logicalRouterPolicyRes, @@ -1040,11 +1083,33 @@ func (oc *Controller) syncStaleEgressReroutePolicy(egressIPToPodIPCache map[stri }, } if err := oc.modelClient.Delete(opModels...); err != nil { - klog.Errorf("Unable to remove stale logical router policies, err: %v", err) + return fmt.Errorf("unable to remove stale logical router policies, err: %v", err) + } + + // Update Logical Router Policies that have stale nexthops. Notice that we must do this separately + // because 1) there is no model predicates, and 2) logicalRouterPolicyStaleNexthops must be populated + opModels2 := make([]libovsdbops.OperationModel, 0, len(logicalRouterPolicyStaleNexthops)) + for lrpUUID := range logicalRouterPolicyStaleNexthops { + lrp := logicalRouterPolicyStaleNexthops[lrpUUID] + klog.Infof("syncStaleEgressReroutePolicy will update %s to remove stale nexthops: %v", lrp.UUID, lrp.Nexthops) + opModels2 = append(opModels2, libovsdbops.OperationModel{ + Model: &lrp, + OnModelMutations: []interface{}{ + &lrp.Nexthops, + }, + }) } + if err := oc.modelClient.Delete(opModels2...); err != nil { + return fmt.Errorf("unable to remove stale next hops from logical router policies, err: %v", err) + } + + return nil } -func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.String) { +// This function implements a portion of syncEgressIPs. +// It removes OVN NAT rules used by EgressIPs deleted while ovnkube-master was down. +// Upon failure, it may be invoked multiple times in order to avoid a pod restart. +func (oc *Controller) syncStaleSNATRules(egressIPCache map[string]egressIPCacheEntry) error { predicate := func(item *nbdb.NAT) bool { egressIPName, exists := item.ExternalIDs["name"] // Exclude rows that have no name or are not the right type @@ -1052,9 +1117,13 @@ func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.St return false } parsedLogicalIP := net.ParseIP(item.LogicalIP).String() - podIPCache, exists := egressIPToPodIPCache[egressIPName] - if !exists || !podIPCache.Has(parsedLogicalIP) { - klog.Infof("syncStaleSNATRules will delete %s: %v", egressIPName, item) + cacheEntry, exists := egressIPCache[egressIPName] + if !exists || !cacheEntry.podIPs.Has(parsedLogicalIP) { + klog.Infof("syncStaleSNATRules will delete %s due to logical ip: %v", egressIPName, item) + return true + } + if !cacheEntry.egressIPs.Has(item.ExternalIP) { + klog.Infof("syncStaleSNATRules will delete %s due to external ip: %v", egressIPName, item) return true } return false @@ -1062,49 +1131,68 @@ func (oc *Controller) syncStaleSNATRules(egressIPToPodIPCache map[string]sets.St nats, err := libovsdbops.FindNATsUsingPredicate(oc.nbClient, predicate) if err != nil { - klog.Errorf("Unable to sync egress IPs err: %v", err) - return + return fmt.Errorf("unable to sync egress IPs err: %v", err) } if len(nats) == 0 { // No stale nat entries to deal with: noop. - return + return nil } routers, err := libovsdbops.FindRoutersUsingNAT(oc.nbClient, nats) if err != nil { - klog.Errorf("Unable to sync egress IPs, err: %v", err) - return + return fmt.Errorf("unable to sync egress IPs, err: %v", err) } + var errors []error ops := []ovsdb.Operation{} for _, router := range routers { ops, err = libovsdbops.DeleteNATsFromRouterOps(oc.nbClient, ops, &router, nats...) if err != nil { klog.Errorf("Error deleting stale NAT from router %s: %v", router.Name, err) + errors = append(errors, err) continue } } + if len(errors) > 0 { + return fmt.Errorf("failed deleting stale NAT: %v", utilerrors.NewAggregate(errors)) + } _, err = libovsdbops.TransactAndCheck(oc.nbClient, ops) if err != nil { - klog.Errorf("Error deleting stale NATs: %v", err) + return fmt.Errorf("error deleting stale NATs: %v", err) } + return nil } -// generatePodIPCacheForEgressIP builds a cache of egressIP name -> podIPs for fast +// generateCacheForEgressIP builds a cache of egressIP name -> podIPs for fast // access when syncing egress IPs. The Egress IP setup will return a lot of // atomic items with the same general information repeated across most (egressIP // name, logical IP defined for that name), hence use a cache to avoid round // trips to the API server per item. -func (oc *Controller) generatePodIPCacheForEgressIP(eIPs []interface{}) (map[string]sets.String, error) { - egressIPToPodIPCache := make(map[string]sets.String) +func (oc *Controller) generateCacheForEgressIP(eIPs []interface{}) (map[string]egressIPCacheEntry, error) { + egressIPCache := make(map[string]egressIPCacheEntry) for _, eIP := range eIPs { egressIP, ok := eIP.(*egressipv1.EgressIP) if !ok { continue } - egressIPToPodIPCache[egressIP.Name] = sets.NewString() + egressIPCache[egressIP.Name] = egressIPCacheEntry{ + podIPs: sets.NewString(), + gatewayRouterIPs: sets.NewString(), + egressIPs: sets.NewString(), + } + for _, status := range egressIP.Status.Items { + isEgressIPv6 := utilnet.IsIPv6String(status.EgressIP) + gatewayRouterIP, err := oc.eIPC.getGatewayRouterJoinIP(status.Node, isEgressIPv6) + if err != nil { + klog.Errorf("Unable to retrieve gateway IP for node: %s, protocol is IPv6: %v, err: %v", status.Node, isEgressIPv6, err) + continue + } + egressIPCache[egressIP.Name].gatewayRouterIPs.Insert(gatewayRouterIP.String()) + egressIPCache[egressIP.Name].egressIPs.Insert(status.EgressIP) + } + namespaces, err := oc.watchFactory.GetNamespacesBySelector(egressIP.Spec.NamespaceSelector) if err != nil { klog.Errorf("Error building egress IP sync cache, cannot retrieve namespaces for EgressIP: %s, err: %v", egressIP.Name, err) @@ -1117,14 +1205,18 @@ func (oc *Controller) generatePodIPCacheForEgressIP(eIPs []interface{}) (map[str continue } for _, pod := range pods { - for _, podIP := range pod.Status.PodIPs { - ip := net.ParseIP(podIP.IP) - egressIPToPodIPCache[egressIP.Name].Insert(ip.String()) + logicalPort, err := oc.logicalPortCache.get(util.GetLogicalPortName(pod.Namespace, pod.Name)) + if err != nil { + klog.Errorf("Error getting logical port %s, err: %v", util.GetLogicalPortName(pod.Namespace, pod.Name), err) + continue + } + for _, ipNet := range logicalPort.ips { + egressIPCache[egressIP.Name].podIPs.Insert(ipNet.IP.String()) } } } } - return egressIPToPodIPCache, nil + return egressIPCache, nil } // isAnyClusterNodeIP verifies that the IP is not any node IP. @@ -1258,7 +1350,7 @@ func (oc *Controller) assignEgressIPs(name string, egressIPs []string) []egressi Node: eNode.name, EgressIP: eIPC.String(), }) - klog.V(5).Infof("Successful assignment of egress IP: %s on node: %+v", egressIP, eNode) + klog.Infof("Successful assignment of egress IP: %s on node: %+v", egressIP, eNode) eNode.allocations[eIPC.String()] = name break } diff --git a/go-controller/pkg/ovn/master.go b/go-controller/pkg/ovn/master.go index 1ebff39e08..caed8cebc4 100644 --- a/go-controller/pkg/ovn/master.go +++ b/go-controller/pkg/ovn/master.go @@ -1360,17 +1360,36 @@ func (oc *Controller) syncNodesPeriodic() { } } +// syncWithRetry is a wrapper that calls a sync function and retries it in case of failures. +func (oc *Controller) syncWithRetry(syncName string, syncFunc func() error) { + err := utilwait.PollImmediate(500*time.Millisecond, 60*time.Second, func() (bool, error) { + if err := syncFunc(); err != nil { + klog.Errorf("Failed (will retry) in syncing %s: %v", syncName, err) + return false, nil + } + return true, nil + }) + if err != nil { + klog.Fatalf("Error in syncing %s: %v", syncName, err) + } +} + // We only deal with cleaning up nodes that shouldn't exist here, since // watchNodes() will be called for all existing nodes at startup anyway. // Note that this list will include the 'join' cluster switch, which we // do not want to delete. func (oc *Controller) syncNodes(nodes []interface{}) { + oc.syncWithRetry("syncNodes", func() error { return oc.syncNodesRetriable(nodes) }) +} + +// This function implements the main body of work of what is described by syncNodes. +// Upon failure, it may be invoked multiple times in order to avoid a pod restart. +func (oc *Controller) syncNodesRetriable(nodes []interface{}) error { foundNodes := sets.NewString() for _, tmp := range nodes { node, ok := tmp.(*kapi.Node) if !ok { - klog.Errorf("Spurious object in syncNodes: %v", tmp) - continue + return fmt.Errorf("spurious object in syncNodes: %v", tmp) } foundNodes.Insert(node.Name) @@ -1387,6 +1406,7 @@ func (oc *Controller) syncNodes(nodes []interface{}) { // For each existing node, reserve its joinSwitch LRP IPs if they already exist. _, err := oc.joinSwIPManager.EnsureJoinLRPIPs(node.Name) if err != nil { + // TODO (flaviof): keep going even if EnsureJoinLRPIPs returned an error. Maybe we should not. klog.Errorf("Failed to get join switch port IP address for node %s: %v", node.Name, err) } } @@ -1396,8 +1416,7 @@ func (oc *Controller) syncNodes(nodes []interface{}) { chassisList, err := libovsdbops.ListChassis(oc.sbClient) if err != nil { - klog.Errorf("Failed to get chassis list: error: %v", err) - return + return fmt.Errorf("failed to get chassis list: error: %v", err) } for _, chassis := range chassisList { @@ -1409,8 +1428,10 @@ func (oc *Controller) syncNodes(nodes []interface{}) { nodeSwitches, err := libovsdbops.FindSwitchesWithOtherConfig(oc.nbClient) if err != nil { - klog.Errorf("Failed to get node logical switches which have other-config set error: %v", err) - return + if err != libovsdbclient.ErrNotFound { + return fmt.Errorf("failed to get node logical switches which have other-config set error: %v", err) + } + klog.Warning("Did not find any logical switches with other-config") } for _, nodeSwitch := range nodeSwitches { @@ -1449,7 +1470,7 @@ func (oc *Controller) syncNodes(nodes []interface{}) { } if err := libovsdbops.DeleteNodeChassis(oc.sbClient, staleChassis.List()...); err != nil { - klog.Errorf("Failed Deleting chassis %v error: %v", staleChassis.List(), err) - return + return fmt.Errorf("failed deleting chassis %v error: %v", staleChassis.List(), err) } + return nil } diff --git a/go-controller/pkg/ovn/namespace.go b/go-controller/pkg/ovn/namespace.go index edfc00e718..d8c2eefb6f 100644 --- a/go-controller/pkg/ovn/namespace.go +++ b/go-controller/pkg/ovn/namespace.go @@ -31,26 +31,34 @@ const ( ) func (oc *Controller) syncNamespaces(namespaces []interface{}) { + oc.syncWithRetry("syncNamespaces", func() error { return oc.syncNamespacesRetriable(namespaces) }) +} + +// This function implements the main body of work of syncNamespaces. +// Upon failure, it may be invoked multiple times in order to avoid a pod restart. +func (oc *Controller) syncNamespacesRetriable(namespaces []interface{}) error { expectedNs := make(map[string]bool) for _, nsInterface := range namespaces { ns, ok := nsInterface.(*kapi.Namespace) if !ok { - klog.Errorf("Spurious object in syncNamespaces: %v", nsInterface) - continue + return fmt.Errorf("spurious object in syncNamespaces: %v", nsInterface) } expectedNs[ns.Name] = true } - err := oc.addressSetFactory.ProcessEachAddressSet(func(addrSetName, namespaceName, nameSuffix string) { + err := oc.addressSetFactory.ProcessEachAddressSet(func(addrSetName, namespaceName, nameSuffix string) error { if nameSuffix == "" && !expectedNs[namespaceName] { if err := oc.addressSetFactory.DestroyAddressSetInBackingStore(addrSetName); err != nil { klog.Errorf(err.Error()) + return err } } + return nil }) if err != nil { - klog.Errorf("Error in syncing namespaces: %v", err) + return fmt.Errorf("error in syncing namespaces: %v", err) } + return nil } func (oc *Controller) getRoutingExternalGWs(nsInfo *namespaceInfo) *gatewayInfo { diff --git a/go-controller/pkg/ovn/pods.go b/go-controller/pkg/ovn/pods.go index 9bb6f2c746..91ef4e4a8a 100644 --- a/go-controller/pkg/ovn/pods.go +++ b/go-controller/pkg/ovn/pods.go @@ -26,27 +26,43 @@ import ( ) func (oc *Controller) syncPods(pods []interface{}) { + oc.syncWithRetry("syncPods", func() error { return oc.syncPodsRetriable(pods) }) +} + +// This function implements the main body of work of syncPods. +// Upon failure, it may be invoked multiple times in order to avoid a pod restart. +func (oc *Controller) syncPodsRetriable(pods []interface{}) error { var allOps []ovsdb.Operation // get the list of logical switch ports (equivalent to pods) expectedLogicalPorts := make(map[string]bool) for _, podInterface := range pods { pod, ok := podInterface.(*kapi.Pod) if !ok { - klog.Errorf("Spurious object in syncPods: %v", podInterface) - continue + return fmt.Errorf("spurious object in syncPods: %v", podInterface) } annotations, err := util.UnmarshalPodAnnotation(pod.Annotations) if util.PodScheduled(pod) && util.PodWantsNetwork(pod) && err == nil { + // skip nodes that are not running ovnk (inferred from host subnets) + if oc.lsManager.IsNonHostSubnetSwitch(pod.Spec.NodeName) { + continue + } logicalPort := util.GetLogicalPortName(pod.Namespace, pod.Name) expectedLogicalPorts[logicalPort] = true if err = oc.waitForNodeLogicalSwitchInCache(pod.Spec.NodeName); err != nil { - klog.Errorf("Failed to wait for node %s to be added to cache. IP allocation may fail!", + return fmt.Errorf("failed to wait for node %s to be added to cache. IP allocation may fail!", pod.Spec.NodeName) } if err = oc.lsManager.AllocateIPs(pod.Spec.NodeName, annotations.IPs); err != nil { - klog.Errorf("couldn't allocate IPs: %s for pod: %s on node: %s"+ - " error: %v", util.JoinIPNetIPs(annotations.IPs, " "), logicalPort, - pod.Spec.NodeName, err) + if err == ipallocator.ErrAllocated { + // already allocated: log an error but not stop syncPod from continuing + klog.Errorf("Already allocated IPs: %s for pod: %s on node: %s", + util.JoinIPNetIPs(annotations.IPs, " "), logicalPort, + pod.Spec.NodeName) + } else { + return fmt.Errorf("Couldn't allocate IPs: %s for pod: %s on node: %s"+ + " error: %v", util.JoinIPNetIPs(annotations.IPs, " "), logicalPort, + pod.Spec.NodeName, err) + } } } } @@ -58,8 +74,7 @@ func (oc *Controller) syncPods(pods []interface{}) { defer cancel() err := oc.nbClient.List(ctx, &lspList) if err != nil { - klog.Errorf("Cannot sync pods, cannot retrieve list of logical switch ports (%+v)", err) - return + return fmt.Errorf("cannot sync pods, cannot retrieve list of logical switch ports (%+v)", err) } for _, lsp := range lspList { portCache[lsp.UUID] = lsp @@ -67,26 +82,31 @@ func (oc *Controller) syncPods(pods []interface{}) { // get all the nodes from the watchFactory nodes, err := oc.watchFactory.GetNodes() if err != nil { - klog.Errorf("Failed to get nodes: %v", err) - return + return fmt.Errorf("failed to get nodes: %v", err) } for _, n := range nodes { + // skip nodes that are not running ovnk (inferred from host subnets) + if oc.lsManager.IsNonHostSubnetSwitch(n.Name) { + continue + } stalePorts := []string{} // find the logical switch for the node ls := &nbdb.LogicalSwitch{} if lsUUID, ok := oc.lsManager.GetUUID(n.Name); !ok { klog.Errorf("Error getting logical switch for node %s: %s", n.Name, "Switch not in logical switch cache") - continue + + // Not in cache: Try getting the logical switch from ovn database (slower method) + if ls, err = libovsdbops.FindSwitchByName(oc.nbClient, n.Name); err != nil { + return fmt.Errorf("can't find switch for node %s: %v", n.Name, err) + } } else { ctx, cancel := context.WithTimeout(context.Background(), ovntypes.OVSDBTimeout) defer cancel() ls.UUID = lsUUID if err := oc.nbClient.Get(ctx, ls); err != nil { - klog.Errorf("Error getting logical switch for node %d (UUID: %d) from ovn database (%v)", n.Name, ls.UUID, err) - continue + return fmt.Errorf("error getting logical switch for node %s (UUID: %s) from ovn database (%v)", n.Name, ls.UUID, err) } - } for _, port := range ls.Ports { if portCache[port].ExternalIDs["pod"] == "true" { @@ -102,16 +122,16 @@ func (oc *Controller) syncPods(pods []interface{}) { Value: stalePorts, }) if err != nil { - klog.Errorf("Could not generate ops to delete stale ports from logical switch %s (%+v)", n.Name, err) - continue + return fmt.Errorf("could not generate ops to delete stale ports from logical switch %s (%+v)", n.Name, err) } allOps = append(allOps, ops...) } } _, err = libovsdbops.TransactAndCheck(oc.nbClient, allOps) if err != nil { - klog.Errorf("Could not remove stale logicalPorts from switches (%+v)", err) + return fmt.Errorf("could not remove stale logicalPorts from switches (%+v)", err) } + return nil } func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) { diff --git a/go-controller/pkg/ovn/policy.go b/go-controller/pkg/ovn/policy.go index 0662714626..000bc2851f 100644 --- a/go-controller/pkg/ovn/policy.go +++ b/go-controller/pkg/ovn/policy.go @@ -112,13 +112,17 @@ func hashedPortGroup(s string) string { } func (oc *Controller) syncNetworkPolicies(networkPolicies []interface{}) { + oc.syncWithRetry("syncNetworkPolicies", func() error { return oc.syncNetworkPoliciesRetriable(networkPolicies) }) +} + +// This function implements the main body of work of syncNetworkPolicies. +// Upon failure, it may be invoked multiple times in order to avoid a pod restart. +func (oc *Controller) syncNetworkPoliciesRetriable(networkPolicies []interface{}) error { expectedPolicies := make(map[string]map[string]bool) for _, npInterface := range networkPolicies { policy, ok := npInterface.(*knet.NetworkPolicy) if !ok { - klog.Errorf("Spurious object in syncNetworkPolicies: %v", - npInterface) - continue + return fmt.Errorf("spurious object in syncNetworkPolicies: %v", npInterface) } if nsMap, ok := expectedPolicies[policy.Namespace]; ok { @@ -131,7 +135,7 @@ func (oc *Controller) syncNetworkPolicies(networkPolicies []interface{}) { } stalePGs := []string{} - err := oc.addressSetFactory.ProcessEachAddressSet(func(addrSetName, namespaceName, policyName string) { + err := oc.addressSetFactory.ProcessEachAddressSet(func(addrSetName, namespaceName, policyName string) error { if policyName != "" && !expectedPolicies[namespaceName][policyName] { // policy doesn't exist on k8s. Delete the port group portGroupName := fmt.Sprintf("%s_%s", namespaceName, policyName) @@ -140,17 +144,19 @@ func (oc *Controller) syncNetworkPolicies(networkPolicies []interface{}) { // delete the address sets for this old policy from OVN if err := oc.addressSetFactory.DestroyAddressSetInBackingStore(addrSetName); err != nil { klog.Errorf(err.Error()) + return err } } + return nil }) if err != nil { - klog.Errorf("Error in syncing network policies: %v", err) + return fmt.Errorf("error in syncing network policies: %v", err) } if len(stalePGs) > 0 { err = libovsdbops.DeletePortGroups(oc.nbClient, stalePGs...) if err != nil { - klog.Errorf("Error removing stale port groups %v: %v", stalePGs, err) + return fmt.Errorf("error removing stale port groups %v: %v", stalePGs, err) } } @@ -158,12 +164,12 @@ func (oc *Controller) syncNetworkPolicies(networkPolicies []interface{}) { var allEgressACLs []nbdb.ACL egressACLs, err := libovsdbops.FindACLsByExternalID(oc.nbClient, map[string]string{policyTypeACLExtIdKey: string(knet.PolicyTypeEgress)}) if err != nil { - klog.Errorf("error cannot sync NetworkPolicy Egress obj: %v", err) + return fmt.Errorf("error cannot sync NetworkPolicy Egress obj: %v", err) } allEgressACLs = append(allEgressACLs, egressACLs...) egressACLs, err = libovsdbops.FindACLsByExternalID(oc.nbClient, map[string]string{defaultDenyPolicyTypeACLExtIdKey: string(knet.PolicyTypeEgress)}) if err != nil { - klog.Errorf("error cannot sync NetworkPolicy Egress obj: %v", err) + return fmt.Errorf("error cannot sync NetworkPolicy Egress obj: %v", err) } allEgressACLs = append(allEgressACLs, egressACLs...) // if the first egress ACL is correct they should all be correct and not need to update @@ -177,15 +183,16 @@ func (oc *Controller) syncNetworkPolicies(networkPolicies []interface{}) { } ops, err := libovsdbops.CreateOrUpdateACLsOps(oc.nbClient, nil, egressACLsPTR...) if err != nil { - klog.Errorf("cannot create ops to update old Egress NetworkPolicy ACLs: %v", err) + return fmt.Errorf("cannot create ops to update old Egress NetworkPolicy ACLs: %v", err) } _, err = libovsdbops.TransactAndCheck(oc.nbClient, ops) if err != nil { - klog.Errorf("cannot update old Egress NetworkPolicy ACLs: %v", err) + return fmt.Errorf("cannot update old Egress NetworkPolicy ACLs: %v", err) } } + return nil } func addAllowACLFromNode(nodeName string, mgmtPortIP net.IP, nbClient libovsdbclient.Client) error {