diff --git a/felix/bpf-gpl/Makefile b/felix/bpf-gpl/Makefile index d6c0df1c4e8..1ec6a0c0462 100644 --- a/felix/bpf-gpl/Makefile +++ b/felix/bpf-gpl/Makefile @@ -1,5 +1,5 @@ # Project Calico BPF dataplane build scripts. -# Copyright (c) 2020-2022 Tigera, Inc. All rights reserved. +# Copyright (c) 2020-2026 Tigera, Inc. All rights reserved. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later # Disable implicit rules. @@ -58,6 +58,7 @@ LD := llc UT_C_FILES:=$(shell find ut -name '*.c') UT_OBJS:=$(UT_C_FILES:.c=.o) $(shell ./list-ut-objs) UT_OBJS+=ut/ip_parse_test_v6.o +UT_OBJS+=ut/tcp_rst_v6.o XDP_MAP_HEADERS := jump.h COMMON_MAP_HEADERS := counters.h ifstate.h perf_types.h profiling.h rule_counters.h qos.h ctlb_map.h $(XDP_MAP_HEADERS) @@ -220,6 +221,10 @@ ut/ip_parse_test_v6.ll: ut/ip_parse_test.c $(CC) $(UT_CFLAGS) $(CFLAGS) -DIPVER6 -c $< -o $@ ut/ip_parse_test_v6.o: ut/ip_parse_test_v6.ll $(LINK) +ut/tcp_rst_v6.ll: ut/tcp_rst.c + $(CC) $(UT_CFLAGS) $(CFLAGS) -DIPVER6 -c $< -o $@ +ut/tcp_rst_v6.o: ut/tcp_rst_v6.ll + $(LINK) %_v4.ll: %.c %.d calculate-flags $(COMPILE) diff --git a/felix/bpf-gpl/conntrack_types.h b/felix/bpf-gpl/conntrack_types.h index b82924d2896..13027b66860 100644 --- a/felix/bpf-gpl/conntrack_types.h +++ b/felix/bpf-gpl/conntrack_types.h @@ -41,6 +41,7 @@ enum cali_ct_type { #define CALI_CT_FLAG_SKIP_REDIR_PEER 0x4000 /* marks connections from a client which is excluded from redir */ #define CALI_CT_FLAG_SET_DSCP 0x8000 /* marks connections that needs to set DSCP */ #define CALI_CT_FLAG_MAGLEV 0X10000 /* marks Maglev connections. Allows packets of an existing to arrive via a different tunnel after failover. */ +#define CALI_CT_FLAG_SEND_RESET 0x20000 /* marks connections where we should send a TCP RST on behalf of the workload */ struct calico_ct_leg { __u64 bytes; diff --git a/felix/bpf-gpl/jump.h b/felix/bpf-gpl/jump.h index c6926f7edfc..ac7ffb2c44b 100644 --- a/felix/bpf-gpl/jump.h +++ b/felix/bpf-gpl/jump.h @@ -96,6 +96,7 @@ enum cali_jump_index { PROG_INDEX_NEW_FLOW, PROG_INDEX_IP_FRAG, PROG_INDEX_MAGLEV, + PROG_INDEX_TCP_RST, PROG_INDEX_MAIN_DEBUG, PROG_INDEX_POLICY_DEBUG, @@ -107,6 +108,7 @@ enum cali_jump_index { PROG_INDEX_NEW_FLOW_DEBUG, PROG_INDEX_IP_FRAG_DEBUG, PROG_INDEX_MAGLEV_DEBUG, + PROG_INDEX_TCP_RST_DEBUG, }; #if CALI_F_XDP diff --git a/felix/bpf-gpl/tc.c b/felix/bpf-gpl/tc.c index e43ddfb0d95..d4d8ef74cf7 100644 --- a/felix/bpf-gpl/tc.c +++ b/felix/bpf-gpl/tc.c @@ -1,5 +1,5 @@ // Project Calico BPF dataplane programs. -// Copyright (c) 2020-2025 Tigera, Inc. All rights reserved. +// Copyright (c) 2020-2026 Tigera, Inc. All rights reserved. // SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later #include @@ -60,6 +60,9 @@ #ifndef IPVER6 #include "ip_v4_fragment.h" +#include "tcp4.h" +#else +#include "tcp6.h" #endif #define HAS_HOST_CONFLICT_PROG CALI_F_TO_HEP @@ -446,6 +449,13 @@ static CALI_BPF_INLINE void calico_tc_process_ct_lookup(struct cali_tc_ctx *ctx) ctx->state->flags |= CALI_ST_SKIP_FIB; } CALI_DEBUG("CT Hit"); + /* Check for TCP RST injection */ + if (CALI_F_TO_HOST && ctx->state->ct_result.flags & CALI_CT_FLAG_SEND_RESET) { + CALI_DEBUG("Sending TCP RST due to CT state"); + ctx->state->ct_result.ifindex_fwd = CT_INVALID_IFINDEX; + CALI_JUMP_TO(ctx, PROG_INDEX_TCP_RST); + goto deny; + } if (ctx->state->ip_proto == IPPROTO_TCP && ct_result_is_syn(ctx->state->ct_result.rc)) { CALI_DEBUG("Forcing policy on SYN"); @@ -1937,6 +1947,39 @@ int calico_tc_skb_icmp_inner_nat(struct __sk_buff *skb) return TC_ACT_SHOT; } +SEC("tc") +int calico_tc_skb_send_tcp_rst(struct __sk_buff *skb) +{ + /* Initialise the context, which is stored on the stack, and the state, which + * we use to pass data from one program to the next via tail calls. */ + DECLARE_TC_CTX(_ctx, + .skb = skb, + ); + struct cali_tc_ctx *ctx = &_ctx; + int ret = 0; +#ifndef IPVER6 + ret = tcp_v4_rst(ctx); +#else + ret = tcp_v6_rst(ctx); +#endif + + CALI_DEBUG("Entering calico_tc_skb_send_tcp_rst"); + if (ret) { + ctx->state->fwd.res = TC_ACT_SHOT; + } else { + fwd_fib_set(&ctx->state->fwd, true); + } + + if (skb_refresh_validate_ptrs(ctx, TCP_SIZE)) { + deny_reason(ctx, CALI_REASON_SHORT); + CALI_DEBUG("Too short"); + return TC_ACT_SHOT; + } + + tc_state_fill_from_iphdr(ctx); + return forward_or_drop(ctx); +} + SEC("tc") int calico_tc_skb_send_icmp_replies(struct __sk_buff *skb) diff --git a/felix/bpf-gpl/tcp4.h b/felix/bpf-gpl/tcp4.h new file mode 100644 index 00000000000..6b954c54762 --- /dev/null +++ b/felix/bpf-gpl/tcp4.h @@ -0,0 +1,84 @@ +// Project Calico BPF dataplane programs. +// Copyright (c) 2020-2026 Tigera, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later + +#ifndef __CALI_TCP4_H__ +#define __CALI_TCP4_H__ + +#include +#include + +#include "bpf.h" +#include "log.h" +#include "skb.h" + +static CALI_BPF_INLINE int tcp_v4_rst(struct cali_tc_ctx *ctx) { + if (skb_refresh_validate_ptrs(ctx, TCP_SIZE)) { + deny_reason(ctx, CALI_REASON_SHORT); + CALI_DEBUG("TCP reset : too short"); + return -1; + } + struct iphdr ip_orig = *ip_hdr(ctx); + struct tcphdr th_orig = *tcp_hdr(ctx); + int original_len = ctx->skb->len; + + /* Trim to minimum size */ + __u32 len = skb_iphdr_offset(ctx) + IP_SIZE + TCP_SIZE /* max IP len */; + int err = bpf_skb_change_tail(ctx->skb, len, 0); + if (err) { + CALI_DEBUG("tcp reset reply: bpf_skb_change_tail (len=%d) failed (err=%d)", len, err); + return -1; + } + + /* Revalidate all pointers */ + if (skb_refresh_validate_ptrs(ctx, TCP_SIZE)) { + deny_reason(ctx, CALI_REASON_SHORT); + CALI_DEBUG("TCP reset : too short"); + return -1; + } + ip_hdr(ctx)->version = 4; + ip_hdr(ctx)->ihl = 5; + ip_hdr(ctx)->tos = 0; + ip_hdr(ctx)->ttl = 64; + ip_hdr(ctx)->protocol = IPPROTO_TCP; + ip_hdr(ctx)->saddr = ip_orig.daddr; + ip_hdr(ctx)->daddr = ip_orig.saddr; + ip_hdr(ctx)->check = 0; + ip_hdr(ctx)->tot_len = bpf_htons(len - (CALI_F_L3_DEV ? 0 : ETH_SIZE)); + ctx->ipheader_len = 20; + + struct tcphdr *th = ((void *)ip_hdr(ctx)) + IP_SIZE; + __builtin_memset(th, 0, sizeof(struct tcphdr)); + th->source = th_orig.dest; + th->dest = th_orig.source; + th->rst = 1; + th->doff = sizeof(struct tcphdr) / 4; + th->seq = 0; + + if (th_orig.ack) { + th->seq = th_orig.ack_seq; + } else { + th->ack_seq = bpf_htonl(bpf_ntohl(th_orig.seq) + th_orig.syn + th_orig.fin + + original_len - (th_orig.doff << 2)); + th->ack = 1; + } + th->check = 0; + + __wsum ip_csum = bpf_csum_diff(0, 0, ctx->ip_header, sizeof(struct iphdr), 0); + __wsum tcp_csum = bpf_csum_diff(0, 0, (__u32 *)th, len - sizeof(struct iphdr) - skb_iphdr_offset(ctx), 0); + if (bpf_l3_csum_replace(ctx->skb, + skb_iphdr_offset(ctx) + offsetof(struct iphdr, check), 0, ip_csum, 0)) { + CALI_DEBUG("TCP reset v4 reply: set ip csum failed"); + return -1; + } + + err = bpf_l4_csum_replace(ctx->skb, skb_l4hdr_offset(ctx) + + offsetof(struct tcphdr, check), 0, tcp_csum, BPF_F_PSEUDO_HDR); + if (err) { + CALI_DEBUG("TCP reset v4 reply: set tcp csum failed %d", err); + return -1; + } + return 0; +} + +#endif /* __CALI_TCP4_H__ */ diff --git a/felix/bpf-gpl/tcp6.h b/felix/bpf-gpl/tcp6.h new file mode 100644 index 00000000000..9c41e65fd61 --- /dev/null +++ b/felix/bpf-gpl/tcp6.h @@ -0,0 +1,76 @@ +// Project Calico BPF dataplane programs. +// Copyright (c) 2020-2026 Tigera, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later + +#ifndef __CALI_TCP6_H__ +#define __CALI_TCP6_H__ + +#include +#include + +#include "bpf.h" +#include "log.h" +#include "skb.h" + +static CALI_BPF_INLINE int tcp_v6_rst(struct cali_tc_ctx *ctx) { + if (skb_refresh_validate_ptrs(ctx, TCP_SIZE)) { + deny_reason(ctx, CALI_REASON_SHORT); + CALI_DEBUG("TCP reset : too short"); + return -1; + } + + ipv6_addr_t orig_src, orig_dst; + ipv6hdr_ip_to_ipv6_addr_t(&orig_src, &ip_hdr(ctx)->saddr); + ipv6hdr_ip_to_ipv6_addr_t(&orig_dst, &ip_hdr(ctx)->daddr); + struct tcphdr th_orig = *tcp_hdr(ctx); + int original_len = ctx->skb->len; + + /* Trim to minimum size */ + __u32 len = skb_iphdr_offset(ctx) + IP_SIZE + TCP_SIZE /* max IP len */; + int err = bpf_skb_change_tail(ctx->skb, len, 0); + if (err) { + CALI_DEBUG("tcp reset reply: bpf_skb_change_tail (len=%d) failed (err=%d)", len, err); + return -1; + } + + /* Revalidate all pointers */ + if (skb_refresh_validate_ptrs(ctx, TCP_SIZE)) { + deny_reason(ctx, CALI_REASON_SHORT); + CALI_DEBUG("TCP reset : too short"); + return -1; + } + ip_hdr(ctx)->version = 6; + ip_hdr(ctx)->hop_limit = 255; + ip_hdr(ctx)->nexthdr = IPPROTO_TCP; + ipv6_addr_t_to_ipv6hdr_ip(&ip_hdr(ctx)->daddr, &orig_src); + ipv6_addr_t_to_ipv6hdr_ip(&ip_hdr(ctx)->saddr, &orig_dst); + ip_hdr(ctx)->payload_len = bpf_htons(TCP_SIZE); + ctx->ipheader_len = IP_SIZE; + + struct tcphdr *th = ((void *)ip_hdr(ctx)) + IP_SIZE; + __builtin_memset(th, 0, TCP_SIZE); + th->source = th_orig.dest; + th->dest = th_orig.source; + th->rst = 1; + th->doff = sizeof(struct tcphdr) / 4; + th->seq = 0; + + if (th_orig.ack) { + th->seq = th_orig.ack_seq; + } else { + th->ack_seq = bpf_htonl(bpf_ntohl(th_orig.seq) + th_orig.syn + th_orig.fin + + original_len - (th_orig.doff << 2)); + th->ack = 1; + } + th->check = 0; + + __wsum tcp_csum = bpf_csum_diff(0, 0, (__u32 *)th, len - sizeof(struct ipv6hdr) - skb_iphdr_offset(ctx), 0); + if (bpf_l4_csum_replace(ctx->skb, skb_l4hdr_offset(ctx) + + offsetof(struct tcphdr, check), 0, tcp_csum, BPF_F_PSEUDO_HDR)) { + CALI_DEBUG("TCP reset v6 reply: set tcp csum failed"); + return -1; + } + return 0; +} + +#endif /* __CALI_TCP6_H__ */ diff --git a/felix/bpf-gpl/ut/tcp_rst.c b/felix/bpf-gpl/ut/tcp_rst.c new file mode 100644 index 00000000000..b4ef08c1496 --- /dev/null +++ b/felix/bpf-gpl/ut/tcp_rst.c @@ -0,0 +1,69 @@ +// Project Calico BPF dataplane programs. +// Copyright (c) 2020-2026 Tigera, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later + +#include "ut.h" +#include "bpf.h" +#include "nat.h" +#ifndef IPVER6 +#include "tcp4.h" +#else +#include "tcp6.h" +#endif +#include "parsing.h" +#include "jump.h" + +const volatile struct cali_tc_preamble_globals __globals; + +static CALI_BPF_INLINE int calico_unittest_entry (struct __sk_buff *skb) +{ + volatile struct cali_tc_globals *globals = state_get_globals_tc(); + + if (!globals) { + return TC_ACT_SHOT; + } + + /* Set the globals for the rest of the prog chain. */ +#ifndef IPVER6 + globals->data = __globals.v4; +#else + globals->data = __globals.v6; +#endif + DECLARE_TC_CTX(_ctx, + .skb = skb, + .ipheader_len = IP_SIZE, + ); + struct cali_tc_ctx *ctx = &_ctx; + if (!ctx->counters) { + CALI_DEBUG("Counters map lookup failed: DROP\n"); + return TC_ACT_SHOT; + } + int ret = PARSING_OK; +#ifdef IPVER6 + ret = PARSING_OK_V6; +#endif + if (parse_packet_ip(ctx) != ret) { + return TC_ACT_UNSPEC; + } + + tc_state_fill_from_iphdr(ctx); + + switch (tc_state_fill_from_nexthdr(ctx, true)) { + case PARSING_ERROR: + goto deny; + case PARSING_ALLOW_WITHOUT_ENFORCING_POLICY: + goto allow; + } +#ifndef IPVER6 + return tcp_v4_rst(ctx); +#else + return tcp_v6_rst(ctx); +#endif + +allow: + return TC_ACT_UNSPEC; + +deny: + return TC_ACT_SHOT; +} + diff --git a/felix/bpf/conntrack/cleanup.go b/felix/bpf/conntrack/cleanup.go index 28d6b0913de..2bab5934793 100644 --- a/felix/bpf/conntrack/cleanup.go +++ b/felix/bpf/conntrack/cleanup.go @@ -16,6 +16,7 @@ package conntrack import ( "net" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -26,6 +27,7 @@ import ( v4 "github.com/projectcalico/calico/felix/bpf/conntrack/v4" "github.com/projectcalico/calico/felix/bpf/maps" "github.com/projectcalico/calico/felix/timeshim" + "github.com/projectcalico/calico/libcalico-go/lib/set" ) var ( @@ -54,6 +56,77 @@ func init() { prometheus.MustRegister(conntrackCounterStaleNAT) } +type WorkloadRemoveScannerTCP struct { + mutex sync.Mutex + ips set.Set[string] + removedIPs set.Set[string] + ipCh chan string + done chan struct{} +} + +func NewWorkloadRemoveScannerTCP(ipCh chan string) *WorkloadRemoveScannerTCP { + wrs := &WorkloadRemoveScannerTCP{ + ips: set.New[string](), + removedIPs: set.New[string](), + ipCh: ipCh, + done: make(chan struct{}), + } + go wrs.run() + return wrs +} + +func (w *WorkloadRemoveScannerTCP) run() { + defer close(w.done) + for { + ip, ok := <-w.ipCh + if !ok { + return + } + w.mutex.Lock() + w.ips.Add(ip) + log.Debugf("WorkloadRemoveScanner added %s to IPs to check", ip) + w.mutex.Unlock() + } +} + +func (w *WorkloadRemoveScannerTCP) IterationStart() { + w.mutex.Lock() + defer w.mutex.Unlock() + w.removedIPs = w.ips.Copy() + w.ips = set.New[string]() +} + +// IterationEnd satisfies EntryScannerSynced +func (w *WorkloadRemoveScannerTCP) IterationEnd() { + w.mutex.Lock() + w.removedIPs = set.New[string]() + w.mutex.Unlock() +} + +func (w *WorkloadRemoveScannerTCP) Stop() { + close(w.ipCh) // This breaks the 'for range' loop in run() + <-w.done // Wait for the goroutine to actually finish +} + +func (w *WorkloadRemoveScannerTCP) Check(ctKey KeyInterface, ctVal ValueInterface, get EntryGet) (ScanVerdict, int64) { + srcIP := ctKey.AddrA().String() + dstIP := ctKey.AddrB().String() + if ctKey.Proto() == ProtoTCP && + (w.removedIPs.Contains(srcIP) || w.removedIPs.Contains(dstIP)) { + log.WithField("key", ctKey).Debug("Marking conntrack entry for sending RST due to workload removal") + // We found a conntrack entry that has the workload IP as source or destination. + // Mark it for RST sending. + return ScanVerdictSendRST, ctVal.LastSeen() + } + return ScanVerdictOK, ctVal.LastSeen() +} + +func (w *WorkloadRemoveScannerTCP) NumIPsPending() int { + w.mutex.Lock() + defer w.mutex.Unlock() + return w.ips.Len() +} + type LivenessScanner struct { timeouts timeouts.Timeouts dsr bool @@ -215,17 +288,21 @@ func NewStaleNATScanner(frontendHasBackend NATChecker) *StaleNATScanner { func (sns *StaleNATScanner) Check(k KeyInterface, v ValueInterface, get EntryGet) (ScanVerdict, int64) { debug := log.GetLevel() >= log.DebugLevel + lastSeen := v.LastSeen() + if k.Proto() == ProtoTCP { + // we do not handle TCP for the below reasons. + // sns.natChecker.ConntrackFrontendHasBackend returns false if the service gets deleted + // or there are no backends. For TCP, we can still let the connection flow even if the service + // gets deleted as long as the backend is alive. When the backend is removed, We add a flag to the + // conntrack entry to send a RST, so the connection will be closed properly. + return ScanVerdictOK, lastSeen + } + again: - lastSeen := v.LastSeen() switch v.Type() { case TypeNormal: proto := k.Proto() - if proto != ProtoUDP { - // skip non-NAT entry - break - } - // Check if we have an entry to a service IP:port without it being // NATed. Remove such entry as it was created when the service wasn't // programmed yet and there was a NAT miss. diff --git a/felix/bpf/conntrack/conntrack_test.go b/felix/bpf/conntrack/conntrack_test.go index 7f4f33b50e4..90aae9ea0ef 100644 --- a/felix/bpf/conntrack/conntrack_test.go +++ b/felix/bpf/conntrack/conntrack_test.go @@ -28,6 +28,7 @@ import ( "github.com/projectcalico/calico/felix/bpf/conntrack/timeouts" v2 "github.com/projectcalico/calico/felix/bpf/conntrack/v2" v3 "github.com/projectcalico/calico/felix/bpf/conntrack/v3" + v4 "github.com/projectcalico/calico/felix/bpf/conntrack/v4" "github.com/projectcalico/calico/felix/bpf/maps" "github.com/projectcalico/calico/felix/bpf/mock" "github.com/projectcalico/calico/felix/timeshim/mocktime" @@ -97,6 +98,61 @@ var _ = Describe("BPF Conntrack LivenessCalculator", func() { ) }) +var _ = Describe("BPF workload remove conntrack scanner", func() { + var wrs *conntrack.WorkloadRemoveScannerTCP + var scanner *conntrack.Scanner + var ctMap, ctCleanupMap *mock.Map + var ipCh chan string + BeforeEach(func() { + ctMap = mock.NewMockMap(conntrack.MapParams) + ctCleanupMap = mock.NewMockMap(conntrack.MapParamsCleanup) + ipCh = make(chan string, 10) + wrs = conntrack.NewWorkloadRemoveScannerTCP(ipCh) + scanner = conntrack.NewScanner(ctMap, conntrack.KeyFromBytes, conntrack.ValueFromBytes, nil, "Disabled", + ctCleanupMap, 4, mock.NewMockBPFCleaner(ctMap, ctCleanupMap), wrs) + }) + It("should mark conntrack entries for removed workloads", func() { + // Insert an entry for a workload IP. + for i := 0; i < 15; i++ { + octetA := byte(1 + (i * 2)) + octetB := byte(2 + (i * 2)) + ipA := net.IPv4(10, 0, 0, octetA) + ipB := net.IPv4(10, 0, 0, octetB) + k := conntrack.NewKey(6, ipA, 1234, ipB, 80) + v := conntrack.NewValueNormal(mocktime.StartKTime, 0, conntrack.Leg{SynSeen: true, AckSeen: true}, conntrack.Leg{SynSeen: true, AckSeen: true}) + err := ctMap.Update(k.AsBytes(), v.AsBytes()) + Expect(err).NotTo(HaveOccurred()) + } + for i := 0; i < 6; i++ { + octetA := byte(1 + (i * 2)) + octetB := byte(14 + (i * 2)) + ipA := net.IPv4(10, 0, 0, octetA) + ipB := net.IPv4(10, 0, 0, octetB) + ipCh <- ipA.String() + ipCh <- ipB.String() + } + Eventually(func() int { return wrs.NumIPsPending() }, "2s", "50ms").Should(Equal(12)) + scanner.Scan() // No IPs removed yet, so no deletions. + for i := 0; i < 15; i++ { + octetA := byte(1 + (i * 2)) + octetB := byte(2 + (i * 2)) + ipA := net.IPv4(10, 0, 0, octetA) + ipB := net.IPv4(10, 0, 0, octetB) + k := conntrack.NewKey(6, ipA, 1234, ipB, 80) + val, err := ctMap.Get(k.AsBytes()) + v := conntrack.ValueFromBytes(val) + Expect(err).NotTo(HaveOccurred(), "expected entry for workload IP to still exist") + if i < 12 { + // These workload IPs were removed, so the entry should be marked for RST + Expect(v.Flags()&v4.FlagSendRST).To(Equal(v4.FlagSendRST), "expected entry for removed workload IP to be marked for RST") + } else { + // These workload IPs were not removed, so the entry should be unmodified. + Expect(v.Flags()&v4.FlagSendRST).To(Equal(uint32(0)), "expected entry for existing workload IP to be unmodified") + } + } + }) +}) + type dummyNATChecker struct { check func(fIP net.IP, fPort uint16, bIP net.IP, bPort uint16, proto uint8) bool } diff --git a/felix/bpf/conntrack/scanner.go b/felix/bpf/conntrack/scanner.go index c0d518eaac9..bddda2c3273 100644 --- a/felix/bpf/conntrack/scanner.go +++ b/felix/bpf/conntrack/scanner.go @@ -82,9 +82,11 @@ const ( // ScanVerdictDelete means entry should be deleted ScanVerdictDelete ScanVerdictDeleteImmediate // Delete without adding to cleanup map + ScanVerdictSendRST // Send RST for TCP connections. ) const cleanupBatchSize int = 1000 +const rstBatchSize int = 10 // EntryGet is a function prototype provided to EntryScanner in case it needs to // evaluate other entries to make a verdict @@ -159,16 +161,23 @@ func NewScanner(ctMap maps.Map, kfb func([]byte) KeyInterface, vfb func([]byte) revNATKeyToFwdNATInfo: make(map[KeyInterface]cleanupv1.ValueInterface), } + switch ipVersion { + case 4: + s.versionHelper = ipv4Helper{} + case 6: + s.versionHelper = ipv6Helper{} + default: + return nil + } + if bpfCleaner != nil { switch ipVersion { case 4: s.ctCleanupMap = cachingmap.New[KeyInterface, cleanupv1.ValueInterface](ctCleanupMap.GetName(), maps.NewTypedMap[KeyInterface, cleanupv1.ValueInterface](ctCleanupMap, kfb, CleanupValueFromBytes)) - s.versionHelper = ipv4Helper{} case 6: s.ctCleanupMap = cachingmap.New[KeyInterface, cleanupv1.ValueInterface](ctCleanupMap.GetName(), maps.NewTypedMap[KeyInterface, cleanupv1.ValueInterface](ctCleanupMap, kfb, CleanupValueV6FromBytes)) - s.versionHelper = ipv6Helper{} default: return nil @@ -250,6 +259,10 @@ func (s *Scanner) Scan() { numExpired := 0 maglevEntriesToLocal, maglevEntriesToRemote := 0, 0 + batchK := make([][]byte, 0, rstBatchSize) + batchV := make([][]byte, 0, rstBatchSize) + rstCount := 0 + if s.ctCleanupMap != nil { s.ctCleanupMap.Desired().DeleteAll() } @@ -288,6 +301,35 @@ func (s *Scanner) Scan() { case ScanVerdictDelete, ScanVerdictDeleteImmediate: // Entry should be deleted. numExpired++ + case ScanVerdictSendRST: + if ctVal.Flags()&v4.FlagSendRST != 0 { + // RST already set, no need to update. + continue + } + updatedVal := ctVal.SetFlags(ctFlags | v4.FlagSendRST) + batchK = append(batchK, ctKey.AsBytes()) + batchV = append(batchV, updatedVal.AsBytes()) + rstCount++ + if rstCount == rstBatchSize { + if debug { + log.Debugf("Updating RST flag on %d conntrack entries in batch.", len(batchK)) + } + applied, err := s.ctMap.BatchUpdate(batchK, batchV, 0) + if err != nil { + applied++ + } + rstCount = rstBatchSize - applied + if rstCount == 0 { + batchK = batchK[:0] + batchV = batchV[:0] + } else { + // Some entries in the batch failed to update. Remove the successfully updated entries from the batch and keep the rest for the next iteration. + batchK = batchK[applied:] + batchV = batchV[applied:] + } + + } + continue } if debug { log.Debug("Deleting conntrack entry.") @@ -356,6 +398,19 @@ func (s *Scanner) Scan() { // Run the bpf cleaner to process the remaining entries in the cleanup map. cleaned += s.runBPFCleaner() + for rstCount > 0 { + applied, err := s.ctMap.BatchUpdate(batchK, batchV, 0) + if err != nil { + applied++ + } + batchK = batchK[applied:] + batchV = batchV[applied:] + rstCount -= applied + } + + batchK = nil + batchV = nil + log.WithField("value", maglevEntriesToLocal).Debug("Setting local maglev conntrack entries gauge") s.conntrackGaugeMaglevToLocalBackend.Set(float64(maglevEntriesToLocal)) log.WithField("value", maglevEntriesToRemote).Debug("Setting remote maglev conntrack entries gauge") diff --git a/felix/bpf/conntrack/v4/map.go b/felix/bpf/conntrack/v4/map.go index 2fbf6c9396c..cdba9697f46 100644 --- a/felix/bpf/conntrack/v4/map.go +++ b/felix/bpf/conntrack/v4/map.go @@ -159,6 +159,7 @@ type ValueInterface interface { Data() EntryData IsForwardDSR() bool String() string + SetFlags(flags uint32) ValueInterface } func (e Value) RSTSeen() int64 { @@ -203,6 +204,15 @@ func (e Value) OrigSrcIP() net.IP { return e[VoOrigSIP : VoOrigSIP+4] } +// SetFlags sets the flags in the value, replacing any existing flags +func (e Value) SetFlags(flags uint32) ValueInterface { + e[VoFlags] = byte(flags & 0xff) + e[VoFlags2] = byte((flags >> 8) & 0xff) + e[VoFlags3] = byte((flags >> 16) & 0xff) + e[VoFlags4] = byte((flags >> 24) & 0xff) + return e +} + const ( TypeNormal uint8 = iota TypeNATForward @@ -225,6 +235,7 @@ const ( FlagNoRedirPeer uint32 = (1 << 14) FlagSetDSCP uint32 = (1 << 15) FlagMaglev uint32 = (1 << 16) + FlagSendRST uint32 = (1 << 17) ) func (e Value) ReverseNATKey() KeyInterface { diff --git a/felix/bpf/conntrack/v4/map6.go b/felix/bpf/conntrack/v4/map6.go index 2cae9d42867..3b415867f67 100644 --- a/felix/bpf/conntrack/v4/map6.go +++ b/felix/bpf/conntrack/v4/map6.go @@ -175,6 +175,15 @@ func (e ValueV6) OrigSrcIP() net.IP { return e[VoOrigSIPV6 : VoOrigSIPV6+16] } +// SetFlags sets the flags in the value, replacing any existing flags +func (e ValueV6) SetFlags(flags uint32) ValueInterface { + e[VoFlags] = byte(flags & 0xff) + e[VoFlags2] = byte((flags >> 8) & 0xff) + e[VoFlags3] = byte((flags >> 16) & 0xff) + e[VoFlags4] = byte((flags >> 24) & 0xff) + return e +} + func (e ValueV6) ReverseNATKey() KeyInterface { var ret KeyV6 @@ -209,7 +218,9 @@ func initValueV6(v *ValueV6, lastSeen time.Duration, typ uint8, flags uint32) { binary.LittleEndian.PutUint64(v[VoLastSeenV6:VoLastSeenV6+8], uint64(lastSeen)) v[VoTypeV6] = typ v[VoFlagsV6] = byte(flags & 0xff) - v[VoFlags2] = byte((flags >> 8) & 0xff) + v[VoFlags2V6] = byte((flags >> 8) & 0xff) + v[VoFlags3V6] = byte((flags >> 16) & 0xff) + v[VoFlags4V6] = byte((flags >> 24) & 0xff) } // NewValueV6Normal creates a new ValueV6 of type TypeNormal based on the given parameters @@ -231,7 +242,7 @@ func NewValueV6NATForward(lastSeen time.Duration, flags uint32, revKey KeyV6) Va initValueV6(&v, lastSeen, TypeNATForward, flags) - copy(v[VoRevKeyV6:VoRevKeyV6+KeySize], revKey.AsBytes()) + copy(v[VoRevKeyV6:VoRevKeyV6+KeyV6Size], revKey.AsBytes()) return v } diff --git a/felix/bpf/hook/map.go b/felix/bpf/hook/map.go index 62db273e7cf..002ff39ff04 100644 --- a/felix/bpf/hook/map.go +++ b/felix/bpf/hook/map.go @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Tigera, Inc. All rights reserved. +// Copyright (c) 2023-2026 Tigera, Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ const ( SubProgNewFlow SubProgIPFrag SubProgMaglev + SubProgTCPRst SubProgTCMainDebug SubProgXDPMain = SubProgTCMain @@ -63,6 +64,7 @@ var tcSubProgNames = []string{ "calico_tc_skb_new_flow_entrypoint", "calico_tc_skb_ipv4_frag", "calico_tc_maglev", + "calico_tc_skb_send_tcp_rst", } var xdpSubProgNames = []string{ diff --git a/felix/bpf/maps/maps.go b/felix/bpf/maps/maps.go index a254443051d..5254dd6b417 100644 --- a/felix/bpf/maps/maps.go +++ b/felix/bpf/maps/maps.go @@ -106,6 +106,7 @@ type Map interface { // Size returns the maximun number of entries the table can hold. Size() int + BatchUpdate(ks, vs [][]byte, flags uint64) (int, error) } type MapWithExistsCheck interface { diff --git a/felix/bpf/mock/map.go b/felix/bpf/mock/map.go index 2423df837e4..2b96abc85d1 100644 --- a/felix/bpf/mock/map.go +++ b/felix/bpf/mock/map.go @@ -139,6 +139,19 @@ func (m *Map) UpdateWithFlags(k, v []byte, flags int) error { return m.updateUnlocked(k, v) } +func (m *Map) BatchUpdate(k, v [][]byte, flags uint64) (int, error) { + m.Lock() + defer m.Unlock() + count := 0 + for i := range k { + if err := m.updateUnlocked(k[i], v[i]); err != nil { + return count, err + } + count++ + } + return count, nil +} + func (m *Map) Get(k []byte) ([]byte, error) { m.Lock() defer m.Unlock() @@ -262,6 +275,10 @@ func (*DummyMap) Update(k, v []byte) error { return nil } +func (*DummyMap) BatchUpdate(k, v [][]byte, flags uint64) (int, error) { + return 0, nil +} + func (*DummyMap) Get(k []byte) ([]byte, error) { return nil, unix.ENOENT } diff --git a/felix/bpf/proxy/syncer_test.go b/felix/bpf/proxy/syncer_test.go index d907228d4d6..da24deff3f6 100644 --- a/felix/bpf/proxy/syncer_test.go +++ b/felix/bpf/proxy/syncer_test.go @@ -245,7 +245,7 @@ var _ = Describe("BPF Syncer", func() { }) Expect(err).NotTo(HaveOccurred()) - Expect(cnt).To(Equal(5)) + Expect(cnt).To(Equal(7)) })) udpSvcKey := k8sp.ServicePortName{ @@ -286,7 +286,7 @@ var _ = Describe("BPF Syncer", func() { }) Expect(err).NotTo(HaveOccurred()) - Expect(cnt).To(Equal(4)) + Expect(cnt).To(Equal(6)) })) By("deleting the udp-service backend", makestep(func() { @@ -365,7 +365,7 @@ var _ = Describe("BPF Syncer", func() { }) Expect(err).NotTo(HaveOccurred()) - Expect(cnt).To(Equal(2)) + Expect(cnt).To(Equal(6)) })) By("not programming eps without a service - non reachables", makestep(func() { @@ -1124,7 +1124,7 @@ var _ = Describe("BPF Syncer", func() { }) Expect(err).NotTo(HaveOccurred()) - Expect(cnt).To(Equal(0)) + Expect(cnt).To(Equal(6)) })) By("checking endpointslice terminating status should be included in endpointslice collection for processing", makestep(func() { @@ -1180,7 +1180,7 @@ var _ = Describe("BPF Syncer", func() { // Expect 6x new conntrack entries from 3x pods NAT forward and 3x pods NAT reverse total. Expect(err).NotTo(HaveOccurred()) - Expect(cnt).To(Equal(6)) + Expect(cnt).To(Equal(12)) })) }) diff --git a/felix/bpf/tc/defs/defs.go b/felix/bpf/tc/defs/defs.go index 5801fef34ea..758cc87a75f 100644 --- a/felix/bpf/tc/defs/defs.go +++ b/felix/bpf/tc/defs/defs.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021 Tigera, Inc. All rights reserved. +// Copyright (c) 2021-2026 Tigera, Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -57,6 +57,7 @@ const ( ProgIndexNewFlow ProgIndexIPFrag ProgIndexMaglev + ProgIndexTCPRst ProgIndexMainDebug ProgIndexPolicyDebug ProgIndexAllowedDebug @@ -67,6 +68,7 @@ const ( ProgIndexNewFlowDebug ProgIndexIPFragDebug ProgIndexMaglevDebug + ProgIndexTCPRstDebug ProgIndexEndDebug ProgIndexEnd @@ -92,6 +94,7 @@ var ProgramNames = []string{ "calico_tc_skb_new_flow_entrypoint", "calico_tc_skb_ipv4_frag", "calico_tc_maglev", + "calico_tc_skb_send_tcp_rst", /* ipv4 - debug */ "calico_tc_main", "calico_tc_norm_pol_tail", @@ -103,6 +106,7 @@ var ProgramNames = []string{ "calico_tc_skb_new_flow_entrypoint", "calico_tc_skb_ipv4_frag", "calico_tc_maglev", + "calico_tc_skb_send_tcp_rst", /* ipv6 */ "calico_tc_main", "calico_tc_norm_pol_tail", @@ -114,6 +118,7 @@ var ProgramNames = []string{ "calico_tc_skb_new_flow_entrypoint", "", "calico_tc_maglev", + "calico_tc_skb_send_tcp_rst", /* ipv6 - debug */ "calico_tc_main", "calico_tc_norm_pol_tail", @@ -125,6 +130,7 @@ var ProgramNames = []string{ "calico_tc_skb_new_flow_entrypoint", "", "calico_tc_maglev", + "calico_tc_skb_send_tcp_rst", } type ToOrFromEp string diff --git a/felix/bpf/ut/attach_test.go b/felix/bpf/ut/attach_test.go index 49aa2546bdf..e7ff7e1e385 100644 --- a/felix/bpf/ut/attach_test.go +++ b/felix/bpf/ut/attach_test.go @@ -90,6 +90,8 @@ func newBPFTestEpMgr( nil, nil, 1500, + nil, + nil, ) } @@ -141,11 +143,11 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) { err = bpfEpMgr.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) - programsIngCount := 8 - programsEgCount := 7 + programsIngCount := 9 + programsEgCount := 8 if ipv6Enabled { - programsIngCount = 15 - programsEgCount = 13 + programsIngCount = 17 + programsEgCount = 15 } Expect(programsIng.Count()).To(Equal(programsIngCount)) Expect(programsEg.Count()).To(Equal(programsEgCount)) @@ -201,7 +203,7 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) { bpfEpMgr.OnUpdate(&proto.HostMetadataV6Update{Hostname: "uthost", Ipv6Addr: "1::4"}) err = bpfEpMgr.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) - Expect(programsIng.Count()).To(Equal(28)) + Expect(programsIng.Count()).To(Equal(32)) atIng := programsIng.Programs() atEg := programsEg.Programs() @@ -343,12 +345,12 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) { err := bpfEpMgr.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) - programIngCount := 8 - programEgCount := 7 + programIngCount := 9 + programEgCount := 8 jumpMapLen := 1 if ipv6Enabled { - programIngCount = 28 - programEgCount = 26 + programIngCount = 32 + programEgCount = 30 jumpMapLen = 4 } Expect(programsIng.Count()).To(Equal(programIngCount)) @@ -371,11 +373,11 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) { err = bpfEpMgr.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) - programsIngCount = programsIngCount + 7 - programsEgCount = programsEgCount + 6 + programsIngCount = programsIngCount + 8 + programsEgCount = programsEgCount + 7 if ipv6Enabled { - programsIngCount = 28 - programsEgCount = 26 + programsIngCount = 32 + programsEgCount = 30 } Expect(programsIng.Count()).To(Equal(programsIngCount)) Expect(programsEg.Count()).To(Equal(programsEgCount)) @@ -619,10 +621,10 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) { err = oldProgs.Open() Expect(err).NotTo(HaveOccurred()) pm := jumpMapDump(oldProgs) - programsCount := 15 + programsCount := 17 oldPoliciesCount := 2 if ipv6Enabled { - programsCount = 28 + programsCount = 32 oldPoliciesCount = 6 } Expect(pm).To(HaveLen(programsCount)) @@ -662,9 +664,9 @@ func runAttachTest(t *testing.T, ipv6Enabled bool) { err = bpfEpMgr.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) - Expect(programsIng.Count()).To(Equal(15)) + Expect(programsIng.Count()).To(Equal(17)) pm = jumpMapDump(commonMaps.ProgramsMaps[hook.Ingress]) - Expect(pm).To(HaveLen(15)) + Expect(pm).To(HaveLen(17)) pmIng := jumpMapDump(commonMaps.JumpMaps[hook.Ingress]) pmEgr := jumpMapDump(commonMaps.JumpMaps[hook.Egress]) diff --git a/felix/bpf/ut/bpf_prog_test.go b/felix/bpf/ut/bpf_prog_test.go index f31ac7c8ee8..61e29be9672 100644 --- a/felix/bpf/ut/bpf_prog_test.go +++ b/felix/bpf/ut/bpf_prog_test.go @@ -228,6 +228,7 @@ var tcJumpMapIndexes = map[string][]int{ tcdefs.ProgIndexNewFlow, tcdefs.ProgIndexIPFrag, tcdefs.ProgIndexMaglev, + tcdefs.ProgIndexTCPRst, }, "IPv4 debug": []int{ tcdefs.ProgIndexMainDebug, @@ -240,6 +241,7 @@ var tcJumpMapIndexes = map[string][]int{ tcdefs.ProgIndexNewFlowDebug, tcdefs.ProgIndexIPFragDebug, tcdefs.ProgIndexMaglevDebug, + tcdefs.ProgIndexTCPRstDebug, }, "IPv6": []int{ tcdefs.ProgIndexMain, @@ -251,6 +253,7 @@ var tcJumpMapIndexes = map[string][]int{ tcdefs.ProgIndexIcmpInnerNat, tcdefs.ProgIndexNewFlow, tcdefs.ProgIndexMaglev, + tcdefs.ProgIndexTCPRst, }, "IPv6 debug": []int{ tcdefs.ProgIndexMainDebug, @@ -262,6 +265,7 @@ var tcJumpMapIndexes = map[string][]int{ tcdefs.ProgIndexIcmpInnerNatDebug, tcdefs.ProgIndexNewFlowDebug, tcdefs.ProgIndexMaglevDebug, + tcdefs.ProgIndexTCPRstDebug, }, } diff --git a/felix/bpf/ut/tcp_rst_test.go b/felix/bpf/ut/tcp_rst_test.go new file mode 100644 index 00000000000..ec3d14b0d54 --- /dev/null +++ b/felix/bpf/ut/tcp_rst_test.go @@ -0,0 +1,139 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ut_test + +import ( + "fmt" + "testing" + + "github.com/gopacket/gopacket" + "github.com/gopacket/gopacket/layers" + . "github.com/onsi/gomega" +) + +func TestTCPReset(t *testing.T) { + RegisterTestingT(t) + cleanUpMaps() + defer cleanUpMaps() + + tcpSyn := &layers.TCP{ + SrcPort: 54321, + DstPort: 7890, + SYN: false, + Seq: 1000, + DataOffset: 5, + } + + _, ipv4, l4, _, pktBytes, err := testPacketV4(nil, nil, tcpSyn, nil) + Expect(err).NotTo(HaveOccurred()) + tcp, ok := l4.(*layers.TCP) + Expect(ok).To(BeTrue()) + runBpfUnitTest(t, "tcp_rst.c", func(bpfrun bpfProgRunFn) { + res, err := bpfrun(pktBytes) + Expect(err).NotTo(HaveOccurred()) + Expect(res.Retval).To(Equal(0)) + + Expect(res.dataOut).To(HaveLen(54)) // eth(14) + ip(20) + tcp(20) + + pktR := gopacket.NewPacket(res.dataOut, layers.LayerTypeEthernet, gopacket.Default) + fmt.Printf("pktR = %+v\n", pktR) + + checkTcpRst(pktR, ipv4, tcp, false) + }) +} + +func TestTCPResetIPv6(t *testing.T) { + RegisterTestingT(t) + cleanUpMaps() + defer cleanUpMaps() + + tcpSyn := &layers.TCP{ + SrcPort: 54321, + DstPort: 7890, + SYN: false, + Seq: 1000, + DataOffset: 5, + } + + hop := &layers.IPv6HopByHop{} + hop.NextHeader = layers.IPProtocolTCP + + /* from gopacket ip6_test.go */ + tlv := &layers.IPv6HopByHopOption{} + tlv.OptionType = 0x01 //PadN + tlv.OptionData = []byte{0x00, 0x00, 0x00, 0x00} + hop.Options = append(hop.Options, tlv) + + _, ipv6, l4, _, pktBytes, err := testPacketV6(nil, nil, tcpSyn, nil, hop) + Expect(err).NotTo(HaveOccurred()) + tcp, ok := l4.(*layers.TCP) + Expect(ok).To(BeTrue()) + runBpfUnitTest(t, "tcp_rst.c", func(bpfrun bpfProgRunFn) { + res, err := bpfrun(pktBytes) + Expect(err).NotTo(HaveOccurred()) + Expect(res.Retval).To(Equal(0)) + + Expect(res.dataOut).To(HaveLen(74)) // Ethernet (14) + IPv6 (40) + TCP (20) + + pktR := gopacket.NewPacket(res.dataOut, layers.LayerTypeEthernet, gopacket.Default) + fmt.Printf("pktR = %+v\n", pktR) + + ipv6L := pktR.Layer(layers.LayerTypeIPv6) + Expect(ipv6L).NotTo(BeNil()) + ipv6R, ok := ipv6L.(*layers.IPv6) + Expect(ok).To(BeTrue()) + Expect(ipv6R.NextHeader).To(Equal(layers.IPProtocolTCP)) + Expect(ipv6R.SrcIP).To(Equal(ipv6.DstIP)) + Expect(ipv6R.DstIP).To(Equal(ipv6.SrcIP)) + + tcpL := pktR.Layer(layers.LayerTypeTCP) + Expect(tcpL).NotTo(BeNil()) + tcpR, ok := tcpL.(*layers.TCP) + Expect(ok).To(BeTrue()) + Expect(tcpR.RST).To(BeTrue()) + Expect(tcpR.SrcPort).To(Equal(tcp.DstPort)) + Expect(tcpR.DstPort).To(Equal(tcp.SrcPort)) + Expect(tcpR.Seq).To(Equal(uint32(0))) + }, withIPv6()) + +} + +func checkTcpRst(pktR gopacket.Packet, ipv4 *layers.IPv4, tcp *layers.TCP, ack bool) { + ipv4L := pktR.Layer(layers.LayerTypeIPv4) + Expect(ipv4L).NotTo(BeNil()) + ipv4R, ok := ipv4L.(*layers.IPv4) + Expect(ok).To(BeTrue()) + Expect(ipv4R.SrcIP).To(Equal(ipv4.DstIP)) + Expect(ipv4R.DstIP).To(Equal(ipv4.SrcIP)) + Expect(ipv4R.Protocol).To(Equal(layers.IPProtocolTCP)) + Expect(ipv4R.TTL).To(Equal(uint8(64))) + + tcpL := pktR.Layer(layers.LayerTypeTCP) + Expect(tcpL).NotTo(BeNil()) + tcpR, ok := tcpL.(*layers.TCP) + Expect(ok).To(BeTrue()) + Expect(tcpR.RST).To(BeTrue()) + Expect(tcpR.FIN).To(BeFalse()) + Expect(tcpR.SYN).To(BeFalse()) + Expect(tcpR.URG).To(BeFalse()) + Expect(tcpR.ECE).To(BeFalse()) + Expect(tcpR.CWR).To(BeFalse()) + Expect(tcpR.NS).To(BeFalse()) + Expect(tcpR.PSH).To(BeFalse()) + + Expect(tcpR.SrcPort).To(Equal(tcp.DstPort)) + Expect(tcpR.DstPort).To(Equal(tcp.SrcPort)) + Expect(tcpR.Seq).To(Equal(uint32(0))) +} diff --git a/felix/dataplane/linux/bpf_ep_mgr.go b/felix/dataplane/linux/bpf_ep_mgr.go index a1c8eff637c..02315e25310 100644 --- a/felix/dataplane/linux/bpf_ep_mgr.go +++ b/felix/dataplane/linux/bpf_ep_mgr.go @@ -416,6 +416,7 @@ type bpfEndpointManagerDataplane struct { tunnelIP net.IP iptablesFilterTable Table ipSetIDAlloc *idalloc.IDAllocator + workloadRemoveChan chan string } type serviceKey struct { @@ -450,6 +451,7 @@ func NewBPFEndpointManager( healthAggregator *health.HealthAggregator, dataplanefeatures *environment.Features, bpfIfaceMTU int, + workloadRemoveChanV4, workloadRemoveChanV6 chan string, ) (*bpfEndpointManager, error) { if livenessCallback == nil { livenessCallback = func() {} @@ -600,10 +602,10 @@ func NewBPFEndpointManager( m.bpfAttachType = apiv3.BPFAttachOptionTC } } - m.v4 = newBPFEndpointManagerDataplane(proto.IPVersion_IPV4, bpfmaps.V4, iptablesFilterTableV4, ipSetIDAllocV4, m) + m.v4 = newBPFEndpointManagerDataplane(proto.IPVersion_IPV4, bpfmaps.V4, iptablesFilterTableV4, ipSetIDAllocV4, workloadRemoveChanV4, m) if m.ipv6Enabled { - m.v6 = newBPFEndpointManagerDataplane(proto.IPVersion_IPV6, bpfmaps.V6, iptablesFilterTableV6, ipSetIDAllocV6, m) + m.v6 = newBPFEndpointManagerDataplane(proto.IPVersion_IPV6, bpfmaps.V6, iptablesFilterTableV6, ipSetIDAllocV6, workloadRemoveChanV6, m) } if m.hostNetworkedNATMode != hostNetworkedNATDisabled { @@ -696,6 +698,7 @@ func newBPFEndpointManagerDataplane( ipMaps *bpfmap.IPMaps, iptablesFilterTable Table, ipSetIDAlloc *idalloc.IDAllocator, + workloadRemoveChan chan string, epMgr *bpfEndpointManager, ) *bpfEndpointManagerDataplane { return &bpfEndpointManagerDataplane{ @@ -705,6 +708,7 @@ func newBPFEndpointManagerDataplane( IPMaps: ipMaps, iptablesFilterTable: iptablesFilterTable, ipSetIDAlloc: ipSetIDAlloc, + workloadRemoveChan: workloadRemoveChan, } } @@ -1300,6 +1304,18 @@ func (m *bpfEndpointManager) onWorkloadEndpointRemove(msg *proto.WorkloadEndpoin }) // Remove policy debug info if any m.removeIfaceAllPolicyDebugInfo(oldWEP.Name) + if m.v4 != nil && m.v4.workloadRemoveChan != nil { + for _, addr := range oldWEP.GetIpv4Nets() { + addr = strings.SplitN(addr, "/", 2)[0] + m.v4.workloadRemoveChan <- addr + } + } + if m.v6 != nil && m.v6.workloadRemoveChan != nil { + for _, addr := range oldWEP.GetIpv6Nets() { + addr = strings.SplitN(addr, "/", 2)[0] + m.v6.workloadRemoveChan <- addr + } + } } // onPolicyUpdate stores the policy in the cache and marks any endpoints using it dirty. diff --git a/felix/dataplane/linux/bpf_ep_mgr_test.go b/felix/dataplane/linux/bpf_ep_mgr_test.go index 895cd85b688..92564a89a8a 100644 --- a/felix/dataplane/linux/bpf_ep_mgr_test.go +++ b/felix/dataplane/linux/bpf_ep_mgr_test.go @@ -515,6 +515,8 @@ var _ = Describe("BPF Endpoint Manager", func() { nil, environment.NewFeatureDetector(nil).GetFeatures(), 1250, + nil, + nil, ) Expect(err).NotTo(HaveOccurred()) bpfEpMgr.v4.hostIP = net.ParseIP("1.2.3.4") diff --git a/felix/dataplane/linux/int_dataplane.go b/felix/dataplane/linux/int_dataplane.go index c8f75ab469d..b0c856a9f5b 100644 --- a/felix/dataplane/linux/int_dataplane.go +++ b/felix/dataplane/linux/int_dataplane.go @@ -937,15 +937,16 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane { // Important that we create the maps before we load a BPF program with TC since we make sure the map // metadata name is set whereas TC doesn't set that field. var conntrackScannerV4, conntrackScannerV6 *bpfconntrack.Scanner + var workloadRemoveChanV4, workloadRemoveChanV6 chan string var ipSetIDAllocatorV4, ipSetIDAllocatorV6 *idalloc.IDAllocator ipSetIDAllocatorV4 = idalloc.New() // Start IPv4 BPF dataplane components - conntrackScannerV4 = startBPFDataplaneComponents(proto.IPVersion_IPV4, bpfMaps.V4, ipSetIDAllocatorV4, &config, ipsetsManager, dp) + conntrackScannerV4, workloadRemoveChanV4 = startBPFDataplaneComponents(proto.IPVersion_IPV4, bpfMaps.V4, ipSetIDAllocatorV4, &config, ipsetsManager, dp) if config.BPFIpv6Enabled { // Start IPv6 BPF dataplane components ipSetIDAllocatorV6 = idalloc.New() - conntrackScannerV6 = startBPFDataplaneComponents(proto.IPVersion_IPV6, bpfMaps.V6, ipSetIDAllocatorV6, &config, ipsetsManagerV6, dp) + conntrackScannerV6, workloadRemoveChanV6 = startBPFDataplaneComponents(proto.IPVersion_IPV6, bpfMaps.V6, ipSetIDAllocatorV6, &config, ipsetsManagerV6, dp) } workloadIfaceRegex := regexp.MustCompile(strings.Join(interfaceRegexes, "|")) @@ -980,6 +981,8 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane { config.HealthAggregator, dataplaneFeatures, podMTU, + workloadRemoveChanV4, + workloadRemoveChanV6, ) if err != nil { log.WithError(err).Panic("Failed to create BPF endpoint manager.") @@ -2868,7 +2871,7 @@ func startBPFDataplaneComponents( config *Config, ipSetsMgr *dpsets.IPSetsManager, dp *InternalDataplane, -) *bpfconntrack.Scanner { +) (*bpfconntrack.Scanner, chan string) { ipSetConfig := config.RulesConfig.IPSetConfigV4 ipSetEntry := bpfipsets.IPSetEntryFromBytes ipSetProtoEntry := bpfipsets.ProtoIPSetMemberToBPFEntry @@ -2958,12 +2961,13 @@ func startBPFDataplaneComponents( log.Errorf("error creating the bpf cleaner %v", err) } + workloadRemoveChan := make(chan string, 1000) conntrackScanner := bpfconntrack.NewScanner(maps.CtMap, ctKey, ctVal, config.ConfigChangedRestartCallback, config.BPFMapSizeConntrackScaling, maps.CtCleanupMap.(bpfmaps.MapWithExistsCheck), int(ipFamily), bpfCleaner, - livenessScanner) + livenessScanner, bpfconntrack.NewWorkloadRemoveScannerTCP(workloadRemoveChan)) // Before we start, scan for all finished / timed out connections to // free up the conntrack table asap as it may take time to sync up the @@ -2987,7 +2991,7 @@ func startBPFDataplaneComponents( } else { log.Info("BPF enabled but no Kubernetes client available, unable to run kube-proxy module.") } - return conntrackScanner + return conntrackScanner, workloadRemoveChan } func conntrackMapSizeFromFile() (int, error) { diff --git a/felix/fv/bpf_test.go b/felix/fv/bpf_test.go index b77333af4df..6dc764d4c5f 100644 --- a/felix/fv/bpf_test.go +++ b/felix/fv/bpf_test.go @@ -1893,7 +1893,7 @@ func describeBPFTests(opts ...bpfTestOpt) bool { // Test doesn't use services so ignore the runs with those turned on. if testOpts.protocol == "tcp" && !testOpts.connTimeEnabled && !testOpts.dsr { - It("should not be able to spoof TCP", func() { + spoofSetup := func() { if testOpts.ipv6 { // XXX the routing needs to be different and may not // apply to ipv6 @@ -1916,10 +1916,16 @@ func describeBPFTests(opts ...bpfTestOpt) bool { // Check that the route manipulation succeeded. cc.CheckConnectivity() cc.ResetExpectations() + } - // PHASE 1: basic single-shot connectivity checks to check that the test infra - // is basically doing what we want. I.e. if felix and the workload disagree on - // interface then new connections get dropped. + // Basic single-shot connectivity checks to check that the test infra + // is basically doing what we want. I.e. if felix and the workload disagree on + // interface then new connections get dropped. + It("should not be able to spoof new TCP connections", func() { + spoofSetup() + if testOpts.ipv6 { + return + } // Switch routes to use the spoofed interface, should fail. By("Workload using spoof0, felix expecting eth0, should fail") @@ -1942,9 +1948,16 @@ func describeBPFTests(opts ...bpfTestOpt) bool { cc.Expect(Some, w[0][0], w[1][0]) cc.CheckConnectivity() cc.ResetExpectations() + }) + + // Keep a connection up and move it from one interface to the other using the pod's + // routes. To the host this looks like one workload is spoofing the other. + It("should not be able to spoof existing TCP connections", func() { + spoofSetup() + if testOpts.ipv6 { + return + } - // PHASE 2: keep a connection up and move it from one interface to the other using the pod's - // routes. To the host this looks like one workload is spoofing the other. By("Starting permanent connection") pc := w[0][0].StartPersistentConnection(w[1][0].IP, 8055, workload.PersistentConnectionOpts{ MonitorConnectivity: true, @@ -3506,14 +3519,10 @@ func describeBPFTests(opts ...bpfTestOpt) bool { cc.CheckConnectivity() }) - ifUDPnoCTLB := func(desc string, body func()) { - if testOpts.protocol != "udp" || testOpts.connTimeEnabled { + It("should have connectivity after a backend is replaced by a new one", func() { + if testOpts.protocol == "udp" && testOpts.connTimeEnabled { return } - It(desc, body) - } - - ifUDPnoCTLB("should have connectivity after a backend is replaced by a new one", func() { var ( testSvc *v1.Service testSvcNamespace string @@ -3575,7 +3584,9 @@ func describeBPFTests(opts ...bpfTestOpt) bool { Timeout: 60 * time.Second, }, ) - defer pc.Stop() + if testOpts.protocol != "tcp" { + defer pc.Stop() + } By("Testing connectivity") prevCount := pc.PongCount() @@ -3586,15 +3597,54 @@ func describeBPFTests(opts ...bpfTestOpt) bool { testSvc2 := k8sService(testSvcName, clusterIP, w[1][0], 80, 8055, 0, testOpts.protocol) k8sUpdateService(k8sClient, testSvcNamespace, testSvcName, testSvc, testSvc2) - By("Stoping the original backend to make sure it is not reachable") + var tcpd *tcpdump.TCPDump + if testOpts.protocol == "tcp" { + iface := w[1][1].InterfaceName + srcIP := clusterIP + tcpdHost := tc.Felixes[1] + if testOpts.connTimeEnabled { + iface = "eth0" + switch testOpts.tunnel { + case "vxlan": + iface = "vxlan.calico" + case "wireguard": + iface = "wireguard.cali" + if testOpts.ipv6 { + iface = "wireguard.cali-v6" + } + case "ipip": + iface = "tunl0" + } + srcIP = w[0][0].IP + tcpdHost = tc.Felixes[0] + } + tcpd = tcpdHost.AttachTCPDump(iface) + tcpd.SetLogEnabled(true) + + ipRegex := "IP" + if testOpts.ipv6 { + ipRegex = "IP6" + } + tcpd.AddMatcher("tcp-rst", + regexp.MustCompile(fmt.Sprintf(`%s %s\.\d+ > %s\.\d+: Flags \[[^\]]*R[^\]]*\]`, ipRegex, srcIP, w[1][1].IP))) + tcpd.Start(infra) + } + + By("Stopping the original backend to make sure it is not reachable") w[0][0].Stop() By("removing the old workload from infra") w[0][0].RemoveFromInfra(infra) By("Testing connectivity continues") - prevCount = pc.PongCount() - Eventually(pc.PongCount, "15s").Should(BeNumerically(">", prevCount), - "Expected to see pong responses on the connection but didn't receive any") + if testOpts.protocol == "tcp" { + Eventually(func() int { return tcpd.MatchCount("tcp-rst") }, "25s").ShouldNot(BeZero(), + "Expected to see TCP RSTs on the connection after backend change") + Expect(pc.IsConnectionReset()).To(BeTrue()) + } else { + prevCount = pc.PongCount() + Eventually(pc.PongCount, "15s").Should(BeNumerically(">", prevCount), + "Expected to see pong responses on the connection but didn't receive any") + } }) }) diff --git a/felix/fv/connectivity/conncheck.go b/felix/fv/connectivity/conncheck.go index 78112580911..1b126e14074 100644 --- a/felix/fv/connectivity/conncheck.go +++ b/felix/fv/connectivity/conncheck.go @@ -957,6 +957,7 @@ type PersistentConnection struct { Timeout time.Duration Sleep time.Duration ProbeLoopFileTimeout time.Duration + connectionReset bool loopFile string runCmd *exec.Cmd @@ -1044,6 +1045,7 @@ func (pc *PersistentConnection) Start() error { line, err := stdoutReader.ReadString('\n') if err != nil { log.WithError(err).Info("End of permanent connection stdout") + pc.connectionReset = true return } line = strings.TrimSpace(string(line)) @@ -1064,6 +1066,7 @@ func (pc *PersistentConnection) Start() error { line, err := stderrReader.ReadString('\n') if err != nil { log.WithError(err).Info("End of permanent connection stderr") + pc.connectionReset = true return } line = strings.TrimSpace(string(line)) @@ -1115,3 +1118,7 @@ func (pc *PersistentConnection) PongCount() int { log.WithField("name", pc.Name).Infof("pong count %d", pc.pongCount) return pc.pongCount } + +func (pc *PersistentConnection) IsConnectionReset() bool { + return pc.connectionReset +}