Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"

log "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -39,6 +40,8 @@ const (

WorkerIDHeader = "x-worker-instance-id"
PrefillWorkerIDHeader = "x-prefill-instance-id"
DpRankHeader = "x-dynamo-dp-rank"
PrefillDpRankHeader = "x-dynamo-prefill-dp-rank"
RoutingModeHeader = "x-dynamo-routing-mode"

// decodeStateKey is the key used to store routing state in PluginState
Expand All @@ -55,6 +58,7 @@ var _ rc.ResponseComplete = &DynDecodeScorer{}
// DecodeRoutingState holds routing information passed from Score() to PreRequest().
type DecodeRoutingState struct {
WorkerID string
DpRank uint32
PrefillWorkerID string
TokenData []int64
}
Expand All @@ -66,6 +70,7 @@ func (s *DecodeRoutingState) Clone() plugins.StateData {
}
clone := &DecodeRoutingState{
WorkerID: s.WorkerID,
DpRank: s.DpRank,
PrefillWorkerID: s.PrefillWorkerID,
}
if s.TokenData != nil {
Expand Down Expand Up @@ -157,8 +162,10 @@ func (s *DynDecodeScorer) Score(ctx context.Context, cycleState *schedtypes.Cycl
}

workerIDStr := fmt.Sprintf("%d", result.WorkerID)
dpRankStr := strconv.FormatUint(uint64(result.DpRank), 10)
logger.V(logutil.DEFAULT).Info("DynDecodeScorer: decode worker selected",
"decodeWorkerID", workerIDStr,
"decodeDpRank", result.DpRank,
"isDisaggregated", isDisaggregated,
"tokenCount", len(result.TokenData))

Expand All @@ -167,6 +174,7 @@ func (s *DynDecodeScorer) Score(ctx context.Context, cycleState *schedtypes.Cycl
req.Headers = map[string]string{}
}
req.Headers[WorkerIDHeader] = workerIDStr
req.Headers[DpRankHeader] = dpRankStr

if isDisaggregated {
req.Headers[RoutingModeHeader] = "disaggregated"
Expand All @@ -188,6 +196,7 @@ func (s *DynDecodeScorer) Score(ctx context.Context, cycleState *schedtypes.Cycl
if req.RequestId != "" {
routingState := &DecodeRoutingState{
WorkerID: workerIDStr,
DpRank: result.DpRank,
TokenData: result.TokenData,
}
s.pluginState.Write(req.RequestId, plugins.StateKey(decodeStateKey), routingState)
Expand Down Expand Up @@ -226,7 +235,7 @@ func (s *DynDecodeScorer) PreRequest(ctx context.Context, request *schedtypes.LL
return
}

if addErr := dynscorer.CallAddRequest(request.RequestId, state.TokenData, workerIDUint, 0); addErr != nil {
if addErr := dynscorer.CallAddRequest(request.RequestId, state.TokenData, workerIDUint, state.DpRank); addErr != nil {
logger.V(logutil.DEFAULT).Error(addErr, "DynDecodeScorer PreRequest: failed to add request",
"requestID", request.RequestId)
return
Expand All @@ -235,6 +244,7 @@ func (s *DynDecodeScorer) PreRequest(ctx context.Context, request *schedtypes.LL
logger.V(logutil.VERBOSE).Info("DynDecodeScorer PreRequest: registered request",
"requestID", request.RequestId,
"workerID", state.WorkerID,
"dpRank", state.DpRank,
"tokenCount", len(state.TokenData))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,21 @@ func (s *DynPrefillScorer) Score(ctx context.Context, cycleState *schedtypes.Cyc
}

prefillWorkerID := strconv.FormatUint(result.WorkerID, 10)
prefillDpRank := strconv.FormatUint(uint64(result.DpRank), 10)
logger.V(logutil.DEFAULT).Info("DynPrefillScorer: prefill worker selected",
"prefillWorkerID", prefillWorkerID,
"prefillDpRank", result.DpRank,
"tokenCount", len(result.TokenData))

// Set the prefill worker ID header directly on the request.
// Set the prefill worker ID and DP rank headers directly on the request.
// The request object is shared across all profile runs in the scheduling
// cycle, so the decode scorer (which runs in the next profile) will see it.
// This is more reliable than CycleState which may be scoped per profile.
if req.Headers == nil {
req.Headers = map[string]string{}
}
req.Headers[PrefillWorkerIDHeader] = prefillWorkerID
req.Headers[PrefillDpRankHeader] = prefillDpRank
Comment thread
atchernych marked this conversation as resolved.

// Score: 1.0 for all pods. The label-filter has already restricted to prefill workers,
// and the FFI router's internal selection is authoritative.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ typedef struct {
bool is_disaggregated;
uint64_t prefill_worker_id;
uint64_t decode_worker_id;
uint32_t prefill_dp_rank;
uint32_t decode_dp_rank;
uint32_t *token_ids;
size_t token_count;
} CRoutingResult;
Expand Down Expand Up @@ -411,6 +413,7 @@ func CallFreeRequest(requestID string) error {
// RoutingResult holds the result of a prefill or decode routing call.
type RoutingResult struct {
WorkerID uint64
DpRank uint32
TokenData []int64
}

Expand Down Expand Up @@ -455,9 +458,10 @@ func CallRoutePrefillRequest(requestJSON string, podsJSON string) (*RoutingResul
}

workerID := uint64(result.prefill_worker_id)
dpRank := uint32(result.prefill_dp_rank)
C.free_routing_result(&result)

return &RoutingResult{WorkerID: workerID, TokenData: tokens64}, nil
return &RoutingResult{WorkerID: workerID, DpRank: dpRank, TokenData: tokens64}, nil
}

// CallRouteDecodeRequest routes a request to the best decode worker.
Expand Down Expand Up @@ -501,7 +505,8 @@ func CallRouteDecodeRequest(requestJSON string, podsJSON string, isDisaggregated
}

workerID := uint64(result.decode_worker_id)
dpRank := uint32(result.decode_dp_rank)
C.free_routing_result(&result)

return &RoutingResult{WorkerID: workerID, TokenData: tokens64}, nil
return &RoutingResult{WorkerID: workerID, DpRank: dpRank, TokenData: tokens64}, nil
}
13 changes: 9 additions & 4 deletions docs/kubernetes/inference-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ title: Inference Gateway (GAIE)

Integrate Dynamo with the Gateway API Inference Extension for intelligent KV-aware request routing at the gateway layer.

EPP's default kv-routing approach is not token-aware because the prompt is not tokenized. But the Dynamo plugin uses a token-aware KV algorithm. It employs the dynamo router which implements kv routing by running your model's tokenizer inline. The EPP plugin configuration lives in [`helm/dynamo-gaie/epp-config-dynamo.yaml`](https://github.com/ai-dynamo/dynamo/blob/main/deploy/inference-gateway/standalone/helm/dynamo-gaie/epp-config-dynamo.yaml), following the checked-in GAIE/EPP configuration layout used by this repository.
## Features

Dynamo Integration with the Inference Gateway supports Aggregated and Disaggregated Serving. A request only exercises disaggregated routing when the EPP config defines a `prefill` profile and prefill workers are available. The standalone [`epp-config-dynamo.yaml`](https://github.com/ai-dynamo/dynamo/blob/main/deploy/inference-gateway/standalone/helm/dynamo-gaie/epp-config-dynamo.yaml) currently only defines a `decode` profile, while the recipe examples use separate aggregated and disaggregated configs under `recipes/llama-3-70b/vllm/agg/gaie/` and `recipes/llama-3-70b/vllm/disagg-single-node/gaie/`. Unless `DYN_ENFORCE_DISAGG=true`, deployments without a `prefill` profile or prefill workers fall back to aggregated serving.
If you want to use LoRA deploy Dynamo without the Inference Gateway.
- EPP's default kv-routing approach is not token-aware because the prompt is not tokenized. But the Dynamo plugin uses a token-aware KV algorithm. It employs the dynamo router which implements kv routing by running your model's tokenizer inline. The EPP plugin configuration lives in [`helm/dynamo-gaie/epp-config-dynamo.yaml`](https://github.com/ai-dynamo/dynamo/blob/main/deploy/inference-gateway/standalone/helm/dynamo-gaie/epp-config-dynamo.yaml), following the checked-in GAIE/EPP configuration layout used by this repository.

Currently, these setups are only supported with the kGateway based Inference Gateway.
- Dynamo Integration with the Inference Gateway supports Aggregated and Disaggregated Serving. A request only exercises disaggregated routing when the EPP config defines a `prefill` profile and prefill workers are available. The standalone [`epp-config-dynamo.yaml`](https://github.com/ai-dynamo/dynamo/blob/main/deploy/inference-gateway/standalone/helm/dynamo-gaie/epp-config-dynamo.yaml) currently only defines a `decode` profile, while the recipe examples use separate aggregated and disaggregated configs under `recipes/llama-3-70b/vllm/agg/gaie/` and `recipes/llama-3-70b/vllm/disagg-single-node/gaie/`. Unless `DYN_ENFORCE_DISAGG=true`, deployments without a `prefill` profile or prefill workers fall back to aggregated serving.

- GAIE integration supports Data Parallelism.

- If you want to use LoRA deploy Dynamo without the Inference Gateway.

- Currently, these setups are only tested with the kGateways Inference Gateway.
Comment thread
atchernych marked this conversation as resolved.
Outdated

## Prerequisites

Expand Down
18 changes: 13 additions & 5 deletions lib/bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ pub struct CRoutingResult {
pub prefill_worker_id: u64,
/// Decode worker ID
pub decode_worker_id: u64,
/// Data parallel rank selected for the prefill worker
pub prefill_dp_rank: u32,
/// Data parallel rank selected for the decode worker
pub decode_dp_rank: u32,
/// Token IDs (needed for add_request callback)
pub token_ids: *mut u32,
/// Number of tokens in the request
Expand All @@ -416,6 +420,8 @@ impl Default for CRoutingResult {
is_disaggregated: false,
prefill_worker_id: 0,
decode_worker_id: 0,
prefill_dp_rank: 0,
decode_dp_rank: 0,
token_ids: ptr::null_mut(),
token_count: 0,
}
Expand Down Expand Up @@ -449,7 +455,7 @@ impl RouterHandles {
lora_name: Option<String>,
priority_jump: f64,
allowed_worker_ids: Option<HashSet<WorkerId>>,
) -> Result<u64, QueryRouterResult> {
) -> Result<(u64, u32), QueryRouterResult> {
if let Some(ref ids) = allowed_worker_ids {
self.prefill_router.register_workers(ids);
}
Expand All @@ -464,7 +470,6 @@ impl RouterHandles {
allowed_worker_ids,
)
.await
.map(|(worker_id, _dp_rank)| worker_id)
.map_err(|e| {
tracing::error!(error = ?e, "Prefill query failed");
QueryRouterResult::ErrQueryFailed
Expand Down Expand Up @@ -1203,25 +1208,27 @@ pub unsafe extern "C" fn route_prefill_request(
let allowed_worker_ids = unsafe { parse_pods_filter(pods_json) };

let result = handles.runtime.secondary().block_on(async {
let prefill_worker_id = handles
let (prefill_worker_id, prefill_dp_rank) = handles
.query_prefill_worker(&tokens, None, false, None, 0.0, allowed_worker_ids)
.await?;

tracing::info!(
prefill_worker_id = prefill_worker_id,
prefill_dp_rank = prefill_dp_rank,
token_count = tokens.len(),
"Routed prefill request"
);

Ok(prefill_worker_id)
Ok((prefill_worker_id, prefill_dp_rank))
});

match result {
Ok(prefill_worker_id) => {
Ok((prefill_worker_id, prefill_dp_rank)) => {
let out = unsafe { &mut *out_result };
*out = CRoutingResult::default();
out.is_disaggregated = true;
out.prefill_worker_id = prefill_worker_id;
out.prefill_dp_rank = prefill_dp_rank;
write_tokens_to_result(&tokens, out);
QueryRouterResult::Ok
}
Expand Down Expand Up @@ -1290,6 +1297,7 @@ pub unsafe extern "C" fn route_decode_request(
*out = CRoutingResult::default();
out.is_disaggregated = is_disaggregated;
out.decode_worker_id = decode_worker.worker_id;
out.decode_dp_rank = decode_worker.dp_rank;
write_tokens_to_result(&tokens, out);
QueryRouterResult::Ok
}
Expand Down
Loading