From a88351d4a0be878fe980efa83afec16bc4ba810b Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 10 Feb 2026 18:20:54 +0000 Subject: [PATCH 01/32] first draft hostmetadata_cache sends host metadata to BPF KP --- felix/bpf/proxy/kube-proxy.go | 53 ++++++++++++++-- felix/bpf/proxy/proxy.go | 13 ++++ felix/dataplane/linux/hostmetadata_cache.go | 68 +++++++++++++++++++++ felix/dataplane/linux/int_dataplane.go | 4 ++ 4 files changed, 133 insertions(+), 5 deletions(-) create mode 100644 felix/dataplane/linux/hostmetadata_cache.go diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 349be047161..4b94a42406d 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -26,6 +26,7 @@ import ( "github.com/projectcalico/calico/felix/bpf/maps" "github.com/projectcalico/calico/felix/bpf/routes" "github.com/projectcalico/calico/felix/ip" + "github.com/projectcalico/calico/felix/proto" ) // KubeProxy is a wrapper of Proxy that deals with higher level issue like @@ -34,6 +35,8 @@ type KubeProxy struct { proxy ProxyFrontend syncer DPSyncer + hostMetadataUpdates chan map[string]*proto.HostMetadataV4V6Update + ipFamily int hostIPUpdates chan []net.IP stopOnce sync.Once @@ -104,13 +107,14 @@ func (kp *KubeProxy) Stop() { defer kp.lock.Unlock() close(kp.exiting) + close(kp.hostMetadataUpdates) close(kp.hostIPUpdates) kp.proxy.Stop() kp.wg.Wait() }) } -func (kp *KubeProxy) run(hostIPs []net.IP) error { +func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMetadataV4V6Update) error { ips := make([]net.IP, 0, len(hostIPs)) for _, ip := range hostIPs { @@ -141,6 +145,7 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error { } kp.proxy.SetHostIPs(hostIPs) + kp.proxy.SetHostMetadata(hostMetadata) kp.proxy.SetSyncer(syncer) log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs) @@ -173,26 +178,38 @@ func (kp *KubeProxy) start() error { kp.syncer = syncer kp.lock.Unlock() - // wait for the initial update + // Wait for the initial update. hostIPs := <-kp.hostIPUpdates + // Could be nil, initially. + hostMetadata := kp.checkHostMetadataV4V6Updates() - err = kp.run(hostIPs) + err = kp.run(hostIPs, hostMetadata) if err != nil { return err } kp.wg.Go(func() { for { + var ok bool select { - case hostIPs, ok := <-kp.hostIPUpdates: + case hostIPs, ok = <-kp.hostIPUpdates: if !ok { log.Error("kube-proxy: hostIPUpdates closed") return } - err = kp.run(hostIPs) + err = kp.run(hostIPs, hostMetadata) if err != nil { log.Panic("kube-proxy failed to resync after host IPs update") } + case hostMetadata, ok = <-kp.hostMetadataUpdates: + if !ok { + log.Error("kube-proxy: hostMetadataUpdates closed") + return + } + err = kp.run(hostIPs, hostMetadata) + if err != nil { + log.Panic("kube-proxy failed to resync after host metadata update") + } case <-kp.exiting: log.Info("kube-proxy: exiting") return @@ -221,6 +238,32 @@ func (kp *KubeProxy) OnHostIPsUpdate(IPs []net.IP) { log.Debugf("kube-proxy OnHostIPsUpdate: %+v", IPs) } +// OnHostMetadataV4V6Update is called by host-metadata cache after +// it goes in-sync (completeDeferredWork called). +func (kp *KubeProxy) OnHostMetadataV4V6Update(updates map[string]*proto.HostMetadataV4V6Update) { + select { + case kp.hostMetadataUpdates <- updates: + default: + select { + case <-kp.hostMetadataUpdates: + default: + } + kp.hostMetadataUpdates <- updates + } + if log.GetLevel() == log.DebugLevel { + log.WithField("updates", updates).Debug("kube-proxy HostMetadataV4V6Update cb fired") + } +} + +func (kp *KubeProxy) checkHostMetadataV4V6Updates() map[string]*proto.HostMetadataV4V6Update { + select { + case upd := <-kp.hostMetadataUpdates: + return upd + default: + return nil + } +} + // OnRouteUpdate should be used to update the internal state of routing tables func (kp *KubeProxy) OnRouteUpdate(k routes.KeyInterface, v routes.ValueInterface) { log.WithFields(log.Fields{"key": k, "value": v}).Debug("kube-proxy: OnRouteUpdate") diff --git a/felix/bpf/proxy/proxy.go b/felix/bpf/proxy/proxy.go index 3bd2aebb427..7fa7bb6b763 100644 --- a/felix/bpf/proxy/proxy.go +++ b/felix/bpf/proxy/proxy.go @@ -26,6 +26,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/projectcalico/calico/felix/proto" log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -58,6 +59,7 @@ type ProxyFrontend interface { Proxy SetSyncer(DPSyncer) SetHostIPs([]net.IP) + SetHostMetadata(map[string]*proto.HostMetadataV4V6Update) } // DPSyncerState groups the information passed to the DPSyncer's Apply @@ -95,6 +97,8 @@ type proxy struct { svcMap k8sp.ServicePortMap epsMap k8sp.EndpointsMap + hostMetadataByHostname map[string]*proto.HostMetadataV4V6Update + dpSyncer DPSyncer syncerLck sync.Mutex // executes periodic the dataplane updates @@ -147,6 +151,8 @@ func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option) svcMap: make(k8sp.ServicePortMap), epsMap: make(k8sp.EndpointsMap), + hostMetadataByHostname: make(map[string]*proto.HostMetadataV4V6Update), + recorder: new(loggerRecorder), // TODO: revisit these default values. @@ -391,6 +397,13 @@ func (p *proxy) SetHostIPs(hostIPs []net.IP) { npa, p.healthzServer) } +func (p *proxy) SetHostMetadata(updates map[string]*proto.HostMetadataV4V6Update) { + p.runnerLck.Lock() + defer p.runnerLck.Unlock() + + p.hostMetadataByHostname = updates +} + func (p *proxy) IPFamily() discovery.AddressType { if p.ipFamily == 4 { return discovery.AddressTypeIPv4 diff --git a/felix/dataplane/linux/hostmetadata_cache.go b/felix/dataplane/linux/hostmetadata_cache.go new file mode 100644 index 00000000000..b75c6127873 --- /dev/null +++ b/felix/dataplane/linux/hostmetadata_cache.go @@ -0,0 +1,68 @@ +package intdataplane + +import ( + "maps" + "sync" + + "github.com/projectcalico/calico/felix/proto" +) + +type HostMetadataCache struct { + updates map[string]*proto.HostMetadataV4V6Update + inSync bool + + onHostUpdateCB func(map[string]*proto.HostMetadataV4V6Update) + cbLock sync.Mutex +} + +func NewHostMetadataCache() *HostMetadataCache { + return &HostMetadataCache{ + updates: make(map[string]*proto.HostMetadataV4V6Update), + } +} + +func (c *HostMetadataCache) CompleteDeferredWork() error { + c.inSync = true + c.sendAllUpdates() + return nil +} + +func (c *HostMetadataCache) OnUpdate(u any) { + switch upd := u.(type) { + case *proto.HostMetadataV4V6Update: + c.onHostMetadataV4V6Update(upd) + case *proto.HostMetadataV4V6Remove: + c.onHostMetadataV4V6Remove(upd) + default: + return + } + + if c.inSync { + c.sendAllUpdates() + } +} + +func (c *HostMetadataCache) sendAllUpdates() { + c.cbLock.Lock() + defer c.cbLock.Unlock() + if c.onHostUpdateCB != nil { + upds := make(map[string]*proto.HostMetadataV4V6Update) + maps.Copy(upds, c.updates) + c.onHostUpdateCB(upds) + } +} + +func (c *HostMetadataCache) onHostMetadataV4V6Update(u *proto.HostMetadataV4V6Update) { + c.updates[u.Hostname] = u +} + +func (c *HostMetadataCache) onHostMetadataV4V6Remove(u *proto.HostMetadataV4V6Remove) { + delete(c.updates, u.Hostname) +} + +func (c *HostMetadataCache) SetOnHostUpdateCB(cb func(map[string]*proto.HostMetadataV4V6Update)) { + c.cbLock.Lock() + defer c.cbLock.Unlock() + + c.onHostUpdateCB = cb +} diff --git a/felix/dataplane/linux/int_dataplane.go b/felix/dataplane/linux/int_dataplane.go index fb92f1a194f..fbc5f81fd57 100644 --- a/felix/dataplane/linux/int_dataplane.go +++ b/felix/dataplane/linux/int_dataplane.go @@ -2985,6 +2985,10 @@ func startBPFDataplaneComponents( log.WithError(err).Panic("Failed to start kube-proxy.") } + hostMetadataCache := NewHostMetadataCache() + hostMetadataCache.SetOnHostUpdateCB(kp.OnHostMetadataV4V6Update) + dp.RegisterManager(hostMetadataCache) + bpfRTMgr.setHostIPUpdatesCallBack(kp.OnHostIPsUpdate) bpfRTMgr.setRoutesCallBacks(kp.OnRouteUpdate, kp.OnRouteDelete) conntrackScanner.AddUnlocked(bpfconntrack.NewStaleNATScanner(kp)) From 9cd0f0c7143f99965e1c94db99c175dcf0655539 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Wed, 11 Feb 2026 09:46:21 +0000 Subject: [PATCH 02/32] appease linter --- felix/bpf/proxy/proxy.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/felix/bpf/proxy/proxy.go b/felix/bpf/proxy/proxy.go index 7fa7bb6b763..1e9fac8352b 100644 --- a/felix/bpf/proxy/proxy.go +++ b/felix/bpf/proxy/proxy.go @@ -26,7 +26,6 @@ import ( "time" "github.com/pkg/errors" - "github.com/projectcalico/calico/felix/proto" log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -44,6 +43,8 @@ import ( "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/runner" "k8s.io/kubernetes/pkg/proxy/util" + + "github.com/projectcalico/calico/felix/proto" ) // Proxy watches for updates of Services and Endpoints, maintains their mapping From 31c395146b33e1553f7e3949354d8b87291a1760 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Wed, 11 Feb 2026 10:05:47 +0000 Subject: [PATCH 03/32] initialise updates chan --- felix/bpf/proxy/kube-proxy.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 4b94a42406d..954dd772d6e 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -76,6 +76,8 @@ func StartKubeProxy(k8s kubernetes.Interface, hostname string, opts: opts, rt: NewRTCache(), + hostMetadataUpdates: make(chan map[string]*proto.HostMetadataV4V6Update, 1), + hostIPUpdates: make(chan []net.IP, 1), exiting: make(chan struct{}), } From 911257dbeb95b5dfc5bbd960004e88fce663cec0 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Wed, 11 Feb 2026 15:24:04 +0000 Subject: [PATCH 04/32] throttle hostmetadata_cache updates --- felix/dataplane/linux/hostmetadata_cache.go | 65 +++++++++++++++++---- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/felix/dataplane/linux/hostmetadata_cache.go b/felix/dataplane/linux/hostmetadata_cache.go index b75c6127873..c24fc085405 100644 --- a/felix/dataplane/linux/hostmetadata_cache.go +++ b/felix/dataplane/linux/hostmetadata_cache.go @@ -3,61 +3,102 @@ package intdataplane import ( "maps" "sync" + "time" "github.com/projectcalico/calico/felix/proto" + "github.com/sirupsen/logrus" ) type HostMetadataCache struct { - updates map[string]*proto.HostMetadataV4V6Update - inSync bool + updates map[string]*proto.HostMetadataV4V6Update + updatesLock sync.Mutex + inSync bool onHostUpdateCB func(map[string]*proto.HostMetadataV4V6Update) cbLock sync.Mutex + + queue chan signal } +type signal struct{} + func NewHostMetadataCache() *HostMetadataCache { - return &HostMetadataCache{ + c := &HostMetadataCache{ updates: make(map[string]*proto.HostMetadataV4V6Update), + queue: make(chan signal, 1), } + + go c.loopFlushingUpdates() + return c } func (c *HostMetadataCache) CompleteDeferredWork() error { c.inSync = true - c.sendAllUpdates() + logrus.Debug("Now in sync") + c.requestUpdate() return nil } func (c *HostMetadataCache) OnUpdate(u any) { switch upd := u.(type) { case *proto.HostMetadataV4V6Update: + logrus.WithField("update", upd).Debug("Received HostMetadataV4V6Update message") c.onHostMetadataV4V6Update(upd) case *proto.HostMetadataV4V6Remove: + logrus.WithField("update", upd).Debug("Received HostMetadataV4V6Remove message") c.onHostMetadataV4V6Remove(upd) default: return } if c.inSync { + c.requestUpdate() + } +} + +func (c *HostMetadataCache) onHostMetadataV4V6Update(u *proto.HostMetadataV4V6Update) { + c.updates[u.Hostname] = u +} + +func (c *HostMetadataCache) onHostMetadataV4V6Remove(u *proto.HostMetadataV4V6Remove) { + delete(c.updates, u.Hostname) +} + +func (c *HostMetadataCache) requestUpdate() { + select { + case c.queue <- signal{}: + default: + } +} + +func (c *HostMetadataCache) loopFlushingUpdates() { + var timer *time.Timer + for { + <-c.queue c.sendAllUpdates() + + if timer == nil { + timer = time.NewTimer(time.Second) + } else { + _ = timer.Reset(time.Second) + } + <-timer.C } } -func (c *HostMetadataCache) sendAllUpdates() { +func (c *HostMetadataCache) sendAllUpdates() error { c.cbLock.Lock() defer c.cbLock.Unlock() + c.updatesLock.Lock() + defer c.updatesLock.Lock() + if c.onHostUpdateCB != nil { upds := make(map[string]*proto.HostMetadataV4V6Update) maps.Copy(upds, c.updates) c.onHostUpdateCB(upds) } -} -func (c *HostMetadataCache) onHostMetadataV4V6Update(u *proto.HostMetadataV4V6Update) { - c.updates[u.Hostname] = u -} - -func (c *HostMetadataCache) onHostMetadataV4V6Remove(u *proto.HostMetadataV4V6Remove) { - delete(c.updates, u.Hostname) + return nil } func (c *HostMetadataCache) SetOnHostUpdateCB(cb func(map[string]*proto.HostMetadataV4V6Update)) { From 96173ef54f2631fcb56b421dff19f34ff754047b Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Wed, 11 Feb 2026 15:58:31 +0000 Subject: [PATCH 05/32] additional logging in hostmetadata_cache --- felix/dataplane/linux/hostmetadata_cache.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/felix/dataplane/linux/hostmetadata_cache.go b/felix/dataplane/linux/hostmetadata_cache.go index c24fc085405..861f9bbf97f 100644 --- a/felix/dataplane/linux/hostmetadata_cache.go +++ b/felix/dataplane/linux/hostmetadata_cache.go @@ -5,8 +5,9 @@ import ( "sync" "time" - "github.com/projectcalico/calico/felix/proto" "github.com/sirupsen/logrus" + + "github.com/projectcalico/calico/felix/proto" ) type HostMetadataCache struct { @@ -33,9 +34,11 @@ func NewHostMetadataCache() *HostMetadataCache { } func (c *HostMetadataCache) CompleteDeferredWork() error { + if !c.inSync { + logrus.Debug("Now in sync") + } c.inSync = true - logrus.Debug("Now in sync") - c.requestUpdate() + c.requestFlush() return nil } @@ -44,6 +47,7 @@ func (c *HostMetadataCache) OnUpdate(u any) { case *proto.HostMetadataV4V6Update: logrus.WithField("update", upd).Debug("Received HostMetadataV4V6Update message") c.onHostMetadataV4V6Update(upd) + case *proto.HostMetadataV4V6Remove: logrus.WithField("update", upd).Debug("Received HostMetadataV4V6Remove message") c.onHostMetadataV4V6Remove(upd) @@ -52,7 +56,7 @@ func (c *HostMetadataCache) OnUpdate(u any) { } if c.inSync { - c.requestUpdate() + c.requestFlush() } } @@ -64,7 +68,7 @@ func (c *HostMetadataCache) onHostMetadataV4V6Remove(u *proto.HostMetadataV4V6Re delete(c.updates, u.Hostname) } -func (c *HostMetadataCache) requestUpdate() { +func (c *HostMetadataCache) requestFlush() { select { case c.queue <- signal{}: default: @@ -75,6 +79,7 @@ func (c *HostMetadataCache) loopFlushingUpdates() { var timer *time.Timer for { <-c.queue + logrus.Debug("Flushing throttled updates") c.sendAllUpdates() if timer == nil { From d9fb94f134ddf4f8498ef5a255abc710bee1ec44 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Thu, 12 Feb 2026 08:55:21 +0000 Subject: [PATCH 06/32] dont return anything from sendAllUpdates --- felix/dataplane/linux/hostmetadata_cache.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/felix/dataplane/linux/hostmetadata_cache.go b/felix/dataplane/linux/hostmetadata_cache.go index 861f9bbf97f..c8815878cbc 100644 --- a/felix/dataplane/linux/hostmetadata_cache.go +++ b/felix/dataplane/linux/hostmetadata_cache.go @@ -91,7 +91,7 @@ func (c *HostMetadataCache) loopFlushingUpdates() { } } -func (c *HostMetadataCache) sendAllUpdates() error { +func (c *HostMetadataCache) sendAllUpdates() { c.cbLock.Lock() defer c.cbLock.Unlock() c.updatesLock.Lock() @@ -102,8 +102,6 @@ func (c *HostMetadataCache) sendAllUpdates() error { maps.Copy(upds, c.updates) c.onHostUpdateCB(upds) } - - return nil } func (c *HostMetadataCache) SetOnHostUpdateCB(cb func(map[string]*proto.HostMetadataV4V6Update)) { From db4f05e87c1180ad350cc6b2ace0032fd32f2ed6 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Thu, 12 Feb 2026 11:33:49 +0000 Subject: [PATCH 07/32] makes the throttling interval configurable in hostmetadata_cache --- felix/dataplane/linux/hostmetadata_cache.go | 37 +++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/felix/dataplane/linux/hostmetadata_cache.go b/felix/dataplane/linux/hostmetadata_cache.go index c8815878cbc..052f9e717f8 100644 --- a/felix/dataplane/linux/hostmetadata_cache.go +++ b/felix/dataplane/linux/hostmetadata_cache.go @@ -18,15 +18,23 @@ type HostMetadataCache struct { onHostUpdateCB func(map[string]*proto.HostMetadataV4V6Update) cbLock sync.Mutex - queue chan signal + queue chan signal + throttleInterval time.Duration } type signal struct{} -func NewHostMetadataCache() *HostMetadataCache { +type HostMetadataCacheOption func(*HostMetadataCache) + +func NewHostMetadataCache(opts ...HostMetadataCacheOption) *HostMetadataCache { c := &HostMetadataCache{ - updates: make(map[string]*proto.HostMetadataV4V6Update), - queue: make(chan signal, 1), + updates: make(map[string]*proto.HostMetadataV4V6Update), + queue: make(chan signal, 1), + throttleInterval: time.Second, + } + + for _, o := range opts { + o(c) } go c.loopFlushingUpdates() @@ -83,9 +91,9 @@ func (c *HostMetadataCache) loopFlushingUpdates() { c.sendAllUpdates() if timer == nil { - timer = time.NewTimer(time.Second) + timer = time.NewTimer(c.throttleInterval) } else { - _ = timer.Reset(time.Second) + _ = timer.Reset(c.throttleInterval) } <-timer.C } @@ -110,3 +118,20 @@ func (c *HostMetadataCache) SetOnHostUpdateCB(cb func(map[string]*proto.HostMeta c.onHostUpdateCB = cb } + +// SetThrottle implements the Throttled interface. +func (c *HostMetadataCache) SetThrottle(d time.Duration) { + c.throttleInterval = d +} + +// Throttled allows any module to use the 'with throttle interval' option. +// This felt prudent since dataplane manager options share scope with the whole dataplane pkg. +type Throttled interface { + SetThrottle(time.Duration) +} + +// OptWithThrottleInterval sets the throttling interval for any module +// that must regulate the period of an operation. +func OptWithThrottleInterval(d time.Duration) func(t Throttled) { + return func(t Throttled) { t.SetThrottle(d) } +} From a0a4dcdbc2526e5aebea3397c71aae567ace978a Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Thu, 12 Feb 2026 12:11:23 +0000 Subject: [PATCH 08/32] update HostMetadata without restarting Syncer --- felix/bpf/proxy/kube-proxy.go | 16 +++++++++++----- felix/bpf/proxy/proxy.go | 8 ++++++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 954dd772d6e..d5a07c0da5a 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -116,6 +116,13 @@ func (kp *KubeProxy) Stop() { }) } +func (kp *KubeProxy) setProxyHostMetadata(hostMetadata map[string]*proto.HostMetadataV4V6Update) { + kp.lock.Lock() + defer kp.lock.Unlock() + + kp.proxy.SetHostMetadata(hostMetadata, true) +} + func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMetadataV4V6Update) error { ips := make([]net.IP, 0, len(hostIPs)) @@ -147,7 +154,8 @@ func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMe } kp.proxy.SetHostIPs(hostIPs) - kp.proxy.SetHostMetadata(hostMetadata) + // Don't bother invoking a resync within SetHostMetadata; we will be syncing a fresh syncer right after. + kp.proxy.SetHostMetadata(hostMetadata, false) kp.proxy.SetSyncer(syncer) log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs) @@ -208,10 +216,8 @@ func (kp *KubeProxy) start() error { log.Error("kube-proxy: hostMetadataUpdates closed") return } - err = kp.run(hostIPs, hostMetadata) - if err != nil { - log.Panic("kube-proxy failed to resync after host metadata update") - } + kp.setProxyHostMetadata(hostMetadata) + case <-kp.exiting: log.Info("kube-proxy: exiting") return diff --git a/felix/bpf/proxy/proxy.go b/felix/bpf/proxy/proxy.go index 1e9fac8352b..3ddf5fe1254 100644 --- a/felix/bpf/proxy/proxy.go +++ b/felix/bpf/proxy/proxy.go @@ -60,7 +60,7 @@ type ProxyFrontend interface { Proxy SetSyncer(DPSyncer) SetHostIPs([]net.IP) - SetHostMetadata(map[string]*proto.HostMetadataV4V6Update) + SetHostMetadata(updates map[string]*proto.HostMetadataV4V6Update, requestResync bool) } // DPSyncerState groups the information passed to the DPSyncer's Apply @@ -398,11 +398,15 @@ func (p *proxy) SetHostIPs(hostIPs []net.IP) { npa, p.healthzServer) } -func (p *proxy) SetHostMetadata(updates map[string]*proto.HostMetadataV4V6Update) { +func (p *proxy) SetHostMetadata(updates map[string]*proto.HostMetadataV4V6Update, requestResync bool) { p.runnerLck.Lock() defer p.runnerLck.Unlock() p.hostMetadataByHostname = updates + if requestResync { + // Invoke a sync via the runner, so that we can release any locks in this goroutine. + p.syncDP() + } } func (p *proxy) IPFamily() discovery.AddressType { From 8c3bc7a4d70d9ba29602d9382518e9f2a2c65b98 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Thu, 12 Feb 2026 12:12:13 +0000 Subject: [PATCH 09/32] rename a channel to be more descriptive --- felix/dataplane/linux/hostmetadata_cache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/felix/dataplane/linux/hostmetadata_cache.go b/felix/dataplane/linux/hostmetadata_cache.go index 052f9e717f8..9aed639964e 100644 --- a/felix/dataplane/linux/hostmetadata_cache.go +++ b/felix/dataplane/linux/hostmetadata_cache.go @@ -18,7 +18,7 @@ type HostMetadataCache struct { onHostUpdateCB func(map[string]*proto.HostMetadataV4V6Update) cbLock sync.Mutex - queue chan signal + updateRequest chan signal throttleInterval time.Duration } @@ -29,7 +29,7 @@ type HostMetadataCacheOption func(*HostMetadataCache) func NewHostMetadataCache(opts ...HostMetadataCacheOption) *HostMetadataCache { c := &HostMetadataCache{ updates: make(map[string]*proto.HostMetadataV4V6Update), - queue: make(chan signal, 1), + updateRequest: make(chan signal, 1), throttleInterval: time.Second, } @@ -78,7 +78,7 @@ func (c *HostMetadataCache) onHostMetadataV4V6Remove(u *proto.HostMetadataV4V6Re func (c *HostMetadataCache) requestFlush() { select { - case c.queue <- signal{}: + case c.updateRequest <- signal{}: default: } } @@ -86,7 +86,7 @@ func (c *HostMetadataCache) requestFlush() { func (c *HostMetadataCache) loopFlushingUpdates() { var timer *time.Timer for { - <-c.queue + <-c.updateRequest logrus.Debug("Flushing throttled updates") c.sendAllUpdates() From 4770c547b15c4471abdde037f39b3847f9bcc3ea Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Fri, 13 Feb 2026 17:07:05 +0000 Subject: [PATCH 10/32] adds testing for hostmetadatacache --- felix/dataplane/linux/hostmetadata_cache.go | 66 ++++++--- .../linux/hostmetadata_cache_test.go | 135 ++++++++++++++++++ felix/dataplane/linux/int_dataplane.go | 1 + 3 files changed, 181 insertions(+), 21 deletions(-) create mode 100644 felix/dataplane/linux/hostmetadata_cache_test.go diff --git a/felix/dataplane/linux/hostmetadata_cache.go b/felix/dataplane/linux/hostmetadata_cache.go index 9aed639964e..bcce1d6b589 100644 --- a/felix/dataplane/linux/hostmetadata_cache.go +++ b/felix/dataplane/linux/hostmetadata_cache.go @@ -20,24 +20,35 @@ type HostMetadataCache struct { updateRequest chan signal throttleInterval time.Duration + newTimerFn NewResettableTimerFunc } type signal struct{} type HostMetadataCacheOption func(*HostMetadataCache) +// OptWithThrottleInterval sets the throttling interval for update flushes. +func OptWithThrottleInterval(d time.Duration) func(t *HostMetadataCache) { + return func(t *HostMetadataCache) { t.throttleInterval = d } +} + +// OptWithNewTimerFn sets the ResettableTimer in the HostMetadataCache. +func OptWithNewTimerFn(f NewResettableTimerFunc) func(t *HostMetadataCache) { + return func(t *HostMetadataCache) { t.newTimerFn = f } +} + func NewHostMetadataCache(opts ...HostMetadataCacheOption) *HostMetadataCache { c := &HostMetadataCache{ updates: make(map[string]*proto.HostMetadataV4V6Update), updateRequest: make(chan signal, 1), throttleInterval: time.Second, + newTimerFn: NewRealTimer, } for _, o := range opts { o(c) } - go c.loopFlushingUpdates() return c } @@ -69,10 +80,14 @@ func (c *HostMetadataCache) OnUpdate(u any) { } func (c *HostMetadataCache) onHostMetadataV4V6Update(u *proto.HostMetadataV4V6Update) { + c.updatesLock.Lock() + defer c.updatesLock.Unlock() c.updates[u.Hostname] = u } func (c *HostMetadataCache) onHostMetadataV4V6Remove(u *proto.HostMetadataV4V6Remove) { + c.updatesLock.Lock() + defer c.updatesLock.Unlock() delete(c.updates, u.Hostname) } @@ -83,19 +98,22 @@ func (c *HostMetadataCache) requestFlush() { } } +// Start spawns a new goroutine for the cache to flush updates. +func (c *HostMetadataCache) Start() { + go c.loopFlushingUpdates() +} + +// loopFlushingUpdates flushes updates indefinitely at most once every c.throttleInterval. +// Intended to be run on its own goroutine. func (c *HostMetadataCache) loopFlushingUpdates() { - var timer *time.Timer + timer := c.newTimerFn(c.throttleInterval) for { + // One update should get through immediately before timers can delay things. <-c.updateRequest - logrus.Debug("Flushing throttled updates") + logrus.Debug("Flushing host metadata cached updates") c.sendAllUpdates() - - if timer == nil { - timer = time.NewTimer(c.throttleInterval) - } else { - _ = timer.Reset(c.throttleInterval) - } - <-timer.C + _ = timer.Reset(c.throttleInterval) + <-timer.Chan() } } @@ -119,19 +137,25 @@ func (c *HostMetadataCache) SetOnHostUpdateCB(cb func(map[string]*proto.HostMeta c.onHostUpdateCB = cb } -// SetThrottle implements the Throttled interface. -func (c *HostMetadataCache) SetThrottle(d time.Duration) { - c.throttleInterval = d +// wrappedRealTimer implements the ResettableTimer interface. +// time.Timer requires a method Chan to return timer.C when wrapped in an interface. +type wrappedRealTimer struct { + *time.Timer } -// Throttled allows any module to use the 'with throttle interval' option. -// This felt prudent since dataplane manager options share scope with the whole dataplane pkg. -type Throttled interface { - SetThrottle(time.Duration) +// Chan implements the ResettableTimer interface. +func (t wrappedRealTimer) Chan() <-chan time.Time { + return t.C } -// OptWithThrottleInterval sets the throttling interval for any module -// that must regulate the period of an operation. -func OptWithThrottleInterval(d time.Duration) func(t Throttled) { - return func(t Throttled) { t.SetThrottle(d) } +// NewRealTimer returns a time.Timer, wrapped to implement the ResettableTimer interface. +func NewRealTimer(d time.Duration) ResettableTimer { + return wrappedRealTimer{time.NewTimer(d)} } + +type ResettableTimer interface { + Reset(time.Duration) bool + Chan() <-chan time.Time +} + +type NewResettableTimerFunc func(time.Duration) ResettableTimer diff --git a/felix/dataplane/linux/hostmetadata_cache_test.go b/felix/dataplane/linux/hostmetadata_cache_test.go new file mode 100644 index 00000000000..29be9188a23 --- /dev/null +++ b/felix/dataplane/linux/hostmetadata_cache_test.go @@ -0,0 +1,135 @@ +package intdataplane + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/projectcalico/calico/felix/proto" +) + +type fakeResettableTimer struct { + C chan time.Time +} + +func newFakeResettableTimer() *fakeResettableTimer { + t := &fakeResettableTimer{ + C: make(chan time.Time), + } + + return t +} + +func (t *fakeResettableTimer) Chan() <-chan time.Time { + return t.C +} + +func (t *fakeResettableTimer) Reset(time.Duration) bool { + // Maybe drain a pending alarm. + select { + case <-t.C: + return false + default: + return true + } +} + +// Pretend the given duration has passed and pop the timer. +func (t *fakeResettableTimer) Fire() { + t.C <- time.Now() +} + +var _ = Describe("HostMetadataCache UTs", func() { + It("should flush the first set of updates without waiting for the throttling timer, and then wait for the timer for subsequent flushes", func() { + Expect(0) + // Run the metadata cache with a fake timer. + timer := newFakeResettableTimer() + newFakeTimerFn := func(t time.Duration) ResettableTimer { + _ = timer.Reset(t) + return timer + } + + // Updates will be sent to us from another goroutine. + // We need thread-safety to receive them back on this thread. + updatesC := make(chan map[string]*proto.HostMetadataV4V6Update, 1) + checkForUpdates := func() map[string]*proto.HostMetadataV4V6Update { + select { + case u := <-updatesC: + return u + default: + return nil + } + } + + // The updates callback will write the running total of dropped + // updates since the channel was last consumed. + // I.e. if an old update was never consumed, it would be overwritten + // (dropped) by a fresher update, and the dropped counter would go up. + // If we receive droppedC to read the number of drops, the total resets to 0. + // droppedC will block any receives when updates haven't been sent on updatesC. + droppedC := make(chan int, 1) + // This callback is written with the assumption that only one goroutine call call it + // at any given time. + updatesCallback := func(u map[string]*proto.HostMetadataV4V6Update) { + droppedUpdate := false + select { + // Drain/drop old update if necessary. + case <-updatesC: + droppedUpdate = true + // Otherwise, send new update. + case updatesC <- u: + } + + // Update the number of dropped updates. + numDropped := 0 + if droppedUpdate { + numDropped = 1 + } + select { + case droppedC <- numDropped: + case lastNumDropped := <-droppedC: + droppedC <- lastNumDropped + numDropped + } + } + + cacheT := NewHostMetadataCache(OptWithThrottleInterval(1*time.Second), OptWithNewTimerFn(newFakeTimerFn)) + cacheT.SetOnHostUpdateCB(updatesCallback) + cacheT.Start() + + update := &proto.HostMetadataV4V6Update{ + Hostname: "hn1", + Ipv4Addr: "1.2.3.4", + Labels: map[string]string{"label1": "label1val"}, + } + + // Fill the cache with updates. + cacheT.OnUpdate(update) + + // Ensure it *doesn't* flush the updates. + Consistently(checkForUpdates).Should(BeNil(), "Cache prematurely fired updates") + + // Call CompleteDeferredWork. + err := cacheT.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // Ensure it *does* flush the updates. + Eventually(checkForUpdates).Should(Equal(map[string]*proto.HostMetadataV4V6Update{update.Hostname: update})) + + // Fill up cache with more updates - remove existing update and add new ones. + cacheT.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: update.Hostname}) + update.Hostname = "hn2" + update.Ipv4Addr = "5.6.7.8" + update.Labels = map[string]string{"label2": "label2val"} + cacheT.OnUpdate(update) + + // Ensure it *doesn't* flush the updates. + Consistently(checkForUpdates).Should(BeNil(), "Cache did not wait for timer before flushing") + + // Pop the fake timer. + timer.Fire() + + // Ensure it *does* flush the updates. + Eventually(checkForUpdates).Should(Equal(map[string]*proto.HostMetadataV4V6Update{update.Hostname: update})) + Expect(droppedC).Should(Receive(Equal(0))) + }) +}) diff --git a/felix/dataplane/linux/int_dataplane.go b/felix/dataplane/linux/int_dataplane.go index fbc5f81fd57..c8e47aaf9ae 100644 --- a/felix/dataplane/linux/int_dataplane.go +++ b/felix/dataplane/linux/int_dataplane.go @@ -2988,6 +2988,7 @@ func startBPFDataplaneComponents( hostMetadataCache := NewHostMetadataCache() hostMetadataCache.SetOnHostUpdateCB(kp.OnHostMetadataV4V6Update) dp.RegisterManager(hostMetadataCache) + hostMetadataCache.Start() bpfRTMgr.setHostIPUpdatesCallBack(kp.OnHostIPsUpdate) bpfRTMgr.setRoutesCallBacks(kp.OnRouteUpdate, kp.OnRouteDelete) From f934bb417069685cca891e9640b979f4dd1aa8bd Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Mon, 16 Feb 2026 17:12:39 +0000 Subject: [PATCH 11/32] remove host-metadata-cache; throttle isnt as necessary because syncer no longer restarts on-update --- felix/bpf/proxy/kube-proxy.go | 74 ++++-- felix/bpf/proxy/kube-proxy_internal_test.go | 250 ++++++++++++++++++ felix/dataplane/linux/hostmetadata_cache.go | 161 ----------- .../linux/hostmetadata_cache_test.go | 135 ---------- felix/dataplane/linux/int_dataplane.go | 5 +- 5 files changed, 307 insertions(+), 318 deletions(-) delete mode 100644 felix/dataplane/linux/hostmetadata_cache.go delete mode 100644 felix/dataplane/linux/hostmetadata_cache_test.go diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index d5a07c0da5a..cbd2ae924c6 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -35,7 +35,10 @@ type KubeProxy struct { proxy ProxyFrontend syncer DPSyncer - hostMetadataUpdates chan map[string]*proto.HostMetadataV4V6Update + // Map is hostname to an update/remove msg. + // The size-1 channel allows events to enter the main loop, + // and allows repeated updates to be merged. + hostMetadataUpdates chan map[string]any ipFamily int hostIPUpdates chan []net.IP @@ -76,7 +79,7 @@ func StartKubeProxy(k8s kubernetes.Interface, hostname string, opts: opts, rt: NewRTCache(), - hostMetadataUpdates: make(chan map[string]*proto.HostMetadataV4V6Update, 1), + hostMetadataUpdates: make(chan map[string]any, 1), hostIPUpdates: make(chan []net.IP, 1), exiting: make(chan struct{}), @@ -190,8 +193,11 @@ func (kp *KubeProxy) start() error { // Wait for the initial update. hostIPs := <-kp.hostIPUpdates + // Could be nil, initially. - hostMetadata := kp.checkHostMetadataV4V6Updates() + hostMetadataUpdates := kp.recvHostMetadataV4V6Updates() + var hostMetadata map[string]*proto.HostMetadataV4V6Update + mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates) err = kp.run(hostIPs, hostMetadata) if err != nil { @@ -211,11 +217,12 @@ func (kp *KubeProxy) start() error { if err != nil { log.Panic("kube-proxy failed to resync after host IPs update") } - case hostMetadata, ok = <-kp.hostMetadataUpdates: + case hostMetadataUpdates, ok = <-kp.hostMetadataUpdates: if !ok { log.Error("kube-proxy: hostMetadataUpdates closed") return } + mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates) kp.setProxyHostMetadata(hostMetadata) case <-kp.exiting: @@ -246,24 +253,41 @@ func (kp *KubeProxy) OnHostIPsUpdate(IPs []net.IP) { log.Debugf("kube-proxy OnHostIPsUpdate: %+v", IPs) } -// OnHostMetadataV4V6Update is called by host-metadata cache after -// it goes in-sync (completeDeferredWork called). -func (kp *KubeProxy) OnHostMetadataV4V6Update(updates map[string]*proto.HostMetadataV4V6Update) { - select { - case kp.hostMetadataUpdates <- updates: - default: - select { - case <-kp.hostMetadataUpdates: - default: - } - kp.hostMetadataUpdates <- updates +// OnUpdate implements the manager interface. +func (kp *KubeProxy) OnUpdate(msg any) { + // Drain any pre-existing msg first and merge. + updates := kp.recvHostMetadataV4V6Updates() + if updates == nil { + updates = make(map[string]any) } - if log.GetLevel() == log.DebugLevel { - log.WithField("updates", updates).Debug("kube-proxy HostMetadataV4V6Update cb fired") + + switch update := msg.(type) { + case *proto.HostMetadataV4V6Update: + updates[update.Hostname] = update + log.Debugf("kube-proxy OnUpdate: host metadata update for %q: %+v", update.Hostname, update) + case *proto.HostMetadataV4V6Remove: + if u, ok := updates[update.Hostname]; ok { + delete(updates, update.Hostname) + log.Debugf("kube-proxy OnUpdate: coalesce host metadata deletion for queued update (deleted update) %q: %+v", update.Hostname, u) + } else { + log.WithField("remove", update).Debug("No existing update in queue, sending remove event") + updates[update.Hostname] = msg + } } + + // Send the now-merged updates back down the channel. + log.Debug("Queueing new hostmetadata for main loop") + kp.hostMetadataUpdates <- updates + log.Debug("Successfully queued new hostmetadata") +} + +// CompleteDeferredWork implements the manager interface. +func (kp *KubeProxy) CompleteDeferredWork() error { + return nil } -func (kp *KubeProxy) checkHostMetadataV4V6Updates() map[string]*proto.HostMetadataV4V6Update { +// recvHostMetadataV4V6Updates tries to read a pending host metadata update on the update channel. +func (kp *KubeProxy) recvHostMetadataV4V6Updates() map[string]any { select { case upd := <-kp.hostMetadataUpdates: return upd @@ -272,6 +296,20 @@ func (kp *KubeProxy) checkHostMetadataV4V6Updates() map[string]*proto.HostMetada } } +// mergeHostMetadataV4V6Updates merges the existing host metadata updates with the latest updates: +// - A 'remove' in latest delete the corresponding key in 'existing'. +// - An 'update' in latest overrides the corresponding key in 'existing'. +func mergeHostMetadataV4V6Updates(existing map[string]*proto.HostMetadataV4V6Update, latest map[string]any) { + for k, v := range latest { + switch update := v.(type) { + case *proto.HostMetadataV4V6Update: + existing[k] = update + case *proto.HostMetadataV4V6Remove: + delete(existing, k) + } + } +} + // OnRouteUpdate should be used to update the internal state of routing tables func (kp *KubeProxy) OnRouteUpdate(k routes.KeyInterface, v routes.ValueInterface) { log.WithFields(log.Fields{"key": k, "value": v}).Debug("kube-proxy: OnRouteUpdate") diff --git a/felix/bpf/proxy/kube-proxy_internal_test.go b/felix/bpf/proxy/kube-proxy_internal_test.go index 0e59e331ad0..be7e9a0197c 100644 --- a/felix/bpf/proxy/kube-proxy_internal_test.go +++ b/felix/bpf/proxy/kube-proxy_internal_test.go @@ -17,6 +17,16 @@ package proxy import ( "net" "testing" + "time" + + . "github.com/onsi/gomega" + "k8s.io/client-go/kubernetes/fake" + + "github.com/projectcalico/calico/felix/bpf/bpfmap" + "github.com/projectcalico/calico/felix/bpf/conntrack" + "github.com/projectcalico/calico/felix/bpf/mock" + "github.com/projectcalico/calico/felix/bpf/nat" + "github.com/projectcalico/calico/felix/proto" ) // The main suite of tests in kube-proxy_test.go use a real syncer, making it @@ -71,3 +81,243 @@ func (s *mockSyncer) ConntrackFrontendHasBackend(ip net.IP, port uint16, backend func (s *mockSyncer) ConntrackDestIsService(ip net.IP, port uint16, proto uint8) bool { return true } + +func TestMergeHostMetadataV4V6Updates_AppliesUpdates(t *testing.T) { + existing := map[string]*proto.HostMetadataV4V6Update{} + latest := map[string]any{ + "host1": &proto.HostMetadataV4V6Update{Hostname: "host1", Ipv4Addr: "1.1.1.1"}, + "host2": &proto.HostMetadataV4V6Update{Hostname: "host2", Ipv4Addr: "2.2.2.2"}, + } + mergeHostMetadataV4V6Updates(existing, latest) + + if len(existing) != 2 { + t.Fatalf("expected 2 entries, got %d", len(existing)) + } + if existing["host1"].Ipv4Addr != "1.1.1.1" { + t.Errorf("host1 Ipv4Addr = %q, want %q", existing["host1"].Ipv4Addr, "1.1.1.1") + } + if existing["host2"].Ipv4Addr != "2.2.2.2" { + t.Errorf("host2 Ipv4Addr = %q, want %q", existing["host2"].Ipv4Addr, "2.2.2.2") + } +} + +func TestMergeHostMetadataV4V6Updates_OverridesExisting(t *testing.T) { + existing := map[string]*proto.HostMetadataV4V6Update{ + "host1": {Hostname: "host1", Ipv4Addr: "1.1.1.1"}, + } + latest := map[string]any{ + "host1": &proto.HostMetadataV4V6Update{Hostname: "host1", Ipv4Addr: "9.9.9.9"}, + } + mergeHostMetadataV4V6Updates(existing, latest) + + if existing["host1"].Ipv4Addr != "9.9.9.9" { + t.Errorf("host1 Ipv4Addr = %q, want %q", existing["host1"].Ipv4Addr, "9.9.9.9") + } +} + +func TestMergeHostMetadataV4V6Updates_RemovesExisting(t *testing.T) { + existing := map[string]*proto.HostMetadataV4V6Update{ + "host1": {Hostname: "host1", Ipv4Addr: "1.1.1.1"}, + "host2": {Hostname: "host2", Ipv4Addr: "2.2.2.2"}, + } + latest := map[string]any{ + "host1": &proto.HostMetadataV4V6Remove{Hostname: "host1"}, + } + mergeHostMetadataV4V6Updates(existing, latest) + + if len(existing) != 1 { + t.Fatalf("expected 1 entry, got %d", len(existing)) + } + if _, ok := existing["host1"]; ok { + t.Error("host1 should have been removed") + } + if existing["host2"].Ipv4Addr != "2.2.2.2" { + t.Errorf("host2 should be unchanged, got %q", existing["host2"].Ipv4Addr) + } +} + +func TestMergeHostMetadataV4V6Updates_MixedUpdatesAndRemoves(t *testing.T) { + existing := map[string]*proto.HostMetadataV4V6Update{ + "host1": {Hostname: "host1", Ipv4Addr: "1.1.1.1"}, + "host2": {Hostname: "host2", Ipv4Addr: "2.2.2.2"}, + } + latest := map[string]any{ + "host1": &proto.HostMetadataV4V6Remove{Hostname: "host1"}, + "host2": &proto.HostMetadataV4V6Update{Hostname: "host2", Ipv4Addr: "5.5.5.5"}, + "host3": &proto.HostMetadataV4V6Update{Hostname: "host3", Ipv4Addr: "3.3.3.3"}, + } + mergeHostMetadataV4V6Updates(existing, latest) + + if len(existing) != 2 { + t.Fatalf("expected 2 entries, got %d", len(existing)) + } + if _, ok := existing["host1"]; ok { + t.Error("host1 should have been removed") + } + if existing["host2"].Ipv4Addr != "5.5.5.5" { + t.Errorf("host2 Ipv4Addr = %q, want %q", existing["host2"].Ipv4Addr, "5.5.5.5") + } + if existing["host3"].Ipv4Addr != "3.3.3.3" { + t.Errorf("host3 Ipv4Addr = %q, want %q", existing["host3"].Ipv4Addr, "3.3.3.3") + } +} + +func TestOnUpdateQueuesHostMetadataUpdates(t *testing.T) { + kp := KubeProxy{ + hostMetadataUpdates: make(chan map[string]any, 1), + } + + update := &proto.HostMetadataV4V6Update{ + Hostname: "hn1", + Ipv4Addr: "1.2.3.4", + Labels: map[string]string{"label1": "label1val"}, + } + kp.OnUpdate(update) + + // Read the queued update from the channel. + updates := kp.recvHostMetadataV4V6Updates() + if updates == nil { + t.Fatal("expected updates on channel, got nil") + } + if len(updates) != 1 { + t.Fatalf("expected 1 update, got %d", len(updates)) + } + if _, ok := updates["hn1"].(*proto.HostMetadataV4V6Update); !ok { + t.Fatalf("expected *proto.HostMetadataV4V6Update, got %T", updates["hn1"]) + } +} + +func TestOnUpdateCoalescesRemoveWithPendingUpdate(t *testing.T) { + kp := KubeProxy{ + hostMetadataUpdates: make(chan map[string]any, 1), + } + + // Queue an update, then an immediate remove for the same hostname. + kp.OnUpdate(&proto.HostMetadataV4V6Update{ + Hostname: "hn1", + Ipv4Addr: "1.2.3.4", + }) + kp.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: "hn1"}) + + updates := kp.recvHostMetadataV4V6Updates() + if updates == nil { + t.Fatal("expected updates on channel, got nil") + } + // The remove should have deleted the pending update for "hn1". + if len(updates) != 0 { + t.Fatalf("expected 0 entries after remove coalesced pending update, got %d: %+v", len(updates), updates) + } +} + +func TestOnUpdateQueuesRemoveWhenNoPendingUpdate(t *testing.T) { + kp := KubeProxy{ + hostMetadataUpdates: make(chan map[string]any, 1), + } + + // Queue a remove without a prior update. + kp.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: "hn1"}) + + updates := kp.recvHostMetadataV4V6Updates() + if updates == nil { + t.Fatal("expected updates on channel, got nil") + } + if len(updates) != 1 { + t.Fatalf("expected 1 entry, got %d", len(updates)) + } + if _, ok := updates["hn1"].(*proto.HostMetadataV4V6Remove); !ok { + t.Fatalf("expected *proto.HostMetadataV4V6Remove, got %T", updates["hn1"]) + } +} + +// startTestKubeProxy starts a real KubeProxy backed by mock maps and a fake +// k8s client, then sends an initial host IP to unblock the start loop. +// The caller must call kp.Stop() when done. +func startTestKubeProxy(t *testing.T) *KubeProxy { + t.Helper() + + maps := &bpfmap.IPMaps{ + FrontendMap: mock.NewMockMap(nat.FrontendMapParameters), + BackendMap: mock.NewMockMap(nat.BackendMapParameters), + AffinityMap: mock.NewMockMap(nat.AffinityMapParameters), + MaglevMap: mock.NewMockMap(nat.MaglevMapParameters), + CtMap: mock.NewMockMap(conntrack.MapParams), + } + + k8s := fake.NewClientset() + kp, err := StartKubeProxy(k8s, "test-node", maps, WithImmediateSync()) + if err != nil { + t.Fatalf("StartKubeProxy failed: %v", err) + } + + // Unblock the start loop by providing initial host IPs. + kp.OnHostIPsUpdate([]net.IP{net.IPv4(1, 1, 1, 1)}) + + return kp +} + +// getProxyHostMetadata reads the proxy's hostMetadataByHostname field +// under the runner lock. Safe to call from Eventually/Consistently. +func getProxyHostMetadata(kp *KubeProxy) func() map[string]*proto.HostMetadataV4V6Update { + return func() map[string]*proto.HostMetadataV4V6Update { + p := kp.proxy.(*proxy) + p.runnerLck.Lock() + defer p.runnerLck.Unlock() + + result := make(map[string]*proto.HostMetadataV4V6Update, len(p.hostMetadataByHostname)) + for k, v := range p.hostMetadataByHostname { + result[k] = v + } + return result + } +} + +func TestKubeProxyPropagatesHostMetadataUpdate(t *testing.T) { + RegisterTestingT(t) + + kp := startTestKubeProxy(t) + defer kp.Stop() + + update := &proto.HostMetadataV4V6Update{ + Hostname: "hn1", + Ipv4Addr: "1.2.3.4", + Labels: map[string]string{"label1": "label1val"}, + } + + kp.OnUpdate(update) + + Eventually(getProxyHostMetadata(kp), 2*time.Second, 10*time.Millisecond). + Should(HaveKeyWithValue("hn1", update)) +} + +func TestKubeProxyPropagatesHostMetadataRemove(t *testing.T) { + RegisterTestingT(t) + + kp := startTestKubeProxy(t) + defer kp.Stop() + + update := &proto.HostMetadataV4V6Update{ + Hostname: "hn1", + Ipv4Addr: "1.2.3.4", + Labels: map[string]string{"label1": "label1val"}, + } + + // First, apply an update. + kp.OnUpdate(update) + Eventually(getProxyHostMetadata(kp), 2*time.Second, 10*time.Millisecond). + Should(HaveKeyWithValue("hn1", update)) + + // Remove hn1 and add hn2. + kp.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: "hn1"}) + update2 := &proto.HostMetadataV4V6Update{ + Hostname: "hn2", + Ipv4Addr: "5.6.7.8", + Labels: map[string]string{"label2": "label2val"}, + } + kp.OnUpdate(update2) + + Eventually(getProxyHostMetadata(kp), 2*time.Second, 10*time.Millisecond). + Should(And( + Not(HaveKey("hn1")), + HaveKeyWithValue("hn2", update2), + )) +} diff --git a/felix/dataplane/linux/hostmetadata_cache.go b/felix/dataplane/linux/hostmetadata_cache.go deleted file mode 100644 index bcce1d6b589..00000000000 --- a/felix/dataplane/linux/hostmetadata_cache.go +++ /dev/null @@ -1,161 +0,0 @@ -package intdataplane - -import ( - "maps" - "sync" - "time" - - "github.com/sirupsen/logrus" - - "github.com/projectcalico/calico/felix/proto" -) - -type HostMetadataCache struct { - updates map[string]*proto.HostMetadataV4V6Update - updatesLock sync.Mutex - inSync bool - - onHostUpdateCB func(map[string]*proto.HostMetadataV4V6Update) - cbLock sync.Mutex - - updateRequest chan signal - throttleInterval time.Duration - newTimerFn NewResettableTimerFunc -} - -type signal struct{} - -type HostMetadataCacheOption func(*HostMetadataCache) - -// OptWithThrottleInterval sets the throttling interval for update flushes. -func OptWithThrottleInterval(d time.Duration) func(t *HostMetadataCache) { - return func(t *HostMetadataCache) { t.throttleInterval = d } -} - -// OptWithNewTimerFn sets the ResettableTimer in the HostMetadataCache. -func OptWithNewTimerFn(f NewResettableTimerFunc) func(t *HostMetadataCache) { - return func(t *HostMetadataCache) { t.newTimerFn = f } -} - -func NewHostMetadataCache(opts ...HostMetadataCacheOption) *HostMetadataCache { - c := &HostMetadataCache{ - updates: make(map[string]*proto.HostMetadataV4V6Update), - updateRequest: make(chan signal, 1), - throttleInterval: time.Second, - newTimerFn: NewRealTimer, - } - - for _, o := range opts { - o(c) - } - - return c -} - -func (c *HostMetadataCache) CompleteDeferredWork() error { - if !c.inSync { - logrus.Debug("Now in sync") - } - c.inSync = true - c.requestFlush() - return nil -} - -func (c *HostMetadataCache) OnUpdate(u any) { - switch upd := u.(type) { - case *proto.HostMetadataV4V6Update: - logrus.WithField("update", upd).Debug("Received HostMetadataV4V6Update message") - c.onHostMetadataV4V6Update(upd) - - case *proto.HostMetadataV4V6Remove: - logrus.WithField("update", upd).Debug("Received HostMetadataV4V6Remove message") - c.onHostMetadataV4V6Remove(upd) - default: - return - } - - if c.inSync { - c.requestFlush() - } -} - -func (c *HostMetadataCache) onHostMetadataV4V6Update(u *proto.HostMetadataV4V6Update) { - c.updatesLock.Lock() - defer c.updatesLock.Unlock() - c.updates[u.Hostname] = u -} - -func (c *HostMetadataCache) onHostMetadataV4V6Remove(u *proto.HostMetadataV4V6Remove) { - c.updatesLock.Lock() - defer c.updatesLock.Unlock() - delete(c.updates, u.Hostname) -} - -func (c *HostMetadataCache) requestFlush() { - select { - case c.updateRequest <- signal{}: - default: - } -} - -// Start spawns a new goroutine for the cache to flush updates. -func (c *HostMetadataCache) Start() { - go c.loopFlushingUpdates() -} - -// loopFlushingUpdates flushes updates indefinitely at most once every c.throttleInterval. -// Intended to be run on its own goroutine. -func (c *HostMetadataCache) loopFlushingUpdates() { - timer := c.newTimerFn(c.throttleInterval) - for { - // One update should get through immediately before timers can delay things. - <-c.updateRequest - logrus.Debug("Flushing host metadata cached updates") - c.sendAllUpdates() - _ = timer.Reset(c.throttleInterval) - <-timer.Chan() - } -} - -func (c *HostMetadataCache) sendAllUpdates() { - c.cbLock.Lock() - defer c.cbLock.Unlock() - c.updatesLock.Lock() - defer c.updatesLock.Lock() - - if c.onHostUpdateCB != nil { - upds := make(map[string]*proto.HostMetadataV4V6Update) - maps.Copy(upds, c.updates) - c.onHostUpdateCB(upds) - } -} - -func (c *HostMetadataCache) SetOnHostUpdateCB(cb func(map[string]*proto.HostMetadataV4V6Update)) { - c.cbLock.Lock() - defer c.cbLock.Unlock() - - c.onHostUpdateCB = cb -} - -// wrappedRealTimer implements the ResettableTimer interface. -// time.Timer requires a method Chan to return timer.C when wrapped in an interface. -type wrappedRealTimer struct { - *time.Timer -} - -// Chan implements the ResettableTimer interface. -func (t wrappedRealTimer) Chan() <-chan time.Time { - return t.C -} - -// NewRealTimer returns a time.Timer, wrapped to implement the ResettableTimer interface. -func NewRealTimer(d time.Duration) ResettableTimer { - return wrappedRealTimer{time.NewTimer(d)} -} - -type ResettableTimer interface { - Reset(time.Duration) bool - Chan() <-chan time.Time -} - -type NewResettableTimerFunc func(time.Duration) ResettableTimer diff --git a/felix/dataplane/linux/hostmetadata_cache_test.go b/felix/dataplane/linux/hostmetadata_cache_test.go deleted file mode 100644 index 29be9188a23..00000000000 --- a/felix/dataplane/linux/hostmetadata_cache_test.go +++ /dev/null @@ -1,135 +0,0 @@ -package intdataplane - -import ( - "time" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/projectcalico/calico/felix/proto" -) - -type fakeResettableTimer struct { - C chan time.Time -} - -func newFakeResettableTimer() *fakeResettableTimer { - t := &fakeResettableTimer{ - C: make(chan time.Time), - } - - return t -} - -func (t *fakeResettableTimer) Chan() <-chan time.Time { - return t.C -} - -func (t *fakeResettableTimer) Reset(time.Duration) bool { - // Maybe drain a pending alarm. - select { - case <-t.C: - return false - default: - return true - } -} - -// Pretend the given duration has passed and pop the timer. -func (t *fakeResettableTimer) Fire() { - t.C <- time.Now() -} - -var _ = Describe("HostMetadataCache UTs", func() { - It("should flush the first set of updates without waiting for the throttling timer, and then wait for the timer for subsequent flushes", func() { - Expect(0) - // Run the metadata cache with a fake timer. - timer := newFakeResettableTimer() - newFakeTimerFn := func(t time.Duration) ResettableTimer { - _ = timer.Reset(t) - return timer - } - - // Updates will be sent to us from another goroutine. - // We need thread-safety to receive them back on this thread. - updatesC := make(chan map[string]*proto.HostMetadataV4V6Update, 1) - checkForUpdates := func() map[string]*proto.HostMetadataV4V6Update { - select { - case u := <-updatesC: - return u - default: - return nil - } - } - - // The updates callback will write the running total of dropped - // updates since the channel was last consumed. - // I.e. if an old update was never consumed, it would be overwritten - // (dropped) by a fresher update, and the dropped counter would go up. - // If we receive droppedC to read the number of drops, the total resets to 0. - // droppedC will block any receives when updates haven't been sent on updatesC. - droppedC := make(chan int, 1) - // This callback is written with the assumption that only one goroutine call call it - // at any given time. - updatesCallback := func(u map[string]*proto.HostMetadataV4V6Update) { - droppedUpdate := false - select { - // Drain/drop old update if necessary. - case <-updatesC: - droppedUpdate = true - // Otherwise, send new update. - case updatesC <- u: - } - - // Update the number of dropped updates. - numDropped := 0 - if droppedUpdate { - numDropped = 1 - } - select { - case droppedC <- numDropped: - case lastNumDropped := <-droppedC: - droppedC <- lastNumDropped + numDropped - } - } - - cacheT := NewHostMetadataCache(OptWithThrottleInterval(1*time.Second), OptWithNewTimerFn(newFakeTimerFn)) - cacheT.SetOnHostUpdateCB(updatesCallback) - cacheT.Start() - - update := &proto.HostMetadataV4V6Update{ - Hostname: "hn1", - Ipv4Addr: "1.2.3.4", - Labels: map[string]string{"label1": "label1val"}, - } - - // Fill the cache with updates. - cacheT.OnUpdate(update) - - // Ensure it *doesn't* flush the updates. - Consistently(checkForUpdates).Should(BeNil(), "Cache prematurely fired updates") - - // Call CompleteDeferredWork. - err := cacheT.CompleteDeferredWork() - Expect(err).NotTo(HaveOccurred()) - - // Ensure it *does* flush the updates. - Eventually(checkForUpdates).Should(Equal(map[string]*proto.HostMetadataV4V6Update{update.Hostname: update})) - - // Fill up cache with more updates - remove existing update and add new ones. - cacheT.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: update.Hostname}) - update.Hostname = "hn2" - update.Ipv4Addr = "5.6.7.8" - update.Labels = map[string]string{"label2": "label2val"} - cacheT.OnUpdate(update) - - // Ensure it *doesn't* flush the updates. - Consistently(checkForUpdates).Should(BeNil(), "Cache did not wait for timer before flushing") - - // Pop the fake timer. - timer.Fire() - - // Ensure it *does* flush the updates. - Eventually(checkForUpdates).Should(Equal(map[string]*proto.HostMetadataV4V6Update{update.Hostname: update})) - Expect(droppedC).Should(Receive(Equal(0))) - }) -}) diff --git a/felix/dataplane/linux/int_dataplane.go b/felix/dataplane/linux/int_dataplane.go index c8e47aaf9ae..562bbda306e 100644 --- a/felix/dataplane/linux/int_dataplane.go +++ b/felix/dataplane/linux/int_dataplane.go @@ -2985,10 +2985,7 @@ func startBPFDataplaneComponents( log.WithError(err).Panic("Failed to start kube-proxy.") } - hostMetadataCache := NewHostMetadataCache() - hostMetadataCache.SetOnHostUpdateCB(kp.OnHostMetadataV4V6Update) - dp.RegisterManager(hostMetadataCache) - hostMetadataCache.Start() + dp.RegisterManager(kp) bpfRTMgr.setHostIPUpdatesCallBack(kp.OnHostIPsUpdate) bpfRTMgr.setRoutesCallBacks(kp.OnRouteUpdate, kp.OnRouteDelete) From 6ef65d791cc60d769e1118646e531faa217ed419 Mon Sep 17 00:00:00 2001 From: Alex O Regan Date: Wed, 18 Feb 2026 09:17:25 +0000 Subject: [PATCH 12/32] Make map before writing Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- felix/bpf/proxy/kube-proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index cbd2ae924c6..29643d2e716 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -196,7 +196,7 @@ func (kp *KubeProxy) start() error { // Could be nil, initially. hostMetadataUpdates := kp.recvHostMetadataV4V6Updates() - var hostMetadata map[string]*proto.HostMetadataV4V6Update + hostMetadata := make(map[string]*proto.HostMetadataV4V6Update) mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates) err = kp.run(hostIPs, hostMetadata) From 9283a6cd917056c78ad59a0637fffd5ee8cf6066 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Wed, 18 Feb 2026 09:41:24 +0000 Subject: [PATCH 13/32] adds a nil check to concurrent proxy field checks --- felix/bpf/proxy/kube-proxy_internal_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/felix/bpf/proxy/kube-proxy_internal_test.go b/felix/bpf/proxy/kube-proxy_internal_test.go index be7e9a0197c..9abd4e1fa21 100644 --- a/felix/bpf/proxy/kube-proxy_internal_test.go +++ b/felix/bpf/proxy/kube-proxy_internal_test.go @@ -259,6 +259,10 @@ func startTestKubeProxy(t *testing.T) *KubeProxy { // under the runner lock. Safe to call from Eventually/Consistently. func getProxyHostMetadata(kp *KubeProxy) func() map[string]*proto.HostMetadataV4V6Update { return func() map[string]*proto.HostMetadataV4V6Update { + if kp.proxy == nil { + return nil + } + p := kp.proxy.(*proxy) p.runnerLck.Lock() defer p.runnerLck.Unlock() From 84ad3095ff48aabc83f45a5210e2c3f04d3ed6bd Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Fri, 20 Feb 2026 11:33:34 +0000 Subject: [PATCH 14/32] batch updates until CompleteDeferredWork is called --- felix/bpf/proxy/kube-proxy.go | 83 +++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 28 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 29643d2e716..ea2908c9306 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -35,9 +35,12 @@ type KubeProxy struct { proxy ProxyFrontend syncer DPSyncer - // Map is hostname to an update/remove msg. - // The size-1 channel allows events to enter the main loop, - // and allows repeated updates to be merged. + // Non-thread-safe map - only written/read by the Felix dataplane thread. + // Keyed by hostname, value is either a HostMetadataV4V6Update or HostMetadataV4V6Remove. + pendingHostMetadataUpdates map[string]any + // Map key is a hostname, value a host metadata update/remove. + // The size-1 channel allows for one non-blocking write, + // and repeated updates get merged into older unconsumed ones. hostMetadataUpdates chan map[string]any ipFamily int @@ -79,7 +82,8 @@ func StartKubeProxy(k8s kubernetes.Interface, hostname string, opts: opts, rt: NewRTCache(), - hostMetadataUpdates: make(chan map[string]any, 1), + hostMetadataUpdates: make(chan map[string]any, 1), + pendingHostMetadataUpdates: make(map[string]any), hostIPUpdates: make(chan []net.IP, 1), exiting: make(chan struct{}), @@ -194,8 +198,8 @@ func (kp *KubeProxy) start() error { // Wait for the initial update. hostIPs := <-kp.hostIPUpdates - // Could be nil, initially. - hostMetadataUpdates := kp.recvHostMetadataV4V6Updates() + // Could be nil, initially. Doesn't block. + hostMetadataUpdates := kp.checkHostMetadataV4V6Updates() hostMetadata := make(map[string]*proto.HostMetadataV4V6Update) mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates) @@ -217,6 +221,7 @@ func (kp *KubeProxy) start() error { if err != nil { log.Panic("kube-proxy failed to resync after host IPs update") } + case hostMetadataUpdates, ok = <-kp.hostMetadataUpdates: if !ok { log.Error("kube-proxy: hostMetadataUpdates closed") @@ -254,40 +259,53 @@ func (kp *KubeProxy) OnHostIPsUpdate(IPs []net.IP) { } // OnUpdate implements the manager interface. +// Writes updates to pending updates map - overwrites repeated updates for the same key. func (kp *KubeProxy) OnUpdate(msg any) { + hostname := "" + switch update := msg.(type) { + case *proto.HostMetadataV4V6Update: + hostname = update.Hostname + log.WithField("msg", update).Debugf("kube-proxy OnUpdate: host metadata update") + case *proto.HostMetadataV4V6Remove: + hostname = update.Hostname + log.WithField("msg", update).Debugf("kube-proxy OnUpdate: host metadata remove") + default: + return + } + + if hostname == "" { + log.WithField("msg", msg).Warn("kube-proxy OnUpdate: got host metadata update with empty hostname") + return + } + + kp.pendingHostMetadataUpdates[hostname] = msg +} + +// CompleteDeferredWork implements the manager interface. +// Avoids blocking the thread by draining & merging older updates on the channel before sending. +func (kp *KubeProxy) CompleteDeferredWork() error { // Drain any pre-existing msg first and merge. - updates := kp.recvHostMetadataV4V6Updates() + updates := kp.checkHostMetadataV4V6Updates() if updates == nil { updates = make(map[string]any) } - switch update := msg.(type) { - case *proto.HostMetadataV4V6Update: - updates[update.Hostname] = update - log.Debugf("kube-proxy OnUpdate: host metadata update for %q: %+v", update.Hostname, update) - case *proto.HostMetadataV4V6Remove: - if u, ok := updates[update.Hostname]; ok { - delete(updates, update.Hostname) - log.Debugf("kube-proxy OnUpdate: coalesce host metadata deletion for queued update (deleted update) %q: %+v", update.Hostname, u) - } else { - log.WithField("remove", update).Debug("No existing update in queue, sending remove event") - updates[update.Hostname] = msg - } + // Overwrite any pre-existing updates for a given key. + // Always send 'Removes' instead of just deleting updates of the same key (since downstream may need to see a remove). + for k, v := range kp.pendingHostMetadataUpdates { + updates[k] = v } - // Send the now-merged updates back down the channel. + // Send the merged updates back down the channel. log.Debug("Queueing new hostmetadata for main loop") kp.hostMetadataUpdates <- updates log.Debug("Successfully queued new hostmetadata") -} - -// CompleteDeferredWork implements the manager interface. -func (kp *KubeProxy) CompleteDeferredWork() error { return nil } -// recvHostMetadataV4V6Updates tries to read a pending host metadata update on the update channel. -func (kp *KubeProxy) recvHostMetadataV4V6Updates() map[string]any { +// checkHostMetadataV4V6Updates tries to read a pending host metadata update on the update channel. +// Returns nil immediately, if nothing can be received from the updates channel. +func (kp *KubeProxy) checkHostMetadataV4V6Updates() map[string]any { select { case upd := <-kp.hostMetadataUpdates: return upd @@ -297,9 +315,18 @@ func (kp *KubeProxy) recvHostMetadataV4V6Updates() map[string]any { } // mergeHostMetadataV4V6Updates merges the existing host metadata updates with the latest updates: -// - A 'remove' in latest delete the corresponding key in 'existing'. -// - An 'update' in latest overrides the corresponding key in 'existing'. +// - A 'remove' in latest deletes the corresponding key in 'existing'. +// - An 'update' in latest overwrites the corresponding key in 'existing'. +// - If 'latest' is nil, does nothing. +// - If 'existing' is nil, initializes it and merges in 'latest'. func mergeHostMetadataV4V6Updates(existing map[string]*proto.HostMetadataV4V6Update, latest map[string]any) { + if latest == nil { + return + } + if existing == nil { + existing = make(map[string]*proto.HostMetadataV4V6Update) + } + for k, v := range latest { switch update := v.(type) { case *proto.HostMetadataV4V6Update: From 807debe905cfbbfa06bde6e4e774a8fd7f1fc2df Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Fri, 20 Feb 2026 15:39:13 +0000 Subject: [PATCH 15/32] simplify tests for new UTs --- felix/bpf/proxy/kube-proxy_internal_test.go | 253 ++++---------------- 1 file changed, 43 insertions(+), 210 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy_internal_test.go b/felix/bpf/proxy/kube-proxy_internal_test.go index 9abd4e1fa21..9ea20386fa6 100644 --- a/felix/bpf/proxy/kube-proxy_internal_test.go +++ b/felix/bpf/proxy/kube-proxy_internal_test.go @@ -20,12 +20,7 @@ import ( "time" . "github.com/onsi/gomega" - "k8s.io/client-go/kubernetes/fake" - "github.com/projectcalico/calico/felix/bpf/bpfmap" - "github.com/projectcalico/calico/felix/bpf/conntrack" - "github.com/projectcalico/calico/felix/bpf/mock" - "github.com/projectcalico/calico/felix/bpf/nat" "github.com/projectcalico/calico/felix/proto" ) @@ -82,61 +77,8 @@ func (s *mockSyncer) ConntrackDestIsService(ip net.IP, port uint16, proto uint8) return true } -func TestMergeHostMetadataV4V6Updates_AppliesUpdates(t *testing.T) { - existing := map[string]*proto.HostMetadataV4V6Update{} - latest := map[string]any{ - "host1": &proto.HostMetadataV4V6Update{Hostname: "host1", Ipv4Addr: "1.1.1.1"}, - "host2": &proto.HostMetadataV4V6Update{Hostname: "host2", Ipv4Addr: "2.2.2.2"}, - } - mergeHostMetadataV4V6Updates(existing, latest) - - if len(existing) != 2 { - t.Fatalf("expected 2 entries, got %d", len(existing)) - } - if existing["host1"].Ipv4Addr != "1.1.1.1" { - t.Errorf("host1 Ipv4Addr = %q, want %q", existing["host1"].Ipv4Addr, "1.1.1.1") - } - if existing["host2"].Ipv4Addr != "2.2.2.2" { - t.Errorf("host2 Ipv4Addr = %q, want %q", existing["host2"].Ipv4Addr, "2.2.2.2") - } -} - -func TestMergeHostMetadataV4V6Updates_OverridesExisting(t *testing.T) { - existing := map[string]*proto.HostMetadataV4V6Update{ - "host1": {Hostname: "host1", Ipv4Addr: "1.1.1.1"}, - } - latest := map[string]any{ - "host1": &proto.HostMetadataV4V6Update{Hostname: "host1", Ipv4Addr: "9.9.9.9"}, - } - mergeHostMetadataV4V6Updates(existing, latest) - - if existing["host1"].Ipv4Addr != "9.9.9.9" { - t.Errorf("host1 Ipv4Addr = %q, want %q", existing["host1"].Ipv4Addr, "9.9.9.9") - } -} - -func TestMergeHostMetadataV4V6Updates_RemovesExisting(t *testing.T) { - existing := map[string]*proto.HostMetadataV4V6Update{ - "host1": {Hostname: "host1", Ipv4Addr: "1.1.1.1"}, - "host2": {Hostname: "host2", Ipv4Addr: "2.2.2.2"}, - } - latest := map[string]any{ - "host1": &proto.HostMetadataV4V6Remove{Hostname: "host1"}, - } - mergeHostMetadataV4V6Updates(existing, latest) - - if len(existing) != 1 { - t.Fatalf("expected 1 entry, got %d", len(existing)) - } - if _, ok := existing["host1"]; ok { - t.Error("host1 should have been removed") - } - if existing["host2"].Ipv4Addr != "2.2.2.2" { - t.Errorf("host2 should be unchanged, got %q", existing["host2"].Ipv4Addr) - } -} - -func TestMergeHostMetadataV4V6Updates_MixedUpdatesAndRemoves(t *testing.T) { +func TestMergeHostMetadataV4V6Updates(t *testing.T) { + RegisterTestingT(t) existing := map[string]*proto.HostMetadataV4V6Update{ "host1": {Hostname: "host1", Ipv4Addr: "1.1.1.1"}, "host2": {Hostname: "host2", Ipv4Addr: "2.2.2.2"}, @@ -148,23 +90,20 @@ func TestMergeHostMetadataV4V6Updates_MixedUpdatesAndRemoves(t *testing.T) { } mergeHostMetadataV4V6Updates(existing, latest) - if len(existing) != 2 { - t.Fatalf("expected 2 entries, got %d", len(existing)) - } - if _, ok := existing["host1"]; ok { - t.Error("host1 should have been removed") - } - if existing["host2"].Ipv4Addr != "5.5.5.5" { - t.Errorf("host2 Ipv4Addr = %q, want %q", existing["host2"].Ipv4Addr, "5.5.5.5") - } - if existing["host3"].Ipv4Addr != "3.3.3.3" { - t.Errorf("host3 Ipv4Addr = %q, want %q", existing["host3"].Ipv4Addr, "3.3.3.3") - } + Expect(existing).To(HaveLen(2), "Expected host1 to be removed and host3 to be added") + Expect(existing).NotTo(HaveKey("host1")) + Expect(existing).To(HaveKey("host2")) + Expect(existing).To(HaveKey("host3")) + + Expect(existing["host2"].Ipv4Addr).To(Equal("5.5.5.5")) + Expect(existing["host3"].Ipv4Addr).To(Equal("3.3.3.3")) } -func TestOnUpdateQueuesHostMetadataUpdates(t *testing.T) { +func TestOnUpdateBatchesHostMetadataUpdates(t *testing.T) { + RegisterTestingT(t) kp := KubeProxy{ - hostMetadataUpdates: make(chan map[string]any, 1), + hostMetadataUpdates: make(chan map[string]any, 1), + pendingHostMetadataUpdates: make(map[string]any), } update := &proto.HostMetadataV4V6Update{ @@ -174,22 +113,28 @@ func TestOnUpdateQueuesHostMetadataUpdates(t *testing.T) { } kp.OnUpdate(update) - // Read the queued update from the channel. - updates := kp.recvHostMetadataV4V6Updates() - if updates == nil { - t.Fatal("expected updates on channel, got nil") - } - if len(updates) != 1 { - t.Fatalf("expected 1 update, got %d", len(updates)) - } - if _, ok := updates["hn1"].(*proto.HostMetadataV4V6Update); !ok { - t.Fatalf("expected *proto.HostMetadataV4V6Update, got %T", updates["hn1"]) + update2 := &proto.HostMetadataV4V6Update{ + Hostname: "hn2", + Ipv4Addr: "2.2.2.2", + Labels: map[string]string{"label2": "label2val"}, } + kp.OnUpdate(update2) + + Consistently(kp.checkHostMetadataV4V6Updates, 100*time.Millisecond).Should(BeNil(), "No updates should have been sent before CompleteDeferredWork") + + Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed") + // Read the queued update from the channel. + Eventually(kp.checkHostMetadataV4V6Updates).Should(Equal(map[string]any{ + "hn1": update, + "hn2": update2, + })) } -func TestOnUpdateCoalescesRemoveWithPendingUpdate(t *testing.T) { +func TestOnUpdateRemoveOverwritesPendingUpdate(t *testing.T) { + RegisterTestingT(t) kp := KubeProxy{ - hostMetadataUpdates: make(chan map[string]any, 1), + hostMetadataUpdates: make(chan map[string]any, 1), + pendingHostMetadataUpdates: make(map[string]any), } // Queue an update, then an immediate remove for the same hostname. @@ -197,131 +142,19 @@ func TestOnUpdateCoalescesRemoveWithPendingUpdate(t *testing.T) { Hostname: "hn1", Ipv4Addr: "1.2.3.4", }) - kp.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: "hn1"}) - - updates := kp.recvHostMetadataV4V6Updates() - if updates == nil { - t.Fatal("expected updates on channel, got nil") - } - // The remove should have deleted the pending update for "hn1". - if len(updates) != 0 { - t.Fatalf("expected 0 entries after remove coalesced pending update, got %d: %+v", len(updates), updates) - } -} - -func TestOnUpdateQueuesRemoveWhenNoPendingUpdate(t *testing.T) { - kp := KubeProxy{ - hostMetadataUpdates: make(chan map[string]any, 1), - } - - // Queue a remove without a prior update. - kp.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: "hn1"}) - - updates := kp.recvHostMetadataV4V6Updates() - if updates == nil { - t.Fatal("expected updates on channel, got nil") - } - if len(updates) != 1 { - t.Fatalf("expected 1 entry, got %d", len(updates)) - } - if _, ok := updates["hn1"].(*proto.HostMetadataV4V6Remove); !ok { - t.Fatalf("expected *proto.HostMetadataV4V6Remove, got %T", updates["hn1"]) - } -} - -// startTestKubeProxy starts a real KubeProxy backed by mock maps and a fake -// k8s client, then sends an initial host IP to unblock the start loop. -// The caller must call kp.Stop() when done. -func startTestKubeProxy(t *testing.T) *KubeProxy { - t.Helper() - - maps := &bpfmap.IPMaps{ - FrontendMap: mock.NewMockMap(nat.FrontendMapParameters), - BackendMap: mock.NewMockMap(nat.BackendMapParameters), - AffinityMap: mock.NewMockMap(nat.AffinityMapParameters), - MaglevMap: mock.NewMockMap(nat.MaglevMapParameters), - CtMap: mock.NewMockMap(conntrack.MapParams), - } - - k8s := fake.NewClientset() - kp, err := StartKubeProxy(k8s, "test-node", maps, WithImmediateSync()) - if err != nil { - t.Fatalf("StartKubeProxy failed: %v", err) - } - - // Unblock the start loop by providing initial host IPs. - kp.OnHostIPsUpdate([]net.IP{net.IPv4(1, 1, 1, 1)}) - - return kp -} - -// getProxyHostMetadata reads the proxy's hostMetadataByHostname field -// under the runner lock. Safe to call from Eventually/Consistently. -func getProxyHostMetadata(kp *KubeProxy) func() map[string]*proto.HostMetadataV4V6Update { - return func() map[string]*proto.HostMetadataV4V6Update { - if kp.proxy == nil { - return nil - } - - p := kp.proxy.(*proxy) - p.runnerLck.Lock() - defer p.runnerLck.Unlock() - - result := make(map[string]*proto.HostMetadataV4V6Update, len(p.hostMetadataByHostname)) - for k, v := range p.hostMetadataByHostname { - result[k] = v - } - return result - } -} - -func TestKubeProxyPropagatesHostMetadataUpdate(t *testing.T) { - RegisterTestingT(t) - - kp := startTestKubeProxy(t) - defer kp.Stop() - - update := &proto.HostMetadataV4V6Update{ - Hostname: "hn1", - Ipv4Addr: "1.2.3.4", - Labels: map[string]string{"label1": "label1val"}, - } - - kp.OnUpdate(update) - - Eventually(getProxyHostMetadata(kp), 2*time.Second, 10*time.Millisecond). - Should(HaveKeyWithValue("hn1", update)) -} - -func TestKubeProxyPropagatesHostMetadataRemove(t *testing.T) { - RegisterTestingT(t) - - kp := startTestKubeProxy(t) - defer kp.Stop() - - update := &proto.HostMetadataV4V6Update{ - Hostname: "hn1", + kp.OnUpdate(&proto.HostMetadataV4V6Update{ + Hostname: "hn2", Ipv4Addr: "1.2.3.4", - Labels: map[string]string{"label1": "label1val"}, - } - - // First, apply an update. - kp.OnUpdate(update) - Eventually(getProxyHostMetadata(kp), 2*time.Second, 10*time.Millisecond). - Should(HaveKeyWithValue("hn1", update)) - - // Remove hn1 and add hn2. + }) kp.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: "hn1"}) - update2 := &proto.HostMetadataV4V6Update{ - Hostname: "hn2", - Ipv4Addr: "5.6.7.8", - Labels: map[string]string{"label2": "label2val"}, - } - kp.OnUpdate(update2) - Eventually(getProxyHostMetadata(kp), 2*time.Second, 10*time.Millisecond). - Should(And( - Not(HaveKey("hn1")), - HaveKeyWithValue("hn2", update2), - )) + Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed") + + Eventually(kp.checkHostMetadataV4V6Updates()).Should(Equal(map[string]any{ + "hn1": &proto.HostMetadataV4V6Remove{Hostname: "hn1"}, + "hn2": &proto.HostMetadataV4V6Update{ + Hostname: "hn2", + Ipv4Addr: "1.2.3.4", + }, + })) } From 535993a854c85fda0b10ea858f2f3132c864c112 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Fri, 20 Feb 2026 19:32:03 +0000 Subject: [PATCH 16/32] adjust test updates check --- felix/bpf/proxy/kube-proxy_internal_test.go | 23 +++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy_internal_test.go b/felix/bpf/proxy/kube-proxy_internal_test.go index 9ea20386fa6..4fc82dc6265 100644 --- a/felix/bpf/proxy/kube-proxy_internal_test.go +++ b/felix/bpf/proxy/kube-proxy_internal_test.go @@ -137,24 +137,25 @@ func TestOnUpdateRemoveOverwritesPendingUpdate(t *testing.T) { pendingHostMetadataUpdates: make(map[string]any), } - // Queue an update, then an immediate remove for the same hostname. - kp.OnUpdate(&proto.HostMetadataV4V6Update{ + update1 := &proto.HostMetadataV4V6Update{ Hostname: "hn1", Ipv4Addr: "1.2.3.4", - }) - kp.OnUpdate(&proto.HostMetadataV4V6Update{ + } + update2 := &proto.HostMetadataV4V6Update{ Hostname: "hn2", Ipv4Addr: "1.2.3.4", - }) - kp.OnUpdate(&proto.HostMetadataV4V6Remove{Hostname: "hn1"}) + } + update1Remove := &proto.HostMetadataV4V6Remove{Hostname: "hn1"} + + // Queue an update, then an immediate remove for the same hostname. + kp.OnUpdate(update1) + kp.OnUpdate(update2) + kp.OnUpdate(update1Remove) Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed") Eventually(kp.checkHostMetadataV4V6Updates()).Should(Equal(map[string]any{ - "hn1": &proto.HostMetadataV4V6Remove{Hostname: "hn1"}, - "hn2": &proto.HostMetadataV4V6Update{ - Hostname: "hn2", - Ipv4Addr: "1.2.3.4", - }, + "hn1": update1Remove, + "hn2": update2, })) } From 9e08163c844bd067eb524ca8fe274c634a27d597 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 10:17:17 +0000 Subject: [PATCH 17/32] clear pending updates after processing them, dont send an update if there are none --- felix/bpf/proxy/kube-proxy.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index ea2908c9306..d02e3b55d52 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -284,6 +284,11 @@ func (kp *KubeProxy) OnUpdate(msg any) { // CompleteDeferredWork implements the manager interface. // Avoids blocking the thread by draining & merging older updates on the channel before sending. func (kp *KubeProxy) CompleteDeferredWork() error { + if len(kp.pendingHostMetadataUpdates) == 0 { + log.Debug("No pending host metadata updates to process") + return nil + } + // Drain any pre-existing msg first and merge. updates := kp.checkHostMetadataV4V6Updates() if updates == nil { @@ -294,6 +299,9 @@ func (kp *KubeProxy) CompleteDeferredWork() error { // Always send 'Removes' instead of just deleting updates of the same key (since downstream may need to see a remove). for k, v := range kp.pendingHostMetadataUpdates { updates[k] = v + log.WithField("nodeName", k).Debug("Queueing new host metadata update") + // ... And don't forget to clear the pending updates after processing! + delete(kp.pendingHostMetadataUpdates, k) } // Send the merged updates back down the channel. From c51769b482f07167c5b1c0bd2733b86b158e3907 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 11:31:11 +0000 Subject: [PATCH 18/32] fix shared map reference between kube-proxy and proxy --- felix/bpf/proxy/proxy.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/felix/bpf/proxy/proxy.go b/felix/bpf/proxy/proxy.go index 3ddf5fe1254..5d8720ecc61 100644 --- a/felix/bpf/proxy/proxy.go +++ b/felix/bpf/proxy/proxy.go @@ -402,7 +402,15 @@ func (p *proxy) SetHostMetadata(updates map[string]*proto.HostMetadataV4V6Update p.runnerLck.Lock() defer p.runnerLck.Unlock() - p.hostMetadataByHostname = updates + // Clear the proxy's map and repopulate. + for k := range p.hostMetadataByHostname { + delete(p.hostMetadataByHostname, k) + } + + for k, v := range updates { + p.hostMetadataByHostname[k] = v + } + if requestResync { // Invoke a sync via the runner, so that we can release any locks in this goroutine. p.syncDP() From 3438016a3baa58ca31e7c14dbecb3163e134a803 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 11:35:28 +0000 Subject: [PATCH 19/32] kube proxy test used async checks unnecessarily --- felix/bpf/proxy/kube-proxy_internal_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy_internal_test.go b/felix/bpf/proxy/kube-proxy_internal_test.go index 4fc82dc6265..bc387c2e6dc 100644 --- a/felix/bpf/proxy/kube-proxy_internal_test.go +++ b/felix/bpf/proxy/kube-proxy_internal_test.go @@ -124,7 +124,7 @@ func TestOnUpdateBatchesHostMetadataUpdates(t *testing.T) { Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed") // Read the queued update from the channel. - Eventually(kp.checkHostMetadataV4V6Updates).Should(Equal(map[string]any{ + Expect(kp.checkHostMetadataV4V6Updates()).To(Equal(map[string]any{ "hn1": update, "hn2": update2, })) @@ -154,7 +154,7 @@ func TestOnUpdateRemoveOverwritesPendingUpdate(t *testing.T) { Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed") - Eventually(kp.checkHostMetadataV4V6Updates()).Should(Equal(map[string]any{ + Expect(kp.checkHostMetadataV4V6Updates()).To(Equal(map[string]any{ "hn1": update1Remove, "hn2": update2, })) From 513138f7b315118d52d80299a6dc7bcef6a80be0 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 11:37:01 +0000 Subject: [PATCH 20/32] fix dead code in mergeHostMetadata... func --- felix/bpf/proxy/kube-proxy.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index d02e3b55d52..15c11ed0779 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -328,12 +328,9 @@ func (kp *KubeProxy) checkHostMetadataV4V6Updates() map[string]any { // - If 'latest' is nil, does nothing. // - If 'existing' is nil, initializes it and merges in 'latest'. func mergeHostMetadataV4V6Updates(existing map[string]*proto.HostMetadataV4V6Update, latest map[string]any) { - if latest == nil { + if latest == nil || existing == nil { return } - if existing == nil { - existing = make(map[string]*proto.HostMetadataV4V6Update) - } for k, v := range latest { switch update := v.(type) { From 507e26faa8ab3074e6f6b1b19ac2ea16aca8d916 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 11:38:02 +0000 Subject: [PATCH 21/32] rewrite a comment that looked like a to-do --- felix/bpf/proxy/kube-proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 15c11ed0779..c1e616d8304 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -300,7 +300,7 @@ func (kp *KubeProxy) CompleteDeferredWork() error { for k, v := range kp.pendingHostMetadataUpdates { updates[k] = v log.WithField("nodeName", k).Debug("Queueing new host metadata update") - // ... And don't forget to clear the pending updates after processing! + // ... And clear the pending updates after processing. delete(kp.pendingHostMetadataUpdates, k) } From 5830ba3537448a82826213e19ea0467b2197635f Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 11:50:22 +0000 Subject: [PATCH 22/32] comment in int_dataplane explaining why KP uses both the Manager iface, and callbacks to receive information. --- felix/dataplane/linux/int_dataplane.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/felix/dataplane/linux/int_dataplane.go b/felix/dataplane/linux/int_dataplane.go index 562bbda306e..ef5edc80d1c 100644 --- a/felix/dataplane/linux/int_dataplane.go +++ b/felix/dataplane/linux/int_dataplane.go @@ -2985,6 +2985,10 @@ func startBPFDataplaneComponents( log.WithError(err).Panic("Failed to start kube-proxy.") } + // Register KP itself as a manager, in order to collect host metadata. + // Kube-proxy already interfaces with the dataplane via manager callbacks to receive information + // that is already at-hand in those managers. But, when it comes to batching raw host information, + // we might-as-well just funnel it directly from the calc-graph. dp.RegisterManager(kp) bpfRTMgr.setHostIPUpdatesCallBack(kp.OnHostIPsUpdate) From 6a22591555a1d95076a6250d9e8869c6820158d1 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 11:53:29 +0000 Subject: [PATCH 23/32] change a func comment after changing the func behaviour --- felix/bpf/proxy/kube-proxy.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index c1e616d8304..597c2ddf3db 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -325,8 +325,7 @@ func (kp *KubeProxy) checkHostMetadataV4V6Updates() map[string]any { // mergeHostMetadataV4V6Updates merges the existing host metadata updates with the latest updates: // - A 'remove' in latest deletes the corresponding key in 'existing'. // - An 'update' in latest overwrites the corresponding key in 'existing'. -// - If 'latest' is nil, does nothing. -// - If 'existing' is nil, initializes it and merges in 'latest'. +// - If 'latest' or 'existing' is nil, does nothing. func mergeHostMetadataV4V6Updates(existing map[string]*proto.HostMetadataV4V6Update, latest map[string]any) { if latest == nil || existing == nil { return From 234f5c8f656ed2e9db2c8831c4e28786c40c3605 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 16:01:30 +0000 Subject: [PATCH 24/32] block KP at start-of-day until hostmetadata updates come in --- felix/bpf/proxy/kube-proxy.go | 6 ++++-- felix/bpf/proxy/kube-proxy_test.go | 4 ++++ felix/bpf/proxy/service_type_change_test.go | 4 ++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 597c2ddf3db..07c1d9bea8a 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -198,9 +198,11 @@ func (kp *KubeProxy) start() error { // Wait for the initial update. hostIPs := <-kp.hostIPUpdates - // Could be nil, initially. Doesn't block. - hostMetadataUpdates := kp.checkHostMetadataV4V6Updates() hostMetadata := make(map[string]*proto.HostMetadataV4V6Update) + // Block until we go in-sync and get the first batch of hostmetadata + // updates, to avoid a flap after a Felix restart. In practice, this + // recv should happen very soon after receiving the host IPs above. + hostMetadataUpdates := <-kp.hostMetadataUpdates mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates) err = kp.run(hostIPs, hostMetadata) diff --git a/felix/bpf/proxy/kube-proxy_test.go b/felix/bpf/proxy/kube-proxy_test.go index 6697ba16b19..9f778922856 100644 --- a/felix/bpf/proxy/kube-proxy_test.go +++ b/felix/bpf/proxy/kube-proxy_test.go @@ -28,6 +28,7 @@ import ( "github.com/projectcalico/calico/felix/bpf/conntrack" "github.com/projectcalico/calico/felix/bpf/mock" proxy "github.com/projectcalico/calico/felix/bpf/proxy" + "github.com/projectcalico/calico/felix/proto" ) var _ = Describe("BPF kube-proxy", func() { @@ -87,6 +88,9 @@ var _ = Describe("BPF kube-proxy", func() { k8s := fake.NewClientset(testSvc, testSvcEps) p, _ = proxy.StartKubeProxy(k8s, "test-node", maps, proxy.WithImmediateSync(), proxy.WithMaglevLUTSize(maglevLUTSize)) + // Unblock start(), which blocks on the initial host metadata update. + p.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "dummy"}) + Expect(p.CompleteDeferredWork()).To(Succeed()) }) AfterEach(func() { diff --git a/felix/bpf/proxy/service_type_change_test.go b/felix/bpf/proxy/service_type_change_test.go index 08fe1aebdc7..10db372c6fa 100644 --- a/felix/bpf/proxy/service_type_change_test.go +++ b/felix/bpf/proxy/service_type_change_test.go @@ -32,6 +32,7 @@ import ( "github.com/projectcalico/calico/felix/bpf/nat" proxy "github.com/projectcalico/calico/felix/bpf/proxy" "github.com/projectcalico/calico/felix/ip" + felixproto "github.com/projectcalico/calico/felix/proto" ) var _ = Describe("BPF service type change", func() { @@ -101,6 +102,9 @@ var _ = Describe("BPF service type change", func() { BeforeEach(func() { p, _ = proxy.StartKubeProxy(k8s, "test-node", bpfMaps, proxy.WithImmediateSync(), proxy.WithMaglevLUTSize(maglevLUTSize)) + // Unblock start(), which blocks on the initial host metadata update. + p.OnUpdate(&felixproto.HostMetadataV4V6Update{Hostname: "dummy"}) + Expect(p.CompleteDeferredWork()).To(Succeed()) p.OnHostIPsUpdate([]net.IP{initIP}) }) From 63283468425cbe92fcb913c71bb9215ace6d58d5 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 16:05:26 +0000 Subject: [PATCH 25/32] go doc format Co-authored-by: Shaun Crampton --- felix/bpf/proxy/kube-proxy.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 07c1d9bea8a..6588392b487 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -35,10 +35,10 @@ type KubeProxy struct { proxy ProxyFrontend syncer DPSyncer - // Non-thread-safe map - only written/read by the Felix dataplane thread. - // Keyed by hostname, value is either a HostMetadataV4V6Update or HostMetadataV4V6Remove. + // pendingHostMetadataUpdates contains HostMetadataV4V6Update and HostMetadataV4V6Removes + // that we're batching up to send. Only accessed from the int-dataplane goroutine. pendingHostMetadataUpdates map[string]any - // Map key is a hostname, value a host metadata update/remove. + // hostMetadataUpdates is keyed by hostname, value a host metadata update/remove. // The size-1 channel allows for one non-blocking write, // and repeated updates get merged into older unconsumed ones. hostMetadataUpdates chan map[string]any From a618e8942bb360ff6c450d6fe86c497525ca8cba Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 16:07:27 +0000 Subject: [PATCH 26/32] rename the non-blocking func that checks a channel can recv in KP --- felix/bpf/proxy/kube-proxy.go | 6 +++--- felix/bpf/proxy/kube-proxy_internal_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 6588392b487..ed42dd0d6c0 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -292,7 +292,7 @@ func (kp *KubeProxy) CompleteDeferredWork() error { } // Drain any pre-existing msg first and merge. - updates := kp.checkHostMetadataV4V6Updates() + updates := kp.pollHostMetadataV4V6UpdatesNonBlocking() if updates == nil { updates = make(map[string]any) } @@ -313,9 +313,9 @@ func (kp *KubeProxy) CompleteDeferredWork() error { return nil } -// checkHostMetadataV4V6Updates tries to read a pending host metadata update on the update channel. +// pollHostMetadataV4V6UpdatesNonBlocking tries to read a pending host metadata update on the update channel. // Returns nil immediately, if nothing can be received from the updates channel. -func (kp *KubeProxy) checkHostMetadataV4V6Updates() map[string]any { +func (kp *KubeProxy) pollHostMetadataV4V6UpdatesNonBlocking() map[string]any { select { case upd := <-kp.hostMetadataUpdates: return upd diff --git a/felix/bpf/proxy/kube-proxy_internal_test.go b/felix/bpf/proxy/kube-proxy_internal_test.go index bc387c2e6dc..6c91ef44be4 100644 --- a/felix/bpf/proxy/kube-proxy_internal_test.go +++ b/felix/bpf/proxy/kube-proxy_internal_test.go @@ -120,11 +120,11 @@ func TestOnUpdateBatchesHostMetadataUpdates(t *testing.T) { } kp.OnUpdate(update2) - Consistently(kp.checkHostMetadataV4V6Updates, 100*time.Millisecond).Should(BeNil(), "No updates should have been sent before CompleteDeferredWork") + Consistently(kp.pollHostMetadataV4V6UpdatesNonBlocking(), 100*time.Millisecond).Should(BeNil(), "No updates should have been sent before CompleteDeferredWork") Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed") // Read the queued update from the channel. - Expect(kp.checkHostMetadataV4V6Updates()).To(Equal(map[string]any{ + Expect(kp.pollHostMetadataV4V6UpdatesNonBlocking()).To(Equal(map[string]any{ "hn1": update, "hn2": update2, })) @@ -154,7 +154,7 @@ func TestOnUpdateRemoveOverwritesPendingUpdate(t *testing.T) { Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed") - Expect(kp.checkHostMetadataV4V6Updates()).To(Equal(map[string]any{ + Expect(kp.pollHostMetadataV4V6UpdatesNonBlocking()).To(Equal(map[string]any{ "hn1": update1Remove, "hn2": update2, })) From e6dd427989a8975069159069b3bead5c9c6fa233 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 16:08:31 +0000 Subject: [PATCH 27/32] allow a nil map to cause a panic (due to programming error) --- felix/bpf/proxy/kube-proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index ed42dd0d6c0..395b61cf153 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -329,7 +329,7 @@ func (kp *KubeProxy) pollHostMetadataV4V6UpdatesNonBlocking() map[string]any { // - An 'update' in latest overwrites the corresponding key in 'existing'. // - If 'latest' or 'existing' is nil, does nothing. func mergeHostMetadataV4V6Updates(existing map[string]*proto.HostMetadataV4V6Update, latest map[string]any) { - if latest == nil || existing == nil { + if latest == nil { return } From 25bac6358f3bcf54962dab96e4becdb2583147d2 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 16:19:03 +0000 Subject: [PATCH 28/32] updates a doc string --- felix/bpf/proxy/kube-proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 395b61cf153..ee080a18158 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -327,7 +327,7 @@ func (kp *KubeProxy) pollHostMetadataV4V6UpdatesNonBlocking() map[string]any { // mergeHostMetadataV4V6Updates merges the existing host metadata updates with the latest updates: // - A 'remove' in latest deletes the corresponding key in 'existing'. // - An 'update' in latest overwrites the corresponding key in 'existing'. -// - If 'latest' or 'existing' is nil, does nothing. +// - If 'latest' is nil, does nothing. 'existing' must be non-nil. func mergeHostMetadataV4V6Updates(existing map[string]*proto.HostMetadataV4V6Update, latest map[string]any) { if latest == nil { return From c4ec5d2ab8a5378d03d181b341f19ea0cd8e339c Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 16:49:40 +0000 Subject: [PATCH 29/32] fix inaccurate doc string --- felix/bpf/proxy/kube-proxy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index ee080a18158..4732d5494d4 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -37,9 +37,9 @@ type KubeProxy struct { // pendingHostMetadataUpdates contains HostMetadataV4V6Update and HostMetadataV4V6Removes // that we're batching up to send. Only accessed from the int-dataplane goroutine. + // Keyed by hostname (node name). pendingHostMetadataUpdates map[string]any - // hostMetadataUpdates is keyed by hostname, value a host metadata update/remove. - // The size-1 channel allows for one non-blocking write, + // hostMetadataUpdates is a size-1 channel - allows for one non-blocking write, // and repeated updates get merged into older unconsumed ones. hostMetadataUpdates chan map[string]any From 784c1c5a25ff1ae418bf4e2415fc65858665b9b8 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Tue, 24 Feb 2026 16:54:51 +0000 Subject: [PATCH 30/32] allow sending an empty update *once*, to allow the KP loop to start --- felix/bpf/proxy/kube-proxy.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 4732d5494d4..137eb53c768 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -42,6 +42,7 @@ type KubeProxy struct { // hostMetadataUpdates is a size-1 channel - allows for one non-blocking write, // and repeated updates get merged into older unconsumed ones. hostMetadataUpdates chan map[string]any + inSyncWithFelix bool ipFamily int hostIPUpdates chan []net.IP @@ -286,7 +287,9 @@ func (kp *KubeProxy) OnUpdate(msg any) { // CompleteDeferredWork implements the manager interface. // Avoids blocking the thread by draining & merging older updates on the channel before sending. func (kp *KubeProxy) CompleteDeferredWork() error { - if len(kp.pendingHostMetadataUpdates) == 0 { + // If not in-sync with felix, we allow sending an empty update + // to signal to the KP loop that it can start looping. + if len(kp.pendingHostMetadataUpdates) == 0 && kp.inSyncWithFelix { log.Debug("No pending host metadata updates to process") return nil } @@ -310,6 +313,7 @@ func (kp *KubeProxy) CompleteDeferredWork() error { log.Debug("Queueing new hostmetadata for main loop") kp.hostMetadataUpdates <- updates log.Debug("Successfully queued new hostmetadata") + kp.inSyncWithFelix = true return nil } From 562fec48635edcb6f12c71dca981669176d0b44b Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Wed, 25 Feb 2026 08:56:35 +0000 Subject: [PATCH 31/32] test KP sends an empty update once --- felix/bpf/proxy/kube-proxy_internal_test.go | 32 +++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/felix/bpf/proxy/kube-proxy_internal_test.go b/felix/bpf/proxy/kube-proxy_internal_test.go index 6c91ef44be4..ed291098896 100644 --- a/felix/bpf/proxy/kube-proxy_internal_test.go +++ b/felix/bpf/proxy/kube-proxy_internal_test.go @@ -130,6 +130,38 @@ func TestOnUpdateBatchesHostMetadataUpdates(t *testing.T) { })) } +func TestCompleteDeferredWorkSendsEmptyUpdateOnce(t *testing.T) { + RegisterTestingT(t) + kp := KubeProxy{ + hostMetadataUpdates: make(chan map[string]any, 1), + pendingHostMetadataUpdates: make(map[string]any), + } + + // First call with no pending updates should still send an empty map + // to signal the KP loop to start. + Expect(kp.CompleteDeferredWork()).To(Succeed()) + msg := kp.pollHostMetadataV4V6UpdatesNonBlocking() + Expect(msg).NotTo(BeNil(), "First call should send an empty update to unblock the KP loop") + Expect(msg).To(BeEmpty(), "The update should be an empty map") + + // Second call with no pending updates should be a no-op. + Expect(kp.CompleteDeferredWork()).To(Succeed()) + Expect(kp.pollHostMetadataV4V6UpdatesNonBlocking()).To(BeNil(), + "Second call with no pending updates should not send anything") + + // Sending a real update should still work after we're in sync. + kp.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "host1", Ipv4Addr: "1.1.1.1"}) + Expect(kp.CompleteDeferredWork()).To(Succeed()) + msg = kp.pollHostMetadataV4V6UpdatesNonBlocking() + Expect(msg).To(HaveLen(1)) + Expect(msg).To(HaveKey("host1")) + + // And after that, no-op again with no pending updates. + Expect(kp.CompleteDeferredWork()).To(Succeed()) + Expect(kp.pollHostMetadataV4V6UpdatesNonBlocking()).To(BeNil(), + "Should not send when in sync and no pending updates") +} + func TestOnUpdateRemoveOverwritesPendingUpdate(t *testing.T) { RegisterTestingT(t) kp := KubeProxy{ From 02f82d107d6125c8c986bfcacc661f360dd31fe7 Mon Sep 17 00:00:00 2001 From: Alex O'Regan Date: Wed, 25 Feb 2026 09:01:40 +0000 Subject: [PATCH 32/32] inSyncWithFelix -> inSyncWithIntDataplane --- felix/bpf/proxy/kube-proxy.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/felix/bpf/proxy/kube-proxy.go b/felix/bpf/proxy/kube-proxy.go index 137eb53c768..27ea9df4775 100644 --- a/felix/bpf/proxy/kube-proxy.go +++ b/felix/bpf/proxy/kube-proxy.go @@ -41,8 +41,8 @@ type KubeProxy struct { pendingHostMetadataUpdates map[string]any // hostMetadataUpdates is a size-1 channel - allows for one non-blocking write, // and repeated updates get merged into older unconsumed ones. - hostMetadataUpdates chan map[string]any - inSyncWithFelix bool + hostMetadataUpdates chan map[string]any + inSyncWithIntDataplane bool ipFamily int hostIPUpdates chan []net.IP @@ -289,7 +289,7 @@ func (kp *KubeProxy) OnUpdate(msg any) { func (kp *KubeProxy) CompleteDeferredWork() error { // If not in-sync with felix, we allow sending an empty update // to signal to the KP loop that it can start looping. - if len(kp.pendingHostMetadataUpdates) == 0 && kp.inSyncWithFelix { + if len(kp.pendingHostMetadataUpdates) == 0 && kp.inSyncWithIntDataplane { log.Debug("No pending host metadata updates to process") return nil } @@ -313,7 +313,7 @@ func (kp *KubeProxy) CompleteDeferredWork() error { log.Debug("Queueing new hostmetadata for main loop") kp.hostMetadataUpdates <- updates log.Debug("Successfully queued new hostmetadata") - kp.inSyncWithFelix = true + kp.inSyncWithIntDataplane = true return nil }