From 68ef691e403d5581ba2cc27d350cde0eb6725d93 Mon Sep 17 00:00:00 2001 From: Tomas Hruby Date: Thu, 30 Apr 2026 14:15:37 -0700 Subject: [PATCH] Defer kube-proxy construction until first hostIPs and in-sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Felix restart on a node receiving NodePort traffic was breaking new TCP connections for ~500ms — the bootstrap window between kube-proxy starting up and receiving the first hostIPs. In that window, kube-proxy.go:start() constructed the proxy with a stub Syncer (only podNPIP, no real host IPs). proxy.New() spins up the k8s informer goroutines synchronously; once they sync, an Apply runs against the stub Syncer. The cachingmap computes desired state without any (realHostIP, nodePort) FE entry and erases pre-existing real-host-IP NodePort FE entries left by the previous Felix run. New external TCP connections to the NodePort during the gap miss the FE map, ascend to the host stack with no listener, and get RST by the kernel. The wildcard FE entry does not cover external→HEP NodePort traffic (nat_lookup.h:60-71) so it provides no safety net. Defer proxy construction until both the first hostIPUpdates and the first hostMetadataUpdates have arrived, then construct the Syncer (with real host IPs) and the proxy in one shot via run(). The KubeProxy.Conntrack* callbacks already nil-check kp.syncer, so in-flight conntrack scans during the wait remain safe. Add a regression test in kube-proxy_test.go that pre-populates the front map with a real-host-IP NodePort FE entry, fires only the host-metadata gate (mimicking the bootstrap window), and asserts via Consistently that the entry survives. Closes #12192 --- felix/bpf/proxy/kube-proxy.go | 101 ++++++++++++++--------- felix/bpf/proxy/kube-proxy_test.go | 125 +++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+), 40 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 27ea9df4775..becbf6b3846 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -119,7 +119,11 @@ func (kp *KubeProxy) Stop() { close(kp.exiting) close(kp.hostMetadataUpdates) close(kp.hostIPUpdates) - kp.proxy.Stop() + if kp.proxy != nil { + // kp.proxy is nil if Stop is called before start() has + // received its initial host IPs and host-metadata updates. + kp.proxy.Stop() + } kp.wg.Wait() }) } @@ -161,10 +165,29 @@ func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMe return errors.WithMessage(err, "new bpf syncer") } - kp.proxy.SetHostIPs(hostIPs) - // Don't bother invoking a resync within SetHostMetadata; we will be syncing a fresh syncer right after. - kp.proxy.SetHostMetadata(hostMetadata, false) - kp.proxy.SetSyncer(syncer) + if kp.proxy == nil { + // First call from start(): construct the proxy with a syncer + // that already knows the real host IPs. proxy.New() spins up + // the k8s informer goroutines synchronously, so by the time + // they sync and trigger an Apply, the syncer's desired state + // will include all (realHostIP, nodePort) FE entries. + // Constructing the proxy any earlier risks an Apply against a + // syncer that lacks real host IPs, which would gut pre-existing + // (realHostIP, nodePort) NAT FE entries left by the previous + // Felix run and break external NodePort traffic. See #12192. + proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...) + if err != nil { + return errors.WithMessage(err, "new proxy") + } + proxy.SetHostIPs(hostIPs) + proxy.SetHostMetadata(hostMetadata, false) + kp.proxy = proxy + } else { + kp.proxy.SetHostIPs(hostIPs) + // Don't bother invoking a resync within SetHostMetadata; we will be syncing a fresh syncer right after. + kp.proxy.SetHostMetadata(hostMetadata, false) + kp.proxy.SetSyncer(syncer) + } log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs) @@ -174,58 +197,56 @@ func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMe } func (kp *KubeProxy) start() error { - var withLocalNP []net.IP - if kp.ipFamily == 4 { - withLocalNP = append(withLocalNP, podNPIP) - } else { - withLocalNP = append(withLocalNP, podNPIPV6) - } - - syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.MaglevMap, kp.affinityMap, kp.rt, kp.excludedCIDRs, kp.maglevLUTSize) - if err != nil { - return errors.WithMessage(err, "new bpf syncer") - } - - proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...) - if err != nil { - return errors.WithMessage(err, "new proxy") + // Block until we have the first batch of host IPs AND the + // host-metadata in-sync signal (sent by CompleteDeferredWork after + // the int_dataplane has finished its first datastore-in-sync + // apply). Only then construct the proxy, via run(). proxy.New() + // kicks off the k8s informer goroutines synchronously; once those + // sync, they trigger Apply on the syncer. Constructing the proxy + // before we have real host IPs lets that Apply run against a + // syncer whose desired state lacks every (realHostIP, nodePort) + // FE entry, which then erases pre-existing entries left by the + // previous Felix run and breaks external NodePort traffic during + // the kube-proxy bootstrap window. See projectcalico/calico#12192. + var hostIPs []net.IP + select { + case ips, ok := <-kp.hostIPUpdates: + if !ok { + return nil + } + hostIPs = ips + case <-kp.exiting: + return nil } - kp.lock.Lock() - kp.proxy = proxy - kp.syncer = syncer - kp.lock.Unlock() - - // Wait for the initial update. - hostIPs := <-kp.hostIPUpdates - hostMetadata := make(map[string]*proto.HostMetadataV4V6Update) - // Block until we go in-sync and get the first batch of hostmetadata - // updates, to avoid a flap after a Felix restart. In practice, this - // recv should happen very soon after receiving the host IPs above. - hostMetadataUpdates := <-kp.hostMetadataUpdates - mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates) + select { + case updates, ok := <-kp.hostMetadataUpdates: + if !ok { + return nil + } + mergeHostMetadataV4V6Updates(hostMetadata, updates) + case <-kp.exiting: + return nil + } - err = kp.run(hostIPs, hostMetadata) - if err != nil { + if err := kp.run(hostIPs, hostMetadata); err != nil { return err } kp.wg.Go(func() { for { - var ok bool select { - case hostIPs, ok = <-kp.hostIPUpdates: + case hostIPs, ok := <-kp.hostIPUpdates: if !ok { log.Error("kube-proxy: hostIPUpdates closed") return } - err = kp.run(hostIPs, hostMetadata) - if err != nil { + if err := kp.run(hostIPs, hostMetadata); err != nil { log.Panic("kube-proxy failed to resync after host IPs update") } - case hostMetadataUpdates, ok = <-kp.hostMetadataUpdates: + case hostMetadataUpdates, ok := <-kp.hostMetadataUpdates: if !ok { log.Error("kube-proxy: hostMetadataUpdates closed") return diff --git a/felix/bpf/proxy/kube-proxy_test.go b/felix/bpf/proxy/kube-proxy_test.go index 9f778922856..eae95423785 100644 --- a/felix/bpf/proxy/kube-proxy_test.go +++ b/felix/bpf/proxy/kube-proxy_test.go @@ -27,6 +27,7 @@ import ( "github.com/projectcalico/calico/felix/bpf/bpfmap" "github.com/projectcalico/calico/felix/bpf/conntrack" "github.com/projectcalico/calico/felix/bpf/mock" + "github.com/projectcalico/calico/felix/bpf/nat" proxy "github.com/projectcalico/calico/felix/bpf/proxy" "github.com/projectcalico/calico/felix/proto" ) @@ -158,3 +159,127 @@ var _ = Describe("BPF kube-proxy", func() { }) }) }) + +// Regression for projectcalico/calico#12192. Felix restart on a node +// receiving NodePort traffic was breaking new TCP connections for ~500ms +// — the bootstrap window between kube-proxy starting up and receiving +// the first hostIPs. In that window, kube-proxy.go:start() used to +// construct the proxy with a stub Syncer (only podNPIP, no real host +// IPs). proxy.New() spins up the k8s informer goroutines synchronously; +// once they sync, an Apply runs against the stub Syncer. The cachingmap +// computes desired state without any (realHostIP, nodePort) FE entry +// and erases pre-existing real-host-IP NodePort FE entries left by the +// previous Felix run. New external TCP connections to the NodePort +// during the gap miss the FE map, ascend to the host stack with no +// listener, and get RST by the kernel. +// +// The fix defers proxy construction until both the first hostIPUpdates +// and the first hostMetadataUpdates have arrived — i.e. until run() +// is called with real host IPs. This test pre-populates the front map +// with a real-host-IP NodePort FE entry, fires the host-metadata +// in-sync gate but withholds the host IPs (mimicking the bootstrap +// window), and asserts the entry survives. +var _ = Describe("BPF kube-proxy bootstrap window — regression #12192", func() { + var ( + bpfMaps *bpfmap.IPMaps + front *mockNATMap + p *proxy.KubeProxy + ) + + realHostIP := net.IPv4(10, 0, 0, 99) + const nodePort = uint16(30000) + + BeforeEach(func() { + bpfMaps = new(bpfmap.IPMaps) + bpfMaps.FrontendMap = newMockNATMap() + bpfMaps.BackendMap = newMockNATBackendMap() + bpfMaps.AffinityMap = newMockAffinityMap() + bpfMaps.MaglevMap = newMockMaglevMap() + bpfMaps.CtMap = mock.NewMockMap(conntrack.MapParams) + front = bpfMaps.FrontendMap.(*mockNATMap) + + // Marker entry simulating leftover state from the previous + // Felix run. Value is not meaningful — only presence matters. + markerKey := nat.NewNATKey(realHostIP, nodePort, proxy.ProtoV1ToIntPanic(v1.ProtocolTCP)) + front.m[markerKey] = nat.NewNATValue(0xdeadbeef, 1, 0, 0) + }) + + AfterEach(func() { + if p != nil { + p.Stop() + } + }) + + It("does not erase pre-existing real-host-IP NodePort FE entries during the bootstrap window", func() { + testSvc := &v1.Service{ + TypeMeta: typeMetaV1("Service"), + ObjectMeta: objectMetaV1("regression-12192-svc"), + Spec: v1.ServiceSpec{ + ClusterIP: "10.1.0.1", + Type: v1.ServiceTypeNodePort, + Selector: map[string]string{"app": "test"}, + Ports: []v1.ServicePort{{ + Protocol: v1.ProtocolTCP, + Port: 1234, + NodePort: int32(nodePort), + }}, + }, + } + testSvcEps := &discoveryv1.EndpointSlice{ + TypeMeta: typeMetaV1("EndpointSlice"), + ObjectMeta: objectMetaV1("regression-12192-svc"), + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{Addresses: []string{"10.1.2.1"}}}, + Ports: []discoveryv1.EndpointPort{{ + Port: ptr.To(int32(1234)), + Name: ptr.To("http"), + Protocol: ptr.To(v1.ProtocolTCP), + }}, + } + k8s := fake.NewClientset(testSvc, testSvcEps) + + markerKey := nat.NewNATKey(realHostIP, nodePort, proxy.ProtoV1ToIntPanic(v1.ProtocolTCP)) + + var err error + p, err = proxy.StartKubeProxy(k8s, "test-node", bpfMaps, + proxy.WithImmediateSync(), + proxy.WithMaglevLUTSize(maglevLUTSize), + ) + Expect(err).NotTo(HaveOccurred()) + + // Fire the host-metadata in-sync gate but NOT the host IPs. + // In the bootstrap window with the buggy code, the proxy is + // already constructed and informers are syncing — they'll + // trigger an Apply on the stub Syncer that erases the marker. + p.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "test-node"}) + Expect(p.CompleteDeferredWork()).To(Succeed()) + + // The marker must survive the bootstrap window. With + // WithImmediateSync the fake-client informers sync and + // schedule an Apply within tens of ms, so 500ms is a + // generous bound. + Consistently(func() bool { + front.Lock() + defer front.Unlock() + _, ok := front.m[markerKey] + return ok + }, "500ms", "20ms").Should(BeTrue(), + "pre-existing (realHostIP, nodePort) FE entry was erased during the kube-proxy bootstrap window") + + // Now release the host-IPs gate. The proxy is constructed + // with real host IPs and informers fire their first Apply + // against a Syncer whose desired state includes the + // (realHostIP, nodePort) FE entry, so the entry stays + // (cachingmap updates it in place to the proxy-computed + // value). + p.OnHostIPsUpdate([]net.IP{realHostIP}) + + Eventually(func() bool { + front.Lock() + defer front.Unlock() + _, ok := front.m[markerKey] + return ok + }, "5s", "50ms").Should(BeTrue(), + "after host IPs propagated, real (realHostIP, nodePort) FE entry should remain") + }) +})