Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 61 additions & 40 deletions felix/bpf/proxy/kube-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ func (kp *KubeProxy) Stop() {
close(kp.exiting)
close(kp.hostMetadataUpdates)
close(kp.hostIPUpdates)
kp.proxy.Stop()
if kp.proxy != nil {
// kp.proxy is nil if Stop is called before start() has
// received its initial host IPs and host-metadata updates.
kp.proxy.Stop()
}
Comment on lines +122 to +126

Copilot AI May 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new nil-check avoids a panic when Stop() is called before start() constructs kp.proxy, but it also makes it possible for Stop() to return without ever stopping a proxy that gets created concurrently shortly after this check (e.g., start() has already received initial updates and is about to assign kp.proxy). Consider preventing proxy construction once exiting is closed, or include the start() goroutine in kp.wg so Stop() waits for it to finish, ensuring a constructed proxy is always stopped.

Copilot uses AI. Check for mistakes.
kp.wg.Wait()
})
}
Expand Down Expand Up @@ -161,10 +165,29 @@ func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMe
return errors.WithMessage(err, "new bpf syncer")
}

kp.proxy.SetHostIPs(hostIPs)
// Don't bother invoking a resync within SetHostMetadata; we will be syncing a fresh syncer right after.
kp.proxy.SetHostMetadata(hostMetadata, false)
kp.proxy.SetSyncer(syncer)
if kp.proxy == nil {
// First call from start(): construct the proxy with a syncer
// that already knows the real host IPs. proxy.New() spins up
// the k8s informer goroutines synchronously, so by the time
// they sync and trigger an Apply, the syncer's desired state
// will include all (realHostIP, nodePort) FE entries.
// Constructing the proxy any earlier risks an Apply against a
// syncer that lacks real host IPs, which would gut pre-existing
// (realHostIP, nodePort) NAT FE entries left by the previous
// Felix run and break external NodePort traffic. See #12192.
proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...)
if err != nil {
return errors.WithMessage(err, "new proxy")
}
proxy.SetHostIPs(hostIPs)
proxy.SetHostMetadata(hostMetadata, false)
kp.proxy = proxy
} else {
kp.proxy.SetHostIPs(hostIPs)
// Don't bother invoking a resync within SetHostMetadata; we will be syncing a fresh syncer right after.
kp.proxy.SetHostMetadata(hostMetadata, false)
kp.proxy.SetSyncer(syncer)
}

log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs)

Expand All @@ -174,58 +197,56 @@ func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMe
}

func (kp *KubeProxy) start() error {
var withLocalNP []net.IP
if kp.ipFamily == 4 {
withLocalNP = append(withLocalNP, podNPIP)
} else {
withLocalNP = append(withLocalNP, podNPIPV6)
}

syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.MaglevMap, kp.affinityMap, kp.rt, kp.excludedCIDRs, kp.maglevLUTSize)
if err != nil {
return errors.WithMessage(err, "new bpf syncer")
}

proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...)
if err != nil {
return errors.WithMessage(err, "new proxy")
// Block until we have the first batch of host IPs AND the
// host-metadata in-sync signal (sent by CompleteDeferredWork after
// the int_dataplane has finished its first datastore-in-sync
// apply). Only then construct the proxy, via run(). proxy.New()
// kicks off the k8s informer goroutines synchronously; once those
// sync, they trigger Apply on the syncer. Constructing the proxy
// before we have real host IPs lets that Apply run against a
// syncer whose desired state lacks every (realHostIP, nodePort)
// FE entry, which then erases pre-existing entries left by the
// previous Felix run and breaks external NodePort traffic during
// the kube-proxy bootstrap window. See projectcalico/calico#12192.
var hostIPs []net.IP
select {
case ips, ok := <-kp.hostIPUpdates:
if !ok {
return nil
}
hostIPs = ips
case <-kp.exiting:
return nil
}

kp.lock.Lock()
kp.proxy = proxy
kp.syncer = syncer
kp.lock.Unlock()

// Wait for the initial update.
hostIPs := <-kp.hostIPUpdates

hostMetadata := make(map[string]*proto.HostMetadataV4V6Update)
// Block until we go in-sync and get the first batch of hostmetadata
// updates, to avoid a flap after a Felix restart. In practice, this
// recv should happen very soon after receiving the host IPs above.
hostMetadataUpdates := <-kp.hostMetadataUpdates
mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates)
select {
case updates, ok := <-kp.hostMetadataUpdates:
if !ok {
return nil
}
mergeHostMetadataV4V6Updates(hostMetadata, updates)
case <-kp.exiting:
return nil
}

err = kp.run(hostIPs, hostMetadata)
if err != nil {
if err := kp.run(hostIPs, hostMetadata); err != nil {
return err
}

kp.wg.Go(func() {
for {
var ok bool
select {
case hostIPs, ok = <-kp.hostIPUpdates:
case hostIPs, ok := <-kp.hostIPUpdates:
if !ok {
log.Error("kube-proxy: hostIPUpdates closed")
return
}
err = kp.run(hostIPs, hostMetadata)
if err != nil {
if err := kp.run(hostIPs, hostMetadata); err != nil {
log.Panic("kube-proxy failed to resync after host IPs update")
}

case hostMetadataUpdates, ok = <-kp.hostMetadataUpdates:
case hostMetadataUpdates, ok := <-kp.hostMetadataUpdates:
if !ok {
log.Error("kube-proxy: hostMetadataUpdates closed")
return
Expand Down
125 changes: 125 additions & 0 deletions felix/bpf/proxy/kube-proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/projectcalico/calico/felix/bpf/bpfmap"
"github.com/projectcalico/calico/felix/bpf/conntrack"
"github.com/projectcalico/calico/felix/bpf/mock"
"github.com/projectcalico/calico/felix/bpf/nat"
proxy "github.com/projectcalico/calico/felix/bpf/proxy"
"github.com/projectcalico/calico/felix/proto"
)
Expand Down Expand Up @@ -158,3 +159,127 @@ var _ = Describe("BPF kube-proxy", func() {
})
})
})

// Regression for projectcalico/calico#12192. Felix restart on a node
// receiving NodePort traffic was breaking new TCP connections for ~500ms
// — the bootstrap window between kube-proxy starting up and receiving
// the first hostIPs. In that window, kube-proxy.go:start() used to
// construct the proxy with a stub Syncer (only podNPIP, no real host
// IPs). proxy.New() spins up the k8s informer goroutines synchronously;
// once they sync, an Apply runs against the stub Syncer. The cachingmap
// computes desired state without any (realHostIP, nodePort) FE entry
// and erases pre-existing real-host-IP NodePort FE entries left by the
// previous Felix run. New external TCP connections to the NodePort
// during the gap miss the FE map, ascend to the host stack with no
// listener, and get RST by the kernel.
//
// The fix defers proxy construction until both the first hostIPUpdates
// and the first hostMetadataUpdates have arrived — i.e. until run()
// is called with real host IPs. This test pre-populates the front map
// with a real-host-IP NodePort FE entry, fires the host-metadata
// in-sync gate but withholds the host IPs (mimicking the bootstrap
// window), and asserts the entry survives.
var _ = Describe("BPF kube-proxy bootstrap window — regression #12192", func() {
var (
bpfMaps *bpfmap.IPMaps
front *mockNATMap
p *proxy.KubeProxy
)

realHostIP := net.IPv4(10, 0, 0, 99)
const nodePort = uint16(30000)

BeforeEach(func() {
bpfMaps = new(bpfmap.IPMaps)
bpfMaps.FrontendMap = newMockNATMap()
bpfMaps.BackendMap = newMockNATBackendMap()
bpfMaps.AffinityMap = newMockAffinityMap()
bpfMaps.MaglevMap = newMockMaglevMap()
bpfMaps.CtMap = mock.NewMockMap(conntrack.MapParams)
front = bpfMaps.FrontendMap.(*mockNATMap)

// Marker entry simulating leftover state from the previous
// Felix run. Value is not meaningful — only presence matters.
markerKey := nat.NewNATKey(realHostIP, nodePort, proxy.ProtoV1ToIntPanic(v1.ProtocolTCP))
front.m[markerKey] = nat.NewNATValue(0xdeadbeef, 1, 0, 0)
})

AfterEach(func() {
if p != nil {
p.Stop()
}
})

It("does not erase pre-existing real-host-IP NodePort FE entries during the bootstrap window", func() {
testSvc := &v1.Service{
TypeMeta: typeMetaV1("Service"),
ObjectMeta: objectMetaV1("regression-12192-svc"),
Spec: v1.ServiceSpec{
ClusterIP: "10.1.0.1",
Type: v1.ServiceTypeNodePort,
Selector: map[string]string{"app": "test"},
Ports: []v1.ServicePort{{
Protocol: v1.ProtocolTCP,
Port: 1234,
NodePort: int32(nodePort),
}},
},
}
testSvcEps := &discoveryv1.EndpointSlice{
TypeMeta: typeMetaV1("EndpointSlice"),
ObjectMeta: objectMetaV1("regression-12192-svc"),
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{Addresses: []string{"10.1.2.1"}}},
Ports: []discoveryv1.EndpointPort{{
Port: ptr.To(int32(1234)),
Name: ptr.To("http"),
Protocol: ptr.To(v1.ProtocolTCP),
}},
}
k8s := fake.NewClientset(testSvc, testSvcEps)

markerKey := nat.NewNATKey(realHostIP, nodePort, proxy.ProtoV1ToIntPanic(v1.ProtocolTCP))

var err error
p, err = proxy.StartKubeProxy(k8s, "test-node", bpfMaps,
proxy.WithImmediateSync(),
proxy.WithMaglevLUTSize(maglevLUTSize),
)
Expect(err).NotTo(HaveOccurred())

// Fire the host-metadata in-sync gate but NOT the host IPs.
// In the bootstrap window with the buggy code, the proxy is
// already constructed and informers are syncing — they'll
// trigger an Apply on the stub Syncer that erases the marker.
p.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "test-node"})
Expect(p.CompleteDeferredWork()).To(Succeed())

// The marker must survive the bootstrap window. With
// WithImmediateSync the fake-client informers sync and
// schedule an Apply within tens of ms, so 500ms is a
// generous bound.
Consistently(func() bool {
front.Lock()
defer front.Unlock()
_, ok := front.m[markerKey]
return ok
}, "500ms", "20ms").Should(BeTrue(),
"pre-existing (realHostIP, nodePort) FE entry was erased during the kube-proxy bootstrap window")

// Now release the host-IPs gate. The proxy is constructed
// with real host IPs and informers fire their first Apply
// against a Syncer whose desired state includes the
// (realHostIP, nodePort) FE entry, so the entry stays
// (cachingmap updates it in place to the proxy-computed
// value).
p.OnHostIPsUpdate([]net.IP{realHostIP})

Eventually(func() bool {
front.Lock()
defer front.Unlock()
_, ok := front.m[markerKey]
return ok
}, "5s", "50ms").Should(BeTrue(),
"after host IPs propagated, real (realHostIP, nodePort) FE entry should remain")
})
})
Loading