diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 27ea9df4775..becbf6b3846 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -119,7 +119,11 @@ func (kp *KubeProxy) Stop() { close(kp.exiting) close(kp.hostMetadataUpdates) close(kp.hostIPUpdates) - kp.proxy.Stop() + if kp.proxy != nil { + // kp.proxy is nil if Stop is called before start() has + // received its initial host IPs and host-metadata updates. + kp.proxy.Stop() + } kp.wg.Wait() }) } @@ -161,10 +165,29 @@ func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMe return errors.WithMessage(err, "new bpf syncer") } - kp.proxy.SetHostIPs(hostIPs) - // Don't bother invoking a resync within SetHostMetadata; we will be syncing a fresh syncer right after. - kp.proxy.SetHostMetadata(hostMetadata, false) - kp.proxy.SetSyncer(syncer) + if kp.proxy == nil { + // First call from start(): construct the proxy with a syncer + // that already knows the real host IPs. proxy.New() spins up + // the k8s informer goroutines synchronously, so by the time + // they sync and trigger an Apply, the syncer's desired state + // will include all (realHostIP, nodePort) FE entries. + // Constructing the proxy any earlier risks an Apply against a + // syncer that lacks real host IPs, which would gut pre-existing + // (realHostIP, nodePort) NAT FE entries left by the previous + // Felix run and break external NodePort traffic. See #12192. + proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...) + if err != nil { + return errors.WithMessage(err, "new proxy") + } + proxy.SetHostIPs(hostIPs) + proxy.SetHostMetadata(hostMetadata, false) + kp.proxy = proxy + } else { + kp.proxy.SetHostIPs(hostIPs) + // Don't bother invoking a resync within SetHostMetadata; we will be syncing a fresh syncer right after. + kp.proxy.SetHostMetadata(hostMetadata, false) + kp.proxy.SetSyncer(syncer) + } log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs) @@ -174,58 +197,56 @@ func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMe } func (kp *KubeProxy) start() error { - var withLocalNP []net.IP - if kp.ipFamily == 4 { - withLocalNP = append(withLocalNP, podNPIP) - } else { - withLocalNP = append(withLocalNP, podNPIPV6) - } - - syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.MaglevMap, kp.affinityMap, kp.rt, kp.excludedCIDRs, kp.maglevLUTSize) - if err != nil { - return errors.WithMessage(err, "new bpf syncer") - } - - proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...) - if err != nil { - return errors.WithMessage(err, "new proxy") + // Block until we have the first batch of host IPs AND the + // host-metadata in-sync signal (sent by CompleteDeferredWork after + // the int_dataplane has finished its first datastore-in-sync + // apply). Only then construct the proxy, via run(). proxy.New() + // kicks off the k8s informer goroutines synchronously; once those + // sync, they trigger Apply on the syncer. Constructing the proxy + // before we have real host IPs lets that Apply run against a + // syncer whose desired state lacks every (realHostIP, nodePort) + // FE entry, which then erases pre-existing entries left by the + // previous Felix run and breaks external NodePort traffic during + // the kube-proxy bootstrap window. See projectcalico/calico#12192. + var hostIPs []net.IP + select { + case ips, ok := <-kp.hostIPUpdates: + if !ok { + return nil + } + hostIPs = ips + case <-kp.exiting: + return nil } - kp.lock.Lock() - kp.proxy = proxy - kp.syncer = syncer - kp.lock.Unlock() - - // Wait for the initial update. - hostIPs := <-kp.hostIPUpdates - hostMetadata := make(map[string]*proto.HostMetadataV4V6Update) - // Block until we go in-sync and get the first batch of hostmetadata - // updates, to avoid a flap after a Felix restart. In practice, this - // recv should happen very soon after receiving the host IPs above. - hostMetadataUpdates := <-kp.hostMetadataUpdates - mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates) + select { + case updates, ok := <-kp.hostMetadataUpdates: + if !ok { + return nil + } + mergeHostMetadataV4V6Updates(hostMetadata, updates) + case <-kp.exiting: + return nil + } - err = kp.run(hostIPs, hostMetadata) - if err != nil { + if err := kp.run(hostIPs, hostMetadata); err != nil { return err } kp.wg.Go(func() { for { - var ok bool select { - case hostIPs, ok = <-kp.hostIPUpdates: + case hostIPs, ok := <-kp.hostIPUpdates: if !ok { log.Error("kube-proxy: hostIPUpdates closed") return } - err = kp.run(hostIPs, hostMetadata) - if err != nil { + if err := kp.run(hostIPs, hostMetadata); err != nil { log.Panic("kube-proxy failed to resync after host IPs update") } - case hostMetadataUpdates, ok = <-kp.hostMetadataUpdates: + case hostMetadataUpdates, ok := <-kp.hostMetadataUpdates: if !ok { log.Error("kube-proxy: hostMetadataUpdates closed") return diff --git a/felix/bpf/proxy/kube-proxy_test.go b/felix/bpf/proxy/kube-proxy_test.go index 9f778922856..eae95423785 100644 --- a/felix/bpf/proxy/kube-proxy_test.go +++ b/felix/bpf/proxy/kube-proxy_test.go @@ -27,6 +27,7 @@ import ( "github.com/projectcalico/calico/felix/bpf/bpfmap" "github.com/projectcalico/calico/felix/bpf/conntrack" "github.com/projectcalico/calico/felix/bpf/mock" + "github.com/projectcalico/calico/felix/bpf/nat" proxy "github.com/projectcalico/calico/felix/bpf/proxy" "github.com/projectcalico/calico/felix/proto" ) @@ -158,3 +159,127 @@ var _ = Describe("BPF kube-proxy", func() { }) }) }) + +// Regression for projectcalico/calico#12192. Felix restart on a node +// receiving NodePort traffic was breaking new TCP connections for ~500ms +// — the bootstrap window between kube-proxy starting up and receiving +// the first hostIPs. In that window, kube-proxy.go:start() used to +// construct the proxy with a stub Syncer (only podNPIP, no real host +// IPs). proxy.New() spins up the k8s informer goroutines synchronously; +// once they sync, an Apply runs against the stub Syncer. The cachingmap +// computes desired state without any (realHostIP, nodePort) FE entry +// and erases pre-existing real-host-IP NodePort FE entries left by the +// previous Felix run. New external TCP connections to the NodePort +// during the gap miss the FE map, ascend to the host stack with no +// listener, and get RST by the kernel. +// +// The fix defers proxy construction until both the first hostIPUpdates +// and the first hostMetadataUpdates have arrived — i.e. until run() +// is called with real host IPs. This test pre-populates the front map +// with a real-host-IP NodePort FE entry, fires the host-metadata +// in-sync gate but withholds the host IPs (mimicking the bootstrap +// window), and asserts the entry survives. +var _ = Describe("BPF kube-proxy bootstrap window — regression #12192", func() { + var ( + bpfMaps *bpfmap.IPMaps + front *mockNATMap + p *proxy.KubeProxy + ) + + realHostIP := net.IPv4(10, 0, 0, 99) + const nodePort = uint16(30000) + + BeforeEach(func() { + bpfMaps = new(bpfmap.IPMaps) + bpfMaps.FrontendMap = newMockNATMap() + bpfMaps.BackendMap = newMockNATBackendMap() + bpfMaps.AffinityMap = newMockAffinityMap() + bpfMaps.MaglevMap = newMockMaglevMap() + bpfMaps.CtMap = mock.NewMockMap(conntrack.MapParams) + front = bpfMaps.FrontendMap.(*mockNATMap) + + // Marker entry simulating leftover state from the previous + // Felix run. Value is not meaningful — only presence matters. + markerKey := nat.NewNATKey(realHostIP, nodePort, proxy.ProtoV1ToIntPanic(v1.ProtocolTCP)) + front.m[markerKey] = nat.NewNATValue(0xdeadbeef, 1, 0, 0) + }) + + AfterEach(func() { + if p != nil { + p.Stop() + } + }) + + It("does not erase pre-existing real-host-IP NodePort FE entries during the bootstrap window", func() { + testSvc := &v1.Service{ + TypeMeta: typeMetaV1("Service"), + ObjectMeta: objectMetaV1("regression-12192-svc"), + Spec: v1.ServiceSpec{ + ClusterIP: "10.1.0.1", + Type: v1.ServiceTypeNodePort, + Selector: map[string]string{"app": "test"}, + Ports: []v1.ServicePort{{ + Protocol: v1.ProtocolTCP, + Port: 1234, + NodePort: int32(nodePort), + }}, + }, + } + testSvcEps := &discoveryv1.EndpointSlice{ + TypeMeta: typeMetaV1("EndpointSlice"), + ObjectMeta: objectMetaV1("regression-12192-svc"), + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{Addresses: []string{"10.1.2.1"}}}, + Ports: []discoveryv1.EndpointPort{{ + Port: ptr.To(int32(1234)), + Name: ptr.To("http"), + Protocol: ptr.To(v1.ProtocolTCP), + }}, + } + k8s := fake.NewClientset(testSvc, testSvcEps) + + markerKey := nat.NewNATKey(realHostIP, nodePort, proxy.ProtoV1ToIntPanic(v1.ProtocolTCP)) + + var err error + p, err = proxy.StartKubeProxy(k8s, "test-node", bpfMaps, + proxy.WithImmediateSync(), + proxy.WithMaglevLUTSize(maglevLUTSize), + ) + Expect(err).NotTo(HaveOccurred()) + + // Fire the host-metadata in-sync gate but NOT the host IPs. + // In the bootstrap window with the buggy code, the proxy is + // already constructed and informers are syncing — they'll + // trigger an Apply on the stub Syncer that erases the marker. + p.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "test-node"}) + Expect(p.CompleteDeferredWork()).To(Succeed()) + + // The marker must survive the bootstrap window. With + // WithImmediateSync the fake-client informers sync and + // schedule an Apply within tens of ms, so 500ms is a + // generous bound. + Consistently(func() bool { + front.Lock() + defer front.Unlock() + _, ok := front.m[markerKey] + return ok + }, "500ms", "20ms").Should(BeTrue(), + "pre-existing (realHostIP, nodePort) FE entry was erased during the kube-proxy bootstrap window") + + // Now release the host-IPs gate. The proxy is constructed + // with real host IPs and informers fire their first Apply + // against a Syncer whose desired state includes the + // (realHostIP, nodePort) FE entry, so the entry stays + // (cachingmap updates it in place to the proxy-computed + // value). + p.OnHostIPsUpdate([]net.IP{realHostIP}) + + Eventually(func() bool { + front.Lock() + defer front.Unlock() + _, ok := front.m[markerKey] + return ok + }, "5s", "50ms").Should(BeTrue(), + "after host IPs propagated, real (realHostIP, nodePort) FE entry should remain") + }) +})