Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions felix/bpf/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ func (p *proxy) SetSyncer(s DPSyncer) {
p.syncerLck.Lock()
p.dpSyncer.Stop()
p.dpSyncer = s
// Re-wire the trigger so that the new syncer's expand-NodePort fixup
// goroutine can schedule a dataplane Apply when previously-missed
// route lookups resolve. proxy.New() does this for the initial syncer;
// without it here, swapping in a fresh syncer (e.g. on a host-IP
// change) would silently lose that wakeup path.
s.SetTriggerFn(p.runner.Run)
p.syncerLck.Unlock()

p.forceSyncDP()
Expand Down
52 changes: 49 additions & 3 deletions felix/bpf/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,50 @@ var _ = Describe("BPF Proxy", func() {
})
})

It("should re-arm the NodePort externalTrafficPolicy=Local route-fixup trigger after a syncer swap", func() {
// expand-NodePort fixup (used for ExternalTrafficPolicy=Local
// NodePort services with remote endpoints) parks unresolved
// route lookups on a goroutine that calls back into the proxy
// via syncer.triggerFn() once the missing route arrives. proxy.New()
// wires that callback on the initial syncer; SetSyncer must do the
// same when swapping a fresh syncer in (e.g. on a host-IP change),
// otherwise local-policy NodePort traffic to backends that landed
// after the swap is silently never applied to the dataplane.
k8s := fake.NewClientset()
syncStop = make(chan struct{})

first := newMockSyncer(syncStop)
p, err := proxy.New(k8s, first, "testnode", proxy.WithImmediateSync())
Expect(err).NotTo(HaveOccurred())

defer func() {
close(syncStop)
p.Stop()
}()

// Drain the initial sync.
first.checkState(func(proxy.DPSyncerState) {})

// SetSyncer calls forceSyncDP synchronously, which calls Apply on
// the new syncer; that blocks until checkState drains it. Run
// SetSyncer in a goroutine so the test can drain in parallel.
second := newMockSyncer(syncStop)
go p.SetSyncer(second)

// Drain the forceSyncDP triggered by SetSyncer.
second.checkState(func(proxy.DPSyncerState) {})

// SetSyncer must have wired the fixup callback on the new syncer
// before any of its background goroutines could fire.
Expect(second.triggerFn).NotTo(BeNil(), "SetSyncer must wire the fixup callback on the new syncer")

// Simulate the fixup goroutine resolving a missed route by
// invoking the callback the syncer was handed. The proxy must
// schedule another Apply.
second.triggerFn()
second.checkState(func(proxy.DPSyncerState) {})
})

testSvc := &v1.Service{
TypeMeta: typeMetaV1("Service"),
ObjectMeta: objectMetaV1("testService"),
Expand Down Expand Up @@ -645,12 +689,14 @@ var _ = Describe("BPF Proxy", func() {

type mockSyncer struct {
syncerConntrackAPIDummy
out chan proxy.DPSyncerState
in chan error
stop chan struct{}
out chan proxy.DPSyncerState
in chan error
stop chan struct{}
triggerFn func()
}

func (s *mockSyncer) SetTriggerFn(f func()) {
s.triggerFn = f
}

func newMockSyncer(stop chan struct{}) *mockSyncer {
Expand Down
Loading