Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a88351d
first draft hostmetadata_cache sends host metadata to BPF KP
aaaaaaaalex Feb 10, 2026
9cd0f0c
appease linter
aaaaaaaalex Feb 11, 2026
31c3951
initialise updates chan
aaaaaaaalex Feb 11, 2026
911257d
throttle hostmetadata_cache updates
aaaaaaaalex Feb 11, 2026
96173ef
additional logging in hostmetadata_cache
aaaaaaaalex Feb 11, 2026
d9fb94f
dont return anything from sendAllUpdates
aaaaaaaalex Feb 12, 2026
db4f05e
makes the throttling interval configurable in hostmetadata_cache
aaaaaaaalex Feb 12, 2026
a0a4dcd
update HostMetadata without restarting Syncer
aaaaaaaalex Feb 12, 2026
8c3bc7a
rename a channel to be more descriptive
aaaaaaaalex Feb 12, 2026
4770c54
adds testing for hostmetadatacache
aaaaaaaalex Feb 13, 2026
f934bb4
remove host-metadata-cache; throttle isnt as necessary because syncer…
aaaaaaaalex Feb 16, 2026
6ef65d7
Make map before writing
aaaaaaaalex Feb 18, 2026
9283a6c
adds a nil check to concurrent proxy field checks
aaaaaaaalex Feb 18, 2026
84ad309
batch updates until CompleteDeferredWork is called
aaaaaaaalex Feb 20, 2026
807debe
simplify tests for new UTs
aaaaaaaalex Feb 20, 2026
535993a
adjust test updates check
aaaaaaaalex Feb 20, 2026
9e08163
clear pending updates after processing them, dont send an update if t…
aaaaaaaalex Feb 24, 2026
c51769b
fix shared map reference between kube-proxy and proxy
aaaaaaaalex Feb 24, 2026
3438016
kube proxy test used async checks unnecessarily
aaaaaaaalex Feb 24, 2026
513138f
fix dead code in mergeHostMetadata... func
aaaaaaaalex Feb 24, 2026
507e26f
rewrite a comment that looked like a to-do
aaaaaaaalex Feb 24, 2026
5830ba3
comment in int_dataplane explaining why KP uses both the Manager ifac…
aaaaaaaalex Feb 24, 2026
6a22591
change a func comment after changing the func behaviour
aaaaaaaalex Feb 24, 2026
234f5c8
block KP at start-of-day until hostmetadata updates come in
aaaaaaaalex Feb 24, 2026
6328346
go doc format
aaaaaaaalex Feb 24, 2026
a618e89
rename the non-blocking func that checks a channel can recv in KP
aaaaaaaalex Feb 24, 2026
e6dd427
allow a nil map to cause a panic (due to programming error)
aaaaaaaalex Feb 24, 2026
25bac63
updates a doc string
aaaaaaaalex Feb 24, 2026
c4ec5d2
fix inaccurate doc string
aaaaaaaalex Feb 24, 2026
784c1c5
allow sending an empty update *once*, to allow the KP loop to start
aaaaaaaalex Feb 24, 2026
562fec4
test KP sends an empty update once
aaaaaaaalex Feb 25, 2026
02f82d1
inSyncWithFelix -> inSyncWithIntDataplane
aaaaaaaalex Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 131 additions & 5 deletions felix/bpf/proxy/kube-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/projectcalico/calico/felix/bpf/maps"
"github.com/projectcalico/calico/felix/bpf/routes"
"github.com/projectcalico/calico/felix/ip"
"github.com/projectcalico/calico/felix/proto"
)

// KubeProxy is a wrapper of Proxy that deals with higher level issue like
Expand All @@ -34,6 +35,15 @@ type KubeProxy struct {
proxy ProxyFrontend
syncer DPSyncer

// pendingHostMetadataUpdates contains HostMetadataV4V6Update and HostMetadataV4V6Removes
// that we're batching up to send. Only accessed from the int-dataplane goroutine.
// Keyed by hostname (node name).
pendingHostMetadataUpdates map[string]any
// hostMetadataUpdates is a size-1 channel - allows for one non-blocking write,
// and repeated updates get merged into older unconsumed ones.
hostMetadataUpdates chan map[string]any
inSyncWithIntDataplane bool

ipFamily int
hostIPUpdates chan []net.IP
stopOnce sync.Once
Expand Down Expand Up @@ -73,6 +83,9 @@ func StartKubeProxy(k8s kubernetes.Interface, hostname string,
opts: opts,
rt: NewRTCache(),

hostMetadataUpdates: make(chan map[string]any, 1),
pendingHostMetadataUpdates: make(map[string]any),

hostIPUpdates: make(chan []net.IP, 1),
exiting: make(chan struct{}),
}
Expand Down Expand Up @@ -104,13 +117,21 @@ func (kp *KubeProxy) Stop() {
defer kp.lock.Unlock()

close(kp.exiting)
close(kp.hostMetadataUpdates)
close(kp.hostIPUpdates)
kp.proxy.Stop()
kp.wg.Wait()
})
}

func (kp *KubeProxy) run(hostIPs []net.IP) error {
func (kp *KubeProxy) setProxyHostMetadata(hostMetadata map[string]*proto.HostMetadataV4V6Update) {
kp.lock.Lock()
defer kp.lock.Unlock()

kp.proxy.SetHostMetadata(hostMetadata, true)
}

func (kp *KubeProxy) run(hostIPs []net.IP, hostMetadata map[string]*proto.HostMetadataV4V6Update) error {

ips := make([]net.IP, 0, len(hostIPs))
for _, ip := range hostIPs {
Expand Down Expand Up @@ -141,6 +162,8 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error {
}

kp.proxy.SetHostIPs(hostIPs)
// Don't bother invoking a resync within SetHostMetadata; we will be syncing a fresh syncer right after.
kp.proxy.SetHostMetadata(hostMetadata, false)
kp.proxy.SetSyncer(syncer)

log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs)
Expand Down Expand Up @@ -173,26 +196,43 @@ func (kp *KubeProxy) start() error {
kp.syncer = syncer
kp.lock.Unlock()

// wait for the initial update
// Wait for the initial update.
hostIPs := <-kp.hostIPUpdates

err = kp.run(hostIPs)
hostMetadata := make(map[string]*proto.HostMetadataV4V6Update)
// Block until we go in-sync and get the first batch of hostmetadata
// updates, to avoid a flap after a Felix restart. In practice, this
// recv should happen very soon after receiving the host IPs above.
hostMetadataUpdates := <-kp.hostMetadataUpdates
mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates)
Comment on lines +202 to +207
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like a void operation. Merging with empty is whatever you got, can't you just assign it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of those structures can contain removes aswell as updates, while the other only contains updates


err = kp.run(hostIPs, hostMetadata)
if err != nil {
return err
}

kp.wg.Go(func() {
for {
var ok bool
select {
case hostIPs, ok := <-kp.hostIPUpdates:
case hostIPs, ok = <-kp.hostIPUpdates:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this now redundant because it is also covered by the hostMetadata updates (idk 100%)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to check on that - these host IP updates were ultimately coming from interface updates IIRC, and I'm not sure if the same word is being used differently between that and HostMetadata updates 🤔

if !ok {
log.Error("kube-proxy: hostIPUpdates closed")
return
}
err = kp.run(hostIPs)
err = kp.run(hostIPs, hostMetadata)
if err != nil {
log.Panic("kube-proxy failed to resync after host IPs update")
}

case hostMetadataUpdates, ok = <-kp.hostMetadataUpdates:
if !ok {
log.Error("kube-proxy: hostMetadataUpdates closed")
return
}
mergeHostMetadataV4V6Updates(hostMetadata, hostMetadataUpdates)
kp.setProxyHostMetadata(hostMetadata)

case <-kp.exiting:
log.Info("kube-proxy: exiting")
return
Expand Down Expand Up @@ -221,6 +261,92 @@ func (kp *KubeProxy) OnHostIPsUpdate(IPs []net.IP) {
log.Debugf("kube-proxy OnHostIPsUpdate: %+v", IPs)
}

// OnUpdate implements the manager interface.
// Writes updates to pending updates map - overwrites repeated updates for the same key.
func (kp *KubeProxy) OnUpdate(msg any) {
hostname := ""
switch update := msg.(type) {
case *proto.HostMetadataV4V6Update:
hostname = update.Hostname
log.WithField("msg", update).Debugf("kube-proxy OnUpdate: host metadata update")
case *proto.HostMetadataV4V6Remove:
hostname = update.Hostname
log.WithField("msg", update).Debugf("kube-proxy OnUpdate: host metadata remove")
default:
return
}

if hostname == "" {
log.WithField("msg", msg).Warn("kube-proxy OnUpdate: got host metadata update with empty hostname")
return
}

kp.pendingHostMetadataUpdates[hostname] = msg
}

// CompleteDeferredWork implements the manager interface.
// Avoids blocking the thread by draining & merging older updates on the channel before sending.
func (kp *KubeProxy) CompleteDeferredWork() error {
// If not in-sync with felix, we allow sending an empty update
// to signal to the KP loop that it can start looping.
if len(kp.pendingHostMetadataUpdates) == 0 && kp.inSyncWithIntDataplane {
log.Debug("No pending host metadata updates to process")
return nil
}

// Drain any pre-existing msg first and merge.
updates := kp.pollHostMetadataV4V6UpdatesNonBlocking()
if updates == nil {
updates = make(map[string]any)
}

// Overwrite any pre-existing updates for a given key.
// Always send 'Removes' instead of just deleting updates of the same key (since downstream may need to see a remove).
for k, v := range kp.pendingHostMetadataUpdates {
updates[k] = v
log.WithField("nodeName", k).Debug("Queueing new host metadata update")
// ... And clear the pending updates after processing.
delete(kp.pendingHostMetadataUpdates, k)
}

// Send the merged updates back down the channel.
log.Debug("Queueing new hostmetadata for main loop")
kp.hostMetadataUpdates <- updates
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition between Stop() and CompleteDeferredWork(). Stop() closes the hostMetadataUpdates channel at line 120, but CompleteDeferredWork() sends to this channel at line 314 without checking if it's closed. If Stop() is called concurrently with CompleteDeferredWork(), this could panic with "send on closed channel". Consider either: (1) checking if the channel is closed before sending, (2) using a context for cancellation, or (3) ensuring Stop() is only called after all CompleteDeferredWork() calls have completed.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is also relevant for pre-existing host IP updates.
Worth fixing both in a followup PR .

log.Debug("Successfully queued new hostmetadata")
kp.inSyncWithIntDataplane = true
return nil
}

// pollHostMetadataV4V6UpdatesNonBlocking tries to read a pending host metadata update on the update channel.
// Returns nil immediately, if nothing can be received from the updates channel.
func (kp *KubeProxy) pollHostMetadataV4V6UpdatesNonBlocking() map[string]any {
select {
case upd := <-kp.hostMetadataUpdates:
return upd
default:
return nil
}
}

// mergeHostMetadataV4V6Updates merges the existing host metadata updates with the latest updates:
// - A 'remove' in latest deletes the corresponding key in 'existing'.
// - An 'update' in latest overwrites the corresponding key in 'existing'.
// - If 'latest' is nil, does nothing. 'existing' must be non-nil.
func mergeHostMetadataV4V6Updates(existing map[string]*proto.HostMetadataV4V6Update, latest map[string]any) {
if latest == nil {
return
}

for k, v := range latest {
switch update := v.(type) {
case *proto.HostMetadataV4V6Update:
existing[k] = update
case *proto.HostMetadataV4V6Remove:
delete(existing, k)
}
}
}

// OnRouteUpdate should be used to update the internal state of routing tables
func (kp *KubeProxy) OnRouteUpdate(k routes.KeyInterface, v routes.ValueInterface) {
log.WithFields(log.Fields{"key": k, "value": v}).Debug("kube-proxy: OnRouteUpdate")
Expand Down
120 changes: 120 additions & 0 deletions felix/bpf/proxy/kube-proxy_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ package proxy
import (
"net"
"testing"
"time"

. "github.com/onsi/gomega"

"github.com/projectcalico/calico/felix/proto"
)

// The main suite of tests in kube-proxy_test.go use a real syncer, making it
Expand Down Expand Up @@ -71,3 +76,118 @@ func (s *mockSyncer) ConntrackFrontendHasBackend(ip net.IP, port uint16, backend
func (s *mockSyncer) ConntrackDestIsService(ip net.IP, port uint16, proto uint8) bool {
return true
}

func TestMergeHostMetadataV4V6Updates(t *testing.T) {
RegisterTestingT(t)
existing := map[string]*proto.HostMetadataV4V6Update{
"host1": {Hostname: "host1", Ipv4Addr: "1.1.1.1"},
"host2": {Hostname: "host2", Ipv4Addr: "2.2.2.2"},
}
latest := map[string]any{
"host1": &proto.HostMetadataV4V6Remove{Hostname: "host1"},
"host2": &proto.HostMetadataV4V6Update{Hostname: "host2", Ipv4Addr: "5.5.5.5"},
"host3": &proto.HostMetadataV4V6Update{Hostname: "host3", Ipv4Addr: "3.3.3.3"},
}
mergeHostMetadataV4V6Updates(existing, latest)

Expect(existing).To(HaveLen(2), "Expected host1 to be removed and host3 to be added")
Expect(existing).NotTo(HaveKey("host1"))
Expect(existing).To(HaveKey("host2"))
Expect(existing).To(HaveKey("host3"))

Expect(existing["host2"].Ipv4Addr).To(Equal("5.5.5.5"))
Expect(existing["host3"].Ipv4Addr).To(Equal("3.3.3.3"))
}

func TestOnUpdateBatchesHostMetadataUpdates(t *testing.T) {
RegisterTestingT(t)
kp := KubeProxy{
hostMetadataUpdates: make(chan map[string]any, 1),
pendingHostMetadataUpdates: make(map[string]any),
}

update := &proto.HostMetadataV4V6Update{
Hostname: "hn1",
Ipv4Addr: "1.2.3.4",
Labels: map[string]string{"label1": "label1val"},
}
kp.OnUpdate(update)

update2 := &proto.HostMetadataV4V6Update{
Hostname: "hn2",
Ipv4Addr: "2.2.2.2",
Labels: map[string]string{"label2": "label2val"},
}
kp.OnUpdate(update2)

Consistently(kp.pollHostMetadataV4V6UpdatesNonBlocking(), 100*time.Millisecond).Should(BeNil(), "No updates should have been sent before CompleteDeferredWork")

Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed")
// Read the queued update from the channel.
Expect(kp.pollHostMetadataV4V6UpdatesNonBlocking()).To(Equal(map[string]any{
"hn1": update,
"hn2": update2,
}))
}

func TestCompleteDeferredWorkSendsEmptyUpdateOnce(t *testing.T) {
RegisterTestingT(t)
kp := KubeProxy{
hostMetadataUpdates: make(chan map[string]any, 1),
pendingHostMetadataUpdates: make(map[string]any),
}

// First call with no pending updates should still send an empty map
// to signal the KP loop to start.
Expect(kp.CompleteDeferredWork()).To(Succeed())
msg := kp.pollHostMetadataV4V6UpdatesNonBlocking()
Expect(msg).NotTo(BeNil(), "First call should send an empty update to unblock the KP loop")
Expect(msg).To(BeEmpty(), "The update should be an empty map")

// Second call with no pending updates should be a no-op.
Expect(kp.CompleteDeferredWork()).To(Succeed())
Expect(kp.pollHostMetadataV4V6UpdatesNonBlocking()).To(BeNil(),
"Second call with no pending updates should not send anything")

// Sending a real update should still work after we're in sync.
kp.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "host1", Ipv4Addr: "1.1.1.1"})
Expect(kp.CompleteDeferredWork()).To(Succeed())
msg = kp.pollHostMetadataV4V6UpdatesNonBlocking()
Expect(msg).To(HaveLen(1))
Expect(msg).To(HaveKey("host1"))

// And after that, no-op again with no pending updates.
Expect(kp.CompleteDeferredWork()).To(Succeed())
Expect(kp.pollHostMetadataV4V6UpdatesNonBlocking()).To(BeNil(),
"Should not send when in sync and no pending updates")
}

func TestOnUpdateRemoveOverwritesPendingUpdate(t *testing.T) {
RegisterTestingT(t)
kp := KubeProxy{
hostMetadataUpdates: make(chan map[string]any, 1),
pendingHostMetadataUpdates: make(map[string]any),
}

update1 := &proto.HostMetadataV4V6Update{
Hostname: "hn1",
Ipv4Addr: "1.2.3.4",
}
update2 := &proto.HostMetadataV4V6Update{
Hostname: "hn2",
Ipv4Addr: "1.2.3.4",
}
update1Remove := &proto.HostMetadataV4V6Remove{Hostname: "hn1"}

// Queue an update, then an immediate remove for the same hostname.
kp.OnUpdate(update1)
kp.OnUpdate(update2)
kp.OnUpdate(update1Remove)

Expect(kp.CompleteDeferredWork()).To(Succeed(), "CompleteDeferredWork should succeed")

Expect(kp.pollHostMetadataV4V6UpdatesNonBlocking()).To(Equal(map[string]any{
"hn1": update1Remove,
"hn2": update2,
}))
}
4 changes: 4 additions & 0 deletions felix/bpf/proxy/kube-proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/projectcalico/calico/felix/bpf/conntrack"
"github.com/projectcalico/calico/felix/bpf/mock"
proxy "github.com/projectcalico/calico/felix/bpf/proxy"
"github.com/projectcalico/calico/felix/proto"
)

var _ = Describe("BPF kube-proxy", func() {
Expand Down Expand Up @@ -87,6 +88,9 @@ var _ = Describe("BPF kube-proxy", func() {

k8s := fake.NewClientset(testSvc, testSvcEps)
p, _ = proxy.StartKubeProxy(k8s, "test-node", maps, proxy.WithImmediateSync(), proxy.WithMaglevLUTSize(maglevLUTSize))
// Unblock start(), which blocks on the initial host metadata update.
p.OnUpdate(&proto.HostMetadataV4V6Update{Hostname: "dummy"})
Expect(p.CompleteDeferredWork()).To(Succeed())
})

AfterEach(func() {
Expand Down
Loading