diff --git a/felix/bpf/conntrack/scanner.go b/felix/bpf/conntrack/scanner.go index 2e73fbef2f9..c0d518eaac9 100644 --- a/felix/bpf/conntrack/scanner.go +++ b/felix/bpf/conntrack/scanner.go @@ -18,6 +18,7 @@ import ( "fmt" "net" "os" + "strconv" "strings" "sync" "time" @@ -27,6 +28,7 @@ import ( "github.com/projectcalico/calico/felix/bpf/conntrack/cleanupv1" "github.com/projectcalico/calico/felix/bpf/conntrack/timeouts" + v4 "github.com/projectcalico/calico/felix/bpf/conntrack/v4" "github.com/projectcalico/calico/felix/bpf/maps" "github.com/projectcalico/calico/felix/cachingmap" "github.com/projectcalico/calico/felix/jitter" @@ -54,6 +56,10 @@ var ( Name: "felix_bpf_conntrack_sweep_duration", Help: "Conntrack sweep execution time (ns)", }) + conntrackGaugeMaglevTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "felix_bpf_conntrack_maglev_entries_total", + Help: "Total number of Maglev entries in conntrack table broken down by IP version, and, whether destination backend is remote (we're acting as a frontend) or local (we're the backend node).", + }, []string{"destination", "ip_family"}) dummyKeyV6 = NewKeyV6(0, net.IPv6zero, 0, net.IPv6zero, 0) dummyKey = NewKey(0, net.IPv4zero, 0, net.IPv4zero, 0) ) @@ -64,6 +70,7 @@ func init() { prometheus.MustRegister(conntrackGaugeCleaned) prometheus.MustRegister(conntrackCounterCleaned) prometheus.MustRegister(conntrackGaugeSweepDuration) + prometheus.MustRegister(conntrackGaugeMaglevTotal) } // ScanVerdict represents the set of values returned by EntryScan @@ -116,6 +123,10 @@ type Scanner struct { bpfCleaner Cleaner versionHelper ipVersionHelper revNATKeyToFwdNATInfo map[KeyInterface]cleanupv1.ValueInterface + ipFamily int + + conntrackGaugeMaglevToLocalBackend prometheus.Gauge + conntrackGaugeMaglevToRemoteBackend prometheus.Gauge wg sync.WaitGroup stopCh chan struct{} @@ -142,6 +153,7 @@ func NewScanner(ctMap maps.Map, kfb func([]byte) KeyInterface, vfb func([]byte) autoScale: strings.ToLower(autoScalingMode) == "doubleiffull", configChangedRestartCallback: configChangedRestartCallback, bpfCleaner: bpfCleaner, + ipFamily: ipVersion, // revNATKeyToFwdNATInfo stores the opposite direction of the mapping of the cleanup bpf map. // => :: revNATKeyToFwdNATInfo: make(map[KeyInterface]cleanupv1.ValueInterface), @@ -162,6 +174,19 @@ func NewScanner(ctMap maps.Map, kfb func([]byte) KeyInterface, vfb func([]byte) } } + + var err error + + s.conntrackGaugeMaglevToLocalBackend, err = conntrackGaugeMaglevTotal.GetMetricWithLabelValues("local", strconv.Itoa(s.ipFamily)) + if err != nil { + log.WithError(err).Panic("Couldn't init (local) Maglev conntrack metric gauge") + } + + s.conntrackGaugeMaglevToRemoteBackend, err = conntrackGaugeMaglevTotal.GetMetricWithLabelValues("remote", strconv.Itoa(s.ipFamily)) + if err != nil { + log.WithError(err).Panic("Couldn't get (remote) Maglev conntrack metric gauge") + } + return s } @@ -223,6 +248,7 @@ func (s *Scanner) Scan() { used := 0 cleaned := 0 numExpired := 0 + maglevEntriesToLocal, maglevEntriesToRemote := 0, 0 if s.ctCleanupMap != nil { s.ctCleanupMap.Desired().DeleteAll() @@ -231,6 +257,7 @@ func (s *Scanner) Scan() { err := s.ctMap.Iter(func(k, v []byte) maps.IteratorAction { ctKey := s.keyFromBytes(k) ctVal := s.valueFromBytes(v) + ctFlags := ctVal.Flags() used++ conntrackCounterCleaned.Inc() @@ -242,6 +269,16 @@ func (s *Scanner) Scan() { }).Debug("Examining conntrack entry") } + if ctFlags&v4.FlagMaglev != 0 { + if ctFlags&v4.FlagExtLocal != 0 { + log.Debug("Conntrack is local maglev connection. Incrementing maglev entries counter") + maglevEntriesToLocal++ + } else if ctFlags&v4.FlagNATNPFwd != 0 { + log.Debug("Conntrack is remote maglev connection. Incrementing maglev entries counter") + maglevEntriesToRemote++ + } + } + for _, scanner := range s.scanners { verdict, ts := scanner.Check(ctKey, ctVal, s.get) switch verdict { @@ -319,6 +356,11 @@ func (s *Scanner) Scan() { // Run the bpf cleaner to process the remaining entries in the cleanup map. cleaned += s.runBPFCleaner() + log.WithField("value", maglevEntriesToLocal).Debug("Setting local maglev conntrack entries gauge") + s.conntrackGaugeMaglevToLocalBackend.Set(float64(maglevEntriesToLocal)) + log.WithField("value", maglevEntriesToRemote).Debug("Setting remote maglev conntrack entries gauge") + s.conntrackGaugeMaglevToRemoteBackend.Set(float64(maglevEntriesToRemote)) + conntrackCounterSweeps.Inc() conntrackGaugeUsed.Set(float64(used)) conntrackGaugeCleaned.Set(float64(cleaned)) diff --git a/felix/fv/bpf_test.go b/felix/fv/bpf_test.go index 2920c008a14..440c7824f8d 100644 --- a/felix/fv/bpf_test.go +++ b/felix/fv/bpf_test.go @@ -399,6 +399,8 @@ func describeBPFTests(opts ...bpfTestOpt) bool { options.ExtraEnvVars["FELIX_BPFLogLevel"] = fmt.Sprint(testOpts.bpfLogLevel) options.ExtraEnvVars["FELIX_BPFConntrackLogLevel"] = fmt.Sprint(testOpts.bpfLogLevel) options.ExtraEnvVars["FELIX_BPFProfiling"] = "Enabled" + options.ExtraEnvVars["FELIX_PrometheusMetricsEnabled"] = "true" + options.ExtraEnvVars["FELIX_PrometheusMetricsHost"] = "0.0.0.0" if testOpts.dsr { options.ExtraEnvVars["FELIX_BPFExternalServiceMode"] = "dsr" } @@ -1906,6 +1908,10 @@ func describeBPFTests(opts ...bpfTestOpt) bool { familyInt = 6 } + felixWithMaglevBackend := 0 + initialIngressFelix := 1 + failoverIngressFelix := 2 + newConntrackKey := func(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16, family string) conntrack.KeyInterface { var key conntrack.KeyInterface // cmp := bytes.Compare(srcIP, dstIP) @@ -1982,6 +1988,19 @@ func describeBPFTests(opts ...bpfTestOpt) bool { } } + probeMaglevConntrackMetric := func(metricName string, felixes ...*infrastructure.Felix) []int { + counts := make([]int, 0) + for _, f := range felixes { + ctCount, err := f.PromMetric(metricName).Int() + if err != nil { + log.WithError(err).WithField("felix", f.Name).Warn("Error while probing Felix metric. Skipping this felix") + continue + } + counts = append(counts, ctCount) + } + return counts + } + BeforeEach(func() { switch testOpts.protocol { case "udp": @@ -1995,11 +2014,24 @@ func describeBPFTests(opts ...bpfTestOpt) bool { } log.WithFields(log.Fields{"number": proto, "name": testOpts.protocol}).Info("parsed protocol") + pTCP := numorstring.ProtocolFromString("tcp") + promPinhole := api.Rule{ + Action: "Allow", + Protocol: &pTCP, + Destination: api.EntityRule{ + Ports: []numorstring.Port{ + {MinPort: 9091, MaxPort: 9091}, + }, + Nets: []string{}, + }, + } + // Create policy allowing ingress from external client allowIngressFromExtClient := api.NewGlobalNetworkPolicy() allowIngressFromExtClient.Namespace = "fv" allowIngressFromExtClient.Name = "policy-ext-client" allowIngressFromExtClient.Spec.Ingress = []api.Rule{ + promPinhole, { Action: "Allow", Source: api.EntityRule{ @@ -2015,11 +2047,12 @@ func describeBPFTests(opts ...bpfTestOpt) bool { allowIngressFromExtClient = createPolicy(allowIngressFromExtClient) // Create service with maglev annotation - testSvc = k8sServiceWithExtIP(testSvcName, clusterIP, w[0][0], 80, tgtPort, 0, + testSvc = k8sServiceWithExtIP(testSvcName, clusterIP, w[felixWithMaglevBackend][0], 80, tgtPort, 0, testOpts.protocol, []string{externalIP}) testSvc.Annotations = map[string]string{ "lb.projectcalico.org/external-traffic-strategy": "maglev", } + testSvcNamespace = testSvc.Namespace _, err := k8sClient.CoreV1().Services(testSvcNamespace).Create(context.Background(), testSvc, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) @@ -2071,16 +2104,39 @@ func describeBPFTests(opts ...bpfTestOpt) bool { cmdCleanRt = append(ipRoute, "route", "del", externalIP) _ = externalClient.ExecMayFail(strings.Join(cmdCleanRt, "")) - cmdCIP := append(ipRoute, "route", "add", clusterIP, "via", felixIP(1)) + cmdCIP := append(ipRoute, "route", "add", clusterIP, "via", felixIP(initialIngressFelix)) externalClient.Exec(cmdCIP...) - cmdEIP := append(ipRoute, "route", "add", externalIP, "via", felixIP(1)) + cmdEIP := append(ipRoute, "route", "add", externalIP, "via", felixIP(initialIngressFelix)) externalClient.Exec(cmdEIP...) }) It("should have connectivity from external client to maglev backend via cluster IP and external IP", func() { + probeMaglevLocalConntrackMetricFunc := func(felixes ...*infrastructure.Felix) func() []int { + return func() []int { + return probeMaglevConntrackMetric(fmt.Sprintf("felix_bpf_conntrack_maglev_entries_total{destination=\"local\",ip_family=\"%d\"}", familyInt), felixes...) + } + } + probeMaglevRemoteConntrackMetricFunc := func(felixes ...*infrastructure.Felix) func() []int { + return func() []int { + return probeMaglevConntrackMetric(fmt.Sprintf("felix_bpf_conntrack_maglev_entries_total{destination=\"remote\",ip_family=\"%d\"}", familyInt), felixes...) + } + } + + Eventually(probeMaglevLocalConntrackMetricFunc(tc.Felixes...), "10s", "1s").Should(Equal([]int{0, 0, 0}), "Expected maglev local-conntrack metric to start at 0 for all Felixes") + Eventually(probeMaglevRemoteConntrackMetricFunc(tc.Felixes...), "10s", "1s").Should(Equal([]int{0, 0, 0}), "Expected maglev remote-conntrack metric to start at 0 for all Felixes") + cc.ExpectSome(externalClient, TargetIP(clusterIP), port) cc.ExpectSome(externalClient, TargetIP(externalIP), port) cc.CheckConnectivity() + + // There is a 10-second interval between iterations of Felix's conntrack scanner (where we export the maglev conntrack metrics). + // This means we must be very pessimistic about timeouts when searching for the prom values we're after. + Eventually(probeMaglevRemoteConntrackMetricFunc(tc.Felixes[initialIngressFelix]), "12s", "1s").Should(Equal([]int{2}), "Expected maglev-ingress felix to increment the remote-conntracks metric") + Eventually(probeMaglevLocalConntrackMetricFunc(tc.Felixes[felixWithMaglevBackend]), "12s", "1s").Should(Equal([]int{2}), "Expected felix with maglev backend to increment the local-conntracks metric") + Consistently(probeMaglevLocalConntrackMetricFunc(tc.Felixes[initialIngressFelix])).Should(Equal([]int{0}), "Expected ingress-felix to only have remote maglev conntracks, but saw metric for local maglev conntracks go up") + Consistently(probeMaglevRemoteConntrackMetricFunc(tc.Felixes[felixWithMaglevBackend])).Should(Equal([]int{0}), "Expected backing felix to only have local maglev conntracks, but saw metric for remote maglev conntracks go up") + Consistently(probeMaglevLocalConntrackMetricFunc(tc.Felixes[failoverIngressFelix])).Should(Equal([]int{0}), "No failover occurred, but an unrelated Felix's local maglev prom metrics went up") + Consistently(probeMaglevRemoteConntrackMetricFunc(tc.Felixes[failoverIngressFelix])).Should(Equal([]int{0}), "No failover occurred, but an unrelated Felix's remote maglev prom metrics went up") }) testFailover := func(serviceIP string) {