Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion go-controller/pkg/libovsdbops/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"

"k8s.io/apimachinery/pkg/util/sets"
)

// LOGICAL_SWITCH OPs
Expand Down Expand Up @@ -315,9 +317,20 @@ func RemoveACLsFromJoinSwitch(nbClient libovsdbclient.Client, acls []nbdb.ACL) e

// RemoveACLFromSwitches removes the ACL uuid entry from Logical Switch acl's list.
func RemoveACLsFromAllSwitches(nbClient libovsdbclient.Client, acls []nbdb.ACL) error {
// optimize the predicate to exclude switches that don't reference deleting acls.
aclsToDelete := sets.String{}
for _, acl := range acls {
aclsToDelete.Insert(acl.UUID)
}
swWithACLsPred := func(sw *nbdb.LogicalSwitch) bool {
return aclsToDelete.HasAny(sw.ACLs...)
}
// Find all switches
switches, err := findSwitches(nbClient)
switches, err := findSwitchesByPredicate(nbClient, swWithACLsPred)
if err != nil {
if errors.Is(err, libovsdbclient.ErrNotFound) {
return nil
}
return err
}

Expand Down
21 changes: 12 additions & 9 deletions go-controller/pkg/ovn/egressfirewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import (
"strings"
"sync"

"github.com/pkg/errors"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
egressfirewallapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util/batching"

kapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
Expand All @@ -27,6 +27,7 @@ const (
egressFirewallUpdateError = "EgressFirewall Rules not correctly updated"
// egressFirewallACLExtIdKey external ID key for egress firewall ACLs
egressFirewallACLExtIdKey = "egressFirewall"
aclDeleteBatchSize = 1000
)

type egressFirewall struct {
Expand Down Expand Up @@ -164,9 +165,11 @@ func (oc *Controller) syncEgressFirewallRetriable(egressFirewalls []interface{})

// delete acls from all switches, they reside on the port group now
if len(egressFirewallACLs) != 0 {
err = libovsdbops.RemoveACLsFromAllSwitches(oc.nbClient, egressFirewallACLs)
err = batching.Batch(aclDeleteBatchSize, egressFirewallACLs, func(batchACLs []nbdb.ACL) error {
return libovsdbops.RemoveACLsFromAllSwitches(oc.nbClient, batchACLs)
})
if err != nil {
return fmt.Errorf("failed to remove reject acl from all logical switches: %v", err)
return fmt.Errorf("failed to remove egress firewall acls from all logical switches: %v", err)
}
}

Expand Down Expand Up @@ -214,7 +217,7 @@ func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.Egress
egressFirewall.Name, egressFirewall.Namespace)
}

var addErrors error
var errorList []error
for i, egressFirewallRule := range egressFirewall.Spec.Egress {
// process Rules into egressFirewallRules for egressFirewall struct
if i > types.EgressFirewallStartPriority-types.MinimumReservedEgressFirewallPriority {
Expand All @@ -224,15 +227,15 @@ func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.Egress
}
efr, err := newEgressFirewallRule(egressFirewallRule, i)
if err != nil {
addErrors = errors.Wrapf(addErrors, "error: cannot create EgressFirewall Rule to destination %s for namespace %s - %v",
egressFirewallRule.To.CIDRSelector, egressFirewall.Namespace, err)
errorList = append(errorList, fmt.Errorf("cannot create EgressFirewall Rule to destination %s for namespace %s: %w",
egressFirewallRule.To.CIDRSelector, egressFirewall.Namespace, err))
continue

}
ef.egressRules = append(ef.egressRules, efr)
}
if addErrors != nil {
return addErrors
if len(errorList) > 0 {
return errors.NewAggregate(errorList)
}

// EgressFirewall needs to make sure that the address_set for the namespace exists independently of the namespace object
Expand Down
26 changes: 26 additions & 0 deletions go-controller/pkg/util/batching/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package batching

import (
"fmt"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
)

func Batch(batchSize int, data []nbdb.ACL, eachFn func([]nbdb.ACL) error) error {
if batchSize < 1 {
return fmt.Errorf("batchSize should be > 0, got %d", batchSize)
}
start := 0
dataLen := len(data)
for start < dataLen {
end := start + batchSize
if end > dataLen {
end = dataLen
}
err := eachFn(data[start:end])
if err != nil {
return err
}
start = end
}
return nil
}
88 changes: 88 additions & 0 deletions go-controller/pkg/util/batching/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package batching

import (
"fmt"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"

"strings"
"testing"
)

type batchTestData struct {
name string
batchSize int
data []nbdb.ACL
expectErr string
}

func TestBatch(t *testing.T) {
acl1 := nbdb.ACL{UUID: "1"}
acl2 := nbdb.ACL{UUID: "2"}
acl3 := nbdb.ACL{UUID: "3"}
acl4 := nbdb.ACL{UUID: "4"}
acl5 := nbdb.ACL{UUID: "5"}

tt := []batchTestData{
{
name: "batch size should be > 0",
batchSize: 0,
data: []nbdb.ACL{acl1, acl2, acl3},
expectErr: "batchSize should be > 0",
},
{
name: "batchSize = 1",
batchSize: 1,
data: []nbdb.ACL{acl1, acl2, acl3},
},
{
name: "batchSize > 1",
batchSize: 2,
data: []nbdb.ACL{acl1, acl2, acl3},
},
{
name: "number of batches = 0",
batchSize: 2,
data: nil,
},
{
name: "number of batches = 1",
batchSize: 2,
data: []nbdb.ACL{acl1, acl2},
},
{
name: "number of batches > 1",
batchSize: 2,
data: []nbdb.ACL{acl1, acl2, acl3, acl4},
},
{
name: "number of batches not int",
batchSize: 2,
data: []nbdb.ACL{acl1, acl2, acl3, acl4, acl5},
},
}

for _, tCase := range tt {
g := gomega.NewGomegaWithT(t)
ginkgo.By(tCase.name)
var result []nbdb.ACL
batchNum := 0
err := Batch(tCase.batchSize, tCase.data, func(l []nbdb.ACL) error {

batchNum += 1
result = append(result, l...)
return nil
})
if err != nil {
if tCase.expectErr != "" && strings.Contains(err.Error(), tCase.expectErr) {
continue
}
t.Fatal(fmt.Sprintf("test %s failed: %v", tCase.name, err))
}
// tCase.data/tCase.batchSize round up
expectedBatchNum := (len(tCase.data) + tCase.batchSize - 1) / tCase.batchSize
g.Expect(batchNum).To(gomega.Equal(expectedBatchNum))
g.Expect(result).To(gomega.Equal(tCase.data))
}
}