Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pkg/epp/flowcontrol/contracts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,21 @@ type FlowRegistryDataPlane interface {
WithConnection(key types.FlowKey, fn func(conn ActiveFlowConnection) error) error
}

// ActiveFlowConnection represents a handle to a temporary, leased session on a flow.
// It provides a safe, scoped entry point to the registry's sharded data plane.
// ActiveFlowConnection represents a handle to a scoped, leased session on a flow.
// It provides a safe entry point to the registry's sharded data plane.
//
// An `ActiveFlowConnection` instance is only valid for the duration of the `WithConnection` callback from which it was
// received. Callers MUST NOT store a reference to this object or use it after the callback returns.
// Its purpose is to ensure that any interaction with the flow's state (e.g., accessing its shards and queues) occurs
// safely while the flow is guaranteed to be protected from garbage collection.
//
// Lifecycle & Pinning:
// This interface represents an active "Lease" on the flow. As long as this object is valid (within the callback), the
// Flow Registry guarantees that the underlying Flow State is "Pinned" and protected from Garbage Collection.
type ActiveFlowConnection interface {
// ActiveShards returns a stable snapshot of accessors for all Active internal state shards.
// ActiveShards returns a current snapshot of accessors for all Active internal state shards.
ActiveShards() []RegistryShard

// FlowKey returns the immutable identity of the flow this connection is pinned to.
FlowKey() types.FlowKey
}

// RegistryShard defines the interface for a single slice (shard) of the `FlowRegistry`'s state.
Expand Down
130 changes: 72 additions & 58 deletions pkg/epp/flowcontrol/controller/controller.go
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff for EnqueueAndWait looks massive, but this is mostly due to indentation changes. I wrapped the existing distribution loop inside the WithConnection closure to extend the lease scope. Would recommend reviewing this with "Hide Whitespace" enabled.

Original file line number Diff line number Diff line change
Expand Up @@ -227,54 +227,74 @@ func (fc *FlowController) EnqueueAndWait(
reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req)
defer cancel()

// 2. Enter the distribution loop to find a home for the request.
// This loop is responsible for retrying on ErrShardDraining.
for {
select { // Non-blocking check on controller lifecycle.
case <-fc.parentCtx.Done():
return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning)
default:
}
var finalOutcome types.QueueOutcome
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we initialize this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It initializes by default (it's an iota) to the semantically correct value, so this should be safe.

// QueueOutcome represents the high-level final state of a request's lifecycle within the `controller.FlowController`.
//
// It is returned by `FlowController.EnqueueAndWait()` along with a corresponding error. This enum is designed to be a
// low-cardinality label ideal for metrics, while the error provides fine-grained details for non-dispatched outcomes.
type QueueOutcome int

const (
	// QueueOutcomeNotYetFinalized indicates the request has not yet been finalized by the `controller.FlowController`.
	// This is an internal default value and should never be returned by `FlowController.EnqueueAndWait()`.
	QueueOutcomeNotYetFinalized QueueOutcome = iota
	...
)
...


// 2. Acquire a lease for the Flow.
// We hold this lease for the entire duration of the request (Distribution + Queueing).
err := fc.registry.WithConnection(flowKey, func(conn contracts.ActiveFlowConnection) error {
// 3. Enter the distribution loop to find a home for the request.
// This loop is responsible for retrying on ErrShardDraining.
// We can safely retry within this loop using the same 'conn' object because conn.ActiveShards() provides a live
// view of the topology.
for {
select { // Non-blocking check on controller lifecycle.
case <-fc.parentCtx.Done():
finalOutcome = types.QueueOutcomeRejectedOther
return fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning)
default:
}

// Attempt to distribute the request once.
item, err := fc.tryDistribution(reqCtx, req, enqueueTime)
if err != nil {
// Distribution failed terminally (e.g., no shards, context cancelled during blocking submit).
// The item has already been finalized by tryDistribution.
finalState := item.FinalState()
return finalState.Outcome, finalState.Err
}
// Attempt to distribute the request once, passing the active connection.
item, err := fc.tryDistribution(reqCtx, req, enqueueTime, conn)
if err != nil {
// Distribution failed terminally (e.g., no shards, context cancelled during blocking submit).
// The item has already been finalized by tryDistribution.
finalState := item.FinalState()
finalOutcome = finalState.Outcome
return finalState.Err
}

// Distribution was successful; ownership of the item has been transferred to a processor.
// Now, we block here in awaitFinalization until the request is finalized by either the processor (e.g., dispatched,
// rejected) or the controller itself (e.g., caller's context cancelled/TTL expired).
outcome, err := fc.awaitFinalization(reqCtx, item)
if errors.Is(err, contracts.ErrShardDraining) {
// This is a benign race condition where the chosen shard started draining after acceptance.
fc.logger.V(logutil.DEBUG).Info("Selected shard is Draining, retrying request distribution",
"flowKey", req.FlowKey(), "requestID", req.ID())
// Introduce a small, randomized delay (1-10ms) to prevent tight spinning loops and thundering herds during retry
// scenarios (e.g., shard draining)
// TODO: Replace this with a more sophisticated backoff strategy when our data parallelism story matures.
// For now, this is more than sufficient.
jitterMs := k8srand.Intn(10) + 1
fc.clock.Sleep(time.Duration(jitterMs) * time.Millisecond)
continue
// Distribution was successful; ownership of the item has been transferred to a processor.
// Now, we block here in awaitFinalization until the request is finalized by either the processor (e.g., dispatched,
// rejected) or the controller itself (e.g., caller's context cancelled/TTL expired).
outcome, err := fc.awaitFinalization(reqCtx, item)
if errors.Is(err, contracts.ErrShardDraining) {
// This is a benign race condition where the chosen shard started draining after acceptance.
fc.logger.V(logutil.DEBUG).Info("Selected shard is Draining, retrying request distribution",
"flowKey", req.FlowKey(), "requestID", req.ID())
// Introduce a small, randomized delay (1-10ms) to prevent tight spinning loops and thundering herds during retry
// scenarios (e.g., shard draining)
jitterMs := k8srand.Intn(10) + 1
fc.clock.Sleep(time.Duration(jitterMs) * time.Millisecond)
continue
}

// The outcome is terminal (Dispatched, Evicted, or a non-retriable rejection).
finalOutcome = outcome
return err
}
})

// The outcome is terminal (Dispatched, Evicted, or a non-retriable rejection).
return outcome, err
// If WithConnection returned an error (e.g. connection failure, context cancelled before lease), we must ensure we
// return a valid rejection outcome.
// In the success case (where the closure ran), finalOutcome is set inside the closure.
if err != nil && finalOutcome == types.QueueOutcomeNotYetFinalized {
return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, err)
}

return finalOutcome, err
}

var errNoShards = errors.New("no viable active shards available")

// tryDistribution handles a single attempt to select a shard and submit a request.
// It uses the provided `conn` to identify candidate shards.
// If this function returns an error, it guarantees that the provided `item` has been finalized.
func (fc *FlowController) tryDistribution(
reqCtx context.Context,
req types.FlowControlRequest,
enqueueTime time.Time,
conn contracts.ActiveFlowConnection,
) (*internal.FlowItem, error) {
// Calculate effective TTL for item initialization (reqCtx is the enforcement mechanism).
effectiveTTL := fc.config.DefaultRequestTTL
Expand All @@ -287,7 +307,7 @@ func (fc *FlowController) tryDistribution(
// We must create a fresh FlowItem on each attempt as finalization is per-lifecycle.
item := internal.NewItem(req, effectiveTTL, enqueueTime)

candidates, err := fc.selectDistributionCandidates(item.OriginalRequest().FlowKey())
candidates, err := fc.selectDistributionCandidates(conn)
if err != nil {
outcome := types.QueueOutcomeRejectedOther
if errors.Is(err, errNoShards) {
Expand Down Expand Up @@ -367,35 +387,29 @@ type candidate struct {
byteSize uint64
}

// selectDistributionCandidates identifies all Active shards for the item's flow and ranks them by the current byte size
// selectDistributionCandidates identifies all Active shards for the leased flow and ranks them by the current byte size
// of that flow's queue, from least to most loaded.
func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]candidate, error) {
var candidates []candidate

// Acquire a connection to the registry for the flow key. This ensures a consistent view of the ActiveShards for the
// duration of the shard selection process, preventing races with concurrent shard topology changes.
err := fc.registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error {
shards := conn.ActiveShards()
candidates = make([]candidate, 0, len(shards))
for _, shard := range shards {
worker := fc.getOrStartWorker(shard)
mq, err := shard.ManagedQueue(key)
if err != nil {
fc.logger.Error(err,
"Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard.",
"flowKey", key, "shardID", shard.ID())
continue
}
candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.FlowQueueAccessor().ByteSize()})
func (fc *FlowController) selectDistributionCandidates(conn contracts.ActiveFlowConnection) ([]candidate, error) {
shards := conn.ActiveShards()
if len(shards) == 0 {
return nil, fmt.Errorf("%w for flow %s", errNoShards, conn.FlowKey())
}

candidates := make([]candidate, 0, len(shards))
for _, shard := range shards {
worker := fc.getOrStartWorker(shard)
mq, err := shard.ManagedQueue(conn.FlowKey())
if err != nil {
fc.logger.Error(err,
"Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard.",
"flowKey", conn.FlowKey(), "shardID", shard.ID())
continue
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to acquire lease for flow %s: %w", key, err)
candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.FlowQueueAccessor().ByteSize()})
}

if len(candidates) == 0 {
return nil, fmt.Errorf("%w for flow %s", errNoShards, key)
return nil, fmt.Errorf("%w for flow %s", errNoShards, conn.FlowKey())
}

slices.SortFunc(candidates, func(a, b candidate) int {
Expand Down
Loading