diff --git a/go-controller/pkg/libovsdbops/switch.go b/go-controller/pkg/libovsdbops/switch.go index 126c6c04aa..817265aca1 100644 --- a/go-controller/pkg/libovsdbops/switch.go +++ b/go-controller/pkg/libovsdbops/switch.go @@ -12,6 +12,8 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" + + "k8s.io/apimachinery/pkg/util/sets" ) // LOGICAL_SWITCH OPs @@ -315,9 +317,20 @@ func RemoveACLsFromJoinSwitch(nbClient libovsdbclient.Client, acls []nbdb.ACL) e // RemoveACLFromSwitches removes the ACL uuid entry from Logical Switch acl's list. func RemoveACLsFromAllSwitches(nbClient libovsdbclient.Client, acls []nbdb.ACL) error { + // optimize the predicate to exclude switches that don't reference deleting acls. + aclsToDelete := sets.String{} + for _, acl := range acls { + aclsToDelete.Insert(acl.UUID) + } + swWithACLsPred := func(sw *nbdb.LogicalSwitch) bool { + return aclsToDelete.HasAny(sw.ACLs...) + } // Find all switches - switches, err := findSwitches(nbClient) + switches, err := findSwitchesByPredicate(nbClient, swWithACLsPred) if err != nil { + if errors.Is(err, libovsdbclient.ErrNotFound) { + return nil + } return err } diff --git a/go-controller/pkg/ovn/egressfirewall.go b/go-controller/pkg/ovn/egressfirewall.go index 86483188da..23a6c309e0 100644 --- a/go-controller/pkg/ovn/egressfirewall.go +++ b/go-controller/pkg/ovn/egressfirewall.go @@ -6,16 +6,16 @@ import ( "strings" "sync" - "github.com/pkg/errors" - "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" egressfirewallapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util/batching" kapi "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -27,6 +27,7 @@ const ( egressFirewallUpdateError = "EgressFirewall Rules not correctly updated" // egressFirewallACLExtIdKey external ID key for egress firewall ACLs egressFirewallACLExtIdKey = "egressFirewall" + aclDeleteBatchSize = 1000 ) type egressFirewall struct { @@ -164,9 +165,11 @@ func (oc *Controller) syncEgressFirewallRetriable(egressFirewalls []interface{}) // delete acls from all switches, they reside on the port group now if len(egressFirewallACLs) != 0 { - err = libovsdbops.RemoveACLsFromAllSwitches(oc.nbClient, egressFirewallACLs) + err = batching.Batch(aclDeleteBatchSize, egressFirewallACLs, func(batchACLs []nbdb.ACL) error { + return libovsdbops.RemoveACLsFromAllSwitches(oc.nbClient, batchACLs) + }) if err != nil { - return fmt.Errorf("failed to remove reject acl from all logical switches: %v", err) + return fmt.Errorf("failed to remove egress firewall acls from all logical switches: %v", err) } } @@ -214,7 +217,7 @@ func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.Egress egressFirewall.Name, egressFirewall.Namespace) } - var addErrors error + var errorList []error for i, egressFirewallRule := range egressFirewall.Spec.Egress { // process Rules into egressFirewallRules for egressFirewall struct if i > types.EgressFirewallStartPriority-types.MinimumReservedEgressFirewallPriority { @@ -224,15 +227,15 @@ func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.Egress } efr, err := newEgressFirewallRule(egressFirewallRule, i) if err != nil { - addErrors = errors.Wrapf(addErrors, "error: cannot create EgressFirewall Rule to destination %s for namespace %s - %v", - egressFirewallRule.To.CIDRSelector, egressFirewall.Namespace, err) + errorList = append(errorList, fmt.Errorf("cannot create EgressFirewall Rule to destination %s for namespace %s: %w", + egressFirewallRule.To.CIDRSelector, egressFirewall.Namespace, err)) continue } ef.egressRules = append(ef.egressRules, efr) } - if addErrors != nil { - return addErrors + if len(errorList) > 0 { + return errors.NewAggregate(errorList) } // EgressFirewall needs to make sure that the address_set for the namespace exists independently of the namespace object diff --git a/go-controller/pkg/util/batching/batch.go b/go-controller/pkg/util/batching/batch.go new file mode 100644 index 0000000000..1b80ffb6d0 --- /dev/null +++ b/go-controller/pkg/util/batching/batch.go @@ -0,0 +1,26 @@ +package batching + +import ( + "fmt" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" +) + +func Batch(batchSize int, data []nbdb.ACL, eachFn func([]nbdb.ACL) error) error { + if batchSize < 1 { + return fmt.Errorf("batchSize should be > 0, got %d", batchSize) + } + start := 0 + dataLen := len(data) + for start < dataLen { + end := start + batchSize + if end > dataLen { + end = dataLen + } + err := eachFn(data[start:end]) + if err != nil { + return err + } + start = end + } + return nil +} diff --git a/go-controller/pkg/util/batching/batch_test.go b/go-controller/pkg/util/batching/batch_test.go new file mode 100644 index 0000000000..57584c4567 --- /dev/null +++ b/go-controller/pkg/util/batching/batch_test.go @@ -0,0 +1,88 @@ +package batching + +import ( + "fmt" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" + + "strings" + "testing" +) + +type batchTestData struct { + name string + batchSize int + data []nbdb.ACL + expectErr string +} + +func TestBatch(t *testing.T) { + acl1 := nbdb.ACL{UUID: "1"} + acl2 := nbdb.ACL{UUID: "2"} + acl3 := nbdb.ACL{UUID: "3"} + acl4 := nbdb.ACL{UUID: "4"} + acl5 := nbdb.ACL{UUID: "5"} + + tt := []batchTestData{ + { + name: "batch size should be > 0", + batchSize: 0, + data: []nbdb.ACL{acl1, acl2, acl3}, + expectErr: "batchSize should be > 0", + }, + { + name: "batchSize = 1", + batchSize: 1, + data: []nbdb.ACL{acl1, acl2, acl3}, + }, + { + name: "batchSize > 1", + batchSize: 2, + data: []nbdb.ACL{acl1, acl2, acl3}, + }, + { + name: "number of batches = 0", + batchSize: 2, + data: nil, + }, + { + name: "number of batches = 1", + batchSize: 2, + data: []nbdb.ACL{acl1, acl2}, + }, + { + name: "number of batches > 1", + batchSize: 2, + data: []nbdb.ACL{acl1, acl2, acl3, acl4}, + }, + { + name: "number of batches not int", + batchSize: 2, + data: []nbdb.ACL{acl1, acl2, acl3, acl4, acl5}, + }, + } + + for _, tCase := range tt { + g := gomega.NewGomegaWithT(t) + ginkgo.By(tCase.name) + var result []nbdb.ACL + batchNum := 0 + err := Batch(tCase.batchSize, tCase.data, func(l []nbdb.ACL) error { + + batchNum += 1 + result = append(result, l...) + return nil + }) + if err != nil { + if tCase.expectErr != "" && strings.Contains(err.Error(), tCase.expectErr) { + continue + } + t.Fatal(fmt.Sprintf("test %s failed: %v", tCase.name, err)) + } + // tCase.data/tCase.batchSize round up + expectedBatchNum := (len(tCase.data) + tCase.batchSize - 1) / tCase.batchSize + g.Expect(batchNum).To(gomega.Equal(expectedBatchNum)) + g.Expect(result).To(gomega.Equal(tCase.data)) + } +}