diff --git a/felix/bpf/proxy/proxy.go b/felix/bpf/proxy/proxy.go index 5d8720ecc61..ef43bee2e8f 100644 --- a/felix/bpf/proxy/proxy.go +++ b/felix/bpf/proxy/proxy.go @@ -365,6 +365,12 @@ func (p *proxy) SetSyncer(s DPSyncer) { p.syncerLck.Lock() p.dpSyncer.Stop() p.dpSyncer = s + // Re-wire the trigger so that the new syncer's expand-NodePort fixup + // goroutine can schedule a dataplane Apply when previously-missed + // route lookups resolve. proxy.New() does this for the initial syncer; + // without it here, swapping in a fresh syncer (e.g. on a host-IP + // change) would silently lose that wakeup path. + s.SetTriggerFn(p.runner.Run) p.syncerLck.Unlock() p.forceSyncDP() diff --git a/felix/bpf/proxy/proxy_test.go b/felix/bpf/proxy/proxy_test.go index cf426f8d0d7..46e4481e545 100644 --- a/felix/bpf/proxy/proxy_test.go +++ b/felix/bpf/proxy/proxy_test.go @@ -69,6 +69,50 @@ var _ = Describe("BPF Proxy", func() { }) }) + It("should re-arm the NodePort externalTrafficPolicy=Local route-fixup trigger after a syncer swap", func() { + // expand-NodePort fixup (used for ExternalTrafficPolicy=Local + // NodePort services with remote endpoints) parks unresolved + // route lookups on a goroutine that calls back into the proxy + // via syncer.triggerFn() once the missing route arrives. proxy.New() + // wires that callback on the initial syncer; SetSyncer must do the + // same when swapping a fresh syncer in (e.g. on a host-IP change), + // otherwise local-policy NodePort traffic to backends that landed + // after the swap is silently never applied to the dataplane. + k8s := fake.NewClientset() + syncStop = make(chan struct{}) + + first := newMockSyncer(syncStop) + p, err := proxy.New(k8s, first, "testnode", proxy.WithImmediateSync()) + Expect(err).NotTo(HaveOccurred()) + + defer func() { + close(syncStop) + p.Stop() + }() + + // Drain the initial sync. + first.checkState(func(proxy.DPSyncerState) {}) + + // SetSyncer calls forceSyncDP synchronously, which calls Apply on + // the new syncer; that blocks until checkState drains it. Run + // SetSyncer in a goroutine so the test can drain in parallel. + second := newMockSyncer(syncStop) + go p.SetSyncer(second) + + // Drain the forceSyncDP triggered by SetSyncer. + second.checkState(func(proxy.DPSyncerState) {}) + + // SetSyncer must have wired the fixup callback on the new syncer + // before any of its background goroutines could fire. + Expect(second.triggerFn).NotTo(BeNil(), "SetSyncer must wire the fixup callback on the new syncer") + + // Simulate the fixup goroutine resolving a missed route by + // invoking the callback the syncer was handed. The proxy must + // schedule another Apply. + second.triggerFn() + second.checkState(func(proxy.DPSyncerState) {}) + }) + testSvc := &v1.Service{ TypeMeta: typeMetaV1("Service"), ObjectMeta: objectMetaV1("testService"), @@ -645,12 +689,14 @@ var _ = Describe("BPF Proxy", func() { type mockSyncer struct { syncerConntrackAPIDummy - out chan proxy.DPSyncerState - in chan error - stop chan struct{} + out chan proxy.DPSyncerState + in chan error + stop chan struct{} + triggerFn func() } func (s *mockSyncer) SetTriggerFn(f func()) { + s.triggerFn = f } func newMockSyncer(stop chan struct{}) *mockSyncer {