diff --git a/pkg/epp/flowcontrol/contracts/registry.go b/pkg/epp/flowcontrol/contracts/registry.go index addc2c1f00..16d3b71c24 100644 --- a/pkg/epp/flowcontrol/contracts/registry.go +++ b/pkg/epp/flowcontrol/contracts/registry.go @@ -81,16 +81,21 @@ type FlowRegistryDataPlane interface { WithConnection(key types.FlowKey, fn func(conn ActiveFlowConnection) error) error } -// ActiveFlowConnection represents a handle to a temporary, leased session on a flow. -// It provides a safe, scoped entry point to the registry's sharded data plane. +// ActiveFlowConnection represents a handle to a scoped, leased session on a flow. +// It provides a safe entry point to the registry's sharded data plane. // // An `ActiveFlowConnection` instance is only valid for the duration of the `WithConnection` callback from which it was // received. Callers MUST NOT store a reference to this object or use it after the callback returns. -// Its purpose is to ensure that any interaction with the flow's state (e.g., accessing its shards and queues) occurs -// safely while the flow is guaranteed to be protected from garbage collection. +// +// Lifecycle & Pinning: +// This interface represents an active "Lease" on the flow. As long as this object is valid (within the callback), the +// Flow Registry guarantees that the underlying Flow State is "Pinned" and protected from Garbage Collection. type ActiveFlowConnection interface { - // ActiveShards returns a stable snapshot of accessors for all Active internal state shards. + // ActiveShards returns a current snapshot of accessors for all Active internal state shards. ActiveShards() []RegistryShard + + // FlowKey returns the immutable identity of the flow this connection is pinned to. + FlowKey() types.FlowKey } // RegistryShard defines the interface for a single slice (shard) of the `FlowRegistry`'s state. diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index ac33660f31..73ab903a3c 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -227,54 +227,74 @@ func (fc *FlowController) EnqueueAndWait( reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req) defer cancel() - // 2. Enter the distribution loop to find a home for the request. - // This loop is responsible for retrying on ErrShardDraining. - for { - select { // Non-blocking check on controller lifecycle. - case <-fc.parentCtx.Done(): - return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning) - default: - } + var finalOutcome types.QueueOutcome + + // 2. Acquire a lease for the Flow. + // We hold this lease for the entire duration of the request (Distribution + Queueing). + err := fc.registry.WithConnection(flowKey, func(conn contracts.ActiveFlowConnection) error { + // 3. Enter the distribution loop to find a home for the request. + // This loop is responsible for retrying on ErrShardDraining. + // We can safely retry within this loop using the same 'conn' object because conn.ActiveShards() provides a live + // view of the topology. + for { + select { // Non-blocking check on controller lifecycle. + case <-fc.parentCtx.Done(): + finalOutcome = types.QueueOutcomeRejectedOther + return fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning) + default: + } - // Attempt to distribute the request once. - item, err := fc.tryDistribution(reqCtx, req, enqueueTime) - if err != nil { - // Distribution failed terminally (e.g., no shards, context cancelled during blocking submit). - // The item has already been finalized by tryDistribution. - finalState := item.FinalState() - return finalState.Outcome, finalState.Err - } + // Attempt to distribute the request once, passing the active connection. + item, err := fc.tryDistribution(reqCtx, req, enqueueTime, conn) + if err != nil { + // Distribution failed terminally (e.g., no shards, context cancelled during blocking submit). + // The item has already been finalized by tryDistribution. + finalState := item.FinalState() + finalOutcome = finalState.Outcome + return finalState.Err + } - // Distribution was successful; ownership of the item has been transferred to a processor. - // Now, we block here in awaitFinalization until the request is finalized by either the processor (e.g., dispatched, - // rejected) or the controller itself (e.g., caller's context cancelled/TTL expired). - outcome, err := fc.awaitFinalization(reqCtx, item) - if errors.Is(err, contracts.ErrShardDraining) { - // This is a benign race condition where the chosen shard started draining after acceptance. - fc.logger.V(logutil.DEBUG).Info("Selected shard is Draining, retrying request distribution", - "flowKey", req.FlowKey(), "requestID", req.ID()) - // Introduce a small, randomized delay (1-10ms) to prevent tight spinning loops and thundering herds during retry - // scenarios (e.g., shard draining) - // TODO: Replace this with a more sophisticated backoff strategy when our data parallelism story matures. - // For now, this is more than sufficient. - jitterMs := k8srand.Intn(10) + 1 - fc.clock.Sleep(time.Duration(jitterMs) * time.Millisecond) - continue + // Distribution was successful; ownership of the item has been transferred to a processor. + // Now, we block here in awaitFinalization until the request is finalized by either the processor (e.g., dispatched, + // rejected) or the controller itself (e.g., caller's context cancelled/TTL expired). + outcome, err := fc.awaitFinalization(reqCtx, item) + if errors.Is(err, contracts.ErrShardDraining) { + // This is a benign race condition where the chosen shard started draining after acceptance. + fc.logger.V(logutil.DEBUG).Info("Selected shard is Draining, retrying request distribution", + "flowKey", req.FlowKey(), "requestID", req.ID()) + // Introduce a small, randomized delay (1-10ms) to prevent tight spinning loops and thundering herds during retry + // scenarios (e.g., shard draining) + jitterMs := k8srand.Intn(10) + 1 + fc.clock.Sleep(time.Duration(jitterMs) * time.Millisecond) + continue + } + + // The outcome is terminal (Dispatched, Evicted, or a non-retriable rejection). + finalOutcome = outcome + return err } + }) - // The outcome is terminal (Dispatched, Evicted, or a non-retriable rejection). - return outcome, err + // If WithConnection returned an error (e.g. connection failure, context cancelled before lease), we must ensure we + // return a valid rejection outcome. + // In the success case (where the closure ran), finalOutcome is set inside the closure. + if err != nil && finalOutcome == types.QueueOutcomeNotYetFinalized { + return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, err) } + + return finalOutcome, err } var errNoShards = errors.New("no viable active shards available") // tryDistribution handles a single attempt to select a shard and submit a request. +// It uses the provided `conn` to identify candidate shards. // If this function returns an error, it guarantees that the provided `item` has been finalized. func (fc *FlowController) tryDistribution( reqCtx context.Context, req types.FlowControlRequest, enqueueTime time.Time, + conn contracts.ActiveFlowConnection, ) (*internal.FlowItem, error) { // Calculate effective TTL for item initialization (reqCtx is the enforcement mechanism). effectiveTTL := fc.config.DefaultRequestTTL @@ -287,7 +307,7 @@ func (fc *FlowController) tryDistribution( // We must create a fresh FlowItem on each attempt as finalization is per-lifecycle. item := internal.NewItem(req, effectiveTTL, enqueueTime) - candidates, err := fc.selectDistributionCandidates(item.OriginalRequest().FlowKey()) + candidates, err := fc.selectDistributionCandidates(conn) if err != nil { outcome := types.QueueOutcomeRejectedOther if errors.Is(err, errNoShards) { @@ -367,35 +387,29 @@ type candidate struct { byteSize uint64 } -// selectDistributionCandidates identifies all Active shards for the item's flow and ranks them by the current byte size +// selectDistributionCandidates identifies all Active shards for the leased flow and ranks them by the current byte size // of that flow's queue, from least to most loaded. -func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]candidate, error) { - var candidates []candidate - - // Acquire a connection to the registry for the flow key. This ensures a consistent view of the ActiveShards for the - // duration of the shard selection process, preventing races with concurrent shard topology changes. - err := fc.registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error { - shards := conn.ActiveShards() - candidates = make([]candidate, 0, len(shards)) - for _, shard := range shards { - worker := fc.getOrStartWorker(shard) - mq, err := shard.ManagedQueue(key) - if err != nil { - fc.logger.Error(err, - "Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard.", - "flowKey", key, "shardID", shard.ID()) - continue - } - candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.FlowQueueAccessor().ByteSize()}) +func (fc *FlowController) selectDistributionCandidates(conn contracts.ActiveFlowConnection) ([]candidate, error) { + shards := conn.ActiveShards() + if len(shards) == 0 { + return nil, fmt.Errorf("%w for flow %s", errNoShards, conn.FlowKey()) + } + + candidates := make([]candidate, 0, len(shards)) + for _, shard := range shards { + worker := fc.getOrStartWorker(shard) + mq, err := shard.ManagedQueue(conn.FlowKey()) + if err != nil { + fc.logger.Error(err, + "Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard.", + "flowKey", conn.FlowKey(), "shardID", shard.ID()) + continue } - return nil - }) - if err != nil { - return nil, fmt.Errorf("failed to acquire lease for flow %s: %w", key, err) + candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.FlowQueueAccessor().ByteSize()}) } if len(candidates) == 0 { - return nil, fmt.Errorf("%w for flow %s", errNoShards, key) + return nil, fmt.Errorf("%w for flow %s", errNoShards, conn.FlowKey()) } slices.SortFunc(candidates, func(a, b candidate) int { diff --git a/pkg/epp/flowcontrol/controller/controller_test.go b/pkg/epp/flowcontrol/controller/controller_test.go index 33aa2b34ad..391c09cb84 100644 --- a/pkg/epp/flowcontrol/controller/controller_test.go +++ b/pkg/epp/flowcontrol/controller/controller_test.go @@ -159,14 +159,22 @@ func newIntegrationHarness(t *testing.T, ctx context.Context, cfg Config, regist // mockActiveFlowConnection is a local mock for the `contracts.ActiveFlowConnection` interface. type mockActiveFlowConnection struct { - contracts.ActiveFlowConnection - ActiveShardsV []contracts.RegistryShard + ActiveShardsV []contracts.RegistryShard + ActiveShardsFunc func() []contracts.RegistryShard + FlowKeyV types.FlowKey } func (m *mockActiveFlowConnection) ActiveShards() []contracts.RegistryShard { + if m.ActiveShardsFunc != nil { + return m.ActiveShardsFunc() + } return m.ActiveShardsV } +func (m *mockActiveFlowConnection) FlowKey() types.FlowKey { + return m.FlowKeyV +} + // mockRegistryClient is a mock for the private `registryClient` interface. type mockRegistryClient struct { contracts.FlowRegistryObserver @@ -324,8 +332,11 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { // Configure registry to return a shard. shardA := newMockShard("shard-A").build() - h.mockRegistry.WithConnectionFunc = func(_ types.FlowKey, fn func(_ contracts.ActiveFlowConnection) error) error { - return fn(&mockActiveFlowConnection{ActiveShardsV: []contracts.RegistryShard{shardA}}) + h.mockRegistry.WithConnectionFunc = func(key types.FlowKey, fn func(_ contracts.ActiveFlowConnection) error) error { + return fn(&mockActiveFlowConnection{ + ActiveShardsV: []contracts.RegistryShard{shardA}, + FlowKeyV: key, + }) } // Configure processor to block until context expiry. h.mockProcessorFactory.processors["shard-A"] = &mockShardProcessor{ @@ -421,10 +432,13 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { }, } mockRegistry.WithConnectionFunc = func( - _ types.FlowKey, + key types.FlowKey, fn func(conn contracts.ActiveFlowConnection) error, ) error { - return fn(&mockActiveFlowConnection{ActiveShardsV: []contracts.RegistryShard{faultyShard}}) + return fn(&mockActiveFlowConnection{ + ActiveShardsV: []contracts.RegistryShard{faultyShard}, + FlowKeyV: key, + }) } req := newTestRequest(defaultFlowKey) @@ -588,10 +602,13 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { // Configure the registry to return the specified shards. mockRegistry.WithConnectionFunc = func( - _ types.FlowKey, + key types.FlowKey, fn func(conn contracts.ActiveFlowConnection) error, ) error { - return fn(&mockActiveFlowConnection{ActiveShardsV: tc.shards}) + return fn(&mockActiveFlowConnection{ + ActiveShardsV: tc.shards, + FlowKeyV: key, + }) } tc.setupProcessors(t, h) @@ -636,11 +653,12 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { t.Parallel() mockRegistry := &mockRegistryClient{ WithConnectionFunc: func( - _ types.FlowKey, + key types.FlowKey, fn func(conn contracts.ActiveFlowConnection, ) error) error { return fn(&mockActiveFlowConnection{ ActiveShardsV: []contracts.RegistryShard{newMockShard("shard-A").build()}, + FlowKeyV: key, }) }, } @@ -677,19 +695,26 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { var callCount atomic.Int32 mockRegistry := &mockRegistryClient{ WithConnectionFunc: func( - _ types.FlowKey, + key types.FlowKey, fn func(conn contracts.ActiveFlowConnection) error, ) error { - attempt := callCount.Add(1) shardA := newMockShard("shard-A").withByteSize(100).build() shardB := newMockShard("shard-B").withByteSize(1000).build() - if attempt == 1 { - // Attempt 1: Shard A is the least loaded and is selected. - return fn(&mockActiveFlowConnection{ActiveShardsV: []contracts.RegistryShard{shardA, shardB}}) + // We construct the connection once, using a hook for dynamic topology. + conn := &mockActiveFlowConnection{ + FlowKeyV: key, + ActiveShardsFunc: func() []contracts.RegistryShard { + attempt := callCount.Add(1) + if attempt == 1 { + // Attempt 1: Shard A is present and will be selected (least loaded). + return []contracts.RegistryShard{shardA, shardB} + } + // Attempt 2 (Retry): Assume Shard A is now draining and removed from the active set by the registry. + return []contracts.RegistryShard{shardB} + }, } - // Attempt 2 (Retry): Assume Shard A is now draining and removed from the active set by the registry. - return fn(&mockActiveFlowConnection{ActiveShardsV: []contracts.RegistryShard{shardB}}) + return fn(conn) }, } // Use a long TTL to ensure retries don't time out. @@ -735,8 +760,11 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { h := newUnitHarness(t, t.Context(), Config{DefaultRequestTTL: 10 * time.Second}, nil) shardA := newMockShard("shard-A").build() - h.mockRegistry.WithConnectionFunc = func(_ types.FlowKey, fn func(_ contracts.ActiveFlowConnection) error) error { - return fn(&mockActiveFlowConnection{ActiveShardsV: []contracts.RegistryShard{shardA}}) + h.mockRegistry.WithConnectionFunc = func(key types.FlowKey, fn func(_ contracts.ActiveFlowConnection) error) error { + return fn(&mockActiveFlowConnection{ + ActiveShardsV: []contracts.RegistryShard{shardA}, + FlowKeyV: key, + }) } // Channel for synchronization. @@ -806,8 +834,11 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { h := newUnitHarness(t, t.Context(), Config{DefaultRequestTTL: requestTTL}, nil) shardA := newMockShard("shard-A").build() - h.mockRegistry.WithConnectionFunc = func(_ types.FlowKey, fn func(_ contracts.ActiveFlowConnection) error) error { - return fn(&mockActiveFlowConnection{ActiveShardsV: []contracts.RegistryShard{shardA}}) + h.mockRegistry.WithConnectionFunc = func(key types.FlowKey, fn func(_ contracts.ActiveFlowConnection) error) error { + return fn(&mockActiveFlowConnection{ + ActiveShardsV: []contracts.RegistryShard{shardA}, + FlowKeyV: key, + }) } itemSubmitted := make(chan *internal.FlowItem, 1) @@ -868,6 +899,88 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { assert.Equal(t, types.QueueOutcomeEvictedTTL, finalState.Outcome, "Item's internal outcome must match the returned outcome") }) + + // Validates that the Flow Registry lease is held (pinned) for the entire duration of the request, including the + // time spent blocking in the processor's queue. If the lease is released early, the Garbage Collector could delete + // the flow while requests are queued. + t.Run("LeaseHeldDuringQueueing", func(t *testing.T) { + t.Parallel() + + // Synchronization channels + leaseReleased := make(chan struct{}) + processorEntered := make(chan struct{}) + unblockProcessor := make(chan struct{}) + + // 1. Setup Registry: Trace when the lease is released. + mockRegistry := &mockRegistryClient{ + WithConnectionFunc: func( + key types.FlowKey, + fn func(conn contracts.ActiveFlowConnection) error, + ) error { + // Execute the controller's logic. + err := fn(&mockActiveFlowConnection{ + ActiveShardsV: []contracts.RegistryShard{newMockShard("shard-A").build()}, + FlowKeyV: key, + }) + // Signal that the closure has finished and the lease is about to be released. + close(leaseReleased) + return err + }, + } + + h := newUnitHarness(t, t.Context(), Config{}, mockRegistry) + + // 2. Setup Processor: Simulate a long wait in the queue. + h.mockProcessorFactory.processors["shard-A"] = &mockShardProcessor{ + SubmitFunc: func(_ *internal.FlowItem) error { return internal.ErrProcessorBusy }, + SubmitOrBlockFunc: func(ctx context.Context, item *internal.FlowItem) error { + close(processorEntered) // Signal that we are now "queued" + + // Block until the test allows us to proceed. + select { + case <-unblockProcessor: + item.FinalizeWithOutcome(types.QueueOutcomeDispatched, nil) + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + } + + // 3. Run EnqueueAndWait in the background. + go func() { + _, _ = h.fc.EnqueueAndWait(context.Background(), newTestRequest(defaultFlowKey)) + }() + + // 4. Wait for the request to enter the queue (Blocking phase). + select { + case <-processorEntered: + // Success: The request is now blocked inside the processor. + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for request to enter processor") + } + + // 5. Verify the lease is still held. + // If leaseReleased is closed, it means the controller returned from WithConnection while the request was still + // inside SubmitOrBlock. + select { + case <-leaseReleased: + t.Fatal("registry lease was released while the request was still queued.") + default: + // Success: The lease is still held. + } + + // 6. Cleanup: Unblock the processor and allow the lease to release. + close(unblockProcessor) + + // Verify that the lease is eventually released after processing finishes. + select { + case <-leaseReleased: + // Success + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for lease to release after processing finished") + } + }) }) } @@ -1159,8 +1272,11 @@ func setupRegistryForConcurrency(t *testing.T, numShards int, flowKey types.Flow } // Configure the registry connection. - mockRegistry.WithConnectionFunc = func(_ types.FlowKey, fn func(conn contracts.ActiveFlowConnection) error) error { - return fn(&mockActiveFlowConnection{ActiveShardsV: shards}) + mockRegistry.WithConnectionFunc = func(key types.FlowKey, fn func(conn contracts.ActiveFlowConnection) error) error { + return fn(&mockActiveFlowConnection{ + ActiveShardsV: shards, + FlowKeyV: key, + }) } mockRegistry.ShardStatsFunc = func() []contracts.ShardStats { stats := make([]contracts.ShardStats, len(shards)) diff --git a/pkg/epp/flowcontrol/registry/connection.go b/pkg/epp/flowcontrol/registry/connection.go index cb98316556..9a3caa1eb9 100644 --- a/pkg/epp/flowcontrol/registry/connection.go +++ b/pkg/epp/flowcontrol/registry/connection.go @@ -21,8 +21,9 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) -// connection is the concrete, un-exported implementation of the `contracts.ActiveFlowConnection` interface. -// It is a temporary handle created for the duration of a single `WithConnection` call. +// connection is the concrete, un-exported implementation of the contracts.ActiveFlowConnection interface. +// It represents a scoped lease that pins the flow state in memory, preventing garbage collection for the duration of +// the session. type connection struct { registry *FlowRegistry key types.FlowKey @@ -42,3 +43,8 @@ func (c *connection) ActiveShards() []contracts.RegistryShard { } return shardsCopy } + +// FlowKey returns the immutable identity of the flow this connection is pinned to. +func (c *connection) FlowKey() types.FlowKey { + return c.key +}