Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions go-controller/pkg/ovn/egressfirewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (
"strings"
"sync"

"github.com/pkg/errors"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
egressfirewallapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util/batching"

kapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
Expand All @@ -26,6 +27,7 @@ const (
egressFirewallAddError = "EgressFirewall Rules not correctly added"
// egressFirewallACLExtIdKey external ID key for egress firewall ACLs
egressFirewallACLExtIdKey = "egressFirewall"
aclDeleteBatchSize = 1000
)

type egressFirewall struct {
Expand Down Expand Up @@ -134,10 +136,19 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) error {

// delete acls from all switches, they reside on the port group now
if len(egressFirewallACLs) != 0 {
err = libovsdbops.RemoveACLsFromLogicalSwitchesWithPredicate(oc.nbClient, func(item *nbdb.LogicalSwitch) bool { return true },
egressFirewallACLs...)
err = batching.Batch[*nbdb.ACL](aclDeleteBatchSize, egressFirewallACLs, func(batchACLs []*nbdb.ACL) error {
// optimize the predicate to exclude switches that don't reference deleting acls.
aclsToDelete := sets.String{}
for _, acl := range batchACLs {
aclsToDelete.Insert(acl.UUID)
}
swWithACLsPred := func(sw *nbdb.LogicalSwitch) bool {
return aclsToDelete.HasAny(sw.ACLs...)
}
return libovsdbops.RemoveACLsFromLogicalSwitchesWithPredicate(oc.nbClient, swWithACLsPred, batchACLs...)
})
if err != nil {
return fmt.Errorf("failed to remove reject acl from node logical switches: %v", err)
return fmt.Errorf("failed to remove egress firewall acls from node logical switches: %v", err)
}
}

Expand Down Expand Up @@ -185,7 +196,7 @@ func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.Egress
egressFirewall.Name, egressFirewall.Namespace)
}

var addErrors error
var errorList []error
for i, egressFirewallRule := range egressFirewall.Spec.Egress {
// process Rules into egressFirewallRules for egressFirewall struct
if i > types.EgressFirewallStartPriority-types.MinimumReservedEgressFirewallPriority {
Expand All @@ -195,15 +206,15 @@ func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.Egress
}
efr, err := newEgressFirewallRule(egressFirewallRule, i)
if err != nil {
addErrors = errors.Wrapf(addErrors, "error: cannot create EgressFirewall Rule to destination %s for namespace %s - %v",
egressFirewallRule.To.CIDRSelector, egressFirewall.Namespace, err)
errorList = append(errorList, fmt.Errorf("cannot create EgressFirewall Rule to destination %s for namespace %s: %w",
egressFirewallRule.To.CIDRSelector, egressFirewall.Namespace, err))
continue

}
ef.egressRules = append(ef.egressRules, efr)
}
if addErrors != nil {
return addErrors
if len(errorList) > 0 {
return errors.NewAggregate(errorList)
}

// EgressFirewall needs to make sure that the address_set for the namespace exists independently of the namespace object
Expand Down
23 changes: 23 additions & 0 deletions go-controller/pkg/util/batching/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package batching

import "fmt"

func Batch[T any](batchSize int, data []T, eachFn func([]T) error) error {
if batchSize < 1 {
return fmt.Errorf("batchSize should be > 0, got %d", batchSize)
}
start := 0
dataLen := len(data)
for start < dataLen {
end := start + batchSize
if end > dataLen {
end = dataLen
}
err := eachFn(data[start:end])
if err != nil {
return err
}
start = end
}
return nil
}
81 changes: 81 additions & 0 deletions go-controller/pkg/util/batching/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package batching

import (
"fmt"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"

"strings"
"testing"
)

type batchTestData struct {
name string
batchSize int
data []int
result []int
expectErr string
}

func TestBatch(t *testing.T) {
tt := []batchTestData{
{
name: "batch size should be > 0",
batchSize: 0,
data: []int{1, 2, 3},
expectErr: "batchSize should be > 0",
},
{
name: "batchSize = 1",
batchSize: 1,
data: []int{1, 2, 3},
},
{
name: "batchSize > 1",
batchSize: 2,
data: []int{1, 2, 3},
},
{
name: "number of batches = 0",
batchSize: 2,
data: nil,
},
{
name: "number of batches = 1",
batchSize: 2,
data: []int{1, 2},
},
{
name: "number of batches > 1",
batchSize: 2,
data: []int{1, 2, 3, 4},
},
{
name: "number of batches not int",
batchSize: 2,
data: []int{1, 2, 3, 4, 5},
},
}

for _, tCase := range tt {
g := gomega.NewGomegaWithT(t)
ginkgo.By(tCase.name)
var result []int
batchNum := 0
err := Batch[int](tCase.batchSize, tCase.data, func(l []int) error {
batchNum += 1
result = append(result, l...)
return nil
})
if err != nil {
if tCase.expectErr != "" && strings.Contains(err.Error(), tCase.expectErr) {
continue
}
t.Fatal(fmt.Sprintf("test %s failed: %v", tCase.name, err))
}
// tCase.data/tCase.batchSize round up
expectedBatchNum := (len(tCase.data) + tCase.batchSize - 1) / tCase.batchSize
g.Expect(batchNum).To(gomega.Equal(expectedBatchNum))
g.Expect(result).To(gomega.Equal(tCase.data))
}
}