From a142d51f40c5e6b7dd6f727f21f521e6aee84127 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Thu, 22 Jan 2026 21:27:29 +0000 Subject: [PATCH 1/2] fix(registry): fix JIT error scoping race Concurrent requests waiting on JIT provisioning could bypass error checks if the primary request failed. Store init error on struct to ensure all waiters see the failure. --- pkg/epp/flowcontrol/registry/registry.go | 10 ++- pkg/epp/flowcontrol/registry/registry_test.go | 77 +++++++++++++++++++ 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/pkg/epp/flowcontrol/registry/registry.go b/pkg/epp/flowcontrol/registry/registry.go index 5096c9f686..3c136c97d2 100644 --- a/pkg/epp/flowcontrol/registry/registry.go +++ b/pkg/epp/flowcontrol/registry/registry.go @@ -76,6 +76,9 @@ type flowState struct { // This prevents race conditions where multiple concurrent requests might attempt to provision the same flow // simultaneously. initialized sync.Once + // initErr stores the result of the strictly-once initialization. + // This allows concurrent waiters to see if the initialization failed. + initErr error } // priorityBandState tracks the lifecycle state for a dynamically provisioned priority band. @@ -249,12 +252,11 @@ func (fr *FlowRegistry) WithConnection(key types.FlowKey, fn func(conn contracts // 2. JIT provisioning: Ensure physical resources exist on shards. // We use sync.Once to ensure we only pay the initialization cost (building components, locking shards) exactly once // per flowState object. - var jitErr error state.initialized.Do(func() { - jitErr = fr.ensureFlowInfrastructure(key) + state.initErr = fr.ensureFlowInfrastructure(key) }) - if jitErr != nil { + if state.initErr != nil { // If provisioning failed, this state object is invalid. // We remove it from the map so that subsequent requests will attempt to create a fresh state object. fr.flowStates.Delete(key) @@ -268,7 +270,7 @@ func (fr *FlowRegistry) WithConnection(key types.FlowKey, fn func(conn contracts } } - return fmt.Errorf("failed to provision JIT flow resources: %w", jitErr) + return fmt.Errorf("failed to provision JIT flow resources: %w", state.initErr) } // 3. Execute callback. diff --git a/pkg/epp/flowcontrol/registry/registry_test.go b/pkg/epp/flowcontrol/registry/registry_test.go index 6fead03739..f48b760eb7 100644 --- a/pkg/epp/flowcontrol/registry/registry_test.go +++ b/pkg/epp/flowcontrol/registry/registry_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -1372,3 +1373,79 @@ func TestFlowRegistry_PriorityBandGarbageCollection(t *testing.T) { assert.True(t, exists, "Band must not be collected if leaseCount > 0, even if empty and idle timer expired") }) } + +// TestFlowRegistry_JITErrorScoping ensures that JIT provisioning errors are correctly propagated to all concurrent +// requests waiting on the same flow initialization. +func TestFlowRegistry_JITErrorScoping(t *testing.T) { + t.Parallel() + handle := newTestPluginsHandle(t) + + // Create a registry with a capability checker that passes validation but using a queue name that doesn't exist. + // This ensures NewConfig succeeds, but JIT (ensureFlowInfrastructure) fails when trying to instantiate the queue. + failQueueName := queue.RegisteredQueueName("NonExistentQueue") + mockChecker := &mockCapabilityChecker{ + checkCompatibilityFunc: func(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error { + return nil // Bypass validation. + }, + } + + // We create a custom band config that uses this failing queue. + // We set it as the default band so that dynamic provisioning is used. + failingBand, err := NewPriorityBandConfig(handle, 0, "FailingBand", WithQueue(failQueueName)) + require.NoError(t, err) + + cfg, err := NewConfig(handle, withCapabilityChecker(mockChecker), WithDefaultPriorityBand(failingBand)) + require.NoError(t, err) + + registry, err := NewFlowRegistry(cfg, logr.Discard()) + require.NoError(t, err) + + key := types.FlowKey{ + Priority: 100, // Dynamic, will trigger ensurePriorityBand + ID: "flow-should-fail", + } + + // Simulate contention: + // We acquire the registry RLock. + // JIT provisioning (dynamic band) requires registry Lock (Write Lock). + // So the first thread to reach ensurePriorityBand will block until we release this lock. + // All other threads will pile up behind it on sync.Once. + registry.mu.RLock() + + const concurrency = 10 + var successCount atomic.Int32 + var errorCount atomic.Int32 + wg := sync.WaitGroup{} + wg.Add(concurrency) + + start := make(chan struct{}) + + for range concurrency { + go func() { + defer wg.Done() + <-start + err := registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error { + return nil + }) + if err == nil { + successCount.Add(1) + } else { + errorCount.Add(1) + } + }() + } + + close(start) + + // Wait a bit to ensure everyone is stuck. + time.Sleep(100 * time.Millisecond) + + // Release the lock, letting the winner proceed (and fail). + registry.mu.RUnlock() + + wg.Wait() + + // Assertion: all requests should fail. + assert.Equal(t, int32(concurrency), errorCount.Load(), "All requests should fail JIT provisioning") + assert.Equal(t, int32(0), successCount.Load(), "No request should succeed if JIT failed") +} From 77e898361d30715e858291ef44dcc4694a7be297 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Mon, 26 Jan 2026 17:09:15 +0000 Subject: [PATCH 2/2] rebase and resolve conflicts --- pkg/epp/flowcontrol/registry/registry_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/flowcontrol/registry/registry_test.go b/pkg/epp/flowcontrol/registry/registry_test.go index f48b760eb7..24116803cf 100644 --- a/pkg/epp/flowcontrol/registry/registry_test.go +++ b/pkg/epp/flowcontrol/registry/registry_test.go @@ -1384,7 +1384,7 @@ func TestFlowRegistry_JITErrorScoping(t *testing.T) { // This ensures NewConfig succeeds, but JIT (ensureFlowInfrastructure) fails when trying to instantiate the queue. failQueueName := queue.RegisteredQueueName("NonExistentQueue") mockChecker := &mockCapabilityChecker{ - checkCompatibilityFunc: func(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error { + checkCompatibilityFunc: func(p framework.OrderingPolicy, q queue.RegisteredQueueName) error { return nil // Bypass validation. }, }