Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/epp/flowcontrol/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type flowState struct {
// This prevents race conditions where multiple concurrent requests might attempt to provision the same flow
// simultaneously.
initialized sync.Once
// initErr stores the result of the strictly-once initialization.
// This allows concurrent waiters to see if the initialization failed.
initErr error
}

// priorityBandState tracks the lifecycle state for a dynamically provisioned priority band.
Expand Down Expand Up @@ -249,12 +252,11 @@ func (fr *FlowRegistry) WithConnection(key types.FlowKey, fn func(conn contracts
// 2. JIT provisioning: Ensure physical resources exist on shards.
// We use sync.Once to ensure we only pay the initialization cost (building components, locking shards) exactly once
// per flowState object.
var jitErr error
state.initialized.Do(func() {
jitErr = fr.ensureFlowInfrastructure(key)
state.initErr = fr.ensureFlowInfrastructure(key)
})

if jitErr != nil {
if state.initErr != nil {
// If provisioning failed, this state object is invalid.
// We remove it from the map so that subsequent requests will attempt to create a fresh state object.
fr.flowStates.Delete(key)
Expand All @@ -268,7 +270,7 @@ func (fr *FlowRegistry) WithConnection(key types.FlowKey, fn func(conn contracts
}
}

return fmt.Errorf("failed to provision JIT flow resources: %w", jitErr)
return fmt.Errorf("failed to provision JIT flow resources: %w", state.initErr)
}

// 3. Execute callback.
Expand Down
77 changes: 77 additions & 0 deletions pkg/epp/flowcontrol/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1372,3 +1373,79 @@ func TestFlowRegistry_PriorityBandGarbageCollection(t *testing.T) {
assert.True(t, exists, "Band must not be collected if leaseCount > 0, even if empty and idle timer expired")
})
}

// TestFlowRegistry_JITErrorScoping ensures that JIT provisioning errors are correctly propagated to all concurrent
// requests waiting on the same flow initialization.
func TestFlowRegistry_JITErrorScoping(t *testing.T) {
t.Parallel()
handle := newTestPluginsHandle(t)

// Create a registry with a capability checker that passes validation but using a queue name that doesn't exist.
// This ensures NewConfig succeeds, but JIT (ensureFlowInfrastructure) fails when trying to instantiate the queue.
failQueueName := queue.RegisteredQueueName("NonExistentQueue")
mockChecker := &mockCapabilityChecker{
checkCompatibilityFunc: func(p framework.OrderingPolicy, q queue.RegisteredQueueName) error {
return nil // Bypass validation.
},
}

// We create a custom band config that uses this failing queue.
// We set it as the default band so that dynamic provisioning is used.
failingBand, err := NewPriorityBandConfig(handle, 0, "FailingBand", WithQueue(failQueueName))
require.NoError(t, err)

cfg, err := NewConfig(handle, withCapabilityChecker(mockChecker), WithDefaultPriorityBand(failingBand))
require.NoError(t, err)

registry, err := NewFlowRegistry(cfg, logr.Discard())
require.NoError(t, err)

key := types.FlowKey{
Priority: 100, // Dynamic, will trigger ensurePriorityBand
ID: "flow-should-fail",
}

// Simulate contention:
// We acquire the registry RLock.
// JIT provisioning (dynamic band) requires registry Lock (Write Lock).
// So the first thread to reach ensurePriorityBand will block until we release this lock.
// All other threads will pile up behind it on sync.Once.
registry.mu.RLock()

const concurrency = 10
var successCount atomic.Int32
var errorCount atomic.Int32
wg := sync.WaitGroup{}
wg.Add(concurrency)

start := make(chan struct{})

for range concurrency {
go func() {
defer wg.Done()
<-start
err := registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error {
return nil
})
if err == nil {
successCount.Add(1)
} else {
errorCount.Add(1)
}
}()
}

close(start)

// Wait a bit to ensure everyone is stuck.
time.Sleep(100 * time.Millisecond)

// Release the lock, letting the winner proceed (and fail).
registry.mu.RUnlock()

wg.Wait()

// Assertion: all requests should fail.
assert.Equal(t, int32(concurrency), errorCount.Load(), "All requests should fail JIT provisioning")
assert.Equal(t, int32(0), successCount.Load(), "No request should succeed if JIT failed")
}