diff --git a/go-controller/cmd/ovn-kube-util/app/ovs-exporter.go b/go-controller/cmd/ovn-kube-util/app/ovs-exporter.go index 346563ea49..5d4e1f65a4 100644 --- a/go-controller/cmd/ovn-kube-util/app/ovs-exporter.go +++ b/go-controller/cmd/ovn-kube-util/app/ovs-exporter.go @@ -1,8 +1,10 @@ package app import ( + "context" "k8s.io/klog/v2" "net/http" + "time" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" @@ -21,6 +23,7 @@ var OvsExporterCommand = cli.Command{ }, }, Action: func(ctx *cli.Context) error { + stopChan := make(chan struct{}) bindAddress := ctx.String("metrics-bind-address") if bindAddress == "" { bindAddress = "0.0.0.0:9310" @@ -34,12 +37,24 @@ var OvsExporterCommand = cli.Command{ mux.Handle("/metrics", promhttp.Handler()) // register ovs metrics that will be served off of /metrics path - metrics.RegisterStandaloneOvsMetrics() + metrics.RegisterStandaloneOvsMetrics(stopChan) - err := http.ListenAndServe(bindAddress, mux) - if err != nil { - klog.Exitf("Starting metrics server failed: %v", err) + server := &http.Server{Addr: bindAddress, Handler: mux} + go func() { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + klog.Exitf("Metrics server exited with error: %v", err) + } + }() + + // run until cancelled + <-ctx.Context.Done() + close(stopChan) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(shutdownCtx); err != nil { + klog.Errorf("Error stopping metrics server: %v", err) } + return nil }, } diff --git a/go-controller/cmd/ovnkube/ovnkube.go b/go-controller/cmd/ovnkube/ovnkube.go index 4df5e884b9..95b8cff664 100644 --- a/go-controller/cmd/ovnkube/ovnkube.go +++ b/go-controller/cmd/ovnkube/ovnkube.go @@ -280,18 +280,18 @@ func runOvnKube(ctx *cli.Context) error { // start the prometheus server to serve OVN K8s Metrics (default master port: 9409, node port: 9410) if config.Metrics.BindAddress != "" { metrics.StartMetricsServer(config.Metrics.BindAddress, config.Metrics.EnablePprof, - config.Metrics.NodeServerCert, config.Metrics.NodeServerPrivKey) + config.Metrics.NodeServerCert, config.Metrics.NodeServerPrivKey, stopChan, wg) } // start the prometheus server to serve OVS and OVN Metrics (default port: 9476) // Note: for ovnkube node mode dpu-host no metrics is required as ovs/ovn is not running on the node. if config.OvnKubeNode.Mode != types.NodeModeDPUHost && config.Metrics.OVNMetricsBindAddress != "" { if config.Metrics.ExportOVSMetrics { - metrics.RegisterOvsMetricsWithOvnMetrics() + metrics.RegisterOvsMetricsWithOvnMetrics(stopChan) } - metrics.RegisterOvnMetrics(ovnClientset.KubeClient, node) + metrics.RegisterOvnMetrics(ovnClientset.KubeClient, node, stopChan) metrics.StartOVNMetricsServer(config.Metrics.OVNMetricsBindAddress, - config.Metrics.NodeServerCert, config.Metrics.NodeServerPrivKey) + config.Metrics.NodeServerCert, config.Metrics.NodeServerPrivKey, stopChan, wg) } // run until cancelled diff --git a/go-controller/go.mod b/go-controller/go.mod index 93745f473a..63243f299d 100644 --- a/go-controller/go.mod +++ b/go-controller/go.mod @@ -21,9 +21,10 @@ require ( github.com/onsi/gomega v1.14.0 github.com/openshift/api v0.0.0-20211201215911-5a82bae32e46 github.com/openshift/client-go v0.0.0-20211202194848-d3f186f2d366 - github.com/ovn-org/libovsdb v0.6.1-0.20220427123326-d7b273399db4 + github.com/ovn-org/libovsdb v0.6.1-0.20220513144310-50ec17900991 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 + github.com/prometheus/client_model v0.2.0 github.com/spf13/afero v1.4.1 github.com/stretchr/testify v1.7.0 github.com/urfave/cli/v2 v2.2.0 @@ -71,7 +72,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect diff --git a/go-controller/go.sum b/go-controller/go.sum index a09be0408f..3df51379bc 100644 --- a/go-controller/go.sum +++ b/go-controller/go.sum @@ -402,6 +402,8 @@ github.com/openshift/client-go v0.0.0-20211202194848-d3f186f2d366/go.mod h1:HJeH github.com/ory/dockertest/v3 v3.8.0/go.mod h1:9zPATATlWQru+ynXP+DytBQrsXV7Tmlx7K86H6fQaDo= github.com/ovn-org/libovsdb v0.6.1-0.20220427123326-d7b273399db4 h1:atUIOA34Cg9GVFn/8rmIsmnOIbi0D82x+xfhV2UuF1Q= github.com/ovn-org/libovsdb v0.6.1-0.20220427123326-d7b273399db4/go.mod h1:BQPdnSM2QOKxPwxl7wHJDSPP4B/CDKq3+vzgFW3J5gE= +github.com/ovn-org/libovsdb v0.6.1-0.20220513144310-50ec17900991 h1:EsMLPWOIRgB9WFTt5+L99LaX4H1Nm6yL2S6zsx4TSzY= +github.com/ovn-org/libovsdb v0.6.1-0.20220513144310-50ec17900991/go.mod h1:BQPdnSM2QOKxPwxl7wHJDSPP4B/CDKq3+vzgFW3J5gE= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/go-controller/hybrid-overlay/pkg/controller/node_linux.go b/go-controller/hybrid-overlay/pkg/controller/node_linux.go index c07fff5be9..af5c0beae9 100644 --- a/go-controller/hybrid-overlay/pkg/controller/node_linux.go +++ b/go-controller/hybrid-overlay/pkg/controller/node_linux.go @@ -604,13 +604,16 @@ func (n *NodeController) RunFlowSync(stopCh <-chan struct{}) { klog.Info("Starting hybrid overlay OpenFlow sync thread") klog.Info("Running initial OpenFlow sync") n.syncFlows() - + syncPeriod := 30 * time.Second + timer := time.NewTicker(syncPeriod) + defer timer.Stop() for { select { - case <-time.After(30 * time.Second): + case <-timer.C: n.syncFlows() case <-n.flowChan: n.syncFlows() + timer.Reset(syncPeriod) case <-stopCh: klog.Info("Shutting down OpenFlow sync thread") return diff --git a/go-controller/pkg/libovsdbops/acl.go b/go-controller/pkg/libovsdbops/acl.go index 8e0931182b..bc0bab8f8d 100644 --- a/go-controller/pkg/libovsdbops/acl.go +++ b/go-controller/pkg/libovsdbops/acl.go @@ -138,27 +138,6 @@ func UpdateACLsLoggingOps(nbClient libovsdbclient.Client, ops []libovsdb.Operati return modelClient.CreateOrUpdateOps(ops, opModels...) } -// UpdateACLsDirection updates the direction on the provided ACLs -func UpdateACLsDirection(nbClient libovsdbclient.Client, acls ...*nbdb.ACL) error { - opModels := make([]operationModel, 0, len(acls)) - for i := range acls { - // can't use i in the predicate, for loop replaces it in-memory - acl := acls[i] - opModel := operationModel{ - Model: acl, - ModelPredicate: func(item *nbdb.ACL) bool { return isEquivalentACL(item, acl) }, - OnModelUpdates: []interface{}{&acl.Direction}, - ErrNotFound: true, - BulkOp: false, - } - opModels = append(opModels, opModel) - } - - modelClient := newModelClient(nbClient) - _, err := modelClient.CreateOrUpdate(opModels...) - return err -} - // DeleteACLs deletes the provided ACLs func DeleteACLs(nbClient libovsdbclient.Client, acls ...*nbdb.ACL) error { opModels := make([]operationModel, 0, len(acls)) diff --git a/go-controller/pkg/libovsdbops/loadbalancer.go b/go-controller/pkg/libovsdbops/loadbalancer.go index 68dc7dceb6..d5a88c6845 100644 --- a/go-controller/pkg/libovsdbops/loadbalancer.go +++ b/go-controller/pkg/libovsdbops/loadbalancer.go @@ -58,6 +58,25 @@ func BuildLoadBalancer(name string, protocol nbdb.LoadBalancerProtocol, selectio } } +// CreateLoadBalancersOps creates the provided load balancers returning the +// corresponding ops +func CreateLoadBalancersOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, lbs ...*nbdb.LoadBalancer) ([]libovsdb.Operation, error) { + opModels := make([]operationModel, 0, len(lbs)) + for i := range lbs { + lb := lbs[i] + opModel := operationModel{ + Model: lb, + OnModelUpdates: onModelUpdatesNone(), + ErrNotFound: false, + BulkOp: false, + } + opModels = append(opModels, opModel) + } + + modelClient := newModelClient(nbClient) + return modelClient.CreateOrUpdateOps(ops, opModels...) +} + // CreateOrUpdateLoadBalancersOps creates or updates the provided load balancers // returning the corresponding ops func CreateOrUpdateLoadBalancersOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, lbs ...*nbdb.LoadBalancer) ([]libovsdb.Operation, error) { diff --git a/go-controller/pkg/libovsdbops/model.go b/go-controller/pkg/libovsdbops/model.go index a2c7585064..35831ce5ba 100644 --- a/go-controller/pkg/libovsdbops/model.go +++ b/go-controller/pkg/libovsdbops/model.go @@ -342,12 +342,13 @@ func onModels(models interface{}, do func(interface{}) error) error { // When no specific operation is required for the provided model, returns an empty // array for convenience. func buildFailOnDuplicateOps(c client.Client, m model.Model) ([]ovsdb.Operation, error) { - // Right now we only consider models with a "Name" field that is not an + // Right now we mostly consider models with a "Name" field that is not an // index for which we don't expect duplicate names. // A duplicate Name field that is an index will fail without the // need of this wait operation. - // Models that require a complex condition to detect duplicates are not - // considered for the time being due to the performance hit (i.e ACLs). + // Some models that require a complex condition to detect duplicates are not + // considered for the time being due to the performance hit (e.g ACLs). + timeout := types.OVSDBWaitTimeout var field interface{} var value string switch t := m.(type) { @@ -360,11 +361,28 @@ func buildFailOnDuplicateOps(c client.Client, m model.Model) ([]ovsdb.Operation, case *nbdb.LogicalSwitch: field = &t.Name value = t.Name + case *nbdb.LogicalRouterPolicy: + condPriority := model.Condition{ + Field: &t.Priority, + Function: ovsdb.ConditionEqual, + Value: t.Priority, + } + condMatch := model.Condition{ + Field: &t.Match, + Function: ovsdb.ConditionEqual, + Value: t.Match, + } + return c.WhereAll(t, condPriority, condMatch).Wait( + ovsdb.WaitConditionNotEqual, + &timeout, + t, + &t.Priority, + &t.Match, + ) default: return []ovsdb.Operation{}, nil } - timeout := types.OVSDBWaitTimeout cond := model.Condition{ Field: field, Function: ovsdb.ConditionEqual, diff --git a/go-controller/pkg/libovsdbops/model_client.go b/go-controller/pkg/libovsdbops/model_client.go index 4d0ae65847..e176940bba 100644 --- a/go-controller/pkg/libovsdbops/model_client.go +++ b/go-controller/pkg/libovsdbops/model_client.go @@ -195,6 +195,8 @@ func (m *modelClient) CreateOrUpdateOps(ops []ovsdb.Operation, opModels ...opera } func (m *modelClient) createOrUpdateOps(ops []ovsdb.Operation, opModels ...operationModel) (interface{}, []ovsdb.Operation, error) { + hasGuardOp := len(ops) > 0 && isGuardOp(&ops[0]) + guardOp := []ovsdb.Operation{} doWhenFound := func(model interface{}, opModel *operationModel) ([]ovsdb.Operation, error) { if opModel.OnModelUpdates != nil { return m.update(model, opModel) @@ -204,9 +206,25 @@ func (m *modelClient) createOrUpdateOps(ops []ovsdb.Operation, opModels ...opera return nil, nil } doWhenNotFound := func(model interface{}, opModel *operationModel) ([]ovsdb.Operation, error) { + if !hasGuardOp { + // for the first insert of certain models, build a wait operation + // that checks for duplicates as a guard op to prevent against + // duplicate transactions + var err error + guardOp, err = buildFailOnDuplicateOps(m.client, opModel.Model) + if err != nil { + return nil, err + } + hasGuardOp = len(guardOp) > 0 + } return m.create(opModel) } - return m.buildOps(ops, doWhenFound, doWhenNotFound, opModels...) + created, ops, err := m.buildOps(ops, doWhenFound, doWhenNotFound, opModels...) + if len(guardOp) > 0 { + // set the guard op as the first of the list + ops = append(guardOp, ops...) + } + return created, ops, err } /* @@ -318,17 +336,11 @@ func (m *modelClient) create(opModel *operationModel) ([]ovsdb.Operation, error) setUUID(opModel.Model, buildNamedUUID()) } - ops, err := buildFailOnDuplicateOps(m.client, opModel.Model) + ops, err := m.client.Create(opModel.Model) if err != nil { return nil, fmt.Errorf("unable to create model, err: %v", err) } - op, err := m.client.Create(opModel.Model) - if err != nil { - return nil, fmt.Errorf("unable to create model, err: %v", err) - } - ops = append(ops, op...) - klog.V(5).Infof("Create operations generated as: %+v", ops) return ops, nil } @@ -468,3 +480,7 @@ func addToExistingResult(model interface{}, existingResult interface{}) error { return nil } + +func isGuardOp(op *ovsdb.Operation) bool { + return op != nil && op.Op == ovsdb.OperationWait && op.Timeout != nil && *op.Timeout == types.OVSDBWaitTimeout +} diff --git a/go-controller/pkg/libovsdbops/model_client_test.go b/go-controller/pkg/libovsdbops/model_client_test.go index 5127ca9992..4393754740 100644 --- a/go-controller/pkg/libovsdbops/model_client_test.go +++ b/go-controller/pkg/libovsdbops/model_client_test.go @@ -28,13 +28,15 @@ var ( ) type OperationModelTestCase struct { - name string - op string - generateOp func() []operationModel - initialDB []libovsdbtest.TestData - expectedDB []libovsdbtest.TestData - expectedRes [][]libovsdbtest.TestData - expectedErr error + name string + op string + generateOp func() []operationModel + interleaveOp bool + initialDB []libovsdbtest.TestData + expectedDB []libovsdbtest.TestData + expectedRes [][]libovsdbtest.TestData + expectedOpsErr error + expectedTxnErr bool } func runTestCase(t *testing.T, tCase OperationModelTestCase) error { @@ -52,19 +54,32 @@ func runTestCase(t *testing.T, tCase OperationModelTestCase) error { opModels := tCase.generateOp() + ops := []ovsdb.Operation{} switch tCase.op { case "Lookup": err = modelClient.Lookup(opModels...) case "CreateOrUpdate": - _, err = modelClient.CreateOrUpdate(opModels...) + ops, err = modelClient.CreateOrUpdateOps(ops, opModels...) case "Delete": - err = modelClient.Delete(opModels...) + ops, err = modelClient.DeleteOps(ops, opModels...) default: return fmt.Errorf("test \"%s\": unknown op %s", tCase.name, tCase.op) } - if err != tCase.expectedErr { - return fmt.Errorf("test \"%s\": unexpected error generating %s operations, got %v, expected %v", tCase.name, tCase.op, err, tCase.expectedErr) + if err != tCase.expectedOpsErr { + return fmt.Errorf("test \"%s\": unexpected error generating %s operations, got %v, expected %v", tCase.name, tCase.op, err, tCase.expectedOpsErr) + } + + if tCase.interleaveOp { + _, err = modelClient.CreateOrUpdate(opModels...) + if err != nil { + return fmt.Errorf("test \"%s\": unexpected error executing interleave operations: %v", tCase.name, err) + } + } + + _, err = TransactAndCheck(nbClient, ops) + if err != nil && !tCase.expectedTxnErr { + return fmt.Errorf("test \"%s\": unexpected error transacting operations: %v", tCase.name, err) } var matcher types.GomegaMatcher @@ -1152,7 +1167,7 @@ func TestLookup(t *testing.T) { Addresses: []string{adressSetTestAdress}, }, }, - expectedErr: client.ErrNotFound, + expectedOpsErr: client.ErrNotFound, }, { name: "Test lookup by index not found no error", @@ -1257,7 +1272,7 @@ func TestLookup(t *testing.T) { Addresses: []string{adressSetTestAdress}, }, }, - expectedErr: client.ErrNotFound, + expectedOpsErr: client.ErrNotFound, }, { name: "Test lookup by predicate not found no error", @@ -1303,7 +1318,7 @@ func TestLookup(t *testing.T) { Addresses: []string{adressSetTestAdress}, }, }, - expectedErr: client.ErrNotFound, + expectedOpsErr: client.ErrNotFound, }, { name: "Test lookup by predicate bulk op multiple results", @@ -1367,7 +1382,7 @@ func TestLookup(t *testing.T) { Addresses: []string{adressSetTestAdress + "-2"}, }, }, - expectedErr: errMultipleResults, + expectedOpsErr: errMultipleResults, }, } @@ -1490,3 +1505,37 @@ func TestBuildMutationsFromFields(t *testing.T) { } } } + +func TestWaitForDuplicates(t *testing.T) { + tt := []OperationModelTestCase{ + { + name: "Test non-root model transaction fails when duplicate", + op: "CreateOrUpdate", + generateOp: func() []operationModel { + return []operationModel{ + { + Model: &nbdb.LogicalSwitch{ + Name: logicalSwitchTestName, + }, + }, + } + }, + interleaveOp: true, + initialDB: []libovsdbtest.TestData{}, + expectedDB: []libovsdbtest.TestData{ + &nbdb.LogicalSwitch{ + UUID: logicalSwitchTestUUID, + Name: logicalSwitchTestName, + }, + }, + expectedTxnErr: true, + }, + } + + for _, tCase := range tt { + if err := runTestCase(t, tCase); err != nil { + t.Fatal(err) + } + } + +} diff --git a/go-controller/pkg/metrics/metrics.go b/go-controller/pkg/metrics/metrics.go index a940d2cd42..4eab2e5c8a 100644 --- a/go-controller/pkg/metrics/metrics.go +++ b/go-controller/pkg/metrics/metrics.go @@ -9,6 +9,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" @@ -146,31 +147,38 @@ func getCoverageShowOutputMap(component string) (map[string]string, error) { // coverageShowMetricsUpdater updates the metric // by obtaining values from getCoverageShowOutputMap for specified component. -func coverageShowMetricsUpdater(component string) { - for range time.Tick(metricsUpdateInterval) { - coverageShowOutputMap, err := getCoverageShowOutputMap(component) - if err != nil { - klog.Errorf("%s", err.Error()) - continue - } - coverageShowMetricsMap := componentCoverageShowMetricsMap[component] - for metricName, metricInfo := range coverageShowMetricsMap { - var metricValue float64 - if metricInfo.srcName != "" { - metricName = metricInfo.srcName +func coverageShowMetricsUpdater(component string, stopChan <-chan struct{}) { + ticker := time.NewTicker(metricsUpdateInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + coverageShowOutputMap, err := getCoverageShowOutputMap(component) + if err != nil { + klog.Errorf("Getting coverage/show metrics for %s failed: %s", component, err.Error()) + continue } - if metricInfo.aggregateFrom != nil { - for _, aggregateMetricName := range metricInfo.aggregateFrom { - if value, ok := coverageShowOutputMap[aggregateMetricName]; ok { - metricValue += parseMetricToFloat(component, aggregateMetricName, value) - } + coverageShowMetricsMap := componentCoverageShowMetricsMap[component] + for metricName, metricInfo := range coverageShowMetricsMap { + var metricValue float64 + if metricInfo.srcName != "" { + metricName = metricInfo.srcName } - } else { - if value, ok := coverageShowOutputMap[metricName]; ok { - metricValue = parseMetricToFloat(component, metricName, value) + if metricInfo.aggregateFrom != nil { + for _, aggregateMetricName := range metricInfo.aggregateFrom { + if value, ok := coverageShowOutputMap[aggregateMetricName]; ok { + metricValue += parseMetricToFloat(component, aggregateMetricName, value) + } + } + } else { + if value, ok := coverageShowOutputMap[metricName]; ok { + metricValue = parseMetricToFloat(component, metricName, value) + } } + metricInfo.metric.Set(metricValue) } - metricInfo.metric.Set(metricValue) + case <-stopChan: + return } } } @@ -304,42 +312,49 @@ func parseStopwatchShowOutput(output string) map[string]stopwatchStatistics { // stopwatchShowMetricsUpdater updates the metric by obtaining the stopwatch/show // metrics for the specified component. -func stopwatchShowMetricsUpdater(component string) { - for range time.Tick(metricsUpdateInterval) { - stopwatchShowOutputMap, err := getStopwatchShowOutputMap(component) - if err != nil { - klog.Error(err) - continue - } +func stopwatchShowMetricsUpdater(component string, stopChan <-chan struct{}) { + ticker := time.NewTicker(metricsUpdateInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + stopwatchShowOutputMap, err := getStopwatchShowOutputMap(component) + if err != nil { + klog.Errorf("Getting stopwatch/show metrics for %s failed: %s", component, err.Error()) + continue + } - if len(stopwatchShowOutputMap) == 0 { - klog.Warningf("No stopwatch/show metrics for component %s", component) - continue - } + if len(stopwatchShowOutputMap) == 0 { + klog.Warningf("No stopwatch/show metrics for component %s", component) + continue + } - stopwatchShowInterestingMetrics := componentStopwatchShowMetricsMap[component] - for metricName, metricInfo := range stopwatchShowInterestingMetrics { - var totalSamplesMetricValue, maxMetricValue, minMetricValue, percentile95thMetricValue, shortTermAvgMetricValue, longTermAvgMetricValue float64 + stopwatchShowInterestingMetrics := componentStopwatchShowMetricsMap[component] + for metricName, metricInfo := range stopwatchShowInterestingMetrics { + var totalSamplesMetricValue, maxMetricValue, minMetricValue, percentile95thMetricValue, shortTermAvgMetricValue, longTermAvgMetricValue float64 - if metricInfo.srcName != "" { - metricName = metricInfo.srcName - } + if metricInfo.srcName != "" { + metricName = metricInfo.srcName + } - if value, ok := stopwatchShowOutputMap[metricName]; ok { - totalSamplesMetricValue = parseMetricToFloat(component, metricName, value.totalSamples) - minMetricValue = parseMetricToFloat(component, metricName, value.min) - maxMetricValue = parseMetricToFloat(component, metricName, value.max) - percentile95thMetricValue = parseMetricToFloat(component, metricName, value.percentile95th) - shortTermAvgMetricValue = parseMetricToFloat(component, metricName, value.shortTermAvg) - longTermAvgMetricValue = parseMetricToFloat(component, metricName, value.longTermAvg) - } + if value, ok := stopwatchShowOutputMap[metricName]; ok { + totalSamplesMetricValue = parseMetricToFloat(component, metricName, value.totalSamples) + minMetricValue = parseMetricToFloat(component, metricName, value.min) + maxMetricValue = parseMetricToFloat(component, metricName, value.max) + percentile95thMetricValue = parseMetricToFloat(component, metricName, value.percentile95th) + shortTermAvgMetricValue = parseMetricToFloat(component, metricName, value.shortTermAvg) + longTermAvgMetricValue = parseMetricToFloat(component, metricName, value.longTermAvg) + } - metricInfo.metrics.totalSamples.Set(totalSamplesMetricValue) - metricInfo.metrics.min.Set(minMetricValue) - metricInfo.metrics.max.Set(maxMetricValue) - metricInfo.metrics.percentile95th.Set(percentile95thMetricValue) - metricInfo.metrics.shortTermAvg.Set(shortTermAvgMetricValue) - metricInfo.metrics.longTermAvg.Set(longTermAvgMetricValue) + metricInfo.metrics.totalSamples.Set(totalSamplesMetricValue) + metricInfo.metrics.min.Set(minMetricValue) + metricInfo.metrics.max.Set(maxMetricValue) + metricInfo.metrics.percentile95th.Set(percentile95thMetricValue) + metricInfo.metrics.shortTermAvg.Set(shortTermAvgMetricValue) + metricInfo.metrics.longTermAvg.Set(longTermAvgMetricValue) + } + case <-stopChan: + return } } } @@ -371,7 +386,7 @@ func checkPodRunsOnGivenNode(clientset kubernetes.Interface, labels []string, k8 // using the cyrpto/tls module's GetCertificate() callback function helps in picking up // the latest certificate (due to cert rotation on cert expiry) -func listenAndServeTLS(addr, certFile, privKeyFile string, handler http.Handler) error { +func getTLSServer(addr, certFile, privKeyFile string, handler http.Handler) *http.Server { tlsConfig := &tls.Config{ GetCertificate: func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { cert, err := tls.LoadX509KeyPair(certFile, privKeyFile) @@ -386,12 +401,13 @@ func listenAndServeTLS(addr, certFile, privKeyFile string, handler http.Handler) Handler: handler, TLSConfig: tlsConfig, } - return server.ListenAndServeTLS("", "") + return server } -// StartMetricsServerTLS runs the prometheus listener so that OVN K8s metrics can be collected +// StartMetricsServer runs the prometheus listener so that OVN K8s metrics can be collected // It puts the endpoint behind TLS if certFile and keyFile are defined. -func StartMetricsServer(bindAddress string, enablePprof bool, certFile string, keyFile string) { +func StartMetricsServer(bindAddress string, enablePprof bool, certFile string, keyFile string, + stopChan <-chan struct{}, wg *sync.WaitGroup) { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) @@ -402,44 +418,82 @@ func StartMetricsServer(bindAddress string, enablePprof bool, certFile string, k mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) } - - go utilwait.Until(func() { - var err error - if certFile != "" && keyFile != "" { - err = listenAndServeTLS(bindAddress, certFile, keyFile, mux) - } else { - err = http.ListenAndServe(bindAddress, mux) - } - if err != nil { - utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err)) + wg.Add(1) + + go func() { + defer wg.Done() + var server *http.Server + go utilwait.Until(func() { + var err error + if certFile != "" && keyFile != "" { + server = getTLSServer(bindAddress, certFile, keyFile, mux) + err = server.ListenAndServeTLS("", "") + } else { + server = &http.Server{ + Addr: bindAddress, + Handler: mux, + } + err = server.ListenAndServe() + } + if err != nil && err != http.ErrServerClosed { + utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err)) + } + }, 5*time.Second, stopChan) + + <-stopChan + klog.Infof("Stopping metrics server %s", server.Addr) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(shutdownCtx); err != nil { + klog.Errorf("Error stopping metrics server: %v", err) } - }, 5*time.Second, utilwait.NeverStop) + }() } var ovnRegistry = prometheus.NewRegistry() // StartOVNMetricsServer runs the prometheus listener so that OVN metrics can be collected -func StartOVNMetricsServer(bindAddress, certFile, keyFile string) { +func StartOVNMetricsServer(bindAddress, certFile, keyFile string, + stopChan <-chan struct{}, wg *sync.WaitGroup) { handler := promhttp.InstrumentMetricHandler(ovnRegistry, promhttp.HandlerFor(ovnRegistry, promhttp.HandlerOpts{})) mux := http.NewServeMux() mux.Handle("/metrics", handler) - go utilwait.Until(func() { - var err error - if certFile != "" && keyFile != "" { - err = listenAndServeTLS(bindAddress, certFile, keyFile, mux) - } else { - err = http.ListenAndServe(bindAddress, mux) - } - if err != nil { - utilruntime.HandleError(fmt.Errorf("starting OVN metrics server failed: %v", err)) + var server *http.Server + wg.Add(1) + + go func() { + defer wg.Done() + go utilwait.Until(func() { + var err error + if certFile != "" && keyFile != "" { + server = getTLSServer(bindAddress, certFile, keyFile, mux) + err = server.ListenAndServeTLS("", "") + } else { + server = &http.Server{ + Addr: bindAddress, + Handler: mux, + } + err = server.ListenAndServe() + } + if err != nil && err != http.ErrServerClosed { + utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err)) + } + }, 5*time.Second, stopChan) + + <-stopChan + klog.Infof("Stopping OVN metrics server %s", server.Addr) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(shutdownCtx); err != nil { + klog.Errorf("Error stopping OVN metrics server: %v", err) } - }, 5*time.Second, utilwait.NeverStop) + }() } -func RegisterOvnMetrics(clientset kubernetes.Interface, k8sNodeName string) { - go RegisterOvnDBMetrics(clientset, k8sNodeName) - go RegisterOvnControllerMetrics() - go RegisterOvnNorthdMetrics(clientset, k8sNodeName) +func RegisterOvnMetrics(clientset kubernetes.Interface, k8sNodeName string, stopChan <-chan struct{}) { + go RegisterOvnDBMetrics(clientset, k8sNodeName, stopChan) + go RegisterOvnControllerMetrics(stopChan) + go RegisterOvnNorthdMetrics(clientset, k8sNodeName, stopChan) } diff --git a/go-controller/pkg/metrics/ovn.go b/go-controller/pkg/metrics/ovn.go index d1cc2b1ee6..066c63c87c 100644 --- a/go-controller/pkg/metrics/ovn.go +++ b/go-controller/pkg/metrics/ovn.go @@ -304,13 +304,18 @@ func setOvnControllerConfigurationMetrics() (err error) { return nil } -func ovnControllerConfigurationMetricsUpdater() { +func ovnControllerConfigurationMetricsUpdater(stopChan <-chan struct{}) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() for { - err := setOvnControllerConfigurationMetrics() - if err != nil { - klog.Errorf("%s", err.Error()) + select { + case <-ticker.C: + if err := setOvnControllerConfigurationMetrics(); err != nil { + klog.Errorf("Setting ovn controller config metrics failed: %s", err.Error()) + } + case <-stopChan: + return } - time.Sleep(30 * time.Second) } } @@ -337,7 +342,7 @@ func getPortCount(portType string) float64 { return portCount } -func RegisterOvnControllerMetrics() { +func RegisterOvnControllerMetrics(stopChan <-chan struct{}) { getOvnControllerVersionInfo() ovnRegistry.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ @@ -416,9 +421,9 @@ func RegisterOvnControllerMetrics() { registerStopwatchShowMetrics(ovnController, MetricOvnNamespace, MetricOvnSubsystemController) // ovn-controller configuration metrics updater - go ovnControllerConfigurationMetricsUpdater() + go ovnControllerConfigurationMetricsUpdater(stopChan) // ovn-controller coverage show metrics updater - go coverageShowMetricsUpdater(ovnController) + go coverageShowMetricsUpdater(ovnController, stopChan) // ovn-controller stopwatch show metrics updater - go stopwatchShowMetricsUpdater(ovnController) + go stopwatchShowMetricsUpdater(ovnController, stopChan) } diff --git a/go-controller/pkg/metrics/ovn_db.go b/go-controller/pkg/metrics/ovn_db.go index 528489bea6..a9b5be7df4 100644 --- a/go-controller/pkg/metrics/ovn_db.go +++ b/go-controller/pkg/metrics/ovn_db.go @@ -322,6 +322,7 @@ func getNBDBSockPath() (string, error) { paths := []string{"/var/run/openvswitch/", "/var/run/ovn/"} for _, basePath := range paths { if _, err := os.Stat(basePath + "ovnnb_db.sock"); err == nil { + klog.Infof("ovnnb_db.sock found at %s", basePath) return basePath, nil } else { klog.Infof("%sovnnb_db.sock getting info failed: %s", basePath, err) @@ -356,7 +357,7 @@ func getOvnDbVersionInfo() { } } -func RegisterOvnDBMetrics(clientset kubernetes.Interface, k8sNodeName string) { +func RegisterOvnDBMetrics(clientset kubernetes.Interface, k8sNodeName string, stopChan <-chan struct{}) { err := wait.PollImmediate(1*time.Second, 300*time.Second, func() (bool, error) { return checkPodRunsOnGivenNode(clientset, []string{"ovn-db-pod=true"}, k8sNodeName, false) }) @@ -419,6 +420,7 @@ func RegisterOvnDBMetrics(clientset kubernetes.Interface, k8sNodeName string) { klog.Info("Found db is standalone, don't register db_cluster metrics") } if dbIsClustered { + klog.Info("Found db is clustered, register db_cluster metrics") ovnRegistry.MustRegister(metricDBClusterCID) ovnRegistry.MustRegister(metricDBClusterSID) ovnRegistry.MustRegister(metricDBClusterServerStatus) @@ -446,25 +448,31 @@ func RegisterOvnDBMetrics(clientset kubernetes.Interface, k8sNodeName string) { // functions responsible for collecting the values and updating the prometheus metrics go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() for { - for _, dbProperty := range dbProperties { + select { + case <-ticker.C: + // To update not only values but also labels for metrics, we use Reset() to delete previous labels+value if dbIsClustered { - ovnDBClusterStatusMetricsUpdater(dbProperty) + resetOvnDbClusterMetrics() } if dbFoundViaPath { - ovnDBSizeMetricsUpdater(dbProperty) + resetOvnDbSizeMetric() } - ovnDBMemoryMetricsUpdater(dbProperty) - } - time.Sleep(30 * time.Second) - // To update not only values but also labels for metrics, we use Reset() to delete previous labels+value - if dbIsClustered { - resetOvnDbClusterMetrics() - } - if dbFoundViaPath { - resetOvnDbSizeMetric() + resetOvnDbMemoryMetrics() + for _, dbProperty := range dbProperties { + if dbIsClustered { + ovnDBClusterStatusMetricsUpdater(dbProperty) + } + if dbFoundViaPath { + ovnDBSizeMetricsUpdater(dbProperty) + } + ovnDBMemoryMetricsUpdater(dbProperty) + } + case <-stopChan: + return } - resetOvnDbMemoryMetrics() } }() } diff --git a/go-controller/pkg/metrics/ovn_northd.go b/go-controller/pkg/metrics/ovn_northd.go index 75bf7b7882..1a8e850c71 100644 --- a/go-controller/pkg/metrics/ovn_northd.go +++ b/go-controller/pkg/metrics/ovn_northd.go @@ -89,7 +89,7 @@ var ovnNorthdStopwatchShowMetricsMap = map[string]*stopwatchMetricDetails{ "ovnsb_db_run": {}, } -func RegisterOvnNorthdMetrics(clientset kubernetes.Interface, k8sNodeName string) { +func RegisterOvnNorthdMetrics(clientset kubernetes.Interface, k8sNodeName string, stopChan <-chan struct{}) { err := wait.PollImmediate(1*time.Second, 300*time.Second, func() (bool, error) { return checkPodRunsOnGivenNode(clientset, []string{"app=ovnkube-master", "name=ovnkube-master"}, k8sNodeName, true) }) @@ -147,10 +147,10 @@ func RegisterOvnNorthdMetrics(clientset kubernetes.Interface, k8sNodeName string // Register the ovn-northd coverage/show metrics with prometheus componentCoverageShowMetricsMap[ovnNorthd] = ovnNorthdCoverageShowMetricsMap registerCoverageShowMetrics(ovnNorthd, MetricOvnNamespace, MetricOvnSubsystemNorthd) - go coverageShowMetricsUpdater(ovnNorthd) + go coverageShowMetricsUpdater(ovnNorthd, stopChan) // Register the ovn-northd stopwatch/show metrics with prometheus componentStopwatchShowMetricsMap[ovnNorthd] = ovnNorthdStopwatchShowMetricsMap registerStopwatchShowMetrics(ovnNorthd, MetricOvnNamespace, MetricOvnSubsystemNorthd) - go stopwatchShowMetricsUpdater(ovnNorthd) + go stopwatchShowMetricsUpdater(ovnNorthd, stopChan) } diff --git a/go-controller/pkg/metrics/ovs.go b/go-controller/pkg/metrics/ovs.go index 8447e301c2..4b583556ec 100644 --- a/go-controller/pkg/metrics/ovs.go +++ b/go-controller/pkg/metrics/ovs.go @@ -344,13 +344,13 @@ func getOvsDatapaths() (datapathsList []string, err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("recovering from a panic while parsing the "+ - "ovs-dpctl dump-dps output : %v", r) + "ovs-appctl dpctl/dump-dps output : %v", r) } }() - stdout, stderr, err = util.RunOVSDpctl("dump-dps") + stdout, stderr, err = util.RunOVSAppctl("dpctl/dump-dps") if err != nil { - return nil, fmt.Errorf("failed to get output of ovs-dpctl dump-dps "+ + return nil, fmt.Errorf("failed to get output of ovs-appctl dpctl/dump-dps "+ "stderr(%s) :(%v)", stderr, err) } for _, kvPair := range strings.Split(stdout, "\n") { @@ -374,13 +374,13 @@ func setOvsDatapathMetrics(datapaths []string) (err error) { defer func() { if r := recover(); r != nil { - err = fmt.Errorf("recovering from a panic while parsing the ovs-dpctl "+ + err = fmt.Errorf("recovering from a panic while parsing the ovs-appctl dpctl/"+ "show %s output : %v", datapathName, r) } }() for _, datapathName = range datapaths { - stdout, stderr, err = util.RunOVSDpctl("show", datapathName) + stdout, stderr, err = util.RunOVSAppctl("dpctl/show", datapathName) if err != nil { return fmt.Errorf("failed to get datapath stats for %s "+ "stderr(%s) :(%v)", datapathName, stderr, err) @@ -411,18 +411,22 @@ func setOvsDatapathMetrics(datapaths []string) (err error) { } // ovsDatapathMetricsUpdate updates the ovs datapath metrics for every 30 sec -func ovsDatapathMetricsUpdate() { +func ovsDatapathMetricsUpdate(stopChan <-chan struct{}) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() for { - time.Sleep(30 * time.Second) - datapaths, err := getOvsDatapaths() - if err != nil { - klog.Errorf("%s", err.Error()) - continue - } - - err = setOvsDatapathMetrics(datapaths) - if err != nil { - klog.Errorf("%s", err.Error()) + select { + case <-ticker.C: + datapaths, err := getOvsDatapaths() + if err != nil { + klog.Errorf("Getting ovs datapath list failed: %s", err.Error()) + continue + } + if err = setOvsDatapathMetrics(datapaths); err != nil { + klog.Errorf("Setting ovs datapath metrics failed: %s", err.Error()) + } + case <-stopChan: + return } } } @@ -536,37 +540,41 @@ func getOvsBridgeInfo() (bridgePortCount map[string]float64, portToBridgeMap map // ovsBridgeMetricsUpdate updates bridgeMetrics & // ovsInterface metrics & geneveInterface metrics for every 30sec -func ovsBridgeMetricsUpdate() { +func ovsBridgeMetricsUpdate(stopChan <-chan struct{}) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() for { - time.Sleep(30 * time.Second) - // set geneve interface metrics - err := geneveInterfaceMetricsUpdate() - if err != nil { - klog.Errorf("%s", err.Error()) - } - // update ovs bridge metrics - bridgePortCountMapping, portBridgeMapping, err := getOvsBridgeInfo() - if err != nil { - klog.Errorf("%s", err.Error()) - continue - } - for brName, nPorts := range bridgePortCountMapping { - metricOvsBridge.WithLabelValues(brName).Set(1) - metricOvsBridgePortsTotal.WithLabelValues(brName).Set(nPorts) - flowsCount := getOvsBridgeOpenFlowsCount(brName) - metricOvsBridgeFlowsTotal.WithLabelValues(brName).Set(flowsCount) - } - metricOvsBridgeTotal.Set(float64(len(bridgePortCountMapping))) + select { + case <-ticker.C: + // set geneve interface metrics + if err := geneveInterfaceMetricsUpdate(); err != nil { + klog.Errorf("Updating geneve interface metrics failed: %s", err.Error()) + } + // update ovs bridge metrics + bridgePortCountMapping, portBridgeMapping, err := getOvsBridgeInfo() + if err != nil { + klog.Errorf("Getting ovs bridge info failed: %s", err.Error()) + continue + } + for brName, nPorts := range bridgePortCountMapping { + metricOvsBridge.WithLabelValues(brName).Set(1) + metricOvsBridgePortsTotal.WithLabelValues(brName).Set(nPorts) + flowsCount := getOvsBridgeOpenFlowsCount(brName) + metricOvsBridgeFlowsTotal.WithLabelValues(brName).Set(flowsCount) + } + metricOvsBridgeTotal.Set(float64(len(bridgePortCountMapping))) - interfaceToPortToBridgeMap, err := getInterfaceToPortToBridgeMapping(portBridgeMapping) - if err != nil { - klog.Errorf("%s", err.Error()) - continue - } - // set ovs interface metrics. - err = ovsInterfaceMetricsUpdate(interfaceToPortToBridgeMap) - if err != nil { - klog.Errorf("%s", err.Error()) + interfaceToPortToBridgeMap, err := getInterfaceToPortToBridgeMapping(portBridgeMapping) + if err != nil { + klog.Errorf("Getting interface to port bridge mapping failed: %s", err.Error()) + continue + } + // set ovs interface metrics. + if err = ovsInterfaceMetricsUpdate(interfaceToPortToBridgeMap); err != nil { + klog.Errorf("Updating ovs interface metrics failed: %s", err.Error()) + } + case <-stopChan: + return } } } @@ -866,13 +874,18 @@ func setOvsMemoryMetrics() (err error) { return nil } -func ovsMemoryMetricsUpdate() { +func ovsMemoryMetricsUpdate(stopChan <-chan struct{}) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() for { - err := setOvsMemoryMetrics() - if err != nil { - klog.Errorf("%s", err.Error()) + select { + case <-ticker.C: + if err := setOvsMemoryMetrics(); err != nil { + klog.Errorf("Setting ovs memory metrics failed: %s", err.Error()) + } + case <-stopChan: + return } - time.Sleep(30 * time.Second) } } @@ -919,13 +932,18 @@ func setOvsHwOffloadMetrics() (err error) { return nil } -func ovsHwOffloadMetricsUpdate() { +func ovsHwOffloadMetricsUpdate(stopChan <-chan struct{}) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() for { - err := setOvsHwOffloadMetrics() - if err != nil { - klog.Errorf("%s", err.Error()) + select { + case <-ticker.C: + if err := setOvsHwOffloadMetrics(); err != nil { + klog.Errorf("Setting ovs hardware offload metrics failed: %s", err.Error()) + } + case <-stopChan: + return } - time.Sleep(30 * time.Second) } } @@ -1184,15 +1202,15 @@ var ovsVswitchdCoverageShowMetricsMap = map[string]*metricDetails{ } var registerOvsMetricsOnce sync.Once -func RegisterStandaloneOvsMetrics() { - registerOvsMetrics(prometheus.DefaultRegisterer) +func RegisterStandaloneOvsMetrics(stopChan <-chan struct{}) { + registerOvsMetrics(prometheus.DefaultRegisterer, stopChan) } -func RegisterOvsMetricsWithOvnMetrics() { - registerOvsMetrics(ovnRegistry) +func RegisterOvsMetricsWithOvnMetrics(stopChan <-chan struct{}) { + registerOvsMetrics(ovnRegistry, stopChan) } -func registerOvsMetrics(registry prometheus.Registerer) { +func registerOvsMetrics(registry prometheus.Registerer, stopChan <-chan struct{}) { registerOvsMetricsOnce.Do(func() { getOvsVersionInfo() registry.MustRegister(prometheus.NewGaugeFunc( @@ -1250,14 +1268,14 @@ func registerOvsMetrics(registry prometheus.Registerer) { })) // OVS datapath metrics updater - go ovsDatapathMetricsUpdate() + go ovsDatapathMetricsUpdate(stopChan) // OVS bridge metrics updater - go ovsBridgeMetricsUpdate() + go ovsBridgeMetricsUpdate(stopChan) // OVS memory metrics updater - go ovsMemoryMetricsUpdate() + go ovsMemoryMetricsUpdate(stopChan) // OVS hw Offload metrics updater - go ovsHwOffloadMetricsUpdate() + go ovsHwOffloadMetricsUpdate(stopChan) // OVS coverage/show metrics updater. - go coverageShowMetricsUpdater(ovsVswitchd) + go coverageShowMetricsUpdater(ovsVswitchd, stopChan) }) } diff --git a/go-controller/pkg/node/healthcheck.go b/go-controller/pkg/node/healthcheck.go index 5335bf95a9..796e63c858 100644 --- a/go-controller/pkg/node/healthcheck.go +++ b/go-controller/pkg/node/healthcheck.go @@ -334,9 +334,12 @@ func (c *openflowManager) Run(stopChan <-chan struct{}, doneWg *sync.WaitGroup) doneWg.Add(1) go func() { defer doneWg.Done() + syncPeriod := 15 * time.Second + timer := time.NewTicker(syncPeriod) + defer timer.Stop() for { select { - case <-time.After(15 * time.Second): + case <-timer.C: if err := checkPorts(c.defaultBridge.patchPort, c.defaultBridge.ofPortPatch, c.defaultBridge.uplinkName, c.defaultBridge.ofPortPhys); err != nil { klog.Errorf("Checkports failed %v", err) @@ -353,6 +356,7 @@ func (c *openflowManager) Run(stopChan <-chan struct{}, doneWg *sync.WaitGroup) c.syncFlows() case <-c.flowChan: c.syncFlows() + timer.Reset(syncPeriod) case <-stopChan: return } diff --git a/go-controller/pkg/node/node.go b/go-controller/pkg/node/node.go index 5c976368e4..32185bee07 100644 --- a/go-controller/pkg/node/node.go +++ b/go-controller/pkg/node/node.go @@ -210,7 +210,7 @@ func setupOVNNode(node *kapi.Node) error { if config.Default.LFlowCacheLimitKb > 0 { setExternalIdsCmd = append(setExternalIdsCmd, - fmt.Sprintf("external_ids:ovn-limit-lflow-cache-kb=%d", config.Default.LFlowCacheLimitKb), + fmt.Sprintf("external_ids:ovn-memlimit-lflow-cache-kb=%d", config.Default.LFlowCacheLimitKb), ) } diff --git a/go-controller/pkg/node/node_ip_handler_linux.go b/go-controller/pkg/node/node_ip_handler_linux.go index dd2e5db7a8..eb0854c160 100644 --- a/go-controller/pkg/node/node_ip_handler_linux.go +++ b/go-controller/pkg/node/node_ip_handler_linux.go @@ -111,6 +111,7 @@ func (c *addressManager) Run(stopChan <-chan struct{}, doneWg *sync.WaitGroup) { defer doneWg.Done() addressSyncTimer := time.NewTicker(30 * time.Second) + defer addressSyncTimer.Stop() subscribed, err := subScribeFcn() if err != nil { diff --git a/go-controller/pkg/node/node_test.go b/go-controller/pkg/node/node_test.go index 13a8adfbab..9ae75f1bc3 100644 --- a/go-controller/pkg/node/node_test.go +++ b/go-controller/pkg/node/node_test.go @@ -366,7 +366,7 @@ var _ = Describe("Node", func() { "external_ids:ovn-ofctrl-wait-before-clear=0 "+ "external_ids:ovn-enable-lflow-cache=false "+ "external_ids:ovn-limit-lflow-cache=1000 "+ - "external_ids:ovn-limit-lflow-cache-kb=100000", + "external_ids:ovn-memlimit-lflow-cache-kb=100000", nodeIP, interval, ofintval, nodeName), }) fexec.AddFakeCmd(&ovntest.ExpectedCmd{ diff --git a/go-controller/pkg/ovn/egressfirewall.go b/go-controller/pkg/ovn/egressfirewall.go index f8f0c4d390..0943e862e6 100644 --- a/go-controller/pkg/ovn/egressfirewall.go +++ b/go-controller/pkg/ovn/egressfirewall.go @@ -58,6 +58,8 @@ func cloneEgressFirewall(originalEgressfirewall *egressfirewallapi.EgressFirewal return ef } +// newEgressFirewallRule creates a new egressFirewallRule. For the logging level, it will pick either of +// aclLoggingAllow or aclLoggingDeny depending if this is an allow or deny rule. func newEgressFirewallRule(rawEgressFirewallRule egressfirewallapi.EgressFirewallRule, id int) (*egressFirewallRule, error) { efr := &egressFirewallRule{ id: id, @@ -125,14 +127,26 @@ func (oc *Controller) syncEgressFirewall(egressFirewalls []interface{}) error { } } - // update the direction of each egressFirewallACL if needed + // Update the direction of each egressFirewallACL if needed. + // Update the egressFirewallACL name if needed. + // Update logging related information if needed. for i := range egressFirewallACLs { egressFirewallACLs[i].Direction = types.DirectionToLPort - err = libovsdbops.UpdateACLsDirection(oc.nbClient, egressFirewallACLs[i]) - if err != nil { - return fmt.Errorf("unable to set ACL direction on egress firewall acls, cannot convert old ACL data err: %v", err) + if namespace, ok := egressFirewallACLs[i].ExternalIDs["egressFirewall"]; ok && namespace != "" { + aclName := buildEgressFwAclName(namespace, egressFirewallACLs[i].Priority) + log, meterName, logSeverity := oc.getLogMeterSeverity(namespace, egressFirewallACLs[i].Action) + egressFirewallACLs[i].Meter = &meterName + egressFirewallACLs[i].Name = &aclName + egressFirewallACLs[i].Log = log + egressFirewallACLs[i].Severity = &logSeverity + } else { + klog.Warningf("Could not find namespace for egress firewall ACL during refresh operation: %v", egressFirewallACLs[i]) } } + err = libovsdbops.CreateOrUpdateACLs(oc.nbClient, egressFirewallACLs...) + if err != nil { + return fmt.Errorf("unable to update ACL information (direction and logging) during resync operation, err: %v", err) + } // In any gateway mode, make sure to delete all LRPs on ovn_cluster_router. // This covers migration from LGW mode that used LRPs for EFW to using ACLs in SGW/LGW modes @@ -331,14 +345,23 @@ func (oc *Controller) createEgressFirewallRules(priority int, match, action, ext logicalSwitches = append(logicalSwitches, types.OVNJoinSwitch) } - egressFirewallACL := &nbdb.ACL{ - Priority: priority, - Direction: types.DirectionToLPort, - Match: match, - Action: action, - ExternalIDs: map[string]string{"egressFirewall": externalID}, - } - + // a name is needed for logging purposes - the name must be unique, so make it + // egressFirewall__ + aclName := buildEgressFwAclName(externalID, priority) + log, meter, severity := oc.getLogMeterSeverity(externalID, action) + + egressFirewallACL := libovsdbops.BuildACL( + aclName, + types.DirectionToLPort, + priority, + match, + action, + meter, + severity, + log, + map[string]string{"egressFirewall": externalID}, + nil, + ) ops, err := libovsdbops.CreateOrUpdateACLsOps(oc.nbClient, nil, egressFirewallACL) if err != nil { return fmt.Errorf("failed to create egressFirewall ACL %v: %v", egressFirewallACL, err) @@ -556,3 +579,80 @@ func getClusterSubnetsExclusion() string { func getEgressFirewallNamespacedName(egressFirewall *egressfirewallapi.EgressFirewall) string { return fmt.Sprintf("%v/%v", egressFirewall.Namespace, egressFirewall.Name) } + +// getLogMeterSeverity determines if logging shall be enabled for a given ACL, what the name of the meter is and the +// severity level for logging. +func (oc *Controller) getLogMeterSeverity(namespaceName, action string) (log bool, meter string, severity string) { + aclLogging := "" + // ok should always be true. However, to avoid nil pointer derefence panics here, it's + // best to add this verification and in the worst case to resort to sane default values. + if nsInfo, ok := oc.namespaces[namespaceName]; ok { + if action == "allow" || action == "allow-related" { + aclLogging = nsInfo.aclLogging.Allow + } else if action == "drop" || action == "drop-related" { + aclLogging = nsInfo.aclLogging.Deny + } + } else { + klog.Warningf("No nsInfo object found for namespace %s", namespaceName) + } + log = aclLogging != "" + severity = getACLLoggingSeverity(aclLogging) + meter = types.OvnACLLoggingMeter + + return +} + +// refreshEgressFirewallLogging updates logging related configuration for all rules of this specific firewall in OVN. +// This method can be called for example from the Namespaces Watcher methods to reload firewall rules' logging when +// namespace annotations change. +// Return values are: bool - if the egressFirewall's ACL was updated or not, error in case of errors. If a namespace +// does not contain an egress firewall ACL, then this returns false, nil instead of a NotFound error. +func (oc *Controller) refreshEgressFirewallLogging(egressFirewallNamespace string) (bool, error) { + // Retrieve the egress firewall object from cache and lock it. + obj, loaded := oc.egressFirewalls.Load(egressFirewallNamespace) + if !loaded { + return false, nil + } + + ef, ok := obj.(*egressFirewall) + if !ok { + return false, fmt.Errorf("refreshEgressFirewallLogging failed: type assertion to *egressFirewall"+ + " failed for EgressFirewall of type %T in namespace %s", + obj, ef.namespace) + } + + ef.Lock() + defer ef.Unlock() + + // Find ACLs for a given egressFirewall + p := func(item *nbdb.ACL) bool { + return item.ExternalIDs["egressFirewall"] == ef.namespace + } + egressFirewallACLs, err := libovsdbops.FindACLsWithPredicate(oc.nbClient, p) + if err != nil { + return false, fmt.Errorf("unable to list egress firewall ACLs, err: %v", err) + } + if len(egressFirewallACLs) == 0 { + klog.Warningf("No egressFirewall ACLs to update in ns: %s", ef.namespace) + return false, nil + } + + for i := range egressFirewallACLs { + // Set logging and severity + log, meterName, logSeverity := oc.getLogMeterSeverity(ef.namespace, egressFirewallACLs[i].Action) + egressFirewallACLs[i].Log = log + egressFirewallACLs[i].Severity = &logSeverity + egressFirewallACLs[i].Meter = &meterName + } + // CreateOrUpdateACLs will update all provided (non zero value) fields + err = libovsdbops.CreateOrUpdateACLs(oc.nbClient, egressFirewallACLs...) + if err != nil { + return false, fmt.Errorf("unable to update ACL logging in ns %s, err: %v", ef.namespace, err) + } + + return true, nil +} + +func buildEgressFwAclName(namespace string, priority int) string { + return fmt.Sprintf("egressFirewall_%s_%d", namespace, priority) +} diff --git a/go-controller/pkg/ovn/egressfirewall_dns.go b/go-controller/pkg/ovn/egressfirewall_dns.go index a91ee27bc3..766d0d18ab 100644 --- a/go-controller/pkg/ovn/egressfirewall_dns.go +++ b/go-controller/pkg/ovn/egressfirewall_dns.go @@ -154,19 +154,21 @@ func (e *EgressDNS) addToDNS(dnsName string) { } } -// Run spawns a goroutine that handles updates to the dns entries for dnsNames used in +// Run spawns a goroutine that handles updates to the dns entries for domain names used in // EgressFirewalls. The loop runs after receiving one of three signals: -// 1. time.After(durationTillNextQuery) times out and the dnsName with the lowest ttl is checked +// 1. time.NewTicker(durationTillNextQuery) times out and the dnsName with the lowest ttl is checked // and the durationTillNextQuery is updated // 2. e.added is received and durationTillNextQuery is recomputed // 3. e.deleted is received and coincides with dnsName func (e *EgressDNS) Run(defaultInterval time.Duration) { - var dnsName, dnsNameDeleted string + var domainNameExpiringNext, domainNameDeleted string var ttl time.Time var timeSet bool // initially the next DNS Query happens at the default interval durationTillNextQuery := defaultInterval go func() { + timer := time.NewTicker(durationTillNextQuery) + defer timer.Stop() for { // perform periodic updates on dnsNames as each ttl runs out, checking for updates at // least every defaultInterval. Update durationTillNextQuery everytime a new DNS name gets @@ -174,19 +176,20 @@ func (e *EgressDNS) Run(defaultInterval time.Duration) { select { case <-e.added: //on update need to check if the GetNextQueryTime has changed - case <-time.After(durationTillNextQuery): - if len(dnsName) > 0 { - if _, err := e.Update(dnsName); err != nil { + case <-timer.C: + if len(domainNameExpiringNext) > 0 { + if _, err := e.Update(domainNameExpiringNext); err != nil { utilruntime.HandleError(err) } - if err := e.updateEntryForName(dnsName); err != nil { + if err := e.updateEntryForName(domainNameExpiringNext); err != nil { utilruntime.HandleError(err) } } - case dnsNameDeleted = <-e.deleted: - // Break from the select and update dnsName only if dnsName - // appears in an EgressFirewall that has just been deleted. - if dnsName != dnsNameDeleted { + case domainNameDeleted = <-e.deleted: + // If domainNameExpiringNext we are waiting to update was deleted, + // recalculate durationTillNextQuery and domainNameExpiringNext. + // Otherwise, ignore this event + if domainNameExpiringNext != domainNameDeleted { continue } case <-e.stopChan: @@ -194,14 +197,15 @@ func (e *EgressDNS) Run(defaultInterval time.Duration) { case <-e.controllerStop: return } - - // before waiting on the signals get the next time this thread needs to wake up - ttl, dnsName, timeSet = e.dns.GetNextQueryTime() + // find the domain name whose DNS entry will expire first and calculate when it will expire, + // set timer to what's sooner: default update interval or next expiration time + ttl, domainNameExpiringNext, timeSet = e.dns.GetNextQueryTime() if time.Until(ttl) > defaultInterval || !timeSet { durationTillNextQuery = defaultInterval } else { durationTillNextQuery = time.Until(ttl) } + timer.Reset(durationTillNextQuery) } }() diff --git a/go-controller/pkg/ovn/egressfirewall_test.go b/go-controller/pkg/ovn/egressfirewall_test.go index 2b21c8f726..da24b59701 100644 --- a/go-controller/pkg/ovn/egressfirewall_test.go +++ b/go-controller/pkg/ovn/egressfirewall_test.go @@ -84,13 +84,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", ) purgeACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionFromLPort, t.EgressFirewallStartPriority, "", nbdb.ACLActionDrop, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "none"}, nil, @@ -113,13 +113,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", // this ACL is not in the egress firewall priority range and should be untouched otherACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority-1), t.DirectionFromLPort, t.MinimumReservedEgressFirewallPriority-1, "", nbdb.ACLActionDrop, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "default"}, nil, @@ -181,6 +181,14 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", // Direction of both ACLs will be converted to keepACL.Direction = t.DirectionToLPort + newName := buildEgressFwAclName("default", t.EgressFirewallStartPriority-1) + meter := t.OvnACLLoggingMeter + severity := defaultACLLoggingSeverity + keepACL.Name = &newName + keepACL.Direction = t.DirectionToLPort + keepACL.Meter = &meter + keepACL.Severity = &severity + keepACL.Log = false expectedDatabaseState := []libovsdb.TestData{ otherACL, @@ -233,7 +241,11 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", *egressFirewall, }, }, - &v1.NodeList{ + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ Items: []v1.Node{ { Status: v1.NodeStatus{ @@ -244,19 +256,20 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", }, }) + fakeOVN.controller.WatchNamespaces() fakeOVN.controller.WatchEgressFirewall() _, err := fakeOVN.fakeClient.EgressFirewallClient.K8sV1().EgressFirewalls(egressFirewall.Namespace).Get(context.TODO(), egressFirewall.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) ipv4ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && ip4.dst != 10.128.0.0/14", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -342,13 +355,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", gomega.Expect(err).NotTo(gomega.HaveOccurred()) ipv6ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip6.dst == 2002::1234:abcd:ffff:c0a8:101/64) && (ip4.src == $a10481622940199974102 || ip6.src == $a10481620741176717680) && ip4.dst != 10.128.0.0/14", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -442,13 +455,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", fakeOVN.controller.WatchEgressFirewall() udpACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && ((udp && ( udp.dst == 100 ))) && ip4.dst != 10.128.0.0/14", nbdb.ACLActionDrop, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -521,7 +534,11 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", *egressFirewall, }, }, - &v1.NodeList{ + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ Items: []v1.Node{ { Status: v1.NodeStatus{ @@ -538,16 +555,17 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", }, }) + fakeOVN.controller.WatchNamespaces() fakeOVN.controller.WatchEgressFirewall() ipv4ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.5/23) && ip4.src == $a10481622940199974102 && ((tcp && ( tcp.dst == 100 ))) && ip4.dst != 10.128.0.0/14", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -650,13 +668,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", fakeOVN.controller.WatchEgressFirewall() ipv4ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && ip4.dst != 10.128.0.0/14", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -761,13 +779,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", fakeOVN.controller.WatchEgressFirewall() ipv4ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.5/23) && ip4.src == $a10481622940199974102 && ((tcp && ( tcp.dst == 100 ))) && ip4.dst != 10.128.0.0/14", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -897,13 +915,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", fakeOVN.controller.WatchEgressFirewall() ipv4ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && ip4.dst != 10.128.0.0/14", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -965,6 +983,112 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for local gateway mode", gomega.Expect(err).NotTo(gomega.HaveOccurred()) }) + + ginkgo.It("correctly updates an egressfirewall's ACL logging", func() { + app.Action = func(ctx *cli.Context) error { + const ( + node1Name string = "node1" + ) + + InitialNodeSwitch := &nbdb.LogicalSwitch{ + UUID: node1Name + "-UUID", + Name: node1Name, + } + + dbSetup := libovsdbtest.TestSetup{ + NBData: []libovsdbtest.TestData{ + InitialNodeSwitch, + clusterRouter, + }, + } + + namespace1 := *newNamespace("namespace1") + egressFirewall := newEgressFirewallObject("default", namespace1.Name, []egressfirewallapi.EgressFirewallRule{ + { + Type: "Allow", + To: egressfirewallapi.EgressFirewallDestination{ + CIDRSelector: "1.2.3.4/23", + }, + }, + }) + + fakeOVN.startWithDBSetup(dbSetup, + &egressfirewallapi.EgressFirewallList{ + Items: []egressfirewallapi.EgressFirewall{ + *egressFirewall, + }, + }, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, + &v1.NodeList{ + Items: []v1.Node{ + { + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(node1Name, ""), + }, + }, + }) + + fakeOVN.controller.WatchNamespaces() + fakeOVN.controller.WatchEgressFirewall() + + ipv4ACL := libovsdbops.BuildACL( + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), + t.DirectionToLPort, + t.EgressFirewallStartPriority, + "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && ip4.dst != 10.128.0.0/14", + nbdb.ACLActionAllow, + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, + false, + map[string]string{"egressFirewall": "namespace1"}, + nil, + ) + ipv4ACL.UUID = "ipv4ACL-UUID" + + // new ACL will be added to the switch + finalNodeSwitch := &nbdb.LogicalSwitch{ + UUID: InitialNodeSwitch.UUID, + Name: InitialNodeSwitch.Name, + ACLs: []string{ipv4ACL.UUID}, + } + + // new ACL will be added to the switch + expectedDatabaseState := []libovsdb.TestData{ + ipv4ACL, + finalNodeSwitch, + clusterRouter, + } + + gomega.Expect(fakeOVN.nbClient).To(libovsdbtest.HaveData(expectedDatabaseState)) + + // get the current namespace + namespace, err := fakeOVN.fakeClient.KubeClient.CoreV1().Namespaces().Get(context.TODO(), namespace1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // enable ACL logging with severity alert, alert + logSeverity := "alert" + updatedLogSeverity := fmt.Sprintf(`{ "deny": "%s", "allow": "%s" }`, logSeverity, logSeverity) + namespace.Annotations[aclLoggingAnnotation] = updatedLogSeverity + _, err = fakeOVN.fakeClient.KubeClient.CoreV1().Namespaces().Update(context.TODO(), namespace, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // eventually, we should see the changes in the namespace reflected in the database + ipv4ACL.Log = true + ipv4ACL.Severity = &logSeverity + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveData(expectedDatabaseState)) + + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) }) }) @@ -1004,13 +1128,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", ginkgo.It("reconciles existing and non-existing egressfirewalls", func() { app.Action = func(ctx *cli.Context) error { purgeACL := libovsdbops.BuildACL( - "", + "purgeACL", t.DirectionFromLPort, t.EgressFirewallStartPriority, "", nbdb.ACLActionDrop, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "none"}, nil, @@ -1033,13 +1157,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", // this ACL is not in the egress firewall priority range and should be untouched otherACL := libovsdbops.BuildACL( - "", + "otherACL", t.DirectionFromLPort, t.MinimumReservedEgressFirewallPriority-1, "", nbdb.ACLActionDrop, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "default"}, nil, @@ -1101,6 +1225,14 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", // Direction of both ACLs will be converted to keepACL.Direction = t.DirectionToLPort + newName := buildEgressFwAclName("default", t.EgressFirewallStartPriority-1) + meter := t.OvnACLLoggingMeter + severity := defaultACLLoggingSeverity + keepACL.Name = &newName + keepACL.Direction = t.DirectionToLPort + keepACL.Meter = &meter + keepACL.Severity = &severity + keepACL.Log = false expectedDatabaseState := []libovsdb.TestData{ otherACL, @@ -1148,7 +1280,11 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", *egressFirewall, }, }, - &v1.NodeList{ + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ Items: []v1.Node{ { Status: v1.NodeStatus{ @@ -1159,19 +1295,20 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", }, }) + fakeOVN.controller.WatchNamespaces() fakeOVN.controller.WatchEgressFirewall() _, err := fakeOVN.fakeClient.EgressFirewallClient.K8sV1().EgressFirewalls(egressFirewall.Namespace).Get(context.TODO(), egressFirewall.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) ipv4ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && inport == \""+t.JoinSwitchToGWRouterPrefix+t.OVNClusterRouter+"\"", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -1252,13 +1389,13 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", gomega.Expect(err).NotTo(gomega.HaveOccurred()) ipv6ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip6.dst == 2002::1234:abcd:ffff:c0a8:101/64) && (ip4.src == $a10481622940199974102 || ip6.src == $a10481620741176717680) && inport == \""+t.JoinSwitchToGWRouterPrefix+t.OVNClusterRouter+"\"", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -1348,14 +1485,14 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", fakeOVN.controller.WatchEgressFirewall() udpACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && ((udp && ( udp.dst == 100 ))) && inport == \""+ t.JoinSwitchToGWRouterPrefix+t.OVNClusterRouter+"\"", nbdb.ACLActionDrop, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -1418,7 +1555,11 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", *egressFirewall, }, }, - &v1.NodeList{ + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ Items: []v1.Node{ { Status: v1.NodeStatus{ @@ -1429,17 +1570,18 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", }, }) + fakeOVN.controller.WatchNamespaces() fakeOVN.controller.WatchEgressFirewall() ipv4ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.5/23) && "+ "ip4.src == $a10481622940199974102 && ((tcp && ( tcp.dst == 100 ))) && inport == \""+t.JoinSwitchToGWRouterPrefix+t.OVNClusterRouter+"\"", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -1510,7 +1652,11 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", *egressFirewall, }, }, - &v1.NodeList{ + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ Items: []v1.Node{ { Status: v1.NodeStatus{ @@ -1521,16 +1667,17 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", }, }) + fakeOVN.controller.WatchNamespaces() fakeOVN.controller.WatchEgressFirewall() ipv4ACL := libovsdbops.BuildACL( - "", + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), t.DirectionToLPort, t.EgressFirewallStartPriority, "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && inport == \""+t.JoinSwitchToGWRouterPrefix+t.OVNClusterRouter+"\"", nbdb.ACLActionAllow, - "", - "", + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, false, map[string]string{"egressFirewall": "namespace1"}, nil, @@ -1568,6 +1715,105 @@ var _ = ginkgo.Describe("OVN EgressFirewall Operations for shared gateway mode", gomega.Expect(err).NotTo(gomega.HaveOccurred()) }) + + ginkgo.It("correctly updates an egressfirewall's ACL logging", func() { + app.Action = func(ctx *cli.Context) error { + initialJoinSwitch := &nbdb.LogicalSwitch{ + UUID: "join-UUID", + Name: "join", + } + + namespace1 := *newNamespace("namespace1") + egressFirewall := newEgressFirewallObject("default", namespace1.Name, []egressfirewallapi.EgressFirewallRule{ + { + Type: "Allow", + To: egressfirewallapi.EgressFirewallDestination{ + CIDRSelector: "1.2.3.4/23", + }, + }, + }) + + dbSetup := libovsdbtest.TestSetup{ + NBData: []libovsdbtest.TestData{ + initialJoinSwitch, + clusterRouter, + }, + } + fakeOVN.startWithDBSetup(dbSetup, + &egressfirewallapi.EgressFirewallList{ + Items: []egressfirewallapi.EgressFirewall{ + *egressFirewall, + }, + }, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ + Items: []v1.Node{ + { + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(node1Name, ""), + }, + }, + }) + + fakeOVN.controller.WatchNamespaces() + fakeOVN.controller.WatchEgressFirewall() + + ipv4ACL := libovsdbops.BuildACL( + buildEgressFwAclName("namespace1", t.EgressFirewallStartPriority), + t.DirectionToLPort, + t.EgressFirewallStartPriority, + "(ip4.dst == 1.2.3.4/23) && ip4.src == $a10481622940199974102 && inport == \""+t.JoinSwitchToGWRouterPrefix+t.OVNClusterRouter+"\"", + nbdb.ACLActionAllow, + t.OvnACLLoggingMeter, + defaultACLLoggingSeverity, + false, + map[string]string{"egressFirewall": "namespace1"}, + nil, + ) + ipv4ACL.UUID = "ipv4ACL-UUID" + + // new ACL will be added to the switch + finalJoinSwitch := &nbdb.LogicalSwitch{ + UUID: initialJoinSwitch.UUID, + Name: initialJoinSwitch.Name, + ACLs: []string{ipv4ACL.UUID}, + } + + expectedDatabaseState := []libovsdb.TestData{ + ipv4ACL, + finalJoinSwitch, + clusterRouter, + } + + gomega.Expect(fakeOVN.nbClient).To(libovsdbtest.HaveData(expectedDatabaseState)) + + // get the current namespace + namespace, err := fakeOVN.fakeClient.KubeClient.CoreV1().Namespaces().Get(context.TODO(), namespace1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // enable ACL logging with severity alert, alert + logSeverity := "alert" + updatedLogSeverity := fmt.Sprintf(`{ "deny": "%s", "allow": "%s" }`, logSeverity, logSeverity) + namespace.Annotations[aclLoggingAnnotation] = updatedLogSeverity + _, err = fakeOVN.fakeClient.KubeClient.CoreV1().Namespaces().Update(context.TODO(), namespace, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // eventually, we should see the changes in the namespace reflected in the database + ipv4ACL.Log = true + ipv4ACL.Severity = &logSeverity + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveData(expectedDatabaseState)) + + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) }) }) diff --git a/go-controller/pkg/ovn/egressgw_test.go b/go-controller/pkg/ovn/egressgw_test.go index c3fe5434eb..21dd92edc3 100644 --- a/go-controller/pkg/ovn/egressgw_test.go +++ b/go-controller/pkg/ovn/egressgw_test.go @@ -3,7 +3,9 @@ package ovn import ( "context" "encoding/json" + "fmt" "net" + "sync" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" @@ -2107,6 +2109,71 @@ var _ = ginkgo.Describe("OVN Egress Gateway Operations", func() { err := app.Run([]string{app.Name}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) }) + ginkgo.It("should create a single policy for concurrent addHybridRoutePolicy for the same node", func() { + app.Action = func(ctx *cli.Context) error { + config.Gateway.Mode = config.GatewayModeLocal + + fakeOvn.startWithDBSetup( + libovsdbtest.TestSetup{ + NBData: []libovsdbtest.TestData{ + &nbdb.LogicalRouterPort{ + UUID: ovntypes.GWRouterToJoinSwitchPrefix + ovntypes.GWRouterPrefix + "node1" + "-UUID", + Name: ovntypes.GWRouterToJoinSwitchPrefix + ovntypes.GWRouterPrefix + "node1", + Networks: []string{"100.64.0.4/32"}, + }, + &nbdb.LogicalRouter{ + Name: ovntypes.OVNClusterRouter, + UUID: ovntypes.OVNClusterRouter + "-UUID", + }, + }, + }, + ) + finalNB := []libovsdbtest.TestData{ + &nbdb.LogicalRouterPolicy{ + UUID: "lrp1", + Priority: types.HybridOverlayReroutePriority, + Action: nbdb.LogicalRouterPolicyActionReroute, + Nexthops: []string{"100.64.0.4"}, + Match: "inport == \"rtos-node1\" && ip4.src == $a17568862106095406051 && ip4.dst != 10.128.0.0/14", + }, + &nbdb.LogicalRouter{ + Name: ovntypes.OVNClusterRouter, + UUID: ovntypes.OVNClusterRouter + "-UUID", + Policies: []string{"lrp1"}, + }, + &nbdb.LogicalRouterPort{ + UUID: ovntypes.GWRouterToJoinSwitchPrefix + ovntypes.GWRouterPrefix + "node1" + "-UUID", + Name: ovntypes.GWRouterToJoinSwitchPrefix + ovntypes.GWRouterPrefix + "node1", + Networks: []string{"100.64.0.4/32"}, + }, + } + + wg := &sync.WaitGroup{} + c := make(chan int) + for i := 1; i <= 5; i++ { + podIndex := i + wg.Add(1) + go func() { + defer wg.Done() + <-c + fakeOvn.controller.addHybridRoutePolicyForPod(net.ParseIP(fmt.Sprintf("10.128.1.%d", podIndex)), "node1") + }() + } + close(c) + wg.Wait() + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(finalNB)) + + err := fakeOvn.controller.addHybridRoutePolicyForPod(net.ParseIP(fmt.Sprintf("10.128.1.%d", 6)), "node1") + // adding another pod after the initial burst should not trigger an error or change db + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(finalNB)) + + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) ginkgo.It("delete hybrid route policy for pods", func() { app.Action = func(ctx *cli.Context) error { config.Gateway.Mode = config.GatewayModeLocal diff --git a/go-controller/pkg/ovn/egressip.go b/go-controller/pkg/ovn/egressip.go index 743ec9e16f..da4a5e5328 100644 --- a/go-controller/pkg/ovn/egressip.go +++ b/go-controller/pkg/ovn/egressip.go @@ -241,9 +241,9 @@ func (oc *Controller) reconcileEgressIP(old, new *egressipv1.EgressIP) (err erro staleEgressIPs.Delete(toRemove.EgressIP) } for staleEgressIP := range staleEgressIPs { - if oc.deleteAllocatorEgressIPAssignmentIfExists(name, staleEgressIP) { + if nodeName := oc.deleteAllocatorEgressIPAssignmentIfExists(name, staleEgressIP); nodeName != "" { statusToRemove = append(statusToRemove, - egressipv1.EgressIPStatusItem{EgressIP: staleEgressIP}) + egressipv1.EgressIPStatusItem{EgressIP: staleEgressIP, Node: nodeName}) } } @@ -1029,17 +1029,17 @@ func (oc *Controller) addPodEgressIPAssignments(name string, statusAssignments [ } // deleteAllocatorEgressIPAssignmentIfExists deletes egressIP config from node allocations map -// if the entry is available and returns true, otherwise returns false. -func (oc *Controller) deleteAllocatorEgressIPAssignmentIfExists(name, egressIP string) bool { +// if the entry is available and returns assigned node name, otherwise returns empty string. +func (oc *Controller) deleteAllocatorEgressIPAssignmentIfExists(name, egressIP string) string { oc.eIPC.allocator.Lock() defer oc.eIPC.allocator.Unlock() - for _, eNode := range oc.eIPC.allocator.cache { + for nodeName, eNode := range oc.eIPC.allocator.cache { if egressIPName, exists := eNode.allocations[egressIP]; exists && egressIPName == name { delete(eNode.allocations, egressIP) - return true + return nodeName } } - return false + return "" } // deleteAllocatorEgressIPAssignments deletes the allocation as to keep the diff --git a/go-controller/pkg/ovn/loadbalancer/loadbalancer.go b/go-controller/pkg/ovn/loadbalancer/loadbalancer.go index 6c2780ebb6..ed5b1fc3b2 100644 --- a/go-controller/pkg/ovn/loadbalancer/loadbalancer.go +++ b/go-controller/pkg/ovn/loadbalancer/loadbalancer.go @@ -55,6 +55,8 @@ func EnsureLBs(nbClient libovsdbclient.Client, service *corev1.Service, LBs []LB } lbs := make([]*nbdb.LoadBalancer, 0, len(LBs)) + existinglbs := make([]*nbdb.LoadBalancer, 0, len(LBs)) + newlbs := make([]*nbdb.LoadBalancer, 0, len(LBs)) addLBsToSwitch := map[string][]*nbdb.LoadBalancer{} removeLBsFromSwitch := map[string][]*nbdb.LoadBalancer{} addLBsToRouter := map[string][]*nbdb.LoadBalancer{} @@ -71,10 +73,14 @@ func EnsureLBs(nbClient libovsdbclient.Client, service *corev1.Service, LBs []LB existingSwitches := sets.String{} existingGroups := sets.String{} if existingLB != nil { + blb.UUID = existingLB.UUID + existinglbs = append(existinglbs, blb) toDelete.Delete(existingLB.UUID) existingRouters = existingLB.Routers existingSwitches = existingLB.Switches existingGroups = existingLB.Groups + } else { + newlbs = append(newlbs, blb) } wantRouters := sets.NewString(lb.Routers...) wantSwitches := sets.NewString(lb.Switches...) @@ -87,7 +93,12 @@ func EnsureLBs(nbClient libovsdbclient.Client, service *corev1.Service, LBs []LB mapLBDifferenceByKey(removeLBsFromGroups, existingGroups, wantGroups, blb) } - ops, err := libovsdbops.CreateOrUpdateLoadBalancersOps(nbClient, nil, lbs...) + ops, err := libovsdbops.CreateOrUpdateLoadBalancersOps(nbClient, nil, existinglbs...) + if err != nil { + return err + } + + ops, err = libovsdbops.CreateLoadBalancersOps(nbClient, ops, newlbs...) if err != nil { return err } diff --git a/go-controller/pkg/ovn/namespace.go b/go-controller/pkg/ovn/namespace.go index a5827c889f..681fd14dce 100644 --- a/go-controller/pkg/ovn/namespace.go +++ b/go-controller/pkg/ovn/namespace.go @@ -367,13 +367,23 @@ func (oc *Controller) updateNamespace(old, newer *kapi.Namespace) { aclAnnotation := newer.Annotations[aclLoggingAnnotation] oldACLAnnotation := old.Annotations[aclLoggingAnnotation] // support for ACL logging update, if new annotation is empty, make sure we propagate new setting - if aclAnnotation != oldACLAnnotation && (oc.aclLoggingCanEnable(aclAnnotation, nsInfo) || aclAnnotation == "") && - len(nsInfo.networkPolicies) > 0 { - // deny rules are all one per namespace - if err := oc.setACLLoggingForNamespace(old.Name, nsInfo); err != nil { + if aclAnnotation != oldACLAnnotation && (oc.aclLoggingCanEnable(aclAnnotation, nsInfo) || aclAnnotation == "") { + if len(nsInfo.networkPolicies) > 0 { + // deny rules are all one per namespace + if err := oc.setNetworkPolicyACLLoggingForNamespace(old.Name, nsInfo); err != nil { + klog.Warningf(err.Error()) + } else { + klog.Infof("Namespace %s: NetworkPolicy ACL logging setting updated to deny=%s allow=%s", + old.Name, nsInfo.aclLogging.Deny, nsInfo.aclLogging.Allow) + } + } + // Trigger an egress fw logging update - this will only happen if an egress firewall exists for the NS, otherwise + // this will not do anything. + updated, err := oc.refreshEgressFirewallLogging(old.Name) + if err != nil { klog.Warningf(err.Error()) - } else { - klog.Infof("Namespace %s: ACL logging setting updated to deny=%s allow=%s", + } else if updated { + klog.Infof("Namespace %s: EgressFirewall ACL logging setting updated to deny=%s allow=%s", old.Name, nsInfo.aclLogging.Deny, nsInfo.aclLogging.Allow) } } diff --git a/go-controller/pkg/ovn/obj_retry.go b/go-controller/pkg/ovn/obj_retry.go index 905609058c..394706ad99 100644 --- a/go-controller/pkg/ovn/obj_retry.go +++ b/go-controller/pkg/ovn/obj_retry.go @@ -1151,15 +1151,18 @@ func (oc *Controller) iterateRetryResources(r *retryObjs, updateAll bool) { // periodicallyRetryResources tracks retryObjs and checks if any object needs to be retried for add or delete every // retryObjInterval seconds or when requested through retryChan. func (oc *Controller) periodicallyRetryResources(r *retryObjs) { + timer := time.NewTicker(retryObjInterval) + defer timer.Stop() for { select { - case <-time.After(retryObjInterval): + case <-timer.C: klog.V(5).Infof("%s s have elapsed, retrying failed objects of type %v", retryObjInterval, r.oType) oc.iterateRetryResources(r, false) case <-r.retryChan: klog.V(5).Infof("Retry channel got triggered: retrying failed objects of type %v", r.oType) oc.iterateRetryResources(r, true) + timer.Reset(retryObjInterval) case <-oc.stopChan: klog.V(5).Infof("Stop channel got triggered: will stop retrying failed objects of type %v", r.oType) diff --git a/go-controller/pkg/ovn/ovn.go b/go-controller/pkg/ovn/ovn.go index 6e2e1928ce..ed522c4172 100644 --- a/go-controller/pkg/ovn/ovn.go +++ b/go-controller/pkg/ovn/ovn.go @@ -474,6 +474,7 @@ func (oc *Controller) Run(ctx context.Context, wg *sync.WaitGroup) error { func (oc *Controller) syncPeriodic() { go func() { nodeSyncTicker := time.NewTicker(5 * time.Minute) + defer nodeSyncTicker.Stop() for { select { case <-nodeSyncTicker.C: @@ -702,17 +703,26 @@ func (oc *Controller) aclLoggingCanEnable(annotation string, nsInfo *namespaceIn if err != nil { return false } + + // Using newDenyLoggingLevel and newAllowLoggingLevel allows resetting nsinfo state. + // This is important if a user sets either the allow level or the deny level flag to an + // invalid value or after they remove either the allow or the deny annotation. + // If either of the 2 (allow or deny logging level) is set with a valid level, return true. + newDenyLoggingLevel := "" + newAllowLoggingLevel := "" okCnt := 0 for _, s := range []string{"alert", "warning", "notice", "info", "debug"} { - if aclLevels.Deny != "" && s == aclLevels.Deny { - nsInfo.aclLogging.Deny = aclLevels.Deny + if s == aclLevels.Deny { + newDenyLoggingLevel = aclLevels.Deny okCnt++ } - if aclLevels.Allow != "" && s == aclLevels.Allow { - nsInfo.aclLogging.Allow = aclLevels.Allow + if s == aclLevels.Allow { + newAllowLoggingLevel = aclLevels.Allow okCnt++ } } + nsInfo.aclLogging.Deny = newDenyLoggingLevel + nsInfo.aclLogging.Allow = newAllowLoggingLevel return okCnt > 0 } diff --git a/go-controller/pkg/ovn/pods.go b/go-controller/pkg/ovn/pods.go index 8545f7f762..de9bcce4da 100644 --- a/go-controller/pkg/ovn/pods.go +++ b/go-controller/pkg/ovn/pods.go @@ -115,7 +115,12 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod, portInfo *lpInfo) (err er // is not re-added into the cache. Delete logical switch port anyway. annotation, err := util.UnmarshalPodAnnotation(pod.Annotations) if err != nil { - return fmt.Errorf("unable to unmarshal pod annocations for pod %s/%s: %w", pod.Namespace, pod.Name, err) + if util.IsAnnotationNotSetError(err) { + // if the annotation doesn’t exist, that’s not an error. It means logical port does not need to be deleted. + klog.V(5).Infof("No annotations on pod %s/%s, no need to delete its logical port: %s", pod.Namespace, pod.Name, logicalPort) + return nil + } + return fmt.Errorf("unable to unmarshal pod annotations for pod %s/%s: %w", pod.Namespace, pod.Name, err) } podIfAddrs = annotation.IPs } else { diff --git a/go-controller/pkg/ovn/pods_test.go b/go-controller/pkg/ovn/pods_test.go index 3fc9da1ab3..c6a761b4fa 100644 --- a/go-controller/pkg/ovn/pods_test.go +++ b/go-controller/pkg/ovn/pods_test.go @@ -899,6 +899,46 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() { gomega.Expect(err).NotTo(gomega.HaveOccurred()) }) + ginkgo.It("remove a LSP from a pod that has no OVN annotations", func() { + app.Action = func(ctx *cli.Context) error { + namespaceT := *newNamespace("namespace1") + t := newTPod( + "node1", + "10.128.1.0/24", + "10.128.1.2", + "10.128.1.1", + "myPod", + "10.128.1.3", + "0a:58:0a:80:01:03", + namespaceT.Name, + ) + pod := newPod(t.namespace, t.podName, t.nodeName, t.podIP) + fakeOvn.startWithDBSetup(initialDB, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespaceT, + }, + }, + &v1.PodList{ + Items: []v1.Pod{ + *pod, + }, + }, + ) + annotations := getPodAnnotations(fakeOvn.fakeClient.KubeClient, t.namespace, t.podName) + gomega.Expect(annotations).To(gomega.Equal("")) + + // Deleting port from a pod that has no annotations should be okay + err := fakeOvn.controller.deleteLogicalPort(pod, nil) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + ginkgo.It("reconciles a deleted pod", func() { app.Action = func(ctx *cli.Context) error { @@ -937,6 +977,8 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() { return getPodAnnotations(fakeOvn.fakeClient.KubeClient, t.namespace, t.podName) }, 2).Should(gomega.MatchJSON(t.getAnnotationsJson())) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t}, []string{"node1"}))) + err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Delete(context.TODO(), t.podName, *metav1.NewDeleteOptions(0)) gomega.Expect(err).NotTo(gomega.HaveOccurred()) @@ -944,7 +986,7 @@ var _ = ginkgo.Describe("OVN Pod Operations", func() { gomega.Expect(err).To(gomega.HaveOccurred()) gomega.Expect(pod).To(gomega.BeNil()) - gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{t}, []string{"node1"}))) + gomega.Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(getExpectedDataPodsAndSwitches([]testPod{}, []string{"node1"}))) return nil } diff --git a/go-controller/pkg/ovn/policy.go b/go-controller/pkg/ovn/policy.go index 90d9e52468..f7fc5caee1 100644 --- a/go-controller/pkg/ovn/policy.go +++ b/go-controller/pkg/ovn/policy.go @@ -155,18 +155,21 @@ func (oc *Controller) syncNetworkPolicies(networkPolicies []interface{}) error { } } - // update existing egress network policies to use the updated ACLs + // Update existing egress network policies to use the updated ACLs + // Note that the default multicast egress acls were created with the correct direction, but + // we'd still need to update its apply-after-lb=true option, so that the ACL priorities can apply properly; + // If acl's option["apply-after-lb"] is already set to true, then its direction should be also correct. p := func(item *nbdb.ACL) bool { - return item.ExternalIDs[policyTypeACLExtIdKey] == string(knet.PolicyTypeEgress) || - item.ExternalIDs[defaultDenyPolicyTypeACLExtIdKey] == string(knet.PolicyTypeEgress) + return (item.ExternalIDs[policyTypeACLExtIdKey] == string(knet.PolicyTypeEgress) || + item.ExternalIDs[defaultDenyPolicyTypeACLExtIdKey] == string(knet.PolicyTypeEgress)) && + item.Options["apply-after-lb"] != "true" } egressACLs, err := libovsdbops.FindACLsWithPredicate(oc.nbClient, p) if err != nil { return fmt.Errorf("cannot find NetworkPolicy Egress ACLs: %v", err) } - // if the first egress ACL is correct they should all be correct and not need to update - if len(egressACLs) > 0 && egressACLs[0].Direction != nbdb.ACLDirectionFromLport { + if len(egressACLs) > 0 { for _, acl := range egressACLs { acl.Direction = nbdb.ACLDirectionFromLport if acl.Options == nil { @@ -339,7 +342,7 @@ func (oc *Controller) updateACLLoggingForPolicy(np *networkPolicy, logLevel stri return err } -func (oc *Controller) setACLLoggingForNamespace(ns string, nsInfo *namespaceInfo) error { +func (oc *Controller) setNetworkPolicyACLLoggingForNamespace(ns string, nsInfo *namespaceInfo) error { var ovsDBOps []ovsdb.Operation for _, policyType := range []knet.PolicyType{knet.PolicyTypeIngress, knet.PolicyTypeEgress} { denyACL, _ := buildDenyACLs(ns, "", targetPortGroupName(nsInfo.portGroupIngressDenyName, nsInfo.portGroupEgressDenyName, policyType), nsInfo.aclLogging.Deny, policyType) @@ -1204,7 +1207,7 @@ func (oc *Controller) addNetworkPolicy(policy *knet.NetworkPolicy) error { nsInfo.networkPolicies[policy.Name] = np // there may have been a namespace update for ACL logging while we were creating the NP // update it - if err := oc.setACLLoggingForNamespace(policy.Namespace, nsInfo); err != nil { + if err := oc.setNetworkPolicyACLLoggingForNamespace(policy.Namespace, nsInfo); err != nil { klog.Warningf(err.Error()) } else { klog.Infof("Namespace %s: ACL logging setting updated to deny=%s allow=%s", diff --git a/go-controller/pkg/util/ovs.go b/go-controller/pkg/util/ovs.go index 5b0748a82e..496dfda023 100644 --- a/go-controller/pkg/util/ovs.go +++ b/go-controller/pkg/util/ovs.go @@ -30,7 +30,6 @@ const ( ovsCommandTimeout = 15 ovsVsctlCommand = "ovs-vsctl" ovsOfctlCommand = "ovs-ofctl" - ovsDpctlCommand = "ovs-dpctl" ovsAppctlCommand = "ovs-appctl" ovnNbctlCommand = "ovn-nbctl" ovnSbctlCommand = "ovn-sbctl" @@ -156,7 +155,6 @@ type execHelper struct { exec kexec.Interface ofctlPath string vsctlPath string - dpctlPath string appctlPath string ovnappctlPath string nbctlPath string @@ -228,10 +226,6 @@ func SetExec(exec kexec.Interface) error { if err != nil { return err } - runner.dpctlPath, err = exec.LookPath(ovsDpctlCommand) - if err != nil { - return err - } runner.appctlPath, err = exec.LookPath(ovsAppctlCommand) if err != nil { return err @@ -358,12 +352,6 @@ func RunOVSOfctl(args ...string) (string, string, error) { return strings.Trim(stdout.String(), "\" \n"), stderr.String(), err } -// RunOVSDpctl runs a command via ovs-dpctl. -func RunOVSDpctl(args ...string) (string, string, error) { - stdout, stderr, err := run(runner.dpctlPath, args...) - return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err -} - // RunOVSVsctl runs a command via ovs-vsctl. func RunOVSVsctl(args ...string) (string, string, error) { cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)} diff --git a/go-controller/pkg/util/ovs_unit_test.go b/go-controller/pkg/util/ovs_unit_test.go index 21161cf1fe..2863b2935f 100644 --- a/go-controller/pkg/util/ovs_unit_test.go +++ b/go-controller/pkg/util/ovs_unit_test.go @@ -527,13 +527,13 @@ func TestSetExec(t *testing.T) { { desc: "positive, test when 'runner' is nil", expectedErr: nil, - onRetArgs: &ovntest.TestifyMockHelper{OnCallMethodName: "LookPath", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{"ip", nil, "arping", nil}, CallTimes: 11}, + onRetArgs: &ovntest.TestifyMockHelper{OnCallMethodName: "LookPath", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{"ip", nil, "arping", nil}, CallTimes: 10}, setRunnerNil: true, }, { desc: "positive, test when 'runner' is not nil", expectedErr: nil, - onRetArgs: &ovntest.TestifyMockHelper{OnCallMethodName: "LookPath", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{"", nil, "", nil}, CallTimes: 11}, + onRetArgs: &ovntest.TestifyMockHelper{OnCallMethodName: "LookPath", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{"", nil, "", nil}, CallTimes: 10}, setRunnerNil: false, }, } @@ -950,47 +950,6 @@ OFPT_GET_CONFIG_REPLY (xid=0x4): frags=normal miss_send_len=0 } } -func TestRunOVSDpctl(t *testing.T) { - mockKexecIface := new(mock_k8s_io_utils_exec.Interface) - mockExecRunner := new(mocks.ExecRunner) - mockCmd := new(mock_k8s_io_utils_exec.Cmd) - // below is defined in ovs.go - runCmdExecRunner = mockExecRunner - // note runner is defined in ovs.go file - runner = &execHelper{exec: mockKexecIface} - tests := []struct { - desc string - expectedErr error - onRetArgsExecUtilsIface *ovntest.TestifyMockHelper - onRetArgsKexecIface *ovntest.TestifyMockHelper - }{ - { - desc: "negative: run `ovs-dpctl` command", - expectedErr: fmt.Errorf("failed to execute ovs-dpctl command"), - onRetArgsExecUtilsIface: &ovntest.TestifyMockHelper{OnCallMethodName: "RunCmd", OnCallMethodArgType: []string{"*mocks.Cmd", "string", "[]string"}, RetArgList: []interface{}{nil, nil, fmt.Errorf("failed to execute ovs-dpctl command")}}, - onRetArgsKexecIface: &ovntest.TestifyMockHelper{OnCallMethodName: "Command", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{mockCmd}}, - }, - { - desc: "positive: run `ovs-dpctl` ", - expectedErr: nil, - onRetArgsExecUtilsIface: &ovntest.TestifyMockHelper{OnCallMethodName: "RunCmd", OnCallMethodArgType: []string{"*mocks.Cmd", "string", "[]string"}, RetArgList: []interface{}{bytes.NewBuffer([]byte("testblah")), bytes.NewBuffer([]byte("")), nil}}, - onRetArgsKexecIface: &ovntest.TestifyMockHelper{OnCallMethodName: "Command", OnCallMethodArgType: []string{"string"}, RetArgList: []interface{}{mockCmd}}, - }, - } - for i, tc := range tests { - t.Run(fmt.Sprintf("%d:%s", i, tc.desc), func(t *testing.T) { - ovntest.ProcessMockFn(&mockExecRunner.Mock, *tc.onRetArgsExecUtilsIface) - ovntest.ProcessMockFn(&mockKexecIface.Mock, *tc.onRetArgsKexecIface) - - _, _, e := RunOVSDpctl() - - assert.Equal(t, e, tc.expectedErr) - mockExecRunner.AssertExpectations(t) - mockKexecIface.AssertExpectations(t) - }) - } -} - func TestRunOVSVsctl(t *testing.T) { mockKexecIface := new(mock_k8s_io_utils_exec.Interface) mockExecRunner := new(mocks.ExecRunner) diff --git a/go-controller/vendor/github.com/ovn-org/libovsdb/cache/cache.go b/go-controller/vendor/github.com/ovn-org/libovsdb/cache/cache.go index bbb158fc25..762eefcbc2 100644 --- a/go-controller/vendor/github.com/ovn-org/libovsdb/cache/cache.go +++ b/go-controller/vendor/github.com/ovn-org/libovsdb/cache/cache.go @@ -957,7 +957,10 @@ func (t *TableCache) ApplyModifications(tableName string, base model.Model, upda // if NativeToOVS was successful, then simply assign if nv.Type() == reflect.ValueOf(current).Type() { err = info.SetField(k, nv.Interface()) - return err + if err != nil { + return err + } + break } // With a pointer type, an update value could be a set with 2 elements [old, new] if nv.Len() != 2 { diff --git a/go-controller/vendor/modules.txt b/go-controller/vendor/modules.txt index 51ce43d0e4..e5da3a0841 100644 --- a/go-controller/vendor/modules.txt +++ b/go-controller/vendor/modules.txt @@ -232,7 +232,7 @@ github.com/openshift/client-go/cloudnetwork/informers/externalversions/cloudnetw github.com/openshift/client-go/cloudnetwork/informers/externalversions/cloudnetwork/v1 github.com/openshift/client-go/cloudnetwork/informers/externalversions/internalinterfaces github.com/openshift/client-go/cloudnetwork/listers/cloudnetwork/v1 -# github.com/ovn-org/libovsdb v0.6.1-0.20220427123326-d7b273399db4 +# github.com/ovn-org/libovsdb v0.6.1-0.20220513144310-50ec17900991 ## explicit; go 1.16 github.com/ovn-org/libovsdb/cache github.com/ovn-org/libovsdb/client diff --git a/test/e2e/acl_logging.go b/test/e2e/acl_logging.go index f8a6640b3d..a23c7b4326 100644 --- a/test/e2e/acl_logging.go +++ b/test/e2e/acl_logging.go @@ -3,10 +3,13 @@ package e2e import ( "context" "fmt" + "io/ioutil" + "os" + "time" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - "time" v1 "k8s.io/api/core/v1" knet "k8s.io/api/networking/v1" @@ -21,12 +24,13 @@ const ( pokeInterval = 1 * time.Second ) -var _ = Describe("ACL Logging", func() { +var _ = Describe("ACL Logging for NetworkPolicy", func() { const ( denyAllPolicyName = "default-deny-all" initialDenyACLSeverity = "alert" initialAllowACLSeverity = "notice" - namespacePrefix = "acl-logging" + denyACLVerdict = "drop" + namespacePrefix = "acl-logging-netpol" pokerPodIndex = 0 pokedPodIndex = 1 ) @@ -38,24 +42,12 @@ var _ = Describe("ACL Logging", func() { pods []v1.Pod ) - setNamespaceACLLogSeverity := func(namespaceToUpdate *v1.Namespace, desiredDenyLogLevel string, desiredAllowLogLevel string) error { - if namespaceToUpdate.ObjectMeta.Annotations == nil { - namespaceToUpdate.ObjectMeta.Annotations = map[string]string{} - } - By("updating the namespace's ACL logging severity") - updatedLogSeverity := fmt.Sprintf(`{ "deny": "%s", "allow": "%s" }`, desiredDenyLogLevel, desiredAllowLogLevel) - namespaceToUpdate.Annotations[logSeverityNamespaceAnnotation] = updatedLogSeverity - - _, err := fr.ClientSet.CoreV1().Namespaces().Update(context.TODO(), namespaceToUpdate, metav1.UpdateOptions{}) - return err - } - BeforeEach(func() { By("configuring the ACL logging level within the namespace") nsName = fr.Namespace.Name namespace, err := fr.ClientSet.CoreV1().Namespaces().Get(context.Background(), nsName, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred(), "failed to retrieve the namespace") - Expect(setNamespaceACLLogSeverity(namespace, initialDenyACLSeverity, initialAllowACLSeverity)).To(Succeed()) + Expect(setNamespaceACLLogSeverity(fr, namespace, initialDenyACLSeverity, initialAllowACLSeverity)).To(Succeed()) By("creating a \"default deny\" network policy") _, err = makeDenyAllPolicy(fr, nsName, denyAllPolicyName) @@ -64,7 +56,7 @@ var _ = Describe("ACL Logging", func() { By("creating pods") cmd := []string{"/bin/bash", "-c", "/agnhost netexec --http-port 8000"} for i := 0; i < 2; i++ { - pod := newAgnhostPod(fmt.Sprintf("pod%d", i+1), cmd...) + pod := newAgnhostPod(nsName, fmt.Sprintf("pod%d", i+1), cmd...) pod = fr.PodClient().CreateSync(pod) Expect(waitForACLLoggingPod(fr, nsName, pod.GetName())).To(Succeed()) pods = append(pods, *pod) @@ -91,11 +83,12 @@ var _ = Describe("ACL Logging", func() { It("the logs have the expected log level", func() { clientPodScheduledPodName := pods[pokerPodIndex].Spec.NodeName // Retry here in the case where OVN acls have not been programmed yet + composedPolicyNameRegex := fmt.Sprintf("%s_%s", nsName, denyAllPolicyName) Eventually(func() (bool, error) { - return assertDenyLogs( + return assertAclLogs( clientPodScheduledPodName, - nsName, - denyAllPolicyName, + composedPolicyNameRegex, + denyACLVerdict, initialDenyACLSeverity) }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) }) @@ -108,7 +101,7 @@ var _ = Describe("ACL Logging", func() { namespace, err := fr.ClientSet.CoreV1().Namespaces().Get(context.Background(), nsName, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred(), "failed to retrieve the namespace") - Expect(setNamespaceACLLogSeverity(namespace, updatedAllowACLLogSeverity, updatedAllowACLLogSeverity)).To(Succeed()) + Expect(setNamespaceACLLogSeverity(fr, namespace, updatedAllowACLLogSeverity, updatedAllowACLLogSeverity)).To(Succeed()) namespace, err = fr.ClientSet.CoreV1().Namespaces().Get(context.Background(), nsName, metav1.GetOptions{}) }) @@ -130,17 +123,222 @@ var _ = Describe("ACL Logging", func() { It("the ACL logs are updated accordingly", func() { clientPodScheduledPodName := pods[pokerPodIndex].Spec.NodeName + composedPolicyNameRegex := fmt.Sprintf("%s_%s", nsName, denyAllPolicyName) Eventually(func() (bool, error) { - return assertDenyLogs( + return assertAclLogs( clientPodScheduledPodName, - nsName, - denyAllPolicyName, + composedPolicyNameRegex, + denyACLVerdict, updatedAllowACLLogSeverity) }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) }) }) }) +var _ = Describe("ACL Logging for EgressFirewall", func() { + const ( + denyAllPolicyName = "default-deny-all" + initialDenyACLSeverity = "alert" + initialAllowACLSeverity = "notice" + updatedDenyACLSeverity = "debug" + updatedAllowACLSeverity = "debug" + denyACLVerdict = "drop" + allowACLVerdict = "allow" + namespacePrefix = "acl-log-egressfw" + secondaryNamespacePrefix = "acl-log-egressfw-sec" + + // These targets must be off cluster - traffic to the cluster should always be + // allowed: https://docs.openshift.com/container-platform/4.10/networking/openshift_sdn/configuring-egress-firewall.html + // "As a cluster administrator, you can create an egress firewall for a project that restricts egress traffic leaving + // your OpenShift Container Platform cluster." + // Because the egress firewall feature only affects traffic leaving the cluster, we will not log for on-cluster targets. + allowedDstIp = "172.18.0.1" + deniedDstIp = "172.19.0.10" + dstPort = 8080 + ) + + fr := framework.NewDefaultFramework(namespacePrefix) + + var ( + nsName string + nsNameSecondary string + pokePod *v1.Pod + pokePodSecondary *v1.Pod + ) + + BeforeEach(func() { + By("configuring the ACL logging level within the namespace") + nsName = fr.Namespace.Name + namespace, err := fr.ClientSet.CoreV1().Namespaces().Get(context.Background(), nsName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "failed to retrieve the namespace") + Expect(setNamespaceACLLogSeverity(fr, namespace, initialDenyACLSeverity, initialAllowACLSeverity)).To(Succeed()) + + By("creating a \"default deny\" Egress Firewall") + err = makeEgressFirewall(nsName) + Expect(err).NotTo(HaveOccurred()) + + By("creating a pod running agnhost netexec") + cmd := []string{"/bin/bash", "-c", "/agnhost netexec --http-port 8000"} + pod := newAgnhostPod(nsName, "pod", cmd...) + pokePod = fr.PodClient().CreateSync(pod) + Expect(waitForACLLoggingPod(fr, nsName, pokePod.GetName())).To(Succeed()) + + // The secondary Namespace is required to make sure that 2 namespaces with different logging + // settings can coexist and that updates to a specific namespace only affect that namespace and + // not other namespaces. + By("creating a secondary namespace") + ns2, err := fr.CreateNamespace(secondaryNamespacePrefix, map[string]string{}) + Expect(err).NotTo(HaveOccurred(), "failed to create secondary namespace") + + By("configuring the ACL logging level within the secondary namespace") + nsNameSecondary = ns2.Name + Expect(setNamespaceACLLogSeverity(fr, ns2, initialDenyACLSeverity, initialAllowACLSeverity)).To(Succeed()) + + By("creating a \"default deny\" Egress Firewall inside the secondary namespace") + err = makeEgressFirewall(nsNameSecondary) + Expect(err).NotTo(HaveOccurred()) + + By("creating a pod running agnhost netexec inside the secondary namespace") + cmdSecondary := []string{"/bin/bash", "-c", "/agnhost netexec --http-port 8000"} + podSecondary := newAgnhostPod(nsNameSecondary, "pod-secondary", cmdSecondary...) + // There seems to be a bug in CreateSync for secondary pod. Need to do this here instead: + pps := fr.PodClientNS(nsNameSecondary).Create(podSecondary) + Eventually(func() (bool, error) { + time.Sleep(15 * time.Second) + pokePodSecondary, err = fr.ClientSet.CoreV1().Pods(nsNameSecondary).Get(context.TODO(), pps.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return pokePodSecondary.Status.Phase == v1.PodRunning, nil + }, 60, 5).Should(BeTrue()) + Expect(waitForACLLoggingPod(fr, nsNameSecondary, pokePodSecondary.GetName())).To(Succeed()) + }) + + AfterEach(func() { + pokePod = nil + }) + + When("the namespace is brought up with the initial ACL log severity", func() { + When("the denied destination is poked", func() { + It("the logs should have the expected log level", func() { + // Retry here in the case where OVN acls have not been programmed yet + // Make sure that we see an increment in count + By("testing the primary namespace") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePod, nsName, deniedDstIp, dstPort, denyACLVerdict, initialDenyACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + + By("making sure that the secondary namespace logs as expected") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePodSecondary, nsNameSecondary, deniedDstIp, dstPort, denyACLVerdict, initialDenyACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + }) + }) + + When("the allowed destination is poked", func() { + It("the logs should have the expected log level", func() { + // Retry here in the case where OVN acls have not been programmed yet + // Make sure that we see an increment in count + By("testing the primary namespace") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePod, nsName, allowedDstIp, dstPort, allowACLVerdict, initialAllowACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + + By("making sure that the secondary namespace logs as expected") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePodSecondary, nsNameSecondary, allowedDstIp, dstPort, allowACLVerdict, initialAllowACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + }) + }) + }) + + When("the namespace's ACL logging annotation is updated", func() { + BeforeEach(func() { + By(fmt.Sprintf("updating the namespace's ACL logging level to %s for deny and %s for allow", updatedDenyACLSeverity, updatedAllowACLSeverity)) + + namespace, err := fr.ClientSet.CoreV1().Namespaces().Get(context.Background(), nsName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "failed to retrieve the namespace") + Expect(setNamespaceACLLogSeverity(fr, namespace, updatedDenyACLSeverity, updatedAllowACLSeverity)).To(Succeed()) + namespace, err = fr.ClientSet.CoreV1().Namespaces().Get(context.Background(), nsName, metav1.GetOptions{}) + }) + + When("the denied destination is poked", func() { + It("the logs should have the expected log level", func() { + // Retry here in the case where OVN acls have not been programmed yet + // Make sure that we see an increment in count + By("testing the primary namespace") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePod, nsName, deniedDstIp, dstPort, denyACLVerdict, updatedDenyACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + + By("making sure that the secondary namespace logs as expected") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePodSecondary, nsNameSecondary, deniedDstIp, dstPort, denyACLVerdict, initialDenyACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + }) + }) + + When("the allowed destination is poked", func() { + It("the logs should have the expected log level", func() { + // Retry here in the case where OVN acls have not been programmed yet + // Make sure that we see an increment in count + By("testing the primary namespace") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePod, nsName, allowedDstIp, dstPort, allowACLVerdict, updatedAllowACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + + By("making sure that the secondary namespace logs as expected") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePodSecondary, nsNameSecondary, allowedDstIp, dstPort, allowACLVerdict, initialAllowACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + }) + }) + }) + + When("the namespace's ACL logging allow annotation is removed", func() { + BeforeEach(func() { + By("removing the namespace's ACL logging allow configuration") + + namespace, err := fr.ClientSet.CoreV1().Namespaces().Get(context.Background(), nsName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "failed to retrieve the namespace") + Expect(setNamespaceACLLogSeverity(fr, namespace, initialDenyACLSeverity, "")).To(Succeed()) + namespace, err = fr.ClientSet.CoreV1().Namespaces().Get(context.Background(), nsName, metav1.GetOptions{}) + }) + + When("the denied destination is poked", func() { + It("the logs should have the expected log level", func() { + // Retry here in the case where OVN acls have not been programmed yet + // Make sure that we see an increment in count + By("testing the primary namespace") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePod, nsName, deniedDstIp, dstPort, denyACLVerdict, initialDenyACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + + By("making sure that the secondary namespace logs as expected") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePodSecondary, nsNameSecondary, deniedDstIp, dstPort, denyACLVerdict, initialDenyACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + }) + }) + + When("the allowed destination is poked", func() { + It("there should be no trace in the ACL logs", func() { + // Retry here until timeout is reached + // Make sure that we see no increment in count + By("testing the primary namespace") + Consistently(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePod, nsName, allowedDstIp, dstPort, allowACLVerdict, initialAllowACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeFalse()) + + By("making sure that the secondary namespace logs as expected") + Eventually(func() (bool, error) { + return isCountUpdatedAfterPokeExternalHost(fr, pokePodSecondary, nsNameSecondary, allowedDstIp, dstPort, allowACLVerdict, initialAllowACLSeverity) + }, maxPokeRetries*pokeInterval, pokeInterval).Should(BeTrue()) + }) + }) + }) +}) + func makeDenyAllPolicy(f *framework.Framework, ns string, policyName string) (*knet.NetworkPolicy, error) { policy := &knet.NetworkPolicy{ ObjectMeta: metav1.ObjectMeta{ @@ -156,9 +354,106 @@ func makeDenyAllPolicy(f *framework.Framework, ns string, policyName string) (*k return f.ClientSet.NetworkingV1().NetworkPolicies(ns).Create(context.TODO(), policy, metav1.CreateOptions{}) } +func makeEgressFirewall(ns string) error { + egressFirewallYaml := "egressfirewall.yaml" + var egressFirewallConfig = fmt.Sprintf(`apiVersion: k8s.ovn.org/v1 +kind: EgressFirewall +metadata: + name: default + namespace: ` + ns + ` +spec: + egress: + - type: Allow + to: + cidrSelector: 172.18.0.1/32 + - type: Deny + to: + cidrSelector: 0.0.0.0/0 +`) + + if err := ioutil.WriteFile(egressFirewallYaml, []byte(egressFirewallConfig), 0644); err != nil { + framework.Failf("Unable to write CRD config to disk: %v", err) + } + + defer func() { + if err := os.Remove(egressFirewallYaml); err != nil { + framework.Logf("Unable to remove the CRD config from disk: %v", err) + } + }() + + _, err := framework.RunKubectl(ns, "create", "-f", egressFirewallYaml) + return err +} + func waitForACLLoggingPod(f *framework.Framework, namespace string, podName string) error { return e2epod.WaitForPodCondition(f.ClientSet, namespace, podName, "running", 5*time.Second, func(pod *v1.Pod) (bool, error) { podIP := pod.Status.PodIP return podIP != "" && pod.Status.Phase != v1.PodPending, nil }) } + +func isCountUpdatedAfterPokeExternalHost(fr *framework.Framework, pokePod *v1.Pod, nsName, dstIp string, dstPort int, aclVerdict, aclSeverity string) (bool, error) { + startCount, err := countAclLogs( + pokePod.Spec.NodeName, + generateEgressFwRegex(pokePod.Namespace), + aclVerdict, + aclSeverity) + if err != nil { + return false, err + } + pokeExternalHost(fr, pokePod, dstIp, dstPort) + endCount, _ := countAclLogs( + pokePod.Spec.NodeName, + generateEgressFwRegex(pokePod.Namespace), + aclVerdict, + aclSeverity) + if err != nil { + return false, err + } + return startCount < endCount, nil +} + +func generateEgressFwRegex(nsName string) string { + return fmt.Sprintf("egressFirewall_%s_.*", nsName) +} + +func pokeExternalHost(fr *framework.Framework, pokePod *v1.Pod, dstIp string, dstPort int) { + framework.Logf("sending traffic outside to test triggering ACL logging") + framework.Logf( + "Poke destination %s:%d from pod %s/%s (on node %s)", + dstIp, + dstPort, + pokePod.Namespace, + pokePod.GetName(), + pokePod.Spec.NodeName, + ) + pokeExternalHostFromPod(fr, pokePod.Namespace, pokePod.GetName(), dstIp, dstPort) +} + +// setNamespaceACLLogSeverity updates namespaceToUpdate with the deny and allow annotations, e.g. k8s.ovn.org/acl-logging={ "deny": "%s", "allow": "%s" }. +func setNamespaceACLLogSeverity(fr *framework.Framework, namespaceToUpdate *v1.Namespace, desiredDenyLogLevel string, desiredAllowLogLevel string) error { + if namespaceToUpdate.ObjectMeta.Annotations == nil { + namespaceToUpdate.ObjectMeta.Annotations = map[string]string{} + } + + aclLogSeverity := "" + if desiredDenyLogLevel != "" && desiredAllowLogLevel != "" { + aclLogSeverity = fmt.Sprintf(`{ "deny": "%s", "allow": "%s" }`, desiredDenyLogLevel, desiredAllowLogLevel) + By(fmt.Sprintf("updating the namespace's ACL logging severity to %s", aclLogSeverity)) + namespaceToUpdate.Annotations[logSeverityNamespaceAnnotation] = aclLogSeverity + } else if desiredDenyLogLevel != "" { + aclLogSeverity = fmt.Sprintf(`{ "deny": "%s" }`, desiredDenyLogLevel) + By(fmt.Sprintf("updating the namespace's ACL logging severity to %s", aclLogSeverity)) + namespaceToUpdate.Annotations[logSeverityNamespaceAnnotation] = aclLogSeverity + } else if desiredAllowLogLevel != "" { + aclLogSeverity = fmt.Sprintf(`{ "allow": "%s" }`, desiredAllowLogLevel) + By(fmt.Sprintf("updating the namespace's ACL logging severity to %s", aclLogSeverity)) + namespaceToUpdate.Annotations[logSeverityNamespaceAnnotation] = aclLogSeverity + } else { + By("removing the namespace's ACL logging severity annotation if it exists") + delete(namespaceToUpdate.Annotations, logSeverityNamespaceAnnotation) + } + + _, err := fr.ClientSet.CoreV1().Namespaces().Update(context.TODO(), namespaceToUpdate, metav1.UpdateOptions{}) + return err +} diff --git a/test/e2e/multicast.go b/test/e2e/multicast.go index 266383f0e6..bf3ff560a8 100644 --- a/test/e2e/multicast.go +++ b/test/e2e/multicast.go @@ -88,7 +88,7 @@ var _ = ginkgo.Describe("Multicast", func() { iperf = iperf + " -V" } cmd := []string{"/bin/sh", "-c", iperf} - clientPod := newAgnhostPod(mcastSource, cmd...) + clientPod := newAgnhostPod(fr.Namespace.Name, mcastSource, cmd...) clientPod.Spec.NodeName = clientNodeInfo.name fr.PodClient().CreateSync(clientPod) @@ -100,7 +100,7 @@ var _ = ginkgo.Describe("Multicast", func() { iperf = iperf + " -V" } cmd = []string{"/bin/sh", "-c", iperf} - mcastServerPod1 := newAgnhostPod(mcastServer1, cmd...) + mcastServerPod1 := newAgnhostPod(fr.Namespace.Name, mcastServer1, cmd...) mcastServerPod1.Spec.NodeName = serverNodeInfo.name fr.PodClient().CreateSync(mcastServerPod1) @@ -112,7 +112,7 @@ var _ = ginkgo.Describe("Multicast", func() { iperf = iperf + " -V" } cmd = []string{"/bin/sh", "-c", iperf} - mcastServerPod2 := newAgnhostPod(mcastServer2, cmd...) + mcastServerPod2 := newAgnhostPod(fr.Namespace.Name, mcastServer2, cmd...) mcastServerPod2.Spec.NodeName = serverNodeInfo.name fr.PodClient().CreateSync(mcastServerPod2) diff --git a/test/e2e/util.go b/test/e2e/util.go index d2c7c3736a..ceba488c69 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net" + "regexp" "strings" "time" @@ -80,10 +81,11 @@ type annotationNotSetError struct { // newAgnhostPod returns a pod that uses the agnhost image. The image's binary supports various subcommands // that behave the same, no matter the underlying OS. -func newAgnhostPod(name string, command ...string) *v1.Pod { +func newAgnhostPod(namespace, name string, command ...string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: name, + Namespace: namespace, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -656,17 +658,61 @@ func pokePod(fr *framework.Framework, srcPodName string, dstPodIP string) error return fmt.Errorf("http request failed; stdout: %s, err: %v", stdout+stderr, err) } -func assertDenyLogs(targetNodeName string, namespace string, policyName string, expectedAclSeverity string) (bool, error) { +func pokeExternalHostFromPod(fr *framework.Framework, namespace string, srcPodName, dstIp string, dstPort int) error { + stdout, stderr, err := ExecShellInPodWithFullOutput( + fr, + namespace, + srcPodName, + fmt.Sprintf("curl --output /dev/stdout -m 1 -I %s:%d | head -n1", dstIp, dstPort)) + if err == nil && stdout == "HTTP/1.1 200 OK" { + return nil + } + return fmt.Errorf("http request failed; stdout: %s, err: %v", stdout+stderr, err) +} + +// ExecShellInPodWithFullOutput is a shameless copy/paste from the framework methods so that we can specify the pod namespace. +func ExecShellInPodWithFullOutput(f *framework.Framework, namespace, podName string, cmd string) (string, string, error) { + return execCommandInPodWithFullOutput(f, namespace, podName, "/bin/sh", "-c", cmd) +} + +// execCommandInPodWithFullOutput is a shameless copy/paste from the framework methods so that we can specify the pod namespace. +func execCommandInPodWithFullOutput(f *framework.Framework, namespace, podName string, cmd ...string) (string, string, error) { + pod, err := f.PodClientNS(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err, "failed to get pod %v", podName) + gomega.Expect(pod.Spec.Containers).NotTo(gomega.BeEmpty()) + return ExecCommandInContainerWithFullOutput(f, namespace, podName, pod.Spec.Containers[0].Name, cmd...) +} + +// ExecCommandInContainerWithFullOutput is a shameless copy/paste from the framework methods so that we can specify the pod namespace. +func ExecCommandInContainerWithFullOutput(f *framework.Framework, namespace, podName, containerName string, cmd ...string) (string, string, error) { + options := framework.ExecOptions{ + Command: cmd, + Namespace: namespace, + PodName: podName, + ContainerName: containerName, + Stdin: nil, + CaptureStdout: true, + CaptureStderr: true, + PreserveWhitespace: false, + } + return f.ExecWithOptions(options) +} + +func assertAclLogs(targetNodeName string, policyNameRegex string, expectedAclVerdict string, expectedAclSeverity string) (bool, error) { framework.Logf("collecting the ovn-controller logs for node: %s", targetNodeName) targetNodeLog, err := runCommand([]string{"docker", "exec", targetNodeName, "grep", "acl_log", ovnControllerLogPath}...) if err != nil { return false, fmt.Errorf("error accessing logs in node %s: %v", targetNodeName, err) } - composedPolicyName := fmt.Sprintf("%s_%s", namespace, policyName) - framework.Logf("Ensuring the *deny* audit log contains: '%s\", verdict=drop' AND 'severity=%s'", composedPolicyName, expectedAclSeverity) + framework.Logf("Ensuring the audit log contains: 'name=\"%s\"', 'verdict=%s' AND 'severity=%s'", policyNameRegex, expectedAclVerdict, expectedAclSeverity) for _, logLine := range strings.Split(targetNodeLog, "\n") { - if strings.Contains(logLine, fmt.Sprintf("%s\", verdict=drop", composedPolicyName)) && + matched, err := regexp.MatchString(fmt.Sprintf("name=\"%s\"", policyNameRegex), logLine) + if err != nil { + return false, err + } + if matched && + strings.Contains(logLine, fmt.Sprintf("verdict=%s", expectedAclVerdict)) && strings.Contains(logLine, fmt.Sprintf("severity=%s", expectedAclSeverity)) { return true, nil } @@ -718,11 +764,11 @@ func isDualStackCluster(nodes *v1.NodeList) bool { } // used to inject OVN specific test actions -func wrappedTestFramework(basename string) *framework.Framework{ +func wrappedTestFramework(basename string) *framework.Framework { f := framework.NewDefaultFramework(basename) // inject dumping dbs on failure ginkgo.JustAfterEach(func() { - if ! ginkgo.CurrentGinkgoTestDescription().Failed { + if !ginkgo.CurrentGinkgoTestDescription().Failed { return } @@ -734,7 +780,7 @@ func wrappedTestFramework(basename string) *framework.Framework{ dbs := []string{"ovnnb_db.db", "ovnsb_db.db"} ovsdb := "conf.db" - testName := strings.Replace(ginkgo.CurrentGinkgoTestDescription().TestText, " ", "_", -1) + testName := strings.Replace(ginkgo.CurrentGinkgoTestDescription().TestText, " ", "_", -1) logDir := fmt.Sprintf("%s/e2e-dbs/%s-%s", logLocation, testName, f.UniqueName) var args []string @@ -770,3 +816,35 @@ func wrappedTestFramework(basename string) *framework.Framework{ return f } + +// countAclLogs connects to (ovn-control-plane, ovn-worker or ovn-worker2 in kind environments) via the docker exec +// command and it greps for the string "acl_log" inside the OVN controller logs. It then checks if the line contains name= +// and if it does, it increases the counter if both the verdict and the severity for this line match what's expected. +func countAclLogs(targetNodeName string, policyNameRegex string, expectedAclVerdict string, expectedAclSeverity string) (int, error) { + count := 0 + + framework.Logf("collecting the ovn-controller logs for node: %s", targetNodeName) + targetNodeLog, err := runCommand([]string{"docker", "exec", targetNodeName, "cat", ovnControllerLogPath}...) + if err != nil { + return 0, fmt.Errorf("error accessing logs in node %s: %v", targetNodeName, err) + } + + stringToMatch := fmt.Sprintf( + ".*acl_log.*name=\"%s\".*verdict=%s.*severity=%s.*", + policyNameRegex, + expectedAclVerdict, + expectedAclSeverity) + + for _, logLine := range strings.Split(targetNodeLog, "\n") { + matched, err := regexp.MatchString(stringToMatch, logLine) + if err != nil { + return 0, err + } + if matched { + count++ + } + } + + framework.Logf("The audit log contains %d occurrences of: '%s'", count, stringToMatch) + return count, nil +} diff --git a/test/scripts/install-kind.sh b/test/scripts/install-kind.sh index 21d4b923db..c8ced50616 100755 --- a/test/scripts/install-kind.sh +++ b/test/scripts/install-kind.sh @@ -2,8 +2,8 @@ set -ex -KIND_URL=https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64 -KIND_SHA=949f81b3c30ca03a3d4effdecda04f100fa3edc07a28b19400f72ede7c5f0491 +KIND_URL=https://kind.sigs.k8s.io/dl/v0.14.0/kind-linux-amd64 +KIND_SHA=af5e8331f2165feab52ec2ae07c427c7b66f4ad044d09f253004a20252524c8b KIND_DOWNLOAD_RETRIES=5 SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"