feat(kv-router): split Dynamo-native remote indexer [DYN-2593]#7973
Conversation
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
WalkthroughConfiguration and indexer selection mechanism refactored from string-based Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
lib/kv-router/src/indexer/sharded.rs (1)
553-575:⚠️ Potential issue | 🟠 MajorAssign first-seen workers to a shard instead of defaulting to shard 0.
This method is now a public entrypoint for precomputed-hash routing decisions, but it still falls back to shard 0 without inserting
worker_assignments. If a worker reaches the served approximate-mode path before anyapply_event, every first-seen worker gets pinned to shard 0 and laterremove_worker/remove_worker_dp_rankcalls will miss it entirely.Suggested fix
- let shard_idx = self - .worker_assignments - .get(&worker.worker_id) - .map(|shard_idx| *shard_idx) - .unwrap_or_default(); + let shard_idx = *self + .worker_assignments + .entry(worker.worker_id) + .or_insert_with(|| { + let worker_counts = self.worker_counts.lock().unwrap(); + let selected_shard = worker_counts + .iter() + .enumerate() + .min_by_key(|&(_, value)| value) + .unwrap() + .0; + drop(worker_counts); + self.worker_counts.lock().unwrap()[selected_shard] += 1; + selected_shard + });Based on learnings:
process_routing_decision_internalpreviously relied on a priorapply_eventto assign the worker to a shard, anduse_kv_events=Falsepopulates the tree viaprocess_routing_decision_for_requestinstead of KV event subscription.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/kv-router/src/indexer/sharded.rs` around lines 553 - 575, process_routing_decision_with_hashes currently falls back to shard 0 for first-seen workers causing them to be permanently pinned; change it to choose a consistent shard and record the assignment in worker_assignments when missing. Specifically, look up the shard with entry/or_insert semantics (e.g. self.worker_assignments.entry(worker.worker_id).or_insert(computed_shard)) so you both compute a shard (for example use (worker.worker_id as usize) % self.routing_tx.len() or another existing sharding function) and store it in worker_assignments before calling self.routing_tx[shard_idx].send(...), ensuring future remove_worker/remove_worker_dp_rank calls find the worker.lib/bindings/python/rust/llm/entrypoint.rs (1)
129-181:⚠️ Potential issue | 🟠 MajorValidate the new cross-field invariants before returning the Python config.
This constructor still exposes unchecked
RsKvRouterConfigs, so Python can createuse_remote_indexer=True, serve_indexer=Trueorserve_indexer=True, overlap_score_weight=0.0and only fail later when the router is built.🐛 Minimal fix sketch
- ) -> Self { - KvRouterConfig { + ) -> PyResult<Self> { + if use_remote_indexer && serve_indexer { + return Err(PyValueError::new_err( + "use_remote_indexer and serve_indexer are mutually exclusive", + )); + } + if serve_indexer && overlap_score_weight == 0.0 { + return Err(PyValueError::new_err( + "serve_indexer requires overlap_score_weight > 0", + )); + } + + Ok(KvRouterConfig { inner: RsKvRouterConfig {- }, - } + }, + }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/bindings/python/rust/llm/entrypoint.rs` around lines 129 - 181, The constructor KvRouterConfig::new must enforce cross-field invariants before returning the Python-exposed config: validate combinations like use_remote_indexer && serve_indexer (disallowed) and serve_indexer && overlap_score_weight == 0.0 (disallowed) and any other cross-field constraints; perform these checks either before constructing RsKvRouterConfig or immediately after building it and return a clear error (panic with descriptive message or convert to a PyErr) rather than letting invalid configs be created. Locate the new function and RsKvRouterConfig usage and add explicit conditional checks that produce informative errors referencing the offending fields (e.g., use_remote_indexer, serve_indexer, overlap_score_weight, router_* fields) so invalid combinations fail fast during new().lib/kv-router/src/indexer/kv_indexer.rs (1)
529-544:⚠️ Potential issue | 🟠 MajorReject mismatched hash vectors in the new public API.
Now that this method is callable outside the original request path, the 1:1
local_hashes/sequence_hashesinvariant needs to be enforced here. Lines 252-259 truncate to the shorter vector withzip(), but Lines 275-282 still enqueue prune metadata for everysequence_hash, which can desynchronize the trie and prune manager.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/kv-router/src/indexer/kv_indexer.rs` around lines 529 - 544, The public API process_routing_decision_with_hashes must enforce the 1:1 invariant between local_hashes and sequence_hashes: at the start of the function check that local_hashes.len() == sequence_hashes.len() and if not return an Err(KvRouterError::...) (add a specific KvRouterError variant like MismatchedHashVectors if none exists); only when lengths match continue to bundle and send the RoutingDecisionRequest so downstream code that zips the vectors and enqueues prune metadata stays synchronized.
🧹 Nitpick comments (2)
lib/kv-router/src/standalone_indexer/registry.rs (1)
124-125: Remove unusedDiscoveryManagedvariant andWorkerSource::Discoverydead code.The
ListenerControlError::DiscoveryManagedvariant (lines 124–125) is never returned, andWorkerSource::Discoveryis never constructed anywhere in the codebase. The corresponding match arm inserver.rs:297is therefore unreachable.Additionally, the
#[allow(unused_mut)]andmutat lines 631–632 can be removed sinceresultis not mutated.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/kv-router/src/standalone_indexer/registry.rs` around lines 124 - 125, Remove the dead discovery-related code: delete the ListenerControlError::DiscoveryManaged variant in registry.rs and remove the unused WorkerSource::Discovery enum variant so the discovery path is no longer present; also remove the unreachable match arm in server.rs that handles WorkerSource::Discovery. While doing this, drop the unnecessary #[allow(unused_mut)] and the `mut` on `result` (the variable referenced around the prior lines 631–632) since `result` is not mutated. Ensure all references to DiscoveryManaged and WorkerSource::Discovery are removed or updated to compile cleanly.tests/router/router_process.py (1)
33-74: Mirror the frontend flag validation in this helper.Invalid
serve_indexercombinations currently fail only after the child process starts. Raising here makes test failures immediate and easier to diagnose.As per coding guidelines, ensure any test/config validation uses explicit `if/raise` (avoid `assert` for runtime checks).♻️ Suggested preflight checks
def __init__( self, request, block_size: int, frontend_port: int, @@ router_aic_config: dict[str, str | int] | None = None, serve_indexer: bool = False, use_remote_indexer: bool = False, ): + if serve_indexer and router_mode != "kv": + raise ValueError("serve_indexer requires router_mode='kv'") + if serve_indexer and use_remote_indexer: + raise ValueError( + "serve_indexer and use_remote_indexer are mutually exclusive" + ) + command = [ "python3", "-m", "dynamo.frontend",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/router/router_process.py` around lines 33 - 74, The helper that builds the router command should preflight-validate incompatible flag combinations (so invalid serve_indexer/use_remote_indexer combos fail fast) by adding explicit checks and raising a ValueError rather than relying on the child process to error; locate the command-building function (the code that accepts serve_indexer and use_remote_indexer in tests/router/router_process.py) and add if/raise checks (e.g., if serve_indexer and use_remote_indexer: raise ValueError("...") and any other invalid combinations you mirror from the frontend validation) before the command list is extended.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/components/router/router-guide.md`:
- Around line 467-469: The sentence claiming "Each serving router/frontend keeps
its normal local KV event ingestion, gap detection, and worker-query recovery
path" should be narrowed to only apply to event-driven deployments: update that
sentence to explicitly state it describes event-driven mode (when
use_kv_events=True / router KV events enabled). For approximate mode
(--no-router-kv-events or use_kv_events=False) document that routers do not
subscribe to KV events but instead rely on KvIndexer populated via
process_routing_decision_for_request and TTL/pruning state for serving and
recovery; also note the singleton constraint for --serve-indexer remains
applicable for remote serving in approximate mode.
In `@docs/components/router/standalone-indexer.md`:
- Line 18: Update the sentence about `--serve-indexer` to also mention the
approximate-mode serving path: when `--no-router-kv-events` (or
`use_kv_events=False`) the served indexer is populated via request-plane
routing-decision recording rather than KV subscription — the router still uses
`KvIndexer` and fills the tree through `process_routing_decision_for_request`;
keep the note that `--serve-indexer` is used on
`dynamo.frontend`/`dynamo.router` and consumers use `--use-remote-indexer`, and
clarify this alternative population mode so readers know it is supported even
when `dynamo.indexer` is not used.
In `@lib/kv-router/src/indexer/types.rs`:
- Around line 194-205: The request struct IndexerRecordRoutingDecisionRequest
currently has parallel vectors local_hashes and sequence_hashes which can drift;
change the wire format to a single paired vector (e.g.,
Vec<PairLocalAndSequence> or Vec<(LocalBlockHash, SequenceHash)>) so each
LocalBlockHash is tied to its SequenceHash and update all consumers (the indexer
code that zips them when building stored blocks and the pruning logic that
iterates sequence hashes) to use the paired field; if you cannot change the wire
format, add a strict runtime validation in the constructor/deserializer for
IndexerRecordRoutingDecisionRequest that rejects the request when
local_hashes.len() != sequence_hashes.len() with a clear error instead.
In `@lib/llm/src/kv_router.rs`:
- Around line 150-165: The served indexer is being published too early when
kv_router_config.serve_indexer is true—call to ensure_served_indexer_service
(creating served_indexer_handle) can make the endpoint discoverable before the
local indexer is ready and before start_subscriber() runs; to fix, defer
creating/publishing the served indexer until after the local indexer
startup/ready path completes (i.e., move or postpone the
ensure_served_indexer_service call so it runs after start_subscriber() or after
an explicit readiness check on the indexer), or add a readiness gating mechanism
in ensure_served_indexer_service driven by ServedIndexerMode::from_use_kv_events
to only publish when the indexer is initialized; update the code that sets
served_indexer_handle accordingly so the service is only registered once the
local indexer is confirmed ready.
In `@lib/llm/src/kv_router/indexer/remote.rs`:
- Around line 74-78: The topology validation currently runs a full
ComponentEndpoints discovery and rebuilds HashSet membership on every call via
validate_topology_if_ready(), causing duplicate scans from find_matches() and
record_hashed_routing_decision(); change validate_topology_if_ready() to cache
the validated topology (e.g., a struct containing the discovered
ComponentEndpoints and HashSets) and return the cached value when topology is
unchanged, and update/invalidate that cache only on detected membership changes
(watch for component list/version changes or expose an on_change hook); update
callers (find_matches and record_hashed_routing_decision) to use the cached
validated topology instead of forcing a rediscovery, and ensure concurrency
safety (Mutex/RwLock or atomic swap) around the cached topology to avoid races
during updates.
In `@tests/router/common.py`:
- Around line 393-417: The retry loop currently catches all exceptions which
masks unrelated failures; change the blanket "except Exception" to catch only
the specific discovery/model-readiness error(s) you expect (for example the
runtime/model discovery exception type thrown by get_runtime or
runtime.endpoint/KvRouter initialization) and re-raise any other exceptions
immediately; keep the same retry/sleep and last_error handling for the targeted
exception(s) and ensure variables like runtime, endpoint, and kv_router are
cleaned up before sleeping when retrying.
- Around line 345-376: The loop in wait_for_served_indexer incorrectly skips
creating/checking the record endpoint when expected_record_instances == 0, so in
use_kv_events mode record_ids is always an empty set and we never fail if the
record endpoint is accidentally registered; fix by always resolving the record
endpoint for "kv_indexer_record_routing_decision" (or checking runtime for its
presence) and creating record_client when present, then in the use_kv_events
branch explicitly verify that record_client is absent or that
record_client.instance_ids() is empty (i.e., ensure record_ids is truly empty
only if the endpoint is not registered), updating the logic around
record_client, record_endpoint, and the use_kv_events check in
wait_for_served_indexer to fail if any record instances appear.
---
Outside diff comments:
In `@lib/bindings/python/rust/llm/entrypoint.rs`:
- Around line 129-181: The constructor KvRouterConfig::new must enforce
cross-field invariants before returning the Python-exposed config: validate
combinations like use_remote_indexer && serve_indexer (disallowed) and
serve_indexer && overlap_score_weight == 0.0 (disallowed) and any other
cross-field constraints; perform these checks either before constructing
RsKvRouterConfig or immediately after building it and return a clear error
(panic with descriptive message or convert to a PyErr) rather than letting
invalid configs be created. Locate the new function and RsKvRouterConfig usage
and add explicit conditional checks that produce informative errors referencing
the offending fields (e.g., use_remote_indexer, serve_indexer,
overlap_score_weight, router_* fields) so invalid combinations fail fast during
new().
In `@lib/kv-router/src/indexer/kv_indexer.rs`:
- Around line 529-544: The public API process_routing_decision_with_hashes must
enforce the 1:1 invariant between local_hashes and sequence_hashes: at the start
of the function check that local_hashes.len() == sequence_hashes.len() and if
not return an Err(KvRouterError::...) (add a specific KvRouterError variant like
MismatchedHashVectors if none exists); only when lengths match continue to
bundle and send the RoutingDecisionRequest so downstream code that zips the
vectors and enqueues prune metadata stays synchronized.
In `@lib/kv-router/src/indexer/sharded.rs`:
- Around line 553-575: process_routing_decision_with_hashes currently falls back
to shard 0 for first-seen workers causing them to be permanently pinned; change
it to choose a consistent shard and record the assignment in worker_assignments
when missing. Specifically, look up the shard with entry/or_insert semantics
(e.g. self.worker_assignments.entry(worker.worker_id).or_insert(computed_shard))
so you both compute a shard (for example use (worker.worker_id as usize) %
self.routing_tx.len() or another existing sharding function) and store it in
worker_assignments before calling self.routing_tx[shard_idx].send(...), ensuring
future remove_worker/remove_worker_dp_rank calls find the worker.
---
Nitpick comments:
In `@lib/kv-router/src/standalone_indexer/registry.rs`:
- Around line 124-125: Remove the dead discovery-related code: delete the
ListenerControlError::DiscoveryManaged variant in registry.rs and remove the
unused WorkerSource::Discovery enum variant so the discovery path is no longer
present; also remove the unreachable match arm in server.rs that handles
WorkerSource::Discovery. While doing this, drop the unnecessary
#[allow(unused_mut)] and the `mut` on `result` (the variable referenced around
the prior lines 631–632) since `result` is not mutated. Ensure all references to
DiscoveryManaged and WorkerSource::Discovery are removed or updated to compile
cleanly.
In `@tests/router/router_process.py`:
- Around line 33-74: The helper that builds the router command should
preflight-validate incompatible flag combinations (so invalid
serve_indexer/use_remote_indexer combos fail fast) by adding explicit checks and
raising a ValueError rather than relying on the child process to error; locate
the command-building function (the code that accepts serve_indexer and
use_remote_indexer in tests/router/router_process.py) and add if/raise checks
(e.g., if serve_indexer and use_remote_indexer: raise ValueError("...") and any
other invalid combinations you mirror from the frontend validation) before the
command list is extended.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 4310cd73-2fc7-42de-b69a-80cbab6a1db2
📒 Files selected for processing (29)
components/src/dynamo/common/configuration/groups/kv_router_args.pycomponents/src/dynamo/frontend/frontend_args.pycomponents/src/dynamo/router/args.pydocs/components/router/router-guide.mddocs/components/router/standalone-indexer.mdlib/bindings/python/Cargo.tomllib/bindings/python/rust/llm/entrypoint.rslib/bindings/python/rust/llm/kv.rslib/bindings/python/tests/replay/replay_utils.pylib/kv-router/Cargo.tomllib/kv-router/src/indexer/kv_indexer.rslib/kv-router/src/indexer/sharded.rslib/kv-router/src/indexer/types.rslib/kv-router/src/scheduling/config.rslib/kv-router/src/standalone_indexer/mod.rslib/kv-router/src/standalone_indexer/registry.rslib/kv-router/src/standalone_indexer/runtime/discovery.rslib/kv-router/src/standalone_indexer/runtime/query_engine.rslib/kv-router/src/standalone_indexer/runtime/subscriber.rslib/llm/src/kv_router.rslib/llm/src/kv_router/indexer/jetstream.rslib/llm/src/kv_router/indexer/mod.rslib/llm/src/kv_router/indexer/remote.rslib/llm/src/kv_router/indexer/subscriber.rslib/llm/src/kv_router/indexer/worker_query.rslib/llm/src/kv_router/publisher/mod.rstests/router/common.pytests/router/router_process.pytests/router/test_router_e2e_with_mockers.py
💤 Files with no reviewable changes (6)
- lib/kv-router/Cargo.toml
- lib/bindings/python/Cargo.toml
- lib/kv-router/src/standalone_indexer/runtime/query_engine.rs
- lib/kv-router/src/standalone_indexer/mod.rs
- lib/kv-router/src/standalone_indexer/runtime/subscriber.rs
- lib/kv-router/src/standalone_indexer/runtime/discovery.rs
Signed-off-by: PeaBrane <yanrpei@gmail.com>
|
Also addressed the approximate-mode first-seen worker path in the sharded indexer: routing-decision writes now assign and persist a shard for previously unseen workers, and there is a regression test covering write-before-event followed by worker removal. |
Signed-off-by: PeaBrane <yanrpei@gmail.com>
|
We ran GPU validation on this PR (commit Setup: H100 80GB on computelab, Qwen/Qwen3-0.6B with Results:
SGLang issue: SGLang reports vLLM Still running TRT-LLM tests — will update with those results. |
|
Update: TRT-LLM results are in. Full 3-framework test matrix complete.
Summary: The new block_size values by framework:
|
Mirror finish() in the Drop impl: free the scheduler slot before firing the deferred session close, so the worker's KV is not released while generation teardown is still in progress. Also guard the close with try_current() to prevent panics outside a tokio runtime. Remove stale pub mod declarations for subscriber/worker_query (moved to kv_router/indexer/ in #7973).
Summary
This changes the Dynamo-native remote-indexer path so it is served from the llm-side KV-router stack instead of the standalone-indexer runtime shim.
The standalone
dynamo.indexerremains a supported product and keeps its intended HTTP + ZMQ microservice behavior. The change here is only that Dynamo-runtime integration is removed from that binary. For the Dynamo-native path, routers/frontends can now serve their own in-process indexer over the request plane, and other KV routers can consume that remote indexer from the same worker component.What Changed
--serve-indexerand--use-remote-indexerto the Dynamo-native KV-router pathserve_indexeranduse_remote_indexerare mutually exclusivelib/kv-routerdynamo.indexerfocused on the real standalone HTTP + ZMQ microservice caselib/llm/src/kv_router/indexer/so the local indexer wrapper, remote indexer client/server, KV-event subscriber, worker-query restore, and JetStream recovery live togethernats_core_remoteandno_kv_events_remotevariantsBehavior
After this change the KV-router modes are:
--serve-indexer:--use-remote-indexer:Remote read failures degrade to zero-overlap scores at the remote wrapper layer. Local/in-process indexer failures still remain hard errors.
Example Usage
Serve a Dynamo-native remote indexer from a router replica:
Consume that remote indexer from another router replica on the same worker component:
Serve from a frontend-owned KV router instead:
The standalone indexer usage remains the direct HTTP + ZMQ microservice path:
Validation
cargo clippy --no-default-features -- -D warnings && cargo fmtinlib/llmcargo clippy --no-deps --all-targets -- -D warnings && cargo fmtinlib/kv-routercargo clippy --no-default-features -- -D warnings && cargo fmtinlib/bindings/pythoncargo test -p dynamo-kv-router --libmaturin develop --uvanduv pip install -e . --python .venv/bin/pythonpython -m pytest tests/router/test_router_e2e_with_mockers.py -k 'nats_core_remote or no_kv_events_remote' -qNote:
cargo test -p dynamo-llm --no-default-features --libstill hits the existing local linker issue in this environment (ld: library 'stdc++' not found).