Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions felix/bpf/conntrack/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -27,6 +28,7 @@ import (

"github.com/projectcalico/calico/felix/bpf/conntrack/cleanupv1"
"github.com/projectcalico/calico/felix/bpf/conntrack/timeouts"
v4 "github.com/projectcalico/calico/felix/bpf/conntrack/v4"
"github.com/projectcalico/calico/felix/bpf/maps"
"github.com/projectcalico/calico/felix/cachingmap"
"github.com/projectcalico/calico/felix/jitter"
Expand Down Expand Up @@ -54,6 +56,10 @@ var (
Name: "felix_bpf_conntrack_sweep_duration",
Help: "Conntrack sweep execution time (ns)",
})
conntrackGaugeMaglevTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "felix_bpf_conntrack_maglev_entries_total",
Help: "Total number of Maglev entries in conntrack table broken down by IP version, and, whether destination backend is remote (we're acting as a frontend) or local (we're the backend node).",
}, []string{"destination", "ip_family"})
dummyKeyV6 = NewKeyV6(0, net.IPv6zero, 0, net.IPv6zero, 0)
dummyKey = NewKey(0, net.IPv4zero, 0, net.IPv4zero, 0)
)
Expand All @@ -64,6 +70,7 @@ func init() {
prometheus.MustRegister(conntrackGaugeCleaned)
prometheus.MustRegister(conntrackCounterCleaned)
prometheus.MustRegister(conntrackGaugeSweepDuration)
prometheus.MustRegister(conntrackGaugeMaglevTotal)
}

// ScanVerdict represents the set of values returned by EntryScan
Expand Down Expand Up @@ -116,6 +123,10 @@ type Scanner struct {
bpfCleaner Cleaner
versionHelper ipVersionHelper
revNATKeyToFwdNATInfo map[KeyInterface]cleanupv1.ValueInterface
ipFamily int

conntrackGaugeMaglevToLocalBackend prometheus.Gauge
conntrackGaugeMaglevToRemoteBackend prometheus.Gauge

wg sync.WaitGroup
stopCh chan struct{}
Expand All @@ -142,6 +153,7 @@ func NewScanner(ctMap maps.Map, kfb func([]byte) KeyInterface, vfb func([]byte)
autoScale: strings.ToLower(autoScalingMode) == "doubleiffull",
configChangedRestartCallback: configChangedRestartCallback,
bpfCleaner: bpfCleaner,
ipFamily: ipVersion,
// revNATKeyToFwdNATInfo stores the opposite direction of the mapping of the cleanup bpf map.
// <reverseNATKey> => <forwardNATKey>:<forwardEntryTimeStamp>:<reverseEntryTimestamp>
revNATKeyToFwdNATInfo: make(map[KeyInterface]cleanupv1.ValueInterface),
Expand All @@ -162,6 +174,19 @@ func NewScanner(ctMap maps.Map, kfb func([]byte) KeyInterface, vfb func([]byte)

}
}

var err error

s.conntrackGaugeMaglevToLocalBackend, err = conntrackGaugeMaglevTotal.GetMetricWithLabelValues("local", strconv.Itoa(s.ipFamily))
if err != nil {
log.WithError(err).Panic("Couldn't init (local) Maglev conntrack metric gauge")
}

s.conntrackGaugeMaglevToRemoteBackend, err = conntrackGaugeMaglevTotal.GetMetricWithLabelValues("remote", strconv.Itoa(s.ipFamily))
if err != nil {
log.WithError(err).Panic("Couldn't get (remote) Maglev conntrack metric gauge")
}

return s
}

Expand Down Expand Up @@ -223,6 +248,7 @@ func (s *Scanner) Scan() {
used := 0
cleaned := 0
numExpired := 0
maglevEntriesToLocal, maglevEntriesToRemote := 0, 0

if s.ctCleanupMap != nil {
s.ctCleanupMap.Desired().DeleteAll()
Expand All @@ -231,6 +257,7 @@ func (s *Scanner) Scan() {
err := s.ctMap.Iter(func(k, v []byte) maps.IteratorAction {
ctKey := s.keyFromBytes(k)
ctVal := s.valueFromBytes(v)
ctFlags := ctVal.Flags()

used++
conntrackCounterCleaned.Inc()
Expand All @@ -242,6 +269,16 @@ func (s *Scanner) Scan() {
}).Debug("Examining conntrack entry")
}

if ctFlags&v4.FlagMaglev != 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should move to case ScanVerdictOK as you probably only care about active connections. This would also count connections that are expired and are going to be removed or do not have backed anymore etc.

Maybe you want to do it in the cleaner since you may not want to count connections in some FIN-wait state, where they may be for quite a while, but for the purpose of "can I remove this node" they do not matter anymore.

When @sridhartigera makes the change to send RSTs when the backend is no more, this would perhaps need to move again 🤔

Obviously, the stats will be fixed up follow up iterations when connection is deleted.

if ctFlags&v4.FlagExtLocal != 0 {
log.Debug("Conntrack is local maglev connection. Incrementing maglev entries counter")
maglevEntriesToLocal++
} else if ctFlags&v4.FlagNATNPFwd != 0 {
log.Debug("Conntrack is remote maglev connection. Incrementing maglev entries counter")
maglevEntriesToRemote++
}
}

for _, scanner := range s.scanners {
verdict, ts := scanner.Check(ctKey, ctVal, s.get)
switch verdict {
Expand Down Expand Up @@ -319,6 +356,11 @@ func (s *Scanner) Scan() {
// Run the bpf cleaner to process the remaining entries in the cleanup map.
cleaned += s.runBPFCleaner()

log.WithField("value", maglevEntriesToLocal).Debug("Setting local maglev conntrack entries gauge")
s.conntrackGaugeMaglevToLocalBackend.Set(float64(maglevEntriesToLocal))
log.WithField("value", maglevEntriesToRemote).Debug("Setting remote maglev conntrack entries gauge")
s.conntrackGaugeMaglevToRemoteBackend.Set(float64(maglevEntriesToRemote))

conntrackCounterSweeps.Inc()
conntrackGaugeUsed.Set(float64(used))
conntrackGaugeCleaned.Set(float64(cleaned))
Expand Down
62 changes: 59 additions & 3 deletions felix/fv/bpf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
options.ExtraEnvVars["FELIX_BPFLogLevel"] = fmt.Sprint(testOpts.bpfLogLevel)
options.ExtraEnvVars["FELIX_BPFConntrackLogLevel"] = fmt.Sprint(testOpts.bpfLogLevel)
options.ExtraEnvVars["FELIX_BPFProfiling"] = "Enabled"
options.ExtraEnvVars["FELIX_PrometheusMetricsEnabled"] = "true"
options.ExtraEnvVars["FELIX_PrometheusMetricsHost"] = "0.0.0.0"
if testOpts.dsr {
options.ExtraEnvVars["FELIX_BPFExternalServiceMode"] = "dsr"
}
Expand Down Expand Up @@ -1906,6 +1908,10 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
familyInt = 6
}

felixWithMaglevBackend := 0
initialIngressFelix := 1
failoverIngressFelix := 2

newConntrackKey := func(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16, family string) conntrack.KeyInterface {
var key conntrack.KeyInterface
// cmp := bytes.Compare(srcIP, dstIP)
Expand Down Expand Up @@ -1982,6 +1988,19 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
}
}

probeMaglevConntrackMetric := func(metricName string, felixes ...*infrastructure.Felix) []int {
counts := make([]int, 0)
for _, f := range felixes {
ctCount, err := f.PromMetric(metricName).Int()
if err != nil {
log.WithError(err).WithField("felix", f.Name).Warn("Error while probing Felix metric. Skipping this felix")
continue
}
counts = append(counts, ctCount)
}
return counts
}

BeforeEach(func() {
switch testOpts.protocol {
case "udp":
Expand All @@ -1995,11 +2014,24 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
}
log.WithFields(log.Fields{"number": proto, "name": testOpts.protocol}).Info("parsed protocol")

pTCP := numorstring.ProtocolFromString("tcp")
promPinhole := api.Rule{
Action: "Allow",
Protocol: &pTCP,
Destination: api.EntityRule{
Ports: []numorstring.Port{
{MinPort: 9091, MaxPort: 9091},
},
Nets: []string{},
},
}

// Create policy allowing ingress from external client
allowIngressFromExtClient := api.NewGlobalNetworkPolicy()
allowIngressFromExtClient.Namespace = "fv"
allowIngressFromExtClient.Name = "policy-ext-client"
allowIngressFromExtClient.Spec.Ingress = []api.Rule{
promPinhole,
{
Action: "Allow",
Source: api.EntityRule{
Expand All @@ -2015,11 +2047,12 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
allowIngressFromExtClient = createPolicy(allowIngressFromExtClient)

// Create service with maglev annotation
testSvc = k8sServiceWithExtIP(testSvcName, clusterIP, w[0][0], 80, tgtPort, 0,
testSvc = k8sServiceWithExtIP(testSvcName, clusterIP, w[felixWithMaglevBackend][0], 80, tgtPort, 0,
testOpts.protocol, []string{externalIP})
testSvc.Annotations = map[string]string{
"lb.projectcalico.org/external-traffic-strategy": "maglev",
}

testSvcNamespace = testSvc.Namespace
_, err := k8sClient.CoreV1().Services(testSvcNamespace).Create(context.Background(), testSvc, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -2071,16 +2104,39 @@ func describeBPFTests(opts ...bpfTestOpt) bool {
cmdCleanRt = append(ipRoute, "route", "del", externalIP)
_ = externalClient.ExecMayFail(strings.Join(cmdCleanRt, ""))

cmdCIP := append(ipRoute, "route", "add", clusterIP, "via", felixIP(1))
cmdCIP := append(ipRoute, "route", "add", clusterIP, "via", felixIP(initialIngressFelix))
externalClient.Exec(cmdCIP...)
cmdEIP := append(ipRoute, "route", "add", externalIP, "via", felixIP(1))
cmdEIP := append(ipRoute, "route", "add", externalIP, "via", felixIP(initialIngressFelix))
externalClient.Exec(cmdEIP...)
})

It("should have connectivity from external client to maglev backend via cluster IP and external IP", func() {
probeMaglevLocalConntrackMetricFunc := func(felixes ...*infrastructure.Felix) func() []int {
return func() []int {
return probeMaglevConntrackMetric(fmt.Sprintf("felix_bpf_conntrack_maglev_entries_total{destination=\"local\",ip_family=\"%d\"}", familyInt), felixes...)
}
}
probeMaglevRemoteConntrackMetricFunc := func(felixes ...*infrastructure.Felix) func() []int {
return func() []int {
return probeMaglevConntrackMetric(fmt.Sprintf("felix_bpf_conntrack_maglev_entries_total{destination=\"remote\",ip_family=\"%d\"}", familyInt), felixes...)
}
}

Eventually(probeMaglevLocalConntrackMetricFunc(tc.Felixes...), "10s", "1s").Should(Equal([]int{0, 0, 0}), "Expected maglev local-conntrack metric to start at 0 for all Felixes")
Eventually(probeMaglevRemoteConntrackMetricFunc(tc.Felixes...), "10s", "1s").Should(Equal([]int{0, 0, 0}), "Expected maglev remote-conntrack metric to start at 0 for all Felixes")

cc.ExpectSome(externalClient, TargetIP(clusterIP), port)
cc.ExpectSome(externalClient, TargetIP(externalIP), port)
cc.CheckConnectivity()

// There is a 10-second interval between iterations of Felix's conntrack scanner (where we export the maglev conntrack metrics).
// This means we must be very pessimistic about timeouts when searching for the prom values we're after.
Eventually(probeMaglevRemoteConntrackMetricFunc(tc.Felixes[initialIngressFelix]), "12s", "1s").Should(Equal([]int{2}), "Expected maglev-ingress felix to increment the remote-conntracks metric")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 2 guaranteed to be correct? I guess, even if there was a retry, maglev would choose same backend and get same conntrack?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe two is guaranteed, yep. One for cluster IP, one for external IP.

Eventually(probeMaglevLocalConntrackMetricFunc(tc.Felixes[felixWithMaglevBackend]), "12s", "1s").Should(Equal([]int{2}), "Expected felix with maglev backend to increment the local-conntracks metric")
Consistently(probeMaglevLocalConntrackMetricFunc(tc.Felixes[initialIngressFelix])).Should(Equal([]int{0}), "Expected ingress-felix to only have remote maglev conntracks, but saw metric for local maglev conntracks go up")
Consistently(probeMaglevRemoteConntrackMetricFunc(tc.Felixes[felixWithMaglevBackend])).Should(Equal([]int{0}), "Expected backing felix to only have local maglev conntracks, but saw metric for remote maglev conntracks go up")
Consistently(probeMaglevLocalConntrackMetricFunc(tc.Felixes[failoverIngressFelix])).Should(Equal([]int{0}), "No failover occurred, but an unrelated Felix's local maglev prom metrics went up")
Consistently(probeMaglevRemoteConntrackMetricFunc(tc.Felixes[failoverIngressFelix])).Should(Equal([]int{0}), "No failover occurred, but an unrelated Felix's remote maglev prom metrics went up")
})

testFailover := func(serviceIP string) {
Expand Down