Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions felix/bpf/conntrack/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -27,6 +28,7 @@ import (

"github.com/projectcalico/calico/felix/bpf/conntrack/cleanupv1"
"github.com/projectcalico/calico/felix/bpf/conntrack/timeouts"
v4 "github.com/projectcalico/calico/felix/bpf/conntrack/v4"
"github.com/projectcalico/calico/felix/bpf/maps"
"github.com/projectcalico/calico/felix/cachingmap"
"github.com/projectcalico/calico/felix/jitter"
Expand Down Expand Up @@ -54,6 +56,10 @@ var (
Name: "felix_bpf_conntrack_sweep_duration",
Help: "Conntrack sweep execution time (ns)",
})
conntrackGaugeMaglevTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "felix_bpf_conntrack_maglev_entries_total",
Help: "Total number of Maglev entries in conntrack table.",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Help: "Total number of Maglev entries in conntrack table.",
Help: "Total number of Maglev entries in conntrack table broken down by IP version, and, whether destination backend is remote (we're acting as a frontend) or local (we're the backend node).",

}, []string{"destination", "ip_family"})
dummyKeyV6 = NewKeyV6(0, net.IPv6zero, 0, net.IPv6zero, 0)
dummyKey = NewKey(0, net.IPv4zero, 0, net.IPv4zero, 0)
)
Expand All @@ -64,6 +70,7 @@ func init() {
prometheus.MustRegister(conntrackGaugeCleaned)
prometheus.MustRegister(conntrackCounterCleaned)
prometheus.MustRegister(conntrackGaugeSweepDuration)
prometheus.MustRegister(conntrackGaugeMaglevTotal)
}

// ScanVerdict represents the set of values returned by EntryScan
Expand Down Expand Up @@ -116,6 +123,7 @@ type Scanner struct {
bpfCleaner Cleaner
versionHelper ipVersionHelper
revNATKeyToFwdNATInfo map[KeyInterface]cleanupv1.ValueInterface
ipFamily int

wg sync.WaitGroup
stopCh chan struct{}
Expand All @@ -142,6 +150,7 @@ func NewScanner(ctMap maps.Map, kfb func([]byte) KeyInterface, vfb func([]byte)
autoScale: strings.ToLower(autoScalingMode) == "doubleiffull",
configChangedRestartCallback: configChangedRestartCallback,
bpfCleaner: bpfCleaner,
ipFamily: ipVersion,
// revNATKeyToFwdNATInfo stores the opposite direction of the mapping of the cleanup bpf map.
// <reverseNATKey> => <forwardNATKey>:<forwardEntryTimeStamp>:<reverseEntryTimestamp>
revNATKeyToFwdNATInfo: make(map[KeyInterface]cleanupv1.ValueInterface),
Expand Down Expand Up @@ -223,6 +232,7 @@ func (s *Scanner) Scan() {
used := 0
cleaned := 0
numExpired := 0
maglevEntriesToLocal, maglevEntriesToRemote := 0, 0

if s.ctCleanupMap != nil {
s.ctCleanupMap.Desired().DeleteAll()
Expand All @@ -231,10 +241,19 @@ func (s *Scanner) Scan() {
err := s.ctMap.Iter(func(k, v []byte) maps.IteratorAction {
ctKey := s.keyFromBytes(k)
ctVal := s.valueFromBytes(v)
ctFlags := ctVal.Flags()

used++
conntrackCounterCleaned.Inc()

if ctFlags&v4.FlagMaglev != 0 {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should move to case ScanVerdictOK as you probably only care about active connections. This would also count connections that are expired and are going to be removed or do not have backed anymore etc.

Maybe you want to do it in the cleaner since you may not want to count connections in some FIN-wait state, where they may be for quite a while, but for the purpose of "can I remove this node" they do not matter anymore.

When @sridhartigera makes the change to send RSTs when the backend is no more, this would perhaps need to move again 🤔

Obviously, the stats will be fixed up follow up iterations when connection is deleted.

if ctFlags&v4.FlagExtLocal != 0 {
maglevEntriesToLocal++
} else if ctFlags&v4.FlagNATNPFwd != 0 {
maglevEntriesToRemote++
}
}

if debug {
log.WithFields(log.Fields{
"key": ctKey,
Expand Down Expand Up @@ -319,6 +338,20 @@ func (s *Scanner) Scan() {
// Run the bpf cleaner to process the remaining entries in the cleanup map.
cleaned += s.runBPFCleaner()

maglevConntracksToLocalBackend, err := conntrackGaugeMaglevTotal.GetMetricWithLabelValues("local", strconv.Itoa(s.ipFamily))
if err != nil {
log.WithError(err).Warn("Couldn't get (local) Maglev conntracks metric, will not update it on this iteration")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really something we should handle; it generally indicates a coding bug (e.g. passing two labels when only one is defined). It's a minor thing, but I'd do the GetMetricWithLabelValues at init time and store off the sub-gauge. Then panic if it fails. That'll fail fast in UT/FV if we break the label set-up.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly I don't think it's viable to initialise the gauges in the init() method along with the others, because we need to discern between IP versions, which I'm getting from the Scanner state.

I can init them when the Scanner is initialised, but will also need to add a mutex to sync access, since many places call Scan(), where the gauges are set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, this Scan method gets called very early on in Felix's lifecycle, so I could just change these logs to a Panic?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you init them when a particular version of scanner is created ans store in fields? That's a common pattern

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - but will need locks I think? Because a few different goroutines call Scan

} else {
maglevConntracksToLocalBackend.Set(float64(maglevEntriesToLocal))
}

maglevConntracksToRemoteBackend, err := conntrackGaugeMaglevTotal.GetMetricWithLabelValues("remote", strconv.Itoa(s.ipFamily))
if err != nil {
log.WithError(err).Warn("Couldn't get (remote) Maglev conntracks metric, will not update it on this iteration")
} else {
maglevConntracksToRemoteBackend.Set(float64(maglevEntriesToRemote))
}

conntrackCounterSweeps.Inc()
conntrackGaugeUsed.Set(float64(used))
conntrackGaugeCleaned.Set(float64(cleaned))
Expand Down
42 changes: 42 additions & 0 deletions felix/fv/bpf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
options.ExtraEnvVars["FELIX_BPFLogLevel"] = fmt.Sprint(testOpts.bpfLogLevel)
options.ExtraEnvVars["FELIX_BPFConntrackLogLevel"] = fmt.Sprint(testOpts.bpfLogLevel)
options.ExtraEnvVars["FELIX_BPFProfiling"] = "Enabled"
options.ExtraEnvVars["FELIX_PrometheusMetricsEnabled"] = "true"
options.ExtraEnvVars["FELIX_PrometheusMetricsHost"] = "0.0.0.0"
if testOpts.dsr {
options.ExtraEnvVars["FELIX_BPFExternalServiceMode"] = "dsr"
}
Expand Down Expand Up @@ -1982,6 +1984,19 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
}
}

probeMaglevConntrackMetric := func(felixes []*infrastructure.Felix, metricName string) []int {
counts := make([]int, 0)
for _, f := range felixes {
ctCount, err := f.PromMetric(metricName).Int()
if err != nil {
log.WithError(err).WithField("felix", f.Name).Warn("Error while probing Felix metric. Skipping this felix")
continue
}
counts = append(counts, ctCount)
}
return counts
}

BeforeEach(func() {
switch testOpts.protocol {
case "udp":
Expand All @@ -1995,11 +2010,24 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
}
log.WithFields(log.Fields{"number": proto, "name": testOpts.protocol}).Info("parsed protocol")

pTCP := numorstring.ProtocolFromString("tcp")
promPinhole := api.Rule{
Action: "Allow",
Protocol: &pTCP,
Destination: api.EntityRule{
Ports: []numorstring.Port{
{MinPort: 9091, MaxPort: 9091},
},
Nets: []string{},
},
}

// Create policy allowing ingress from external client
allowIngressFromExtClient := api.NewGlobalNetworkPolicy()
allowIngressFromExtClient.Namespace = "fv"
allowIngressFromExtClient.Name = "policy-ext-client"
allowIngressFromExtClient.Spec.Ingress = []api.Rule{
promPinhole,
{
Action: "Allow",
Source: api.EntityRule{
Expand Down Expand Up @@ -2078,9 +2106,23 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
})

It("should have connectivity from external client to maglev backend via cluster IP and external IP", func() {
Eventually(func() []int {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think testing this in the scanner/cleaner UT would be enough and more practical.

return probeMaglevConntrackMetric(tc.Felixes, fmt.Sprintf("felix_bpf_conntrack_maglev_entries_total{destination=\"local\",ip_family=\"%d\"}", familyInt))
}, "10s", "1s").Should(BeEquivalentTo([]int{0, 0, 0}), "Expected maglev local-conntrack metric to start at 0")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}, "10s", "1s").Should(BeEquivalentTo([]int{0, 0, 0}), "Expected maglev local-conntrack metric to start at 0")
}, "10s", "1s").Should(Equal([]int{0, 0, 0}), "Expected maglev local-conntrack metric to start at 0")

I don't think you need the extra power of BeEquivalentTo here, so best to use the less surprising matcher.

Eventually(func() []int {
return probeMaglevConntrackMetric(tc.Felixes, fmt.Sprintf("felix_bpf_conntrack_maglev_entries_total{destination=\"remote\",ip_family=\"%d\"}", familyInt))
}, "10s", "1s").Should(BeEquivalentTo([]int{0, 0, 0}), "Expected maglev remote-conntrack metric to start at 0")

cc.ExpectSome(externalClient, TargetIP(clusterIP), port)
cc.ExpectSome(externalClient, TargetIP(externalIP), port)
cc.CheckConnectivity()

Eventually(func() []int {
return probeMaglevConntrackMetric(tc.Felixes, fmt.Sprintf("felix_bpf_conntrack_maglev_entries_total{destination=\"local\",ip_family=\"%d\"}", familyInt))
}, "10s").ShouldNot(BeEquivalentTo([]int{0, 0, 0}), "Expected maglev local-conntrack metric to have increased")
Eventually(func() []int {
return probeMaglevConntrackMetric(tc.Felixes, fmt.Sprintf("felix_bpf_conntrack_maglev_entries_total{destination=\"remote\",ip_family=\"%d\"}", familyInt))
}, "10s", "1s").ShouldNot(BeEquivalentTo([]int{0, 0, 0}), "Expected maglev remote-conntrack metric to have increased")
})

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, it'd be good to test:

  • That the counters increase instead of going negative or something
  • That the ingress node showed a positive remote value
  • That the backend node showed a positive local value.
  • Rest show zero.


testFailover := func(serviceIP string) {
Expand Down