From ee32a9669e10988d90939723a91459961a9e84fd Mon Sep 17 00:00:00 2001 From: PeaBrane Date: Tue, 7 Apr 2026 17:44:26 -0700 Subject: [PATCH 1/4] split remote indexer out of standalone limbo Signed-off-by: PeaBrane --- .../configuration/groups/kv_router_args.py | 21 +- .../src/dynamo/frontend/frontend_args.py | 15 + components/src/dynamo/router/args.py | 16 +- docs/components/router/router-guide.md | 61 +- docs/components/router/standalone-indexer.md | 127 +---- lib/bindings/python/Cargo.toml | 1 - lib/bindings/python/rust/llm/entrypoint.rs | 8 +- lib/bindings/python/rust/llm/kv.rs | 51 +- .../python/tests/replay/replay_utils.py | 2 +- lib/kv-router/Cargo.toml | 1 - lib/kv-router/src/indexer/kv_indexer.rs | 6 +- lib/kv-router/src/indexer/sharded.rs | 6 +- lib/kv-router/src/indexer/types.rs | 59 +- lib/kv-router/src/scheduling/config.rs | 27 +- lib/kv-router/src/standalone_indexer/mod.rs | 84 --- .../src/standalone_indexer/registry.rs | 187 +------ .../standalone_indexer/runtime/discovery.rs | 121 ---- .../runtime/query_engine.rs | 50 -- .../standalone_indexer/runtime/subscriber.rs | 89 --- lib/llm/src/kv_router.rs | 38 +- .../src/kv_router/{ => indexer}/jetstream.rs | 4 +- .../kv_router/{indexer.rs => indexer/mod.rs} | 134 +++-- lib/llm/src/kv_router/indexer/remote.rs | 520 ++++++++++++++++++ .../src/kv_router/{ => indexer}/subscriber.rs | 2 +- .../kv_router/{ => indexer}/worker_query.rs | 2 +- lib/llm/src/kv_router/publisher/mod.rs | 2 +- tests/router/common.py | 230 ++++++++ tests/router/router_process.py | 8 + tests/router/test_router_e2e_with_mockers.py | 49 +- 29 files changed, 1097 insertions(+), 824 deletions(-) delete mode 100644 lib/kv-router/src/standalone_indexer/runtime/discovery.rs delete mode 100644 lib/kv-router/src/standalone_indexer/runtime/query_engine.rs delete mode 100644 lib/kv-router/src/standalone_indexer/runtime/subscriber.rs rename lib/llm/src/kv_router/{ => indexer}/jetstream.rs (99%) rename lib/llm/src/kv_router/{indexer.rs => indexer/mod.rs} (66%) create mode 100644 lib/llm/src/kv_router/indexer/remote.rs rename lib/llm/src/kv_router/{ => indexer}/subscriber.rs (98%) rename lib/llm/src/kv_router/{ => indexer}/worker_query.rs (99%) diff --git a/components/src/dynamo/common/configuration/groups/kv_router_args.py b/components/src/dynamo/common/configuration/groups/kv_router_args.py index 2151d9c0635c..63a7bee8c479 100644 --- a/components/src/dynamo/common/configuration/groups/kv_router_args.py +++ b/components/src/dynamo/common/configuration/groups/kv_router_args.py @@ -36,7 +36,8 @@ "router_queue_threshold", "router_event_threads", "router_queue_policy", - "remote_indexer_component", + "use_remote_indexer", + "serve_indexer", ) @@ -61,7 +62,8 @@ class KvRouterConfigBase(ConfigBase): router_queue_threshold: Optional[float] router_event_threads: int router_queue_policy: str - remote_indexer_component: Optional[str] + use_remote_indexer: bool = False + serve_indexer: bool = False def kv_router_kwargs(self) -> dict: """Return a dict suitable for ``KvRouterConfig(**kwargs)``.""" @@ -286,15 +288,14 @@ def add_arguments(self, parser) -> None: arg_type=str, choices=["fcfs", "wspt"], ) - add_argument( + add_negatable_bool_argument( g, - flag_name="--remote-indexer-component", - env_var="DYN_REMOTE_INDEXER_COMPONENT", - default=None, + flag_name="--use-remote-indexer", + env_var="DYN_USE_REMOTE_INDEXER", + default=False, help=( - "[EXPERIMENTAL] KV Router: Component name of a standalone KV indexer to use for overlap scoring. " - "When set, the router queries the standalone indexer via the request plane instead " - "of maintaining a local radix tree (e.g. 'kv-indexer')." + "[EXPERIMENTAL] KV Router: Query a remote KV indexer served from the worker " + "component via the request plane instead of maintaining a local radix tree." ), - arg_type=str, + dest="use_remote_indexer", ) diff --git a/components/src/dynamo/frontend/frontend_args.py b/components/src/dynamo/frontend/frontend_args.py index c1d5bf0f9696..d0ecc70ed3bb 100644 --- a/components/src/dynamo/frontend/frontend_args.py +++ b/components/src/dynamo/frontend/frontend_args.py @@ -130,6 +130,13 @@ def validate(self) -> None: "--router-prefill-load-model=aic requires " "--router-track-prefill-tokens" ) + if self.serve_indexer: + if self.router_mode != "kv": + raise ValueError("--serve-indexer requires --router-mode=kv") + if self.use_remote_indexer: + raise ValueError( + "--serve-indexer and --use-remote-indexer are mutually exclusive" + ) @register_encoder(FrontendConfig) @@ -193,6 +200,14 @@ def add_arguments(self, parser) -> None: help="HTTP port for the engine (u16).", arg_type=int, ) + add_negatable_bool_argument( + g, + flag_name="--serve-indexer", + env_var="DYN_SERVE_INDEXER", + default=False, + help="Serve this frontend's local KV indexers over the request plane.", + dest="serve_indexer", + ) add_argument( g, flag_name="--tls-cert-path", diff --git a/components/src/dynamo/router/args.py b/components/src/dynamo/router/args.py index 5c6c4e48b05e..88f6f4c70f7d 100644 --- a/components/src/dynamo/router/args.py +++ b/components/src/dynamo/router/args.py @@ -15,7 +15,7 @@ KvRouterArgGroup, KvRouterConfigBase, ) -from dynamo.common.configuration.utils import add_argument +from dynamo.common.configuration.utils import add_argument, add_negatable_bool_argument from dynamo.llm import AicPerfConfig, KvRouterConfig @@ -25,6 +25,7 @@ class DynamoRouterConfig(KvRouterConfigBase, AicPerfConfigBase): namespace: str endpoint: str router_block_size: int + serve_indexer: bool = False def validate(self) -> None: """Validate config invariants (aligned with Rust KvRouterConfig where applicable).""" @@ -40,6 +41,10 @@ def validate(self) -> None: "Expected format: namespace.component.endpoint" ) self.namespace = parts[0] + if self.serve_indexer and self.use_remote_indexer: + raise ValueError( + "--serve-indexer and --use-remote-indexer are mutually exclusive" + ) if self.router_prefill_load_model == "aic": missing = [ flag @@ -89,6 +94,15 @@ def add_arguments(self, parser) -> None: obsolete_flag="--block-size", ) + add_negatable_bool_argument( + g, + flag_name="--serve-indexer", + env_var="DYN_SERVE_INDEXER", + default=False, + help="Serve this router's local KV indexer over the request plane.", + dest="serve_indexer", + ) + # KV router options (shared with dynamo.frontend) KvRouterArgGroup().add_arguments(parser) AicPerfArgGroup().add_arguments(parser) diff --git a/docs/components/router/router-guide.md b/docs/components/router/router-guide.md index 34fe2c3cf657..a697a8bd081f 100644 --- a/docs/components/router/router-guide.md +++ b/docs/components/router/router-guide.md @@ -42,7 +42,7 @@ When using KV routing, the router needs to know what each worker has cached. The |------------|---------------|-------------| | **NATS Core (local indexer)** | Default (no extra flags) | Workers maintain a local indexer; router queries workers on startup and receives events via NATS Core | | **JetStream (durable)** | `--router-durable-kv-events` | Events persisted in NATS JetStream; supports snapshots and durable consumers. *Deprecated.* | -| **ZMQ** | `--event-plane zmq` | Workers publish via ZMQ PUB sockets; standalone indexer aggregates events | +| **ZMQ** | `--event-plane zmq` | Workers publish via ZMQ PUB sockets; the standalone `dynamo.indexer` service aggregates events | | **Approximate (no events)** | `--no-router-kv-events` | No events consumed; router predicts cache state from its own routing decisions with TTL-based expiration | ### Aggregated vs. Disaggregated Topology @@ -93,6 +93,8 @@ Backend workers register themselves using the `register_model` API, after which | `--router-prefill-load-model ` | `none` | Prompt-side load model. `aic` decays only the oldest active prefill using an AIC-predicted duration | | `--router-queue-threshold ` | `4.0` | Queue threshold fraction; enables priority scheduling via `priority` | | `--router-queue-policy ` | `fcfs` | Scheduling policy for the queue: `fcfs` (tail TTFT), `wspt` (avg TTFT), or `lcfs` (comparison-only reverse ordering) | +| `--serve-indexer` | `false` | Serve the Dynamo-native remote indexer from this frontend/router on the worker component | +| `--use-remote-indexer` | `false` | Query the worker component's served remote indexer instead of maintaining a local overlap indexer | For all available options: `python -m dynamo.frontend --help` @@ -444,6 +446,63 @@ graph TD For improved fault tolerance, you can launch multiple frontend + router replicas. If multiple `dynamo.frontend` processes share the same host or network namespace, give each instance a different HTTP port. In Kubernetes or on separate hosts, replicas can usually reuse the same container port. Alternatively, you can deploy the router separately as the standalone `python -m dynamo.router` service; see the [Standalone Router README](https://github.com/ai-dynamo/dynamo/blob/main/components/src/dynamo/router/README.md). +### Dynamo-Native Remote Indexer + +For Dynamo-native deployments, the remote indexer is served by `dynamo.frontend` or `dynamo.router`, not by `dynamo.indexer`. + +- Use `--serve-indexer` on router/frontend replicas that should expose `kv_indexer_query` from the worker component. +- Use `--use-remote-indexer` on consumer routers/frontends that should query that served endpoint instead of maintaining a local overlap indexer. +- `dynamo.indexer` remains the standalone HTTP + ZMQ microservice for non-Dynamo / direct-ZMQ deployments. + +Frontend example: + +```bash +# Serving anchors +python -m dynamo.frontend --router-mode kv --serve-indexer + +# Consumer frontend +python -m dynamo.frontend --router-mode kv --use-remote-indexer +``` + +The served service is request-plane only. Each serving router/frontend keeps its normal local KV event ingestion, gap detection, and worker-query recovery path; remote consumers only issue hash-based overlap queries. + +Approximate mode (`--no-router-kv-events`) is singleton-only for remote serving: only one `--serve-indexer` replica may exist for a given worker component. Event-driven mode allows multiple serving replicas behind the same worker component. + +```mermaid +graph TD + subgraph "Workers" + W1["Worker 1"] + W2["Worker 2"] + end + + subgraph "Event Plane" + EP["KV Events"] + end + + subgraph "Serving Routers / Frontends" + S1["Router / Frontend A
--serve-indexer"] + S2["Router / Frontend B
--serve-indexer"] + I1["Local Indexer"] + I2["Local Indexer"] + end + + subgraph "Request Plane" + RP["backend.kv_indexer_query"] + end + + C["Consumer Router / Frontend
--use-remote-indexer"] + + W1 --> EP + W2 --> EP + EP --> S1 + EP --> S2 + S1 --> I1 + S2 --> I2 + C --> RP + RP --> S1 + RP --> S2 +``` + ### Router State Management The KV Router tracks two types of state (see [Router Design](../../design-docs/router-design.md) for details): diff --git a/docs/components/router/standalone-indexer.md b/docs/components/router/standalone-indexer.md index 34c74a0abe71..27629e26b0be 100644 --- a/docs/components/router/standalone-indexer.md +++ b/docs/components/router/standalone-indexer.md @@ -7,13 +7,16 @@ subtitle: Run the KV cache indexer as an independent HTTP service for querying b ## Overview -The standalone KV indexer (`python -m dynamo.indexer`) is a lightweight service that maintains a radix tree of cached blocks and exposes HTTP endpoints for querying and managing workers. It supports two operational modes: +The standalone KV indexer (`python -m dynamo.indexer`) is a lightweight service that maintains a radix tree of cached blocks and exposes HTTP endpoints for querying and managing workers. -- **Standalone mode** (default): subscribes to ZMQ KV event streams directly from workers. No Dynamo runtime discovery, registration, or event-plane integration required. -- **Dynamo runtime mode** (`--dynamo-runtime`): integrates with the Dynamo runtime for automatic worker discovery via MDC, KV event ingestion via the event plane (NATS or ZMQ), and overlap queries over the request plane for remote frontends. +- It subscribes to ZMQ KV event streams directly from workers. +- It exposes an HTTP API for registration, inspection, and overlap queries. +- It preserves P2P recovery and gap detection/replay for the standalone ZMQ path. This is distinct from the [Standalone Router](../../../components/src/dynamo/router/README.md), which is a full routing service. The standalone indexer provides only the indexing and query layer without routing logic. +For Dynamo-native remote indexing, use `--serve-indexer` on `dynamo.frontend` or `dynamo.router` and `--use-remote-indexer` on consumers instead. That request-plane service reuses the router's existing event ingestion and recovery machinery; it is not implemented by `dynamo.indexer`. + The HTTP API follows the [Mooncake KV Indexer RFC](https://github.com/kvcache-ai/Mooncake/issues/1403) conventions. `DYN_ROUTER_MIN_INITIAL_WORKERS` is also honored here. When set to a positive integer, the @@ -30,9 +33,7 @@ The indexer maintains one radix tree per `(model_name, tenant_id)` pair. Workers ## Compatibility -In standalone mode, the indexer works with any engine that publishes KV cache events over ZMQ in the expected msgpack format. This includes bare vLLM and SGLang engines, which emit ZMQ KV events natively — no Dynamo-specific wrapper is required. - -In Dynamo runtime mode, the indexer discovers workers automatically via MDC and receives KV events through the event plane. It also registers a query endpoint on the request plane, allowing frontends to query overlap scores remotely without needing direct HTTP access. +The standalone indexer works with any engine that publishes KV cache events over ZMQ in the expected msgpack format. This includes bare vLLM and SGLang engines, which emit ZMQ KV events natively — no Dynamo-specific wrapper is required. ## Use Cases @@ -40,7 +41,7 @@ In Dynamo runtime mode, the indexer discovers workers automatically via MDC and - **State verification**: Confirm that the indexer's view of KV cache state matches the router's internal state (used in integration tests). - **Custom routing**: Build external routing logic that queries the indexer for overlap scores and makes its own worker selection decisions. - **Monitoring**: Observe KV cache distribution across workers without running a full router. -- **Remote indexing**: In Dynamo runtime mode, frontends can offload KV cache indexing to a dedicated service and query it over the request plane. +- **Standalone microservice**: Run an indexer independently of the router/frontend when you want direct HTTP inspection and ZMQ-based ingestion. ## P2P Recovery @@ -91,7 +92,6 @@ The service is exposed through the Python bindings package and launched with `py |---------|-------------| | `kv-indexer` | Core standalone indexer service path (`python -m dynamo.indexer`: HTTP API, ZMQ listeners, P2P recovery) | | `kv-indexer-metrics` | Optional `/metrics` endpoint | -| `kv-indexer-runtime` | Dynamo runtime integration (`--dynamo-runtime`, discovery, event plane, request plane) | ### Standalone build @@ -109,30 +109,12 @@ cd lib/bindings/python && VIRTUAL_ENV=../../.venv ../../.venv/bin/maturin develo This keeps the default `kv-indexer` build lean while still allowing Prometheus metrics when needed. -### Runtime-enabled build - -```bash -cd lib/bindings/python && VIRTUAL_ENV=../../.venv ../../.venv/bin/maturin develop --uv --features kv-indexer,kv-indexer-runtime -``` - -This enables the `--dynamo-runtime` CLI flag for MDC discovery, event-plane subscription, and request-plane queries. It also includes the metrics endpoint. - ## CLI -### Standalone mode (default) - ```bash python -m dynamo.indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-model --tenant-id default --workers "1=tcp://host:5557,2:1=tcp://host:5558"] [--peers "http://peer1:8090,http://peer2:8091"] ``` -### Dynamo runtime mode - -```bash -python -m dynamo.indexer --dynamo-runtime --namespace default --component-name kv-indexer --worker-component backend --port 8090 [--threads 4] -``` - -In runtime mode, workers are discovered automatically via MDC. The `--workers` flag can still be used to register additional static workers alongside discovered ones. - | Flag | Default | Description | |------|---------|-------------| | `--block-size` | (none) | KV cache block size for initial `--workers` (required when `--workers` is set) | @@ -142,10 +124,6 @@ In runtime mode, workers are discovered automatically via MDC. The `--workers` f | `--model-name` | `default` | Model name for initial `--workers` | | `--tenant-id` | `default` | Tenant ID for initial `--workers` | | `--peers` | (none) | Comma-separated peer indexer URLs for P2P recovery on startup | -| `--dynamo-runtime` | `false` | Enable Dynamo runtime integration (requires `kv-indexer-runtime`) | -| `--namespace` | `default` | Dynamo namespace to register the indexer component under | -| `--component-name` | `kv-indexer` | Component name for this indexer in the Dynamo runtime | -| `--worker-component` | `backend` | Component name that workers register under for event-plane subscription | ### Shared Startup Gate @@ -165,7 +143,7 @@ curl http://localhost:8090/health ### `GET /metrics` — Prometheus metrics -Returns metrics in Prometheus text exposition format. Available when the Python bindings are built with the `kv-indexer-metrics` or `kv-indexer-runtime` feature. +Returns metrics in Prometheus text exposition format. Available when the Python bindings are built with the `kv-indexer-metrics` feature. ```bash curl http://localhost:8090/metrics @@ -400,38 +378,9 @@ If no `replay_endpoint` is configured, gaps are logged as warnings but not recov The sequence counter (`last_seq`) persists across unregister/register cycles, so re-registering a worker after a gap will trigger replay on the first batch received by the new listener. -## Dynamo Runtime Mode - -When started with `--dynamo-runtime`, the indexer integrates with the Dynamo distributed runtime: - -### Worker Discovery - -The indexer watches MDC (Model Discovery Catalog) for worker additions and removals. When a worker registers with MDC, the indexer automatically creates an indexer for its model and block size. Workers discovered via MDC are tracked separately from those registered via `--workers` or the `/register` HTTP API; a worker cannot be registered through both paths simultaneously. - -### Event Plane Subscription - -Instead of connecting directly to ZMQ PUB sockets on each worker, the indexer subscribes to KV events through the Dynamo event plane. The transport (NATS or ZMQ) is determined by the `DYNAMO_EVENT_TRANSPORT` environment variable. Events are routed to the appropriate indexer based on the worker ID. - -### Request Plane Query Endpoint - -The indexer registers a query endpoint on the Dynamo request plane, allowing frontends to send `IndexerQueryRequest` messages containing a model name, namespace, and block hashes. The indexer looks up the appropriate radix tree and returns overlap scores. This enables frontends to use a remote indexer for KV-aware routing without direct HTTP access. - -### Example - -```bash -# Start the indexer with runtime integration -python -m dynamo.indexer --dynamo-runtime \ - --namespace my-namespace \ - --component-name kv-indexer \ - --worker-component backend \ - --port 8090 --threads 4 -``` - -The HTTP API remains fully available in runtime mode. Static workers can be added via `--workers` alongside discovered workers. - ## Limitations -- **Standalone mode is ZMQ only**: In standalone mode, workers must publish KV events via ZMQ PUB sockets. Build with `kv-indexer-runtime` and use `--dynamo-runtime` to receive events via the event plane (NATS or ZMQ). +- **Standalone mode is ZMQ only**: Workers must publish KV events via ZMQ PUB sockets. - **No routing logic**: The indexer only maintains the radix tree and answers queries. It does not track active blocks, manage request lifecycle, or perform worker selection. ## Architecture @@ -471,62 +420,6 @@ graph TD style CLIENT fill:#fff3e0,stroke:#333,color:#333 ``` -### Dynamo Runtime Mode - -```mermaid -graph TD - subgraph Workers - W1[Worker 1] - W2[Worker 2] - end - - subgraph "Dynamo Runtime" - MDC[MDC Discovery] - EP[Event Plane
NATS / ZMQ] - RP[Request Plane] - end - - subgraph "Standalone Indexer" - DISC[Discovery Watcher] - SUB[Event Subscriber] - REG[Worker Registry] - IDX["Indexer Map
(model, tenant) → Radix Tree"] - QE[Query Endpoint] - HTTP[HTTP API
/query /dump /register /metrics] - end - - FRONTEND[Frontend / Router] - CLIENT[External Client] - - W1 -->|register| MDC - W2 -->|register| MDC - MDC -->|added/removed| DISC - DISC -->|add/remove workers| REG - W1 -->|KV events| EP - W2 -->|KV events| EP - EP -->|RouterEvent| SUB - SUB -->|apply events| IDX - FRONTEND -->|IndexerQueryRequest| RP - RP --> QE - QE -->|query| IDX - CLIENT -->|POST /query, GET /dump| HTTP - HTTP -->|query| IDX - - style W1 fill:#f3e5f5,stroke:#333,color:#333 - style W2 fill:#f3e5f5,stroke:#333,color:#333 - style MDC fill:#e3f2fd,stroke:#333,color:#333 - style EP fill:#e3f2fd,stroke:#333,color:#333 - style RP fill:#e3f2fd,stroke:#333,color:#333 - style IDX fill:#2e8b57,stroke:#333,color:#fff - style SUB fill:#2e8b57,stroke:#333,color:#fff - style DISC fill:#2e8b57,stroke:#333,color:#fff - style REG fill:#2e8b57,stroke:#333,color:#fff - style QE fill:#2e8b57,stroke:#333,color:#fff - style HTTP fill:#2e8b57,stroke:#333,color:#fff - style FRONTEND fill:#fff3e0,stroke:#333,color:#333 - style CLIENT fill:#fff3e0,stroke:#333,color:#333 -``` - ### P2P Recovery Flow ```mermaid diff --git a/lib/bindings/python/Cargo.toml b/lib/bindings/python/Cargo.toml index 5f8afd6c2ca3..c2d07ef8092e 100644 --- a/lib/bindings/python/Cargo.toml +++ b/lib/bindings/python/Cargo.toml @@ -25,7 +25,6 @@ crate-type = ["cdylib", "rlib"] default = [] media-ffmpeg = ["dynamo-llm/media-ffmpeg"] kv-indexer = ["dep:clap", "dep:tracing-subscriber"] -kv-indexer-runtime = ["kv-indexer", "dynamo-kv-router/indexer-runtime"] kv-indexer-metrics = ["kv-indexer", "dynamo-kv-router/metrics"] nvtx = ["dynamo-runtime/nvtx"] diff --git a/lib/bindings/python/rust/llm/entrypoint.rs b/lib/bindings/python/rust/llm/entrypoint.rs index b2e2855ea109..5c9050531919 100644 --- a/lib/bindings/python/rust/llm/entrypoint.rs +++ b/lib/bindings/python/rust/llm/entrypoint.rs @@ -126,7 +126,7 @@ impl AicPerfConfig { #[pymethods] impl KvRouterConfig { #[new] - #[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_track_prefill_tokens=true, router_prefill_load_model="none", router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(4.0), router_event_threads=4, router_queue_policy="fcfs", remote_indexer_component=None))] + #[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_track_prefill_tokens=true, router_prefill_load_model="none", router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(4.0), router_event_threads=4, router_queue_policy="fcfs", use_remote_indexer=false, serve_indexer=false))] #[allow(clippy::too_many_arguments)] fn new( overlap_score_weight: f64, @@ -147,7 +147,8 @@ impl KvRouterConfig { router_queue_threshold: Option, router_event_threads: u32, router_queue_policy: &str, - remote_indexer_component: Option, + use_remote_indexer: bool, + serve_indexer: bool, ) -> Self { KvRouterConfig { inner: RsKvRouterConfig { @@ -176,7 +177,8 @@ impl KvRouterConfig { router_queue_policy: router_queue_policy.parse().unwrap_or_else(|_| { panic!("invalid router_queue_policy: {router_queue_policy:?}") }), - remote_indexer_component, + use_remote_indexer, + serve_indexer, }, } } diff --git a/lib/bindings/python/rust/llm/kv.rs b/lib/bindings/python/rust/llm/kv.rs index 74309df2f937..3f15ad23085c 100644 --- a/lib/bindings/python/rust/llm/kv.rs +++ b/lib/bindings/python/rust/llm/kv.rs @@ -16,8 +16,6 @@ use clap::Parser; use dynamo_kv_router::config::{KvRouterConfig, RouterConfigOverride}; use dynamo_kv_router::protocols::compute_block_hash_for_seq; use dynamo_kv_router::protocols::*; -#[cfg(feature = "kv-indexer-runtime")] -use dynamo_kv_router::standalone_indexer::RuntimeConfig; #[cfg(feature = "kv-indexer")] use dynamo_kv_router::standalone_indexer::{self, IndexerConfig}; use rs::pipeline::{AsyncEngine, SingleIn}; @@ -71,26 +69,6 @@ struct KvIndexerCli { /// Comma-separated peer URLs for P2P recovery (e.g. "http://host1:8090,http://host2:8091") #[arg(long)] peers: Option, - - /// Enable Dynamo runtime integration (discovery, event plane, request plane). - #[cfg(feature = "kv-indexer-runtime")] - #[arg(long)] - dynamo_runtime: bool, - - /// Dynamo namespace to register the indexer component under. - #[cfg(feature = "kv-indexer-runtime")] - #[arg(long, default_value = "default")] - namespace: String, - - /// Component name for this indexer in the Dynamo runtime. - #[cfg(feature = "kv-indexer-runtime")] - #[arg(long, default_value = "kv-indexer")] - component_name: String, - - /// Component name that workers register under. - #[cfg(feature = "kv-indexer-runtime")] - #[arg(long, default_value = "backend")] - worker_component: String, } pub fn run_kv_indexer_cli(args: I) -> anyhow::Result<()> @@ -105,31 +83,6 @@ where .chain(args.into_iter().map(Into::into)), )?; - #[cfg(feature = "kv-indexer-runtime")] - if cli.dynamo_runtime { - dynamo_runtime::logging::init(); - let worker = dynamo_runtime::Worker::from_settings()?; - return worker.execute(move |runtime| { - standalone_indexer::run_with_runtime( - runtime, - IndexerConfig { - block_size: cli.block_size, - port: cli.port, - threads: cli.threads, - workers: cli.workers, - model_name: cli.model_name, - tenant_id: cli.tenant_id, - peers: cli.peers, - }, - RuntimeConfig { - namespace: cli.namespace, - component_name: cli.component_name, - worker_component: cli.worker_component, - }, - ) - }); - } - init_standalone_logging(); let rt = tokio::runtime::Runtime::new()?; @@ -732,11 +685,11 @@ async fn create_kv_router_from_endpoint( llm_rs::discovery::WORKER_TYPE_DECODE }; - // Query discovery once so we can derive both model_name (for remote indexer) + // Query discovery once so we can derive both model_name (for remote/served indexer) // and Eagle routing semantics from the model card. let needs_model_name = kv_router_config .as_ref() - .map(|cfg| cfg.remote_indexer_component.is_some()) + .map(|cfg| cfg.use_remote_indexer || cfg.serve_indexer) .unwrap_or(false); let (model_name, enable_eagle) = { let discovery = endpoint.inner.component().drt().discovery(); diff --git a/lib/bindings/python/tests/replay/replay_utils.py b/lib/bindings/python/tests/replay/replay_utils.py index 9ef7687e49b8..c3f20858702a 100644 --- a/lib/bindings/python/tests/replay/replay_utils.py +++ b/lib/bindings/python/tests/replay/replay_utils.py @@ -86,7 +86,7 @@ def _router_config_payload(): "router_prune_target_ratio": 0.8, "router_enable_cache_control": False, "skip_initial_worker_wait": False, - "remote_indexer_component": None, + "use_remote_indexer": False, } diff --git a/lib/kv-router/Cargo.toml b/lib/kv-router/Cargo.toml index 6dbd9c686422..f9c643967ef8 100644 --- a/lib/kv-router/Cargo.toml +++ b/lib/kv-router/Cargo.toml @@ -18,7 +18,6 @@ metrics = ["dep:prometheus"] runtime-protocols = ["dep:dynamo-runtime"] bench = [] standalone-indexer = ["dep:axum", "dep:serde_json", "dep:reqwest", "dep:zmq"] -indexer-runtime = ["metrics", "runtime-protocols", "standalone-indexer"] [dependencies] # repo diff --git a/lib/kv-router/src/indexer/kv_indexer.rs b/lib/kv-router/src/indexer/kv_indexer.rs index 7ea72d3760be..0bc03c55940f 100644 --- a/lib/kv-router/src/indexer/kv_indexer.rs +++ b/lib/kv-router/src/indexer/kv_indexer.rs @@ -510,7 +510,7 @@ impl KvIndexerInterface for KvIndexer { let local_hashes = tokens_with_hashes.get_or_compute_block_hashes().to_vec(); let sequence_hashes = tokens_with_hashes.get_or_compute_seq_hashes().to_vec(); - self.process_routing_decision_internal(worker, local_hashes, sequence_hashes) + self.process_routing_decision_with_hashes(worker, local_hashes, sequence_hashes) .await } async fn flush(&self) -> usize { @@ -526,8 +526,8 @@ impl KvIndexerInterface for KvIndexer { } impl KvIndexer { - /// Internal method to process a routing decision with pre-computed hashes. - async fn process_routing_decision_internal( + /// Process a routing decision with pre-computed hashes. + pub async fn process_routing_decision_with_hashes( &self, worker: WorkerWithDpRank, local_hashes: Vec, diff --git a/lib/kv-router/src/indexer/sharded.rs b/lib/kv-router/src/indexer/sharded.rs index 72668aa0d962..dceae0645457 100644 --- a/lib/kv-router/src/indexer/sharded.rs +++ b/lib/kv-router/src/indexer/sharded.rs @@ -525,7 +525,7 @@ impl KvIndexerInterface for KvIndexerSharded { let local_hashes = tokens_with_hashes.get_or_compute_block_hashes().to_vec(); let sequence_hashes = tokens_with_hashes.get_or_compute_seq_hashes().to_vec(); - self.process_routing_decision_internal(worker, local_hashes, sequence_hashes) + self.process_routing_decision_with_hashes(worker, local_hashes, sequence_hashes) .await } @@ -550,8 +550,8 @@ impl KvIndexerInterface for KvIndexerSharded { } impl KvIndexerSharded { - /// Internal method to process a routing decision with pre-computed hashes. - async fn process_routing_decision_internal( + /// Process a routing decision with pre-computed hashes. + pub async fn process_routing_decision_with_hashes( &self, worker: WorkerWithDpRank, local_hashes: Vec, diff --git a/lib/kv-router/src/indexer/types.rs b/lib/kv-router/src/indexer/types.rs index cb538402bcbb..f56f2a988525 100644 --- a/lib/kv-router/src/indexer/types.rs +++ b/lib/kv-router/src/indexer/types.rs @@ -110,14 +110,14 @@ impl dynamo_runtime::protocols::maybe_error::MaybeError for WorkerKvQueryRespons /// Endpoint name for the standalone KV indexer query service. pub const KV_INDEXER_QUERY_ENDPOINT: &str = "kv_indexer_query"; +/// Endpoint name for recording approximate-mode routing decisions on a remote indexer. +pub const KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT: &str = "kv_indexer_record_routing_decision"; -/// Request to query the standalone KV indexer for overlap scores. +/// Request to query a served KV indexer for overlap scores. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct IndexerQueryRequest { /// Model name to query the indexer for. pub model_name: String, - /// Dynamo namespace (used as tenant_id for indexer lookup). - pub namespace: String, /// Block hashes to find matches for in the radix tree. pub block_hashes: Vec, } @@ -153,7 +153,7 @@ impl From for OverlapScores { } } -/// Response from the standalone KV indexer. +/// Response from a served KV indexer query. #[derive(Serialize, Deserialize, Debug, Clone)] pub enum IndexerQueryResponse { /// Overlap scores per worker. @@ -191,6 +191,57 @@ impl dynamo_runtime::protocols::maybe_error::MaybeError for IndexerQueryResponse } } +/// Request to record a routing decision on a served approximate-mode indexer. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct IndexerRecordRoutingDecisionRequest { + /// Model name to update. + pub model_name: String, + /// Selected worker for this routing decision. + pub worker: WorkerWithDpRank, + /// Locally-computed block hashes for the routed request. + pub local_hashes: Vec, + /// Locally-computed rolling sequence hashes for the routed request. + pub sequence_hashes: Vec, +} + +/// Response from a served approximate-mode routing-decision endpoint. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum IndexerRecordRoutingDecisionResponse { + Recorded, + Error(String), +} + +impl MaybeError for IndexerRecordRoutingDecisionResponse { + fn from_err(err: impl std::error::Error + 'static) -> Self { + IndexerRecordRoutingDecisionResponse::Error(err.to_string()) + } + + fn err(&self) -> Option> { + match self { + IndexerRecordRoutingDecisionResponse::Error(msg) => { + Some(Box::new(std::io::Error::other(msg.clone()))) + } + _ => None, + } + } +} + +#[cfg(feature = "runtime-protocols")] +impl dynamo_runtime::protocols::maybe_error::MaybeError for IndexerRecordRoutingDecisionResponse { + fn from_err(err: impl std::error::Error + 'static) -> Self { + IndexerRecordRoutingDecisionResponse::Error(err.to_string()) + } + + fn err(&self) -> Option { + match self { + IndexerRecordRoutingDecisionResponse::Error(msg) => { + Some(dynamo_runtime::error::DynamoError::msg(msg.clone())) + } + _ => None, + } + } +} + /// A request to find matches in the Radix Tree. pub struct MatchRequest { /// A vector of `LocalBlockHash` representing the sequence to match. diff --git a/lib/kv-router/src/scheduling/config.rs b/lib/kv-router/src/scheduling/config.rs index 23a897d9ab3b..ce400a9ac533 100644 --- a/lib/kv-router/src/scheduling/config.rs +++ b/lib/kv-router/src/scheduling/config.rs @@ -204,12 +204,16 @@ pub struct KvRouterConfig { /// "wspt": weighted shortest processing time (Smith's rule) — optimizes average TTFT. pub router_queue_policy: RouterQueuePolicy, - /// Component name of a standalone KV indexer to use for overlap scoring. - /// When set, the router creates a `Remote` indexer that queries the standalone - /// indexer via the request plane instead of maintaining a local radix tree. - /// The standalone indexer handles its own event subscription and discovery. + /// Whether to query a remote KV indexer served from the worker component + /// instead of maintaining a local radix tree for overlap scoring. #[serde(default)] - pub remote_indexer_component: Option, + pub use_remote_indexer: bool, + + /// Whether this router should serve its local indexer from the worker component. + /// This enables other routers/frontends in the same namespace to query + /// overlap scores remotely over the request plane by component + endpoint. + #[serde(default)] + pub serve_indexer: bool, } impl Default for KvRouterConfig { @@ -234,7 +238,8 @@ impl Default for KvRouterConfig { router_event_threads: 4, skip_initial_worker_wait: false, router_queue_policy: RouterQueuePolicy::default(), - remote_indexer_component: None, + use_remote_indexer: false, + serve_indexer: false, } } } @@ -268,6 +273,16 @@ fn validate_kv_router_config(config: &KvRouterConfig) -> Result<(), ValidationEr "router_prefill_load_model currently requires router_queue_policy='fcfs'", )); } + if config.use_remote_indexer && config.serve_indexer { + return Err(ValidationError::new( + "use_remote_indexer and serve_indexer are mutually exclusive", + )); + } + if config.serve_indexer && config.overlap_score_weight == 0.0 { + return Err(ValidationError::new( + "serve_indexer requires overlap_score_weight > 0", + )); + } Ok(()) } diff --git a/lib/kv-router/src/standalone_indexer/mod.rs b/lib/kv-router/src/standalone_indexer/mod.rs index 12794e8ff0f3..112b5b85c0a3 100644 --- a/lib/kv-router/src/standalone_indexer/mod.rs +++ b/lib/kv-router/src/standalone_indexer/mod.rs @@ -6,8 +6,6 @@ pub mod listener; pub mod metrics; pub mod recovery; pub mod registry; -#[cfg(feature = "indexer-runtime")] -pub mod runtime; pub mod server; mod zmq; @@ -31,13 +29,6 @@ pub struct IndexerConfig { pub peers: Option, } -#[cfg(feature = "indexer-runtime")] -pub struct RuntimeConfig { - pub namespace: String, - pub component_name: String, - pub worker_component: String, -} - pub(super) fn validate_zmq_endpoint(endpoint: &str) -> anyhow::Result<()> { let (scheme, address) = endpoint .split_once("://") @@ -155,81 +146,6 @@ pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> { run_common(&config, ®istry, cancel_token).await } -#[cfg(feature = "indexer-runtime")] -pub async fn run_with_runtime( - runtime: dynamo_runtime::Runtime, - config: IndexerConfig, - runtime_config: RuntimeConfig, -) -> anyhow::Result<()> { - use dynamo_runtime::{ - DistributedRuntime, - pipeline::{ManyOut, SingleIn, network::Ingress}, - }; - - use crate::indexer::{IndexerQueryRequest, IndexerQueryResponse, KV_INDEXER_QUERY_ENDPOINT}; - - let distributed_runtime = DistributedRuntime::from_settings(runtime).await?; - let cancel_token = distributed_runtime.primary_token(); - let component = distributed_runtime - .namespace(&runtime_config.namespace)? - .component(&runtime_config.component_name)?; - - tracing::info!( - namespace = %runtime_config.namespace, - component = %runtime_config.component_name, - block_size = ?config.block_size, - port = config.port, - threads = config.threads, - model_name = %config.model_name, - tenant_id = %config.tenant_id, - worker_component = %runtime_config.worker_component, - num_peers = config.peers.as_ref().map(|p| p.split(',').count()).unwrap_or(0), - "Starting standalone KV cache indexer (Dynamo runtime mode)" - ); - - let registry = Arc::new(WorkerRegistry::new(config.threads)); - let engine = Arc::new(runtime::query_engine::IndexerQueryEngine { - registry: registry.clone(), - }); - let ingress = - Ingress::, ManyOut>::for_engine( - engine, - )?; - let query_endpoint = component - .endpoint(KV_INDEXER_QUERY_ENDPOINT) - .endpoint_builder() - .handler(ingress) - .graceful_shutdown(true); - - distributed_runtime.runtime().secondary().spawn(async move { - if let Err(err) = query_endpoint.start().await { - tracing::error!(error = %err, "Query endpoint failed"); - } - }); - - tracing::info!( - endpoint = KV_INDEXER_QUERY_ENDPOINT, - "Query endpoint registered" - ); - - runtime::discovery::spawn_discovery_watcher( - &distributed_runtime, - registry.clone(), - cancel_token.clone(), - ) - .await?; - runtime::subscriber::spawn_event_subscriber( - &distributed_runtime, - &runtime_config.namespace, - &runtime_config.worker_component, - registry.clone(), - cancel_token.clone(), - ) - .await?; - - run_common(&config, ®istry, cancel_token).await -} - async fn wait_for_min_initial_workers( registry: &WorkerRegistry, cancel_token: &CancellationToken, diff --git a/lib/kv-router/src/standalone_indexer/registry.rs b/lib/kv-router/src/standalone_indexer/registry.rs index 6579e4d6b780..6bbf32d970c6 100644 --- a/lib/kv-router/src/standalone_indexer/registry.rs +++ b/lib/kv-router/src/standalone_indexer/registry.rs @@ -314,8 +314,6 @@ pub struct WorkerRegistry { indexers: DashMap, peers: DashMap, watermarks: DashMap<(WorkerId, u32), Arc>, - #[cfg(feature = "indexer-runtime")] - discovered_workers: DashMap, num_threads: usize, ready_tx: watch::Sender, ready_rx: watch::Receiver, @@ -329,8 +327,6 @@ impl WorkerRegistry { indexers: DashMap::new(), peers: DashMap::new(), watermarks: DashMap::new(), - #[cfg(feature = "indexer-runtime")] - discovered_workers: DashMap::new(), num_threads, ready_tx, ready_rx, @@ -360,16 +356,7 @@ impl WorkerRegistry { #[cfg(feature = "metrics")] pub fn refresh_metrics(&self) { let models = self.indexers.len(); - let workers = self.workers.len() + { - #[cfg(feature = "indexer-runtime")] - { - self.discovered_workers.len() - } - #[cfg(not(feature = "indexer-runtime"))] - { - 0 - } - }; + let workers = self.workers.len(); let mut listener_counts = [0_i64; 4]; for entry in self.workers.iter() { @@ -392,14 +379,6 @@ impl WorkerRegistry { block_size: u32, replay_endpoint: Option, ) -> Result<()> { - #[cfg(feature = "indexer-runtime")] - if self.discovered_workers.contains_key(&instance_id) { - bail!( - "instance {instance_id} is already registered via discovery; \ - use the Dynamo runtime to manage it" - ); - } - let key = IndexerKey { model_name, tenant_id, @@ -496,20 +475,6 @@ impl WorkerRegistry { ); } } else { - #[cfg(feature = "indexer-runtime")] - if let Some(discovered_key) = self.discovered_workers.get(&instance_id) { - if discovered_key.value() != &key { - bail!( - "instance {instance_id} is registered for model={} tenant={}", - discovered_key.value().model_name, - discovered_key.value().tenant_id - ); - } - } else { - bail!("instance {instance_id} not found"); - } - - #[cfg(not(feature = "indexer-runtime"))] bail!("instance {instance_id} not found"); } @@ -522,11 +487,6 @@ impl WorkerRegistry { for &dp_rank in entry.listeners.keys() { self.watermarks.remove(&(instance_id, dp_rank)); } - } else { - #[cfg(feature = "indexer-runtime")] - { - self.discovered_workers.remove(&instance_id); - } } if let Some(ie) = self.indexers.get(&key) { @@ -602,21 +562,6 @@ impl WorkerRegistry { } entry.key.clone() } else { - #[cfg(feature = "indexer-runtime")] - if let Some(discovered_key) = self.discovered_workers.get(&instance_id) { - if discovered_key.value().model_name != model_name { - bail!( - "instance {instance_id} is registered for model={} tenant={}", - discovered_key.value().model_name, - discovered_key.value().tenant_id - ); - } - discovered_key.value().clone() - } else { - bail!("instance {instance_id} not found"); - } - - #[cfg(not(feature = "indexer-runtime"))] bail!("instance {instance_id} not found"); }; @@ -629,11 +574,6 @@ impl WorkerRegistry { for &dp_rank in entry.listeners.keys() { self.watermarks.remove(&(instance_id, dp_rank)); } - } else { - #[cfg(feature = "indexer-runtime")] - { - self.discovered_workers.remove(&instance_id); - } } if let Some(ie) = self.indexers.get(&key) { @@ -656,11 +596,6 @@ impl WorkerRegistry { }, )? } else { - #[cfg(feature = "indexer-runtime")] - if self.discovered_workers.contains_key(&instance_id) { - return Err(ListenerControlError::DiscoveryManaged { instance_id }); - } - return Err(ListenerControlError::WorkerNotFound { instance_id }); }; @@ -683,11 +618,6 @@ impl WorkerRegistry { }, )? } else { - #[cfg(feature = "indexer-runtime")] - if self.discovered_workers.contains_key(&instance_id) { - return Err(ListenerControlError::DiscoveryManaged { instance_id }); - } - return Err(ListenerControlError::WorkerNotFound { instance_id }); }; @@ -724,21 +654,6 @@ impl WorkerRegistry { }) .collect(); - #[cfg(feature = "indexer-runtime")] - for entry in self.discovered_workers.iter() { - let worker_id = *entry.key(); - if self.workers.contains_key(&worker_id) { - continue; - } - result.push(WorkerInfo { - instance_id: worker_id, - source: WorkerSource::Discovery, - status: ListenerStatus::Active, - endpoints: HashMap::new(), - listeners: HashMap::new(), - }); - } - result } @@ -784,97 +699,6 @@ impl WorkerRegistry { .collect() } - #[cfg(feature = "indexer-runtime")] - pub fn add_worker_from_discovery( - &self, - instance_id: WorkerId, - model_name: String, - tenant_id: String, - block_size: u32, - ) -> Result<()> { - if self.workers.contains_key(&instance_id) { - bail!( - "instance {instance_id} is already manually registered; \ - cannot add via discovery" - ); - } - - let key = IndexerKey { - model_name, - tenant_id, - }; - - if let Some(existing) = self.discovered_workers.get(&instance_id) { - if existing.value() != &key { - bail!( - "instance {instance_id} is already registered for model={} tenant={}", - existing.value().model_name, - existing.value().tenant_id - ); - } - return Ok(()); - } - - let indexer_entry = self.indexers.entry(key.clone()).or_insert_with(|| { - tracing::info!( - model_name = %key.model_name, - tenant_id = %key.tenant_id, - block_size, - "Creating new indexer (discovery)" - ); - IndexerEntry { - indexer: create_indexer(block_size, self.num_threads), - block_size, - } - }); - - if indexer_entry.block_size != block_size { - bail!( - "block_size mismatch for model={} tenant={}: existing={}, requested={}", - key.model_name, - key.tenant_id, - indexer_entry.block_size, - block_size - ); - } - drop(indexer_entry); - - self.discovered_workers.insert(instance_id, key); - Ok(()) - } - - #[cfg(feature = "indexer-runtime")] - pub async fn remove_worker_from_discovery(&self, instance_id: WorkerId) { - if let Some((_, key)) = self.discovered_workers.remove(&instance_id) { - if let Some(ie) = self.indexers.get(&key) { - ie.indexer.remove_worker(instance_id).await; - } - self.maybe_remove_indexer(&key); - } else { - tracing::debug!( - instance_id, - "remove_worker_from_discovery: worker not in discovered_workers map" - ); - } - } - - #[cfg(feature = "indexer-runtime")] - pub fn get_indexer_for_worker(&self, worker_id: WorkerId) -> Option { - if let Some(key) = self.discovered_workers.get(&worker_id) - && let Some(ie) = self.indexers.get(key.value()) - { - return Some(ie.indexer.clone()); - } - - if let Some(entry) = self.workers.get(&worker_id) - && let Some(ie) = self.indexers.get(&entry.key) - { - return Some(ie.indexer.clone()); - } - - None - } - fn spawn_listener( &self, instance_id: WorkerId, @@ -897,15 +721,6 @@ impl WorkerRegistry { return; } - #[cfg(feature = "indexer-runtime")] - if self - .discovered_workers - .iter() - .any(|entry| entry.value() == key) - { - return; - } - self.indexers.remove(key); } } diff --git a/lib/kv-router/src/standalone_indexer/runtime/discovery.rs b/lib/kv-router/src/standalone_indexer/runtime/discovery.rs deleted file mode 100644 index f7693a720cdb..000000000000 --- a/lib/kv-router/src/standalone_indexer/runtime/discovery.rs +++ /dev/null @@ -1,121 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use dynamo_runtime::stream::StreamExt; -use dynamo_runtime::{ - DistributedRuntime, - discovery::{ - DiscoveryEvent, DiscoveryInstance, DiscoveryInstanceId, DiscoveryQuery, DiscoveryStream, - }, -}; -use serde::Deserialize; -use tokio_util::sync::CancellationToken; - -use crate::standalone_indexer::registry::WorkerRegistry; - -#[derive(Deserialize, Debug)] -struct PartialModelCard { - pub display_name: String, - #[serde(default)] - pub kv_cache_block_size: u32, -} - -pub async fn spawn_discovery_watcher( - drt: &DistributedRuntime, - registry: Arc, - cancel_token: CancellationToken, -) -> anyhow::Result<()> { - let discovery = drt.discovery(); - let mut stream: DiscoveryStream = discovery - .list_and_watch(DiscoveryQuery::AllModels, Some(cancel_token.clone())) - .await?; - - tokio::spawn(async move { - tracing::info!("Discovery watcher started"); - - while let Some(result) = stream.next().await { - let event = match result { - Ok(event) => event, - Err(err) => { - tracing::error!(%err, "Error in discovery stream"); - continue; - } - }; - - match event { - DiscoveryEvent::Added(instance) => { - let (instance_id, namespace, card) = match &instance { - DiscoveryInstance::Model { - instance_id, - namespace, - .. - } => match instance.deserialize_model::() { - Ok(card) => (*instance_id, namespace.clone(), card), - Err(err) => { - tracing::error!(%err, instance_id, "Failed to deserialize model card"); - continue; - } - }, - _ => { - tracing::debug!("Ignoring non-model discovery instance"); - continue; - } - }; - - let model_name = card.display_name.clone(); - let block_size = card.kv_cache_block_size; - let tenant_id = namespace; - - if block_size == 0 { - tracing::warn!( - instance_id, - model_name, - "Skipping worker with kv_cache_block_size=0" - ); - continue; - } - - tracing::info!( - instance_id, - model_name, - tenant_id, - block_size, - "Discovery: adding worker" - ); - - if let Err(err) = registry.add_worker_from_discovery( - instance_id, - model_name.clone(), - tenant_id, - block_size, - ) { - tracing::error!( - instance_id, - model_name, - error = %err, - "Failed to add discovered worker" - ); - } - } - DiscoveryEvent::Removed(id) => { - let instance_id = match &id { - DiscoveryInstanceId::Model(mcid) => mcid.instance_id, - _ => { - tracing::debug!("Ignoring non-model discovery removal"); - continue; - } - }; - - tracing::info!(instance_id, "Discovery: removing worker"); - registry.remove_worker_from_discovery(instance_id).await; - } - } - } - - tracing::info!("Discovery watcher exiting"); - }); - - Ok(()) -} diff --git a/lib/kv-router/src/standalone_indexer/runtime/query_engine.rs b/lib/kv-router/src/standalone_indexer/runtime/query_engine.rs deleted file mode 100644 index 25026c9db06f..000000000000 --- a/lib/kv-router/src/standalone_indexer/runtime/query_engine.rs +++ /dev/null @@ -1,50 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use anyhow::Result; -use dynamo_runtime::pipeline::{ - AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, async_trait, -}; -use dynamo_runtime::stream; - -use crate::indexer::{IndexerQueryRequest, IndexerQueryResponse}; -use crate::standalone_indexer::registry::{IndexerKey, WorkerRegistry}; - -pub struct IndexerQueryEngine { - pub registry: Arc, -} - -#[async_trait] -impl AsyncEngine, ManyOut, anyhow::Error> - for IndexerQueryEngine -{ - async fn generate( - &self, - request: SingleIn, - ) -> Result> { - let (req, ctx) = request.into_parts(); - let key = IndexerKey { - model_name: req.model_name.clone(), - tenant_id: req.namespace.clone(), - }; - - let response = match self.registry.get_indexer(&key) { - Some(entry) => match entry.indexer.find_matches(req.block_hashes).await { - Ok(scores) => IndexerQueryResponse::Scores(scores.into()), - Err(err) => IndexerQueryResponse::Error(err.to_string()), - }, - None => IndexerQueryResponse::Error(format!( - "no indexer for model={} namespace={}", - req.model_name, req.namespace - )), - }; - - let response_stream = stream::iter(vec![response]); - Ok(ResponseStream::new( - Box::pin(response_stream), - ctx.context(), - )) - } -} diff --git a/lib/kv-router/src/standalone_indexer/runtime/subscriber.rs b/lib/kv-router/src/standalone_indexer/runtime/subscriber.rs deleted file mode 100644 index 07fba8c7217c..000000000000 --- a/lib/kv-router/src/standalone_indexer/runtime/subscriber.rs +++ /dev/null @@ -1,89 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use anyhow::Result; -use tokio_util::sync::CancellationToken; - -use dynamo_runtime::{ - DistributedRuntime, discovery::EventTransportKind, transports::event_plane::EventSubscriber, -}; - -use crate::protocols::{KV_EVENT_SUBJECT, RouterEvent}; -use crate::standalone_indexer::registry::WorkerRegistry; - -pub async fn spawn_event_subscriber( - drt: &DistributedRuntime, - namespace: &str, - worker_component_name: &str, - registry: Arc, - cancel_token: CancellationToken, -) -> Result<()> { - let transport_kind = EventTransportKind::from_env_or_default(); - let worker_component = drt.namespace(namespace)?.component(worker_component_name)?; - let mut subscriber = EventSubscriber::for_component_with_transport( - &worker_component, - KV_EVENT_SUBJECT, - transport_kind, - ) - .await? - .typed::(); - - let kv_event_subject = format!( - "namespace.{}.component.{}.{}", - namespace, worker_component_name, KV_EVENT_SUBJECT - ); - - match transport_kind { - EventTransportKind::Nats => { - tracing::info!( - subject = %kv_event_subject, - "KV Indexer subscribing to NATS Core events" - ); - } - EventTransportKind::Zmq => { - tracing::info!( - subject = %kv_event_subject, - "KV Indexer subscribing to ZMQ event plane" - ); - } - } - - tokio::spawn(async move { - loop { - tokio::select! { - biased; - - _ = cancel_token.cancelled() => { - tracing::debug!("Event subscriber received cancellation signal"); - break; - } - - Some(result) = subscriber.next() => { - let (_envelope, event) = match result { - Ok((envelope, event)) => (envelope, event), - Err(err) => { - tracing::warn!("Failed to receive RouterEvent from event plane: {err:?}"); - continue; - } - }; - - let worker_id = event.worker_id; - if let Some(indexer) = registry.get_indexer_for_worker(worker_id) { - indexer.apply_event(event).await; - } else { - tracing::trace!( - worker_id, - "Received event for unknown worker (not yet discovered?)" - ); - } - } - } - } - - tracing::info!("Event subscriber exiting"); - }); - - Ok(()) -} diff --git a/lib/llm/src/kv_router.rs b/lib/llm/src/kv_router.rs index 1c6e826776c5..fcbb65530b31 100644 --- a/lib/llm/src/kv_router.rs +++ b/lib/llm/src/kv_router.rs @@ -31,17 +31,14 @@ use tracing::Instrument; use validator::Validate; pub mod indexer; -mod jetstream; pub mod metrics; pub mod prefill_router; pub mod publisher; pub mod push_router; pub mod scheduler; pub mod sequence; -pub mod subscriber; -pub mod worker_query; -pub use indexer::Indexer; +pub use indexer::{Indexer, ServedIndexerHandle, ServedIndexerMode, ensure_served_indexer_service}; pub use prefill_router::PrefillRouter; pub use push_router::{DirectRoutingRouter, KvPushRouter}; @@ -117,6 +114,7 @@ where cancellation_token: tokio_util::sync::CancellationToken, client: Client, is_eagle: bool, + _served_indexer_handle: Option, } impl KvRouter @@ -142,7 +140,29 @@ where let cancellation_token = component.drt().primary_token(); let min_initial_workers = min_initial_workers_from_env()?; - let indexer = Indexer::new(component, &kv_router_config, block_size, model_name).await?; + let indexer = Indexer::new( + component, + &kv_router_config, + block_size, + model_name.as_deref(), + ) + .await?; + let served_indexer_handle = if kv_router_config.serve_indexer { + let model_name = model_name.ok_or_else(|| { + anyhow::anyhow!("model_name is required when serve_indexer is configured") + })?; + Some( + ensure_served_indexer_service( + component.clone(), + ServedIndexerMode::from_use_kv_events(kv_router_config.use_kv_events), + model_name, + indexer.clone(), + ) + .await?, + ) + } else { + None + }; if min_initial_workers > 0 && !kv_router_config.skip_initial_worker_wait { let mut startup_watch = workers_with_configs.clone(); @@ -168,12 +188,11 @@ where ) .await?; - // Start KV event subscription if needed — skip when using a remote indexer - // (the standalone indexer handles its own event subscription). - if kv_router_config.remote_indexer_component.is_some() { + // Start KV event subscription if needed — skip when using a remote indexer. + if kv_router_config.use_remote_indexer { tracing::info!("Skipping KV event subscription (using remote indexer)"); } else if kv_router_config.should_subscribe_to_kv_events() { - subscriber::start_subscriber(component.clone(), &kv_router_config, indexer.clone()) + indexer::start_subscriber(component.clone(), &kv_router_config, indexer.clone()) .await?; } else { tracing::info!( @@ -193,6 +212,7 @@ where cancellation_token, client, is_eagle, + _served_indexer_handle: served_indexer_handle, }) } diff --git a/lib/llm/src/kv_router/jetstream.rs b/lib/llm/src/kv_router/indexer/jetstream.rs similarity index 99% rename from lib/llm/src/kv_router/jetstream.rs rename to lib/llm/src/kv_router/indexer/jetstream.rs index 7108db7b74cf..c5ee592d19ce 100644 --- a/lib/llm/src/kv_router/jetstream.rs +++ b/lib/llm/src/kv_router/indexer/jetstream.rs @@ -18,9 +18,11 @@ use rand::Rng; use tokio_util::sync::CancellationToken; use crate::kv_router::{ - Indexer, KV_EVENT_SUBJECT, RADIX_STATE_BUCKET, RADIX_STATE_FILE, router_discovery_query, + KV_EVENT_SUBJECT, RADIX_STATE_BUCKET, RADIX_STATE_FILE, router_discovery_query, }; +use super::Indexer; + /// Helper function to create a KV stream name from a component and subject. /// /// Generates a slugified stream name in the format: diff --git a/lib/llm/src/kv_router/indexer.rs b/lib/llm/src/kv_router/indexer/mod.rs similarity index 66% rename from lib/llm/src/kv_router/indexer.rs rename to lib/llm/src/kv_router/indexer/mod.rs index a90c14a242f3..e9a14fb6824d 100644 --- a/lib/llm/src/kv_router/indexer.rs +++ b/lib/llm/src/kv_router/indexer/mod.rs @@ -5,71 +5,28 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; -use futures::StreamExt; - use dynamo_kv_router::{ ConcurrentRadixTreeCompressed, ThreadPoolIndexer, approx::PruneConfig, config::KvRouterConfig, - indexer::{ - IndexerQueryRequest, IndexerQueryResponse, KV_INDEXER_QUERY_ENDPOINT, KvIndexer, - KvIndexerInterface, KvIndexerMetrics, KvRouterError, - }, + indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics, KvRouterError}, protocols::{ LocalBlockHash, OverlapScores, RouterEvent, TokensWithHashes, WorkerId, WorkerWithDpRank, }, }; -use dynamo_runtime::{ - component::Component, - pipeline::{ManyOut, RouterMode, SingleIn, network::egress::push_router::PushRouter}, - traits::DistributedRuntimeProvider, -}; +use dynamo_runtime::{component::Component, traits::DistributedRuntimeProvider}; +use dynamo_tokens::SequenceHash; use tokio::sync::oneshot; -pub struct RemoteIndexer { - router: PushRouter, - model_name: String, - namespace: String, -} - -impl RemoteIndexer { - async fn new( - component: &Component, - indexer_component_name: &str, - model_name: String, - ) -> Result { - let namespace = component.namespace().name(); - let indexer_ns = component.namespace(); - let indexer_component = indexer_ns.component(indexer_component_name)?; - let endpoint = indexer_component.endpoint(KV_INDEXER_QUERY_ENDPOINT); - let client = endpoint.client().await?; - let router = - PushRouter::from_client_no_fault_detection(client, RouterMode::RoundRobin).await?; - Ok(Self { - router, - model_name, - namespace, - }) - } +mod jetstream; +pub mod remote; +mod subscriber; +mod worker_query; - async fn find_matches(&self, block_hashes: Vec) -> Result { - let request = IndexerQueryRequest { - model_name: self.model_name.clone(), - namespace: self.namespace.clone(), - block_hashes, - }; - let mut stream: ManyOut = - self.router.round_robin(SingleIn::new(request)).await?; - - match stream.next().await { - Some(IndexerQueryResponse::Scores(scores)) => Ok(scores.into()), - Some(IndexerQueryResponse::Error(msg)) => { - Err(anyhow::anyhow!("Remote indexer error: {}", msg)) - } - None => Err(anyhow::anyhow!("Remote indexer returned empty response")), - } - } -} +use self::remote::RemoteIndexer; +pub use self::remote::{ServedIndexerHandle, ServedIndexerMode, ensure_served_indexer_service}; +pub(crate) use subscriber::start_subscriber; +pub(crate) use worker_query::start_worker_kv_query_endpoint; #[derive(Clone)] pub enum Indexer { @@ -84,24 +41,26 @@ impl Indexer { component: &Component, kv_router_config: &KvRouterConfig, block_size: u32, - model_name: Option, + model_name: Option<&str>, ) -> Result { if kv_router_config.overlap_score_weight == 0.0 { return Ok(Self::None); } - if let Some(ref indexer_component_name) = kv_router_config.remote_indexer_component { - let model_name = model_name.ok_or_else(|| { - anyhow::anyhow!( - "model_name is required when remote_indexer_component is configured" - ) - })?; + if kv_router_config.use_remote_indexer { + let model_name = model_name + .ok_or_else(|| { + anyhow::anyhow!("model_name is required when use_remote_indexer is configured") + })? + .to_string(); + let indexer_component_name = component.name(); tracing::info!( - remote_indexer_component = %indexer_component_name, + indexer_component = %indexer_component_name, model_name, "Using remote KV indexer" ); - let remote = RemoteIndexer::new(component, indexer_component_name, model_name).await?; + let remote = + RemoteIndexer::new(component, model_name, kv_router_config.use_kv_events).await?; return Ok(Self::Remote(Arc::new(remote))); } @@ -149,14 +108,46 @@ impl Indexer { match self { Self::KvIndexer(indexer) => indexer.find_matches(sequence).await, Self::Concurrent(tpi) => tpi.find_matches(sequence).await, - Self::Remote(remote) => remote.find_matches(sequence).await.map_err(|e| { - tracing::warn!(error = %e, "Remote indexer query failed"); - KvRouterError::IndexerOffline - }), + Self::Remote(remote) => match remote.find_matches(sequence).await { + Ok(scores) => Ok(scores), + Err(error) => { + tracing::warn!(error = %error, "Remote indexer query failed"); + Ok(OverlapScores::new()) + } + }, Self::None => Ok(OverlapScores::new()), } } + pub(crate) async fn record_hashed_routing_decision( + &self, + worker: WorkerWithDpRank, + local_hashes: Vec, + sequence_hashes: Vec, + ) -> Result<(), KvRouterError> { + match self { + Self::KvIndexer(indexer) => { + indexer + .process_routing_decision_with_hashes(worker, local_hashes, sequence_hashes) + .await + } + Self::Concurrent(_) => { + tracing::warn!( + "Hashed routing-decision recording is unsupported for concurrent indexers" + ); + Err(KvRouterError::IndexerDroppedRequest) + } + Self::Remote(remote) => remote + .record_hashed_routing_decision(worker, local_hashes, sequence_hashes) + .await + .map_err(|error| { + tracing::warn!(error = %error, "Remote indexer write failed"); + KvRouterError::IndexerDroppedRequest + }), + Self::None => Ok(()), + } + } + pub(crate) async fn dump_events(&self) -> Result, KvRouterError> { match self { Self::KvIndexer(indexer) => indexer.dump_events().await, @@ -176,16 +167,17 @@ impl Indexer { worker: WorkerWithDpRank, ) -> Result<(), KvRouterError> { match self { - Self::KvIndexer(indexer) => { - indexer - .process_routing_decision_for_request(tokens_with_hashes, worker) + Self::KvIndexer(_) | Self::Remote(_) => { + let local_hashes = tokens_with_hashes.get_or_compute_block_hashes().to_vec(); + let sequence_hashes = tokens_with_hashes.get_or_compute_seq_hashes().to_vec(); + self.record_hashed_routing_decision(worker, local_hashes, sequence_hashes) .await } Self::Concurrent(tpi) => { tpi.process_routing_decision_for_request(tokens_with_hashes, worker) .await } - Self::Remote(_) | Self::None => Ok(()), + Self::None => Ok(()), } } diff --git a/lib/llm/src/kv_router/indexer/remote.rs b/lib/llm/src/kv_router/indexer/remote.rs new file mode 100644 index 000000000000..51e84f001de0 --- /dev/null +++ b/lib/llm/src/kv_router/indexer/remote.rs @@ -0,0 +1,520 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashSet; +use std::sync::{Arc, LazyLock}; + +use anyhow::Result; +use dashmap::DashMap; +use dynamo_kv_router::indexer::{ + IndexerQueryRequest, IndexerQueryResponse, IndexerRecordRoutingDecisionRequest, + IndexerRecordRoutingDecisionResponse, KV_INDEXER_QUERY_ENDPOINT, + KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT, +}; +use dynamo_kv_router::protocols::{LocalBlockHash, OverlapScores, WorkerWithDpRank}; +use dynamo_runtime::component::Component; +use dynamo_runtime::discovery::{DiscoveryInstance, DiscoveryQuery}; +use dynamo_runtime::pipeline::{ + AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, RouterMode, SingleIn, + async_trait, network::Ingress, network::egress::push_router::PushRouter, +}; +use dynamo_runtime::stream; +use dynamo_runtime::traits::DistributedRuntimeProvider; +use dynamo_tokens::SequenceHash; +use futures::StreamExt; +use parking_lot::RwLock; +use tokio::sync::Mutex; + +use super::Indexer; + +pub struct RemoteIndexer { + query_router: PushRouter, + record_router: Option< + PushRouter, + >, + component: Component, + model_name: String, + use_kv_events: bool, +} + +impl RemoteIndexer { + pub(super) async fn new( + component: &Component, + model_name: String, + use_kv_events: bool, + ) -> Result { + let query_client = component + .endpoint(KV_INDEXER_QUERY_ENDPOINT) + .client() + .await?; + let query_router = + PushRouter::from_client_no_fault_detection(query_client, RouterMode::RoundRobin) + .await?; + let record_router = if use_kv_events { + None + } else { + let record_client = component + .endpoint(KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT) + .client() + .await?; + Some( + PushRouter::from_client_no_fault_detection(record_client, RouterMode::RoundRobin) + .await?, + ) + }; + Ok(Self { + query_router, + record_router, + component: component.clone(), + model_name, + use_kv_events, + }) + } + + pub(super) async fn find_matches( + &self, + block_hashes: Vec, + ) -> Result { + self.validate_topology_if_ready().await?; + + let request = IndexerQueryRequest { + model_name: self.model_name.clone(), + block_hashes, + }; + let mut stream: ManyOut = self + .query_router + .round_robin(SingleIn::new(request)) + .await?; + + match stream.next().await { + Some(IndexerQueryResponse::Scores(scores)) => Ok(scores.into()), + Some(IndexerQueryResponse::Error(msg)) => { + Err(anyhow::anyhow!("Remote indexer error: {}", msg)) + } + None => Err(anyhow::anyhow!("Remote indexer returned empty response")), + } + } + + pub(super) async fn record_hashed_routing_decision( + &self, + worker: WorkerWithDpRank, + local_hashes: Vec, + sequence_hashes: Vec, + ) -> Result<()> { + self.validate_topology_if_ready().await?; + + let record_router = self.record_router.as_ref().ok_or_else(|| { + anyhow::anyhow!("remote approximate indexer is not configured for writes") + })?; + let request = IndexerRecordRoutingDecisionRequest { + model_name: self.model_name.clone(), + worker, + local_hashes, + sequence_hashes, + }; + let mut stream: ManyOut = + record_router.round_robin(SingleIn::new(request)).await?; + + match stream.next().await { + Some(IndexerRecordRoutingDecisionResponse::Recorded) => Ok(()), + Some(IndexerRecordRoutingDecisionResponse::Error(msg)) => { + Err(anyhow::anyhow!("Remote indexer write error: {}", msg)) + } + None => Err(anyhow::anyhow!( + "Remote indexer returned empty write response" + )), + } + } + + async fn validate_topology_if_ready(&self) -> Result<()> { + let endpoints = self + .component + .drt() + .discovery() + .list(DiscoveryQuery::ComponentEndpoints { + namespace: self.component.namespace().name(), + component: self.component.name().to_string(), + }) + .await?; + + let mut query_instances = HashSet::new(); + let mut record_instances = HashSet::new(); + + for endpoint in endpoints { + let DiscoveryInstance::Endpoint(instance) = endpoint else { + continue; + }; + match instance.endpoint.as_str() { + KV_INDEXER_QUERY_ENDPOINT => { + query_instances.insert(instance.instance_id); + } + KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT => { + record_instances.insert(instance.instance_id); + } + _ => {} + } + } + + if query_instances.is_empty() && record_instances.is_empty() { + return Ok(()); + } + + if self.use_kv_events { + if !record_instances.is_empty() { + anyhow::bail!( + "remote indexer component {}.{} mixes event-driven and approximate endpoints", + self.component.namespace().name(), + self.component.name() + ); + } + return Ok(()); + } + + if query_instances.len() != 1 || record_instances.len() != 1 { + anyhow::bail!( + "approximate remote indexer component {}.{} must expose exactly one query endpoint and one record endpoint", + self.component.namespace().name(), + self.component.name() + ); + } + if query_instances != record_instances { + anyhow::bail!( + "approximate remote indexer component {}.{} must expose query and record endpoints from the same singleton instance", + self.component.namespace().name(), + self.component.name() + ); + } + + Ok(()) + } +} + +type ServiceKey = (u64, String, String); + +static SERVED_INDEXER_SERVICES: LazyLock>> = + LazyLock::new(DashMap::new); +static SERVICE_CREATION_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ServedIndexerMode { + EventDriven, + Approximate, +} + +impl ServedIndexerMode { + pub fn from_use_kv_events(use_kv_events: bool) -> Self { + if use_kv_events { + Self::EventDriven + } else { + Self::Approximate + } + } + + fn topology_label(self) -> &'static str { + match self { + Self::EventDriven => "event-driven", + Self::Approximate => "approximate", + } + } +} + +#[derive(Clone)] +struct ServedIndexerBinding { + model_name: String, + indexer: Indexer, +} + +struct ServedIndexerService { + mode: ServedIndexerMode, + binding: Arc>>, +} + +impl ServedIndexerService { + async fn start(component: Component, mode: ServedIndexerMode) -> Result> { + verify_service_topology(&component, mode).await?; + + let binding = Arc::new(RwLock::new(None)); + start_query_endpoint(component.clone(), binding.clone())?; + if mode == ServedIndexerMode::Approximate { + start_record_endpoint(component.clone(), binding.clone())?; + } + + Ok(Arc::new(Self { mode, binding })) + } +} + +pub struct ServedIndexerHandle { + service_key: ServiceKey, + service: Arc, +} + +impl Drop for ServedIndexerHandle { + fn drop(&mut self) { + { + let mut binding = self.service.binding.write(); + *binding = None; + } + + if Arc::strong_count(&self.service) != 2 { + return; + } + + let should_remove = SERVED_INDEXER_SERVICES + .get(&self.service_key) + .is_some_and(|service| Arc::ptr_eq(service.value(), &self.service)); + if should_remove { + SERVED_INDEXER_SERVICES.remove(&self.service_key); + } + } +} + +pub async fn ensure_served_indexer_service( + component: Component, + mode: ServedIndexerMode, + model_name: String, + indexer: Indexer, +) -> Result { + let service_key = service_key(&component); + let service = get_or_start_service(component.clone(), mode).await?; + + if service.mode != mode { + anyhow::bail!( + "cannot mix {} and {} served indexers under {}.{}", + service.mode.topology_label(), + mode.topology_label(), + component.namespace().name(), + component.name() + ); + } + + { + let mut binding = service.binding.write(); + if binding.is_some() { + anyhow::bail!( + "served indexer is already registered under {}.{}", + component.namespace().name(), + component.name(), + ); + } + + *binding = Some(ServedIndexerBinding { + model_name: model_name.clone(), + indexer, + }); + } + + Ok(ServedIndexerHandle { + service_key, + service, + }) +} + +async fn get_or_start_service( + component: Component, + mode: ServedIndexerMode, +) -> Result> { + let key = service_key(&component); + if let Some(existing) = SERVED_INDEXER_SERVICES.get(&key) { + return Ok(existing.clone()); + } + + let _guard = SERVICE_CREATION_LOCK.lock().await; + if let Some(existing) = SERVED_INDEXER_SERVICES.get(&key) { + return Ok(existing.clone()); + } + + let service = ServedIndexerService::start(component, mode).await?; + SERVED_INDEXER_SERVICES.insert(key, service.clone()); + Ok(service) +} + +async fn verify_service_topology(component: &Component, mode: ServedIndexerMode) -> Result<()> { + let discovery = component.drt().discovery(); + let endpoints = discovery + .list(DiscoveryQuery::ComponentEndpoints { + namespace: component.namespace().name(), + component: component.name().to_string(), + }) + .await?; + + let mut query_instances = HashSet::new(); + let mut record_instances = HashSet::new(); + + for endpoint in endpoints { + let DiscoveryInstance::Endpoint(instance) = endpoint else { + continue; + }; + match instance.endpoint.as_str() { + KV_INDEXER_QUERY_ENDPOINT => { + query_instances.insert(instance.instance_id); + } + KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT => { + record_instances.insert(instance.instance_id); + } + _ => {} + } + } + + match mode { + ServedIndexerMode::EventDriven => { + if !record_instances.is_empty() { + anyhow::bail!( + "cannot start event-driven served indexer on {}.{}: approximate endpoint already exists", + component.namespace().name(), + component.name() + ); + } + } + ServedIndexerMode::Approximate => { + if !query_instances.is_empty() || !record_instances.is_empty() { + anyhow::bail!( + "cannot start approximate served indexer on {}.{}: indexer endpoint already exists", + component.namespace().name(), + component.name() + ); + } + } + } + + Ok(()) +} + +fn start_query_endpoint( + component: Component, + binding: Arc>>, +) -> Result<()> { + let engine = Arc::new(ServedIndexerQueryEngine { binding }); + let ingress = + Ingress::, ManyOut>::for_engine( + engine, + )?; + tokio::spawn(async move { + if let Err(error) = component + .endpoint(KV_INDEXER_QUERY_ENDPOINT) + .endpoint_builder() + .handler(ingress) + .graceful_shutdown(true) + .start() + .await + { + tracing::error!(error = %error, "served indexer query endpoint failed"); + } + }); + Ok(()) +} + +fn start_record_endpoint( + component: Component, + binding: Arc>>, +) -> Result<()> { + let engine = Arc::new(ServedIndexerRecordEngine { binding }); + let ingress = Ingress::< + SingleIn, + ManyOut, + >::for_engine(engine)?; + tokio::spawn(async move { + if let Err(error) = component + .endpoint(KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT) + .endpoint_builder() + .handler(ingress) + .graceful_shutdown(true) + .start() + .await + { + tracing::error!(error = %error, "served indexer record endpoint failed"); + } + }); + Ok(()) +} + +struct ServedIndexerQueryEngine { + binding: Arc>>, +} + +#[async_trait] +impl AsyncEngine, ManyOut, anyhow::Error> + for ServedIndexerQueryEngine +{ + async fn generate( + &self, + request: SingleIn, + ) -> Result> { + let (request, ctx) = request.into_parts(); + let binding = self.binding.read().clone(); + + let response = match binding { + Some(binding) if binding.model_name == request.model_name => { + match binding.indexer.find_matches(request.block_hashes).await { + Ok(scores) => IndexerQueryResponse::Scores(scores.into()), + Err(error) => IndexerQueryResponse::Error(error.to_string()), + } + } + Some(binding) => IndexerQueryResponse::Error(format!( + "served indexer model mismatch: requested={}, served={}", + request.model_name, binding.model_name + )), + None => IndexerQueryResponse::Error("served indexer is not registered".to_string()), + }; + + Ok(ResponseStream::new( + Box::pin(stream::iter(vec![response])), + ctx.context(), + )) + } +} + +struct ServedIndexerRecordEngine { + binding: Arc>>, +} + +#[async_trait] +impl + AsyncEngine< + SingleIn, + ManyOut, + anyhow::Error, + > for ServedIndexerRecordEngine +{ + async fn generate( + &self, + request: SingleIn, + ) -> Result> { + let (request, ctx) = request.into_parts(); + let binding = self.binding.read().clone(); + + let response = match binding { + Some(binding) if binding.model_name == request.model_name => match binding + .indexer + .record_hashed_routing_decision( + request.worker, + request.local_hashes, + request.sequence_hashes, + ) + .await + { + Ok(()) => IndexerRecordRoutingDecisionResponse::Recorded, + Err(error) => IndexerRecordRoutingDecisionResponse::Error(error.to_string()), + }, + Some(binding) => IndexerRecordRoutingDecisionResponse::Error(format!( + "served indexer model mismatch: requested={}, served={}", + request.model_name, binding.model_name + )), + None => IndexerRecordRoutingDecisionResponse::Error( + "served indexer is not registered".to_string(), + ), + }; + + Ok(ResponseStream::new( + Box::pin(stream::iter(vec![response])), + ctx.context(), + )) + } +} + +fn service_key(component: &Component) -> ServiceKey { + ( + component.drt().connection_id(), + component.namespace().name(), + component.name().to_string(), + ) +} diff --git a/lib/llm/src/kv_router/subscriber.rs b/lib/llm/src/kv_router/indexer/subscriber.rs similarity index 98% rename from lib/llm/src/kv_router/subscriber.rs rename to lib/llm/src/kv_router/indexer/subscriber.rs index 329fe98cc1f4..f9fdae9d8d08 100644 --- a/lib/llm/src/kv_router/subscriber.rs +++ b/lib/llm/src/kv_router/indexer/subscriber.rs @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use crate::kv_router::{Indexer, worker_query::WorkerQueryClient}; +use super::{Indexer, worker_query::WorkerQueryClient}; use anyhow::Result; use dynamo_kv_router::{ config::KvRouterConfig, diff --git a/lib/llm/src/kv_router/worker_query.rs b/lib/llm/src/kv_router/indexer/worker_query.rs similarity index 99% rename from lib/llm/src/kv_router/worker_query.rs rename to lib/llm/src/kv_router/indexer/worker_query.rs index cc0efeb30541..ae6cb9da3e27 100644 --- a/lib/llm/src/kv_router/worker_query.rs +++ b/lib/llm/src/kv_router/indexer/worker_query.rs @@ -20,7 +20,7 @@ use dynamo_runtime::traits::DistributedRuntimeProvider; use futures::StreamExt; use tokio::sync::{Mutex, Semaphore}; -use crate::kv_router::Indexer; +use super::Indexer; use crate::kv_router::worker_kv_indexer_query_endpoint; use dynamo_kv_router::{ indexer::{LocalKvIndexer, WorkerKvQueryRequest, WorkerKvQueryResponse}, diff --git a/lib/llm/src/kv_router/publisher/mod.rs b/lib/llm/src/kv_router/publisher/mod.rs index c97a69104361..5ef0d4a21c8c 100644 --- a/lib/llm/src/kv_router/publisher/mod.rs +++ b/lib/llm/src/kv_router/publisher/mod.rs @@ -24,7 +24,7 @@ use dynamo_runtime::{ }; use crate::kv_router::{ - KV_EVENT_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE, worker_query::start_worker_kv_query_endpoint, + KV_EVENT_SUBJECT, WORKER_KV_INDEXER_BUFFER_SIZE, indexer::start_worker_kv_query_endpoint, }; mod event_processor; diff --git a/tests/router/common.py b/tests/router/common.py index bdbdb400b6b7..9fd60e5bdc13 100644 --- a/tests/router/common.py +++ b/tests/router/common.py @@ -320,6 +320,236 @@ async def verify_consumer_lifecycle(): kv_router.__exit__(None, None, None) +def _test_remote_indexer_decisions( + engine_workers, + model_name: str, + block_size: int = 8, + use_kv_events: bool = True, + test_dp_rank: bool = True, + request_plane: str = "nats", + store_backend: str = "etcd", +): + """Validate remote-indexer-backed routing decisions using direct KvRouter instances.""" + + async def wait_for_worker_ids(endpoint, expected_num_workers: int) -> list[int]: + client = await endpoint.client() + + for _ in range(120): + worker_ids = sorted(set(client.instance_ids())) + if len(worker_ids) >= expected_num_workers: + return worker_ids + await asyncio.sleep(1) + + raise TimeoutError("Timed out waiting for backend worker IDs") + + async def wait_for_served_indexer( + runtime, + expected_query_instances: int, + expected_record_instances: int, + ) -> None: + query_endpoint = runtime.endpoint( + f"{engine_workers.namespace}.{engine_workers.component_name}.kv_indexer_query" + ) + query_client = await query_endpoint.client() + record_client = None + if expected_record_instances > 0: + record_endpoint = runtime.endpoint( + f"{engine_workers.namespace}.{engine_workers.component_name}.kv_indexer_record_routing_decision" + ) + record_client = await record_endpoint.client() + + for _ in range(120): + query_ids = set(query_client.instance_ids()) + record_ids = set(record_client.instance_ids()) if record_client else set() + + if use_kv_events: + if len(query_ids) >= expected_query_instances and not record_ids: + return + elif ( + len(query_ids) == expected_query_instances + and len(record_ids) == expected_record_instances + and query_ids == record_ids + ): + return + + await asyncio.sleep(0.5) + + raise TimeoutError("Timed out waiting for served indexer endpoints to register") + + async def test_sync(): + endpoint_path = ( + f"{engine_workers.namespace}.{engine_workers.component_name}.generate" + ) + expected_num_instances = engine_workers.num_workers + + async def make_router(*, serve_indexer: bool, use_remote_indexer: bool): + kv_router_config = KvRouterConfig( + router_snapshot_threshold=20, + use_kv_events=use_kv_events, + router_track_prefill_tokens=True, + serve_indexer=serve_indexer, + use_remote_indexer=use_remote_indexer, + ) + last_error: Exception | None = None + for _ in range(60): + runtime = get_runtime( + store_backend=store_backend, request_plane=request_plane + ) + endpoint = runtime.endpoint(endpoint_path) + try: + with min_initial_workers_env(expected_num_instances): + kv_router = KvRouter( + endpoint=endpoint, + block_size=block_size, + kv_router_config=kv_router_config, + ) + return runtime, endpoint, kv_router + except Exception as error: + last_error = error + if not (serve_indexer or use_remote_indexer): + raise + del endpoint + del runtime + await asyncio.sleep(1.0) + + raise AssertionError( + "Timed out waiting for model discovery before creating remote-indexer router" + ) from last_error + + serving_runtimes = [] + serving_endpoints = [] + serving_routers = [] + + runtime_a, endpoint_a, router_a = await make_router( + serve_indexer=True, use_remote_indexer=False + ) + serving_runtimes.append(runtime_a) + serving_endpoints.append(endpoint_a) + serving_routers.append(router_a) + + if use_kv_events: + runtime_b, endpoint_b, router_b = await make_router( + serve_indexer=True, use_remote_indexer=False + ) + serving_runtimes.append(runtime_b) + serving_endpoints.append(endpoint_b) + serving_routers.append(router_b) + + await wait_for_served_indexer( + serving_runtimes[0], + expected_query_instances=len(serving_routers), + expected_record_instances=0 if use_kv_events else 1, + ) + + _, consumer_endpoint, consumer_router = await make_router( + serve_indexer=False, use_remote_indexer=True + ) + + worker_ids = await wait_for_worker_ids( + serving_endpoints[0], expected_num_instances + ) + if len(worker_ids) >= 2: + worker_a_id = worker_ids[0] + worker_b_id = worker_ids[1] + elif len(worker_ids) == 1 and test_dp_rank: + worker_a_id = worker_ids[0] + worker_b_id = worker_ids[0] + else: + raise AssertionError( + f"Need at least 2 routing targets but got {len(worker_ids)} worker(s) " + f"with test_dp_rank={test_dp_rank}" + ) + + dp_rank_a = 0 if test_dp_rank else None + dp_rank_b = 1 if test_dp_rank else None + logger.info( + "Remote-indexer routing targets: worker_a=%s/%s worker_b=%s/%s", + worker_a_id, + dp_rank_a, + worker_b_id, + dp_rank_b, + ) + + blocks = [ + [random.randint(1, 10000) for _ in range(block_size)] for _ in range(7) + ] + A, B, C, D, E, F, G = blocks + request_specs = [ + (serving_routers[0], A + B, worker_a_id, dp_rank_a, 0.1), + (serving_routers[0], A + C + D, worker_a_id, dp_rank_a, 0.1), + (serving_routers[-1], A + C + E, worker_b_id, dp_rank_b, 2.0), + (consumer_router, A + C + D + F, None, None, 2.0), + (consumer_router, A + C + G, None, None, 2.0), + ] + + responses: list[dict[str, Optional[int]]] = [] + for i, ( + kv_router, + token_ids, + forced_worker_id, + forced_dp_rank, + sleep_after, + ) in enumerate(request_specs, start=1): + logger.info( + "Sending remote-indexer request %s/5%s%s", + i, + ( + f" forced_worker_id={forced_worker_id}" + if forced_worker_id is not None + else "" + ), + ( + f" forced_dp_rank={forced_dp_rank}" + if forced_dp_rank is not None + else "" + ), + ) + result = await send_request_via_python_kv_router( + kv_python_router=kv_router, + model_name=model_name, + token_ids=token_ids, + initial_wait=1.0, + max_retries=8, + stop_conditions={ + "ignore_eos": True, + "max_tokens": 2, + }, + worker_id=forced_worker_id, + dp_rank=forced_dp_rank, + return_worker_ids=True, + ) + assert isinstance(result, dict), f"Expected dict result, got {type(result)}" + responses.append(result) + if sleep_after > 0: + await asyncio.sleep(sleep_after) + + req4 = responses[3] + assert req4["prefill_worker_id"] == worker_a_id, ( + f"Request 4: expected prefill_worker_id={worker_a_id} (longest prefix match), " + f"got {req4['prefill_worker_id']}" + ) + if test_dp_rank: + assert req4["prefill_dp_rank"] == dp_rank_a, ( + f"Request 4: expected prefill_dp_rank={dp_rank_a} " + f"(longest prefix match), got {req4['prefill_dp_rank']}" + ) + + req5 = responses[4] + assert req5["prefill_worker_id"] == worker_b_id, ( + f"Request 5: expected prefill_worker_id={worker_b_id} (tiebreak by smaller tree), " + f"got {req5['prefill_worker_id']}" + ) + if test_dp_rank: + assert req5["prefill_dp_rank"] == dp_rank_b, ( + f"Request 5: expected prefill_dp_rank={dp_rank_b} " + f"(tiebreak by smaller tree), got {req5['prefill_dp_rank']}" + ) + + await wait_for_worker_ids(consumer_endpoint, expected_num_instances) + + asyncio.run(test_sync()) + + def _test_python_router_bindings( engine_workers, endpoint, diff --git a/tests/router/router_process.py b/tests/router/router_process.py index 67f38f97e020..c52826fbd2ba 100644 --- a/tests/router/router_process.py +++ b/tests/router/router_process.py @@ -30,6 +30,8 @@ def __init__( router_mode: str = "kv", min_initial_workers: int | None = None, router_aic_config: dict[str, str | int] | None = None, + serve_indexer: bool = False, + use_remote_indexer: bool = False, ): command = [ "python3", @@ -65,6 +67,12 @@ def __init__( if durable_kv_events: command.append("--router-durable-kv-events") + if serve_indexer: + command.append("--serve-indexer") + + if use_remote_indexer: + command.append("--use-remote-indexer") + if router_aic_config is not None: command.extend( [ diff --git a/tests/router/test_router_e2e_with_mockers.py b/tests/router/test_router_e2e_with_mockers.py index e2e2862628b3..5f5e421a426d 100644 --- a/tests/router/test_router_e2e_with_mockers.py +++ b/tests/router/test_router_e2e_with_mockers.py @@ -21,6 +21,7 @@ _test_busy_threshold_endpoint, _test_disagg_direct_mode, _test_python_router_bindings, + _test_remote_indexer_decisions, _test_router_basic, _test_router_decisions, _test_router_decisions_disagg, @@ -1014,14 +1015,28 @@ def test_query_instance_id_returns_worker_and_tokens( @pytest.mark.timeout(300) # bumped for xdist contention (was 29s; ~9.55s serial avg) @pytest.mark.parametrize("request_plane", ["tcp"], indirect=True) @pytest.mark.parametrize( - "durable_kv_events,use_kv_events,zmq_kv_events", + "durable_kv_events,use_kv_events,zmq_kv_events,use_remote_indexer", [ - (True, True, False), # JetStream mode with KV events - (False, True, False), # NATS Core mode with local indexer (default) - (False, False, False), # Approximate mode (--no-kv-events) - no KV events - (False, True, True), # ZMQ mode: mocker → ZMQ PUB → relay → NATS + (True, True, False, False), # JetStream mode with KV events + (False, True, False, False), # NATS Core mode with local indexer (default) + (False, True, False, True), # NATS Core mode with a served remote indexer + (False, False, False, False), # Approximate mode (--no-kv-events) + ( + False, + False, + False, + True, + ), # Approximate mode with a singleton served remote indexer + (False, True, True, False), # ZMQ mode: mocker → ZMQ PUB → relay → NATS + ], + ids=[ + "jetstream", + "nats_core", + "nats_core_remote", + "no_kv_events", + "no_kv_events_remote", + "zmq", ], - ids=["jetstream", "nats_core", "no_kv_events", "zmq"], indirect=["durable_kv_events"], ) def test_router_decisions( @@ -1032,18 +1047,24 @@ def test_router_decisions( use_kv_events, request_plane, zmq_kv_events, + use_remote_indexer, ): """Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes. Parameterized to test: - JetStream mode: KV events via NATS JetStream (durable) - NATS Core mode (default): KV events via NATS Core with local indexer on workers + - NATS Core mode with a served remote indexer - Approximate mode (--no-kv-events): No KV events, router predicts cache state based on routing decisions with TTL-based expiration and pruning + - Approximate mode with a singleton served remote indexer """ # runtime_services_dynamic_ports handles NATS and etcd startup logger.info( - f"Starting test router decisions: durable_kv_events={durable_kv_events}, use_kv_events={use_kv_events}" + "Starting test router decisions: durable_kv_events=%s, use_kv_events=%s, use_remote_indexer=%s", + durable_kv_events, + use_kv_events, + use_remote_indexer, ) # Create mocker args dictionary with dp_size=4 @@ -1066,10 +1087,18 @@ def test_router_decisions( ) as mockers: logger.info(f"All mockers using endpoint: {mockers.endpoint}") - # Initialize mockers - # Get runtime and create endpoint + if use_remote_indexer: + _test_remote_indexer_decisions( + mockers, + MODEL_NAME, + block_size=8, + use_kv_events=use_kv_events, + test_dp_rank=True, + request_plane=request_plane, + ) + return + runtime = get_runtime(request_plane=request_plane) - # Use the namespace from the mockers endpoint = runtime.endpoint(f"{mockers.namespace}.mocker.generate") _test_router_decisions( From 18db613e98410201cfcdce06b60c960840b37a0e Mon Sep 17 00:00:00 2001 From: PeaBrane Date: Tue, 7 Apr 2026 17:50:55 -0700 Subject: [PATCH 2/4] fix ci after indexer runtime eviction Signed-off-by: PeaBrane --- container/templates/wheel_builder.Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/container/templates/wheel_builder.Dockerfile b/container/templates/wheel_builder.Dockerfile index 65ecff820d7c..77672cea8b42 100644 --- a/container/templates/wheel_builder.Dockerfile +++ b/container/templates/wheel_builder.Dockerfile @@ -439,9 +439,9 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ uv build --wheel --out-dir /opt/dynamo/dist && \ cd /opt/dynamo/lib/bindings/python && \ if [ "$ENABLE_MEDIA_FFMPEG" = "true" ]; then \ - maturin build --release --features "media-ffmpeg,kv-indexer,kv-indexer-runtime" --out /opt/dynamo/dist; \ + maturin build --release --features "media-ffmpeg,kv-indexer" --out /opt/dynamo/dist; \ else \ - maturin build --release --features "kv-indexer,kv-indexer-runtime" --out /opt/dynamo/dist; \ + maturin build --release --features "kv-indexer" --out /opt/dynamo/dist; \ fi && \ /tmp/use-sccache.sh show-stats "Dynamo Runtime" From d8b1194ed02bc5d80967d736a57f89b0cbba0ccb Mon Sep 17 00:00:00 2001 From: PeaBrane Date: Tue, 7 Apr 2026 19:55:45 -0700 Subject: [PATCH 3/4] fix remote indexer startup and ghost workers Signed-off-by: PeaBrane --- lib/kv-router/src/indexer/sharded.rs | 44 +++--- lib/kv-router/src/indexer/tests.rs | 32 ++++ lib/llm/src/kv_router.rs | 33 +++-- lib/llm/src/kv_router/indexer/remote.rs | 189 +++++++++++------------- tests/router/common.py | 14 +- 5 files changed, 159 insertions(+), 153 deletions(-) diff --git a/lib/kv-router/src/indexer/sharded.rs b/lib/kv-router/src/indexer/sharded.rs index dceae0645457..0850637df493 100644 --- a/lib/kv-router/src/indexer/sharded.rs +++ b/lib/kv-router/src/indexer/sharded.rs @@ -353,6 +353,21 @@ impl KvIndexerSharded { ) -> Self { Self::new_with_frequency(token, num_shards, None, kv_block_size, metrics, None) } + + fn shard_for_worker(&self, worker_id: WorkerId) -> usize { + *self.worker_assignments.entry(worker_id).or_insert_with(|| { + let worker_counts = self.worker_counts.lock().unwrap(); + let selected_shard = worker_counts + .iter() + .enumerate() + .min_by_key(|&(_, value)| value) + .unwrap() + .0; + drop(worker_counts); + self.worker_counts.lock().unwrap()[selected_shard] += 1; + selected_shard + }) + } } #[async_trait] @@ -439,26 +454,8 @@ impl KvIndexerInterface for KvIndexerSharded { } async fn apply_event(&self, event: RouterEvent) { - let shard = self - .worker_assignments - .entry(event.worker_id) - .or_insert_with(|| { - // Get the shard with the smallest amount of workers. - let worker_counts = self.worker_counts.lock().unwrap(); - let selected_shard = worker_counts - .iter() - .enumerate() - .min_by_key(|&(_, value)| value) - .unwrap() - .0; - drop(worker_counts); - - // Increment the count for this shard - self.worker_counts.lock().unwrap()[selected_shard] += 1; - selected_shard - }); - - self.event_tx[*shard].send(event).await.unwrap(); + let shard = self.shard_for_worker(event.worker_id); + self.event_tx[shard].send(event).await.unwrap(); } async fn remove_worker(&self, worker: WorkerId) { @@ -557,12 +554,7 @@ impl KvIndexerSharded { local_hashes: Vec, sequence_hashes: Vec, ) -> Result<(), KvRouterError> { - // Route to the appropriate shard based on worker assignment - let shard_idx = self - .worker_assignments - .get(&worker.worker_id) - .map(|shard_idx| *shard_idx) - .unwrap_or_default(); + let shard_idx = self.shard_for_worker(worker.worker_id); self.routing_tx[shard_idx] .send(RoutingDecisionRequest { diff --git a/lib/kv-router/src/indexer/tests.rs b/lib/kv-router/src/indexer/tests.rs index 97bbbfcb10cd..0f8b5fc08b21 100644 --- a/lib/kv-router/src/indexer/tests.rs +++ b/lib/kv-router/src/indexer/tests.rs @@ -13,6 +13,7 @@ use super::concurrent_radix_tree::ConcurrentRadixTree; use super::concurrent_radix_tree_compressed::ConcurrentRadixTreeCompressed; use super::positional::PositionalIndexer; use super::*; +use crate::indexer::pruning::PruneConfig; use crate::protocols::*; use crate::test_utils::{remove_event, router_event, stored_blocks_with_sequence_hashes}; @@ -1889,6 +1890,37 @@ fn make_tree_indexer_with_frequency( } } +#[tokio::test] +async fn test_sharded_routing_decision_assigns_first_seen_worker() { + let token = CancellationToken::new(); + let metrics = Arc::new(KvIndexerMetrics::new_unregistered()); + let index = KvIndexerSharded::new_with_frequency( + token, + 4, + Some(Duration::from_secs(60)), + 32, + metrics, + Some(PruneConfig::default()), + ); + let worker = WorkerWithDpRank::new(42, 0); + let local_hashes = vec![LocalBlockHash(11), LocalBlockHash(22)]; + let sequence_hashes = compute_seq_hash_for_block(&local_hashes); + + index + .process_routing_decision_with_hashes(worker, local_hashes.clone(), sequence_hashes) + .await + .unwrap(); + flush_and_settle(&index).await; + + assert_score(&index, &[11, 22], worker, 2).await; + + index.remove_worker(worker.worker_id).await; + flush_and_settle(&index).await; + + let scores = query_scores(&index, &[11, 22]).await; + assert!(!scores.scores.contains_key(&worker)); +} + mod tree_specific_tests { use super::*; use rstest_reuse::apply; diff --git a/lib/llm/src/kv_router.rs b/lib/llm/src/kv_router.rs index fcbb65530b31..9649fbf7b549 100644 --- a/lib/llm/src/kv_router.rs +++ b/lib/llm/src/kv_router.rs @@ -147,22 +147,6 @@ where model_name.as_deref(), ) .await?; - let served_indexer_handle = if kv_router_config.serve_indexer { - let model_name = model_name.ok_or_else(|| { - anyhow::anyhow!("model_name is required when serve_indexer is configured") - })?; - Some( - ensure_served_indexer_service( - component.clone(), - ServedIndexerMode::from_use_kv_events(kv_router_config.use_kv_events), - model_name, - indexer.clone(), - ) - .await?, - ) - } else { - None - }; if min_initial_workers > 0 && !kv_router_config.skip_initial_worker_wait { let mut startup_watch = workers_with_configs.clone(); @@ -202,6 +186,23 @@ where ); } + let served_indexer_handle = if kv_router_config.serve_indexer { + let model_name = model_name.clone().ok_or_else(|| { + anyhow::anyhow!("model_name is required when serve_indexer is configured") + })?; + Some( + ensure_served_indexer_service( + component.clone(), + ServedIndexerMode::from_use_kv_events(kv_router_config.use_kv_events), + model_name, + indexer.clone(), + ) + .await?, + ) + } else { + None + }; + tracing::info!("KV Routing initialized"); Ok(Self { indexer, diff --git a/lib/llm/src/kv_router/indexer/remote.rs b/lib/llm/src/kv_router/indexer/remote.rs index 51e84f001de0..6ab74751017f 100644 --- a/lib/llm/src/kv_router/indexer/remote.rs +++ b/lib/llm/src/kv_router/indexer/remote.rs @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, LazyLock}; use anyhow::Result; @@ -12,7 +12,7 @@ use dynamo_kv_router::indexer::{ KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT, }; use dynamo_kv_router::protocols::{LocalBlockHash, OverlapScores, WorkerWithDpRank}; -use dynamo_runtime::component::Component; +use dynamo_runtime::component::{Client, Component}; use dynamo_runtime::discovery::{DiscoveryInstance, DiscoveryQuery}; use dynamo_runtime::pipeline::{ AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, RouterMode, SingleIn, @@ -29,9 +29,11 @@ use super::Indexer; pub struct RemoteIndexer { query_router: PushRouter, + query_client: Client, record_router: Option< PushRouter, >, + record_client: Client, component: Component, model_name: String, use_kv_events: bool, @@ -47,24 +49,31 @@ impl RemoteIndexer { .endpoint(KV_INDEXER_QUERY_ENDPOINT) .client() .await?; - let query_router = - PushRouter::from_client_no_fault_detection(query_client, RouterMode::RoundRobin) - .await?; + let query_router = PushRouter::from_client_no_fault_detection( + query_client.clone(), + RouterMode::RoundRobin, + ) + .await?; + let record_client = component + .endpoint(KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT) + .client() + .await?; let record_router = if use_kv_events { None } else { - let record_client = component - .endpoint(KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT) - .client() - .await?; Some( - PushRouter::from_client_no_fault_detection(record_client, RouterMode::RoundRobin) - .await?, + PushRouter::from_client_no_fault_detection( + record_client.clone(), + RouterMode::RoundRobin, + ) + .await?, ) }; Ok(Self { query_router, + query_client, record_router, + record_client, component: component.clone(), model_name, use_kv_events, @@ -127,33 +136,8 @@ impl RemoteIndexer { } async fn validate_topology_if_ready(&self) -> Result<()> { - let endpoints = self - .component - .drt() - .discovery() - .list(DiscoveryQuery::ComponentEndpoints { - namespace: self.component.namespace().name(), - component: self.component.name().to_string(), - }) - .await?; - - let mut query_instances = HashSet::new(); - let mut record_instances = HashSet::new(); - - for endpoint in endpoints { - let DiscoveryInstance::Endpoint(instance) = endpoint else { - continue; - }; - match instance.endpoint.as_str() { - KV_INDEXER_QUERY_ENDPOINT => { - query_instances.insert(instance.instance_id); - } - KV_INDEXER_RECORD_ROUTING_DECISION_ENDPOINT => { - record_instances.insert(instance.instance_id); - } - _ => {} - } - } + let query_instances = cached_instance_ids(&self.query_client); + let record_instances = cached_instance_ids(&self.record_client); if query_instances.is_empty() && record_instances.is_empty() { return Ok(()); @@ -189,6 +173,10 @@ impl RemoteIndexer { } } +fn cached_instance_ids(client: &Client) -> HashSet { + client.instance_ids_avail().iter().copied().collect() +} + type ServiceKey = (u64, String, String); static SERVED_INDEXER_SERVICES: LazyLock>> = @@ -218,53 +206,33 @@ impl ServedIndexerMode { } } -#[derive(Clone)] -struct ServedIndexerBinding { - model_name: String, - indexer: Indexer, -} - struct ServedIndexerService { mode: ServedIndexerMode, - binding: Arc>>, + bindings: Arc>>, } impl ServedIndexerService { async fn start(component: Component, mode: ServedIndexerMode) -> Result> { verify_service_topology(&component, mode).await?; - let binding = Arc::new(RwLock::new(None)); - start_query_endpoint(component.clone(), binding.clone())?; + let bindings = Arc::new(RwLock::new(HashMap::new())); + start_query_endpoint(component.clone(), bindings.clone())?; if mode == ServedIndexerMode::Approximate { - start_record_endpoint(component.clone(), binding.clone())?; + start_record_endpoint(component.clone(), bindings.clone())?; } - Ok(Arc::new(Self { mode, binding })) + Ok(Arc::new(Self { mode, bindings })) } } pub struct ServedIndexerHandle { - service_key: ServiceKey, service: Arc, + model_name: String, } impl Drop for ServedIndexerHandle { fn drop(&mut self) { - { - let mut binding = self.service.binding.write(); - *binding = None; - } - - if Arc::strong_count(&self.service) != 2 { - return; - } - - let should_remove = SERVED_INDEXER_SERVICES - .get(&self.service_key) - .is_some_and(|service| Arc::ptr_eq(service.value(), &self.service)); - if should_remove { - SERVED_INDEXER_SERVICES.remove(&self.service_key); - } + self.service.bindings.write().remove(&self.model_name); } } @@ -274,7 +242,6 @@ pub async fn ensure_served_indexer_service( model_name: String, indexer: Indexer, ) -> Result { - let service_key = service_key(&component); let service = get_or_start_service(component.clone(), mode).await?; if service.mode != mode { @@ -288,24 +255,22 @@ pub async fn ensure_served_indexer_service( } { - let mut binding = service.binding.write(); - if binding.is_some() { + let mut bindings = service.bindings.write(); + if bindings.contains_key(&model_name) { anyhow::bail!( - "served indexer is already registered under {}.{}", + "served indexer for model {} is already registered under {}.{}", + model_name, component.namespace().name(), component.name(), ); } - *binding = Some(ServedIndexerBinding { - model_name: model_name.clone(), - indexer, - }); + bindings.insert(model_name.clone(), indexer); } Ok(ServedIndexerHandle { - service_key, service, + model_name, }) } @@ -381,9 +346,9 @@ async fn verify_service_topology(component: &Component, mode: ServedIndexerMode) fn start_query_endpoint( component: Component, - binding: Arc>>, + bindings: Arc>>, ) -> Result<()> { - let engine = Arc::new(ServedIndexerQueryEngine { binding }); + let engine = Arc::new(ServedIndexerQueryEngine { bindings }); let ingress = Ingress::, ManyOut>::for_engine( engine, @@ -405,9 +370,9 @@ fn start_query_endpoint( fn start_record_endpoint( component: Component, - binding: Arc>>, + bindings: Arc>>, ) -> Result<()> { - let engine = Arc::new(ServedIndexerRecordEngine { binding }); + let engine = Arc::new(ServedIndexerRecordEngine { bindings }); let ingress = Ingress::< SingleIn, ManyOut, @@ -428,7 +393,7 @@ fn start_record_endpoint( } struct ServedIndexerQueryEngine { - binding: Arc>>, + bindings: Arc>>, } #[async_trait] @@ -440,20 +405,17 @@ impl AsyncEngine, ManyOut, a request: SingleIn, ) -> Result> { let (request, ctx) = request.into_parts(); - let binding = self.binding.read().clone(); - - let response = match binding { - Some(binding) if binding.model_name == request.model_name => { - match binding.indexer.find_matches(request.block_hashes).await { - Ok(scores) => IndexerQueryResponse::Scores(scores.into()), - Err(error) => IndexerQueryResponse::Error(error.to_string()), - } - } - Some(binding) => IndexerQueryResponse::Error(format!( - "served indexer model mismatch: requested={}, served={}", - request.model_name, binding.model_name + let indexer = self.bindings.read().get(&request.model_name).cloned(); + + let response = match indexer { + Some(indexer) => match indexer.find_matches(request.block_hashes).await { + Ok(scores) => IndexerQueryResponse::Scores(scores.into()), + Err(error) => IndexerQueryResponse::Error(error.to_string()), + }, + None => IndexerQueryResponse::Error(format!( + "served indexer model {} is not registered", + request.model_name )), - None => IndexerQueryResponse::Error("served indexer is not registered".to_string()), }; Ok(ResponseStream::new( @@ -464,7 +426,7 @@ impl AsyncEngine, ManyOut, a } struct ServedIndexerRecordEngine { - binding: Arc>>, + bindings: Arc>>, } #[async_trait] @@ -480,11 +442,10 @@ impl request: SingleIn, ) -> Result> { let (request, ctx) = request.into_parts(); - let binding = self.binding.read().clone(); + let indexer = self.bindings.read().get(&request.model_name).cloned(); - let response = match binding { - Some(binding) if binding.model_name == request.model_name => match binding - .indexer + let response = match indexer { + Some(indexer) => match indexer .record_hashed_routing_decision( request.worker, request.local_hashes, @@ -495,13 +456,10 @@ impl Ok(()) => IndexerRecordRoutingDecisionResponse::Recorded, Err(error) => IndexerRecordRoutingDecisionResponse::Error(error.to_string()), }, - Some(binding) => IndexerRecordRoutingDecisionResponse::Error(format!( - "served indexer model mismatch: requested={}, served={}", - request.model_name, binding.model_name + None => IndexerRecordRoutingDecisionResponse::Error(format!( + "served indexer model {} is not registered", + request.model_name )), - None => IndexerRecordRoutingDecisionResponse::Error( - "served indexer is not registered".to_string(), - ), }; Ok(ResponseStream::new( @@ -518,3 +476,28 @@ fn service_key(component: &Component) -> ServiceKey { component.name().to_string(), ) } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn query_engine_supports_multiple_model_bindings() { + let bindings = Arc::new(RwLock::new(HashMap::from([ + ("model-a".to_string(), Indexer::None), + ("model-b".to_string(), Indexer::None), + ]))); + let engine = ServedIndexerQueryEngine { bindings }; + let request = SingleIn::new(IndexerQueryRequest { + model_name: "model-b".to_string(), + block_hashes: vec![LocalBlockHash(1)], + }); + + let mut stream = engine.generate(request).await.unwrap(); + + assert!(matches!( + stream.next().await, + Some(IndexerQueryResponse::Scores(_)) + )); + } +} diff --git a/tests/router/common.py b/tests/router/common.py index 9fd60e5bdc13..cc0d83c6020c 100644 --- a/tests/router/common.py +++ b/tests/router/common.py @@ -351,19 +351,17 @@ async def wait_for_served_indexer( f"{engine_workers.namespace}.{engine_workers.component_name}.kv_indexer_query" ) query_client = await query_endpoint.client() - record_client = None - if expected_record_instances > 0: - record_endpoint = runtime.endpoint( - f"{engine_workers.namespace}.{engine_workers.component_name}.kv_indexer_record_routing_decision" - ) - record_client = await record_endpoint.client() + record_endpoint = runtime.endpoint( + f"{engine_workers.namespace}.{engine_workers.component_name}.kv_indexer_record_routing_decision" + ) + record_client = await record_endpoint.client() for _ in range(120): query_ids = set(query_client.instance_ids()) - record_ids = set(record_client.instance_ids()) if record_client else set() + record_ids = set(record_client.instance_ids()) if use_kv_events: - if len(query_ids) >= expected_query_instances and not record_ids: + if len(query_ids) >= expected_query_instances and len(record_ids) == 0: return elif ( len(query_ids) == expected_query_instances From 2da789b713d2243396ae44bad405899ceacbd3af Mon Sep 17 00:00:00 2001 From: PeaBrane Date: Tue, 7 Apr 2026 21:29:04 -0700 Subject: [PATCH 4/4] count remote indexer failures before they ghost us Signed-off-by: PeaBrane --- .../python/src/dynamo/prometheus_names.py | 4 ++ lib/llm/src/kv_router/indexer/remote.rs | 43 ++++++++++++---- lib/llm/src/kv_router/metrics.rs | 50 ++++++++++++++++++- lib/runtime/src/metrics/prometheus_names.rs | 8 +++ 4 files changed, 95 insertions(+), 10 deletions(-) diff --git a/lib/bindings/python/src/dynamo/prometheus_names.py b/lib/bindings/python/src/dynamo/prometheus_names.py index b2af11ec7f4c..095de5452409 100644 --- a/lib/bindings/python/src/dynamo/prometheus_names.py +++ b/lib/bindings/python/src/dynamo/prometheus_names.py @@ -287,6 +287,10 @@ class router: # Total number of requests processed by the router REQUESTS_TOTAL = "router_requests_total" + # Total number of remote indexer overlap queries that failed + REMOTE_INDEXER_QUERY_FAILURES_TOTAL = "router_remote_indexer_query_failures_total" + # Total number of remote indexer routing-decision writes that failed + REMOTE_INDEXER_WRITE_FAILURES_TOTAL = "router_remote_indexer_write_failures_total" # Time to first token observed at the router (seconds) TIME_TO_FIRST_TOKEN_SECONDS = "router_time_to_first_token_seconds" # Average inter-token latency observed at the router (seconds) diff --git a/lib/llm/src/kv_router/indexer/remote.rs b/lib/llm/src/kv_router/indexer/remote.rs index 6ab74751017f..9968ed15dbf5 100644 --- a/lib/llm/src/kv_router/indexer/remote.rs +++ b/lib/llm/src/kv_router/indexer/remote.rs @@ -25,6 +25,8 @@ use futures::StreamExt; use parking_lot::RwLock; use tokio::sync::Mutex; +use crate::kv_router::metrics::RemoteIndexerMetrics; + use super::Indexer; pub struct RemoteIndexer { @@ -36,6 +38,7 @@ pub struct RemoteIndexer { record_client: Client, component: Component, model_name: String, + metrics: Arc, use_kv_events: bool, } @@ -69,6 +72,7 @@ impl RemoteIndexer { .await?, ) }; + let metrics = RemoteIndexerMetrics::from_component(component); Ok(Self { query_router, query_client, @@ -76,6 +80,7 @@ impl RemoteIndexer { record_client, component: component.clone(), model_name, + metrics, use_kv_events, }) } @@ -84,7 +89,9 @@ impl RemoteIndexer { &self, block_hashes: Vec, ) -> Result { - self.validate_topology_if_ready().await?; + self.validate_topology_if_ready().await.inspect_err(|_| { + self.metrics.increment_query_failures(); + })?; let request = IndexerQueryRequest { model_name: self.model_name.clone(), @@ -93,14 +100,21 @@ impl RemoteIndexer { let mut stream: ManyOut = self .query_router .round_robin(SingleIn::new(request)) - .await?; + .await + .inspect_err(|_| { + self.metrics.increment_query_failures(); + })?; match stream.next().await { Some(IndexerQueryResponse::Scores(scores)) => Ok(scores.into()), Some(IndexerQueryResponse::Error(msg)) => { + self.metrics.increment_query_failures(); Err(anyhow::anyhow!("Remote indexer error: {}", msg)) } - None => Err(anyhow::anyhow!("Remote indexer returned empty response")), + None => { + self.metrics.increment_query_failures(); + Err(anyhow::anyhow!("Remote indexer returned empty response")) + } } } @@ -110,9 +124,12 @@ impl RemoteIndexer { local_hashes: Vec, sequence_hashes: Vec, ) -> Result<()> { - self.validate_topology_if_ready().await?; + self.validate_topology_if_ready().await.inspect_err(|_| { + self.metrics.increment_write_failures(); + })?; let record_router = self.record_router.as_ref().ok_or_else(|| { + self.metrics.increment_write_failures(); anyhow::anyhow!("remote approximate indexer is not configured for writes") })?; let request = IndexerRecordRoutingDecisionRequest { @@ -121,17 +138,25 @@ impl RemoteIndexer { local_hashes, sequence_hashes, }; - let mut stream: ManyOut = - record_router.round_robin(SingleIn::new(request)).await?; + let mut stream: ManyOut = record_router + .round_robin(SingleIn::new(request)) + .await + .inspect_err(|_| { + self.metrics.increment_write_failures(); + })?; match stream.next().await { Some(IndexerRecordRoutingDecisionResponse::Recorded) => Ok(()), Some(IndexerRecordRoutingDecisionResponse::Error(msg)) => { + self.metrics.increment_write_failures(); Err(anyhow::anyhow!("Remote indexer write error: {}", msg)) } - None => Err(anyhow::anyhow!( - "Remote indexer returned empty write response" - )), + None => { + self.metrics.increment_write_failures(); + Err(anyhow::anyhow!( + "Remote indexer returned empty write response" + )) + } } } diff --git a/lib/llm/src/kv_router/metrics.rs b/lib/llm/src/kv_router/metrics.rs index 4600276e2971..c1eeb2d6ec62 100644 --- a/lib/llm/src/kv_router/metrics.rs +++ b/lib/llm/src/kv_router/metrics.rs @@ -44,7 +44,7 @@ use std::time::Duration; use dynamo_runtime::component::Component; use dynamo_runtime::metrics::MetricsHierarchy; use dynamo_runtime::metrics::prometheus_names::{ - frontend_service, labels, name_prefix, router_request, routing_overhead, + frontend_service, labels, name_prefix, router, router_request, routing_overhead, }; /// Build a router metric name: `"router_" + frontend_service_suffix`. @@ -406,6 +406,54 @@ impl RouterRequestMetrics { } } +pub struct RemoteIndexerMetrics { + pub query_failures_total: prometheus::IntCounter, + pub write_failures_total: prometheus::IntCounter, +} + +static REMOTE_INDEXER_METRICS: OnceLock> = OnceLock::new(); + +impl RemoteIndexerMetrics { + pub fn from_component(component: &Component) -> Arc { + REMOTE_INDEXER_METRICS + .get_or_init(|| { + let instance_id = component.drt().discovery().instance_id(); + let router_id = instance_id.to_string(); + let extra_labels: &[(&str, &str)] = &[(labels::ROUTER_ID, &router_id)]; + + let metrics = component.metrics(); + let query_failures_total = metrics + .create_intcounter( + router::REMOTE_INDEXER_QUERY_FAILURES_TOTAL, + "Total number of remote indexer overlap queries that failed", + extra_labels, + ) + .expect("failed to create router_remote_indexer_query_failures_total"); + let write_failures_total = metrics + .create_intcounter( + router::REMOTE_INDEXER_WRITE_FAILURES_TOTAL, + "Total number of remote indexer routing-decision writes that failed", + extra_labels, + ) + .expect("failed to create router_remote_indexer_write_failures_total"); + + Arc::new(Self { + query_failures_total, + write_failures_total, + }) + }) + .clone() + } + + pub fn increment_query_failures(&self) { + self.query_failures_total.inc(); + } + + pub fn increment_write_failures(&self) { + self.write_failures_total.inc(); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index 0819ef203dd7..c29b7902627e 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -506,6 +506,14 @@ pub mod router { /// Total number of requests processed by the router pub const REQUESTS_TOTAL: &str = "router_requests_total"; + /// Total number of remote indexer overlap queries that failed + pub const REMOTE_INDEXER_QUERY_FAILURES_TOTAL: &str = + "router_remote_indexer_query_failures_total"; + + /// Total number of remote indexer routing-decision writes that failed + pub const REMOTE_INDEXER_WRITE_FAILURES_TOTAL: &str = + "router_remote_indexer_write_failures_total"; + /// Time to first token observed at the router (seconds) pub const TIME_TO_FIRST_TOKEN_SECONDS: &str = "router_time_to_first_token_seconds";