diff --git a/components/src/dynamo/common/configuration/groups/kv_router_args.py b/components/src/dynamo/common/configuration/groups/kv_router_args.py index a8d9285bab9..5d409b6523c 100644 --- a/components/src/dynamo/common/configuration/groups/kv_router_args.py +++ b/components/src/dynamo/common/configuration/groups/kv_router_args.py @@ -34,7 +34,6 @@ "router_prune_target_ratio", "router_queue_threshold", "router_event_threads", - "router_enable_cache_control", "router_queue_policy", "remote_indexer_component", ) @@ -59,7 +58,6 @@ class KvRouterConfigBase(ConfigBase): router_prune_target_ratio: float router_queue_threshold: Optional[float] router_event_threads: int - router_enable_cache_control: bool router_queue_policy: str remote_indexer_component: Optional[str] @@ -260,18 +258,6 @@ def add_arguments(self, parser) -> None: ), arg_type=int, ) - add_negatable_bool_argument( - g, - flag_name="--enable-cache-control", - env_var="DYN_ENABLE_CACHE_CONTROL", - default=False, - dest="router_enable_cache_control", - help=( - "KV Router: Enable cache control (PIN with TTL). When set, the router creates " - "a cache_control service mesh client and fires pin_prefix after generation for " - "requests with nvext.cache_control." - ), - ) add_argument( g, flag_name="--router-queue-policy", diff --git a/components/src/dynamo/frontend/frontend_args.py b/components/src/dynamo/frontend/frontend_args.py index 1b19b3687a3..3890781b0bf 100644 --- a/components/src/dynamo/frontend/frontend_args.py +++ b/components/src/dynamo/frontend/frontend_args.py @@ -93,8 +93,6 @@ def validate(self) -> None: ) if self.min_initial_workers < 0: raise ValueError("--router-min-initial-workers must be >= 0") - if self.router_enable_cache_control and self.router_mode != "kv": - raise ValueError("--enable-cache-control requires --router-mode=kv") if self.tokenizer_backend not in self._VALID_TOKENIZER_BACKENDS: raise ValueError( f"--tokenizer: invalid value '{self.tokenizer_backend}' " diff --git a/components/src/dynamo/sglang/request_handlers/handler_base.py b/components/src/dynamo/sglang/request_handlers/handler_base.py index 814dd4421a0..399cb25ebf2 100644 --- a/components/src/dynamo/sglang/request_handlers/handler_base.py +++ b/components/src/dynamo/sglang/request_handlers/handler_base.py @@ -380,47 +380,6 @@ async def update_weight_version(self, body: dict) -> dict: "new_version": req.new_version, } - async def pin_prefix(self, body: dict) -> dict: - """Pin a prefix by token_ids to resist eviction. - - Args: - body: Dict with "token_ids" list of token IDs and optional - "ttl_seconds" (default 300). - """ - token_ids = body.get("token_ids", []) - ttl_seconds = body.get("ttl_seconds", 300) - if not token_ids: - return {"status": "error", "message": "token_ids required"} - try: - result = await self.engine.tokenizer_manager.pin_prefix( - token_ids, ttl_seconds - ) - return { - "status": "ok" if result.success else "error", - "nodes_pinned": result.nodes_pinned, - "message": result.message, - } - except Exception as e: - logging.error(f"Failed to pin prefix: {e}") - return {"status": "error", "message": str(e)} - - async def cache_control(self, request, context=None): - """Service mesh endpoint for cache control operations. - - Args: - request: Dict with "action" key and action-specific parameters. - context: Optional Dynamo context (unused but required by protocol). - - Yields: - Single dict with operation result. - """ - action = request.get("action") - if action == "pin_prefix": - result = await self.pin_prefix(request) - else: - result = {"status": "error", "message": f"Unknown action: {action}"} - yield result - def register_engine_routes(self, runtime: DistributedRuntime) -> None: """Register all engine routes for this handler. @@ -435,7 +394,6 @@ def register_engine_routes(self, runtime: DistributedRuntime) -> None: runtime.register_engine_route( "resume_memory_occupation", self.resume_memory_occupation ) - runtime.register_engine_route("pin_prefix", self.pin_prefix) runtime.register_engine_route( "update_weights_from_disk", self.update_weights_from_disk ) diff --git a/docs/backends/sglang/agents.md b/docs/backends/sglang/agents.md index 0e74a6c9923..f53e497d953 100644 --- a/docs/backends/sglang/agents.md +++ b/docs/backends/sglang/agents.md @@ -2,12 +2,12 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 title: SGLang for Agentic Workloads -subtitle: Priority scheduling, KV cache eviction policies, and cache pinning for multi-turn agentic serving +subtitle: Priority scheduling and KV cache eviction policies for multi-turn agentic serving --- # SGLang for Agentic Workloads -This guide covers SGLang-specific configuration for agentic serving with Dynamo. It explains which SGLang engine flags to enable, how Dynamo's [agent hints](../../components/frontend/nvext.md#agent-hints) map to SGLang behavior, and how to use experimental cache pinning to protect KV cache for high-value conversations. +This guide covers SGLang-specific configuration for agentic serving with Dynamo. It explains which SGLang engine flags to enable and how Dynamo's [agent hints](../../components/frontend/nvext.md#agent-hints) map to SGLang behavior. ## Overview @@ -109,192 +109,6 @@ for chunk in response: print(chunk.choices[0].delta.content, end="") ``` -## Cache Pinning (Experimental) - -> [!WARNING] -> Cache pinning is experimental and available on development branches only. The API may change. - -**Required PRs:** -- SGLang: [feat: TTL-based prefix pinning with refresh-on-hit for HiRadixCache](https://github.com/sgl-project/sglang/pull/18941) -- Dynamo: [feat: wire nvext.cache_control TTL-based pinning through Dynamo router](https://github.com/ai-dynamo/dynamo/pull/6213) - -Cache pinning lets you explicitly protect KV cache for high-value conversation prefixes. When a request includes `nvext.cache_control`, the router fires a `pin_prefix` call to the SGLang worker after generation completes. Pinned nodes resist eviction for the specified TTL -- even under memory pressure, they are retained (demoted to host memory with HiCache rather than deleted). - -### How It Works - -```mermaid -sequenceDiagram - participant Client - participant Preprocessor - participant Router - participant Worker as SGLang Worker - participant Cache as Radix Cache - - Client->>Preprocessor: chat/completions + nvext.cache_control{ttl} - Preprocessor->>Preprocessor: Extract TTL, attach to RoutingHints - Preprocessor->>Router: PreprocessedRequest (cache_control_ttl=N) - Router->>Router: Select worker, record token_ids + TTL in PinState - Router->>Worker: Generate request - Worker-->>Router: Stream response tokens - Router-->>Client: Stream response tokens - - Note over Router,Worker: On stream completion - - Router-)Worker: pin_prefix(token_ids, ttl) [fire-and-forget] - Worker->>Cache: Walk radix tree along token sequence - Cache->>Cache: Set pin_expiry, acquire host_ref_counter hold - Worker--)Router: {status: ok, nodes_pinned: N} - - Note over Cache: TTL expires - - Cache->>Cache: Clear pin_expiry, release host_ref_counter - Note over Cache: Node now eligible for normal eviction -``` - -1. The client includes `nvext.cache_control` with a TTL in the request. -2. The Dynamo preprocessor extracts the TTL and attaches it to routing hints. -3. The router routes the request normally and records the token IDs in a `PinState`. -4. After the response stream completes, the router spawns a fire-and-forget `pin_prefix` RPC to the worker that served the request. -5. The worker walks the radix tree along the token sequence and pins each node, setting `pin_expiry` and acquiring a `host_ref_counter` hold that prevents eviction. -6. When TTL expires, the pin is cleared and the node becomes eligible for normal eviction. - -### Enabling Cache Pinning - -**Frontend flag:** - -```bash -python -m dynamo.frontend \ - --router-mode kv \ - --enable-cache-control \ - ... -``` - -| Flag | Description | -|------|-------------| -| `--enable-cache-control` | Enables cache control (PIN with TTL). Creates a `cache_control` service mesh client and fires `pin_prefix` after generation for requests with `nvext.cache_control`. Requires `--router-mode=kv`. | - -**SGLang worker:** The worker receives PIN requests via its `cache_control` service mesh endpoint. You **must** set the `SGLANG_HICACHE_MAX_PINNED_RATIO` environment variable to a non-zero value -- pinning is disabled by default. - -| Environment Variable | Type | Default | Description | -|---------------------|------|---------|-------------| -| `SGLANG_HICACHE_MAX_PINNED_RATIO` | `float` | `0.0` | Max fraction of cache tokens that can be pinned. Must be in `[0, 1)`. `0` disables pinning entirely. | - -HiCache is required (`--enable-hierarchical-cache`). Without it, the scheduler rejects PIN requests. For best results, use `write_through` so that pinned nodes demote to host memory instead of being deleted when GPU memory fills: - -```bash -SGLANG_HICACHE_MAX_PINNED_RATIO=0.1 python -m dynamo.sglang \ - --model-path Qwen/Qwen3-14B-FP8 \ - --enable-hierarchical-cache \ - --hicache-ratio 2.0 \ - --hicache-write-policy write_through \ - ... -``` - -### Request Format - -Include `cache_control` as a top-level field in `nvext`: - -```json -{ - "model": "Qwen/Qwen3-14B-FP8", - "messages": [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Explain quantum computing."} - ], - "nvext": { - "cache_control": { - "type": "ephemeral", - "ttl": "1h" - } - } -} -``` - -| Field | Type | Description | -|-------|------|-------------| -| `cache_control.type` | `string` | Currently only `"ephemeral"` is supported. | -| `cache_control.ttl` | `string` | TTL as integer seconds (`"600"`) or shorthand (`"5m"`, `"1h"`). Clamped to [300, 3600] seconds. Unrecognized strings default to 300s. | - -### Python Example - -```python -from openai import OpenAI - -client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy") - -# First turn -- pin the conversation prefix for 1 hour -response = client.chat.completions.create( - model="Qwen/Qwen3-14B-FP8", - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": "Analyze this codebase and suggest improvements."}, - ], - stream=True, - extra_body={ - "nvext": { - "cache_control": { - "type": "ephemeral", - "ttl": "1h" - } - } - } -) - -# Collect the assistant reply -assistant_response = "" -for chunk in response: - if chunk.choices[0].delta.content: - assistant_response += chunk.choices[0].delta.content - -# Later turns reuse the pinned prefix -- even after heavy load from -# other requests, the KV cache for this conversation is preserved. -response = client.chat.completions.create( - model="Qwen/Qwen3-14B-FP8", - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": "Analyze this codebase and suggest improvements."}, - {"role": "assistant", "content": assistant_response}, - {"role": "user", "content": "Now focus on the database layer."}, - ], - stream=True, - extra_body={ - "nvext": { - "cache_control": { - "type": "ephemeral", - "ttl": "1h" - } - } - } -) -``` - -### Verifying Cache Hits - -The response includes `prompt_tokens_details.cached_tokens` in the `usage` object when `--enable-cache-report` is set on the SGLang worker: - -```json -{ - "usage": { - "prompt_tokens": 2048, - "completion_tokens": 150, - "prompt_tokens_details": { - "cached_tokens": 1920 - } - } -} -``` - -A high `cached_tokens / prompt_tokens` ratio on subsequent turns confirms that the pinned prefix was preserved. - -### Limitations - -- **Pinning disabled by default**: `SGLANG_HICACHE_MAX_PINNED_RATIO` defaults to `0.0`. You must set it to a non-zero value (e.g., `0.1`) or all PIN requests will be rejected. -- **HiCache required**: The scheduler rejects PIN requests unless `--enable-hierarchical-cache` is set. -- **TTL clamping**: Values are clamped to [300, 3600] seconds. You cannot pin for less than 5 minutes or more than 1 hour. -- **Pin budget**: Pinned tokens consume a budget controlled by `SGLANG_HICACHE_MAX_PINNED_RATIO` (fraction of host pool capacity). Requests exceeding this budget are rejected. -- **No priority on pinned nodes**: `pin_prefix` does not set a priority on the radix tree nodes. All pinned nodes have equal eviction priority and fall back to LRU ordering among themselves when host memory fills. -- **Requires stack restart for A/B testing**: Pins persist in cache across benchmark runs. When comparing pinned vs. unpinned performance, restart the full stack between phases to avoid false cache hits. - ## See Also - **[NVIDIA Request Extensions (nvext)](../../components/frontend/nvext.md)**: Full `nvext` field reference including agent hints diff --git a/docs/components/frontend/configuration.md b/docs/components/frontend/configuration.md index f21d3e9381d..385e88e124d 100644 --- a/docs/components/frontend/configuration.md +++ b/docs/components/frontend/configuration.md @@ -45,7 +45,6 @@ The Rust HTTP server also reads these environment variables (not exposed as CLI | `--router-event-threads` | `DYN_ROUTER_EVENT_THREADS` | `4` | Event processing threads. >1 enables concurrent radix tree | | `--router-queue-threshold` | `DYN_ROUTER_QUEUE_THRESHOLD` | `4.0` | Queue threshold fraction of prefill capacity. Enables priority scheduling | | `--router-queue-policy` | `DYN_ROUTER_QUEUE_POLICY` | `fcfs` | Queue scheduling policy: `fcfs` (tail TTFT), `wspt` (avg TTFT), or `lcfs` (comparison-only reverse ordering) | -| `--enable-cache-control` / `--no-enable-cache-control` | `DYN_ENABLE_CACHE_CONTROL` | `false` | Enable TTL-based cache pinning (requires `--router-mode=kv`) | | `--decode-fallback` / `--no-decode-fallback` | `DYN_DECODE_FALLBACK` | `false` | Fall back to aggregated mode when prefill workers unavailable | ## Fault Tolerance diff --git a/docs/components/frontend/nvext.md b/docs/components/frontend/nvext.md index 00c31bbd379..fead8076adf 100644 --- a/docs/components/frontend/nvext.md +++ b/docs/components/frontend/nvext.md @@ -39,7 +39,6 @@ Include `nvext` as a top-level field alongside standard OpenAI-compatible fields | `prefill_worker_id` | `u64` | `None` | Router | Routes the request to a specific prefill worker (disaggregated serving). | | `decode_worker_id` | `u64` | `None` | Router | Routes the request to a specific decode worker (disaggregated serving). | | `agent_hints` | object | `None` | Router | Per-request hints for scheduling and load balancing. See [Agent Hints](#agent-hints). | -| `cache_control` | object | `None` | Router | KV cache pinning hint with TTL. See [Cache Control](#cache-control). | ### Header Overrides @@ -130,31 +129,6 @@ Backend details: } ``` -## Cache Control - -> [!WARNING] -> Cache control is experimental and available on development branches only. The API may change. - -The `cache_control` object enables explicit KV cache pinning with a TTL. When set, the router fires a `pin_prefix` call to the backend worker after generation completes, protecting the conversation's KV cache from eviction for the specified duration. - -| Field | Type | Default | Description | -|-------|------|---------|-------------| -| `cache_control.type` | `string` | — | Cache control type. Currently only `"ephemeral"` is supported. | -| `cache_control.ttl` | `string` | `"300"` | TTL as integer seconds (`"600"`) or shorthand (`"5m"`, `"1h"`). Clamped to [300, 3600] seconds. | - -```json -{ - "nvext": { - "cache_control": { - "type": "ephemeral", - "ttl": "1h" - } - } -} -``` - -Requires `--enable-cache-control` and `--router-mode=kv` on the frontend. See [SGLang for Agentic Workloads](../../backends/sglang/agents.md#cache-pinning-experimental) for full setup and usage details. - ## Response Extensions When the client requests response metadata via `extra_fields`, the response includes an `nvext` object with the requested fields: @@ -190,4 +164,4 @@ When the client requests response metadata via `extra_fields`, the response incl |----------|-------------| | [Frontend Guide](frontend-guide.md) | KServe gRPC configuration and integration | | [Router Guide](../router/router-guide.md) | Full router configuration and CLI arguments | -| [SGLang for Agentic Workloads](../../backends/sglang/agents.md) | SGLang engine flags for priority scheduling, eviction policies, and cache pinning | +| [SGLang for Agentic Workloads](../../backends/sglang/agents.md) | SGLang engine flags for priority scheduling and eviction policies | diff --git a/docs/features/agentic_workloads.md b/docs/features/agentic_workloads.md index ac669068b94..1ebd6beb77f 100644 --- a/docs/features/agentic_workloads.md +++ b/docs/features/agentic_workloads.md @@ -20,17 +20,17 @@ Three gaps stand out with current workflows: ## Dynamo as an Agentic Runtime -Dynamo exposes **agentic hints** and uses them at three layers: frontend API, router, and KV cache management. Together, these enable workload-aware inference instead of generic, state-of-the-moment optimization. +Dynamo exposes **agentic hints** and uses them at the frontend API, router, and backend scheduling layers. Together, these enable workload-aware inference instead of generic, state-of-the-moment optimization. ### Agentic Hints -Agentic hints are per-request metadata that the agent client (e.g. Claude Code, Codex, [NeMo Agent Toolkit](https://github.com/NVIDIA/NeMo-Agent-Toolkit)) sends to Dynamo's frontend. They are carried in the request body under [**nvext**](../components/frontend/nvext.md#agent-hints) on chat completions. The frontend parses them and passes them to the KV router and, where applicable, to the KV cache manager and backends. +Agentic hints are per-request metadata that the agent client (e.g. Claude Code, Codex, [NeMo Agent Toolkit](https://github.com/NVIDIA/NeMo-Agent-Toolkit)) sends to Dynamo's frontend. They are carried in the request body under [**nvext**](../components/frontend/nvext.md#agent-hints) on chat completions. The frontend parses them and passes them to the KV router and, where applicable, to backends. - **Flow:** Harness sets hints in the request → Dynamo frontend parses `nvext` into routing hints → KV router uses them for queue ordering and worker selection → backends use them for priority scheduling and cache eviction. ![Agentic workflow: Harness → hints in request → Dynamo frontend → routing hints → KV router (queue order, worker choice) → backend](../assets/img/agentic-hints-workflow.svg) -The request body includes `nvext.agent_hints` (routing, scheduling) and `nvext.cache_control` (TTL-based pinning); the frontend passes the former to the KV router and the latter to the KV block manager for cache pinning, prefetching, and eviction. +The request body includes `nvext.agent_hints` for routing and scheduling metadata; the frontend passes those hints to the KV router for queue ordering and worker selection. | Hint | Description | |------|-------------| @@ -40,15 +40,11 @@ The request body includes `nvext.agent_hints` (routing, scheduling) and `nvext.c | `program_id` | (Planned) Identifies the agentic program for program-level metrics and cache affinity. | | `context_type` | (Planned) Semantic type (e.g. system prompt, tool definition, reasoning branch) for context-aware eviction. | -**`nvext.cache_control`** (sibling of `agent_hints`, not inside it) provides TTL-based KV cache pinning. Pinned prefixes resist eviction for the specified duration. See [SGLang for Agentic Workloads — Cache Pinning](../backends/sglang/agents.md#cache-pinning-experimental). - - ## Feature matrix | Feature | vLLM | SGLang | TensorRT-LLM | |---------|:----:|:------:|:-------------:| | Priority-based cache eviction | 🚧 | ✅ | 🚧 | -| Cache pinning | | ✅ | 🚧 | | Cache prefetching | | 🚧 | | | Subagent / thinking-aware cache eviction | | 🚧 | | | Speculative prefill | ✅ | ✅ | ✅ | @@ -68,11 +64,6 @@ Dynamo is now supported directly in LangChain using the [NVIDIA AI Endpoints int - **Priority-based KV cache eviction:** Instead of evicting by LRU alone, the backend can evict **low-priority** cache entries first when the GPU (and, with HiCache, host) cache is full. The `priority` value in `nvext.agent_hints` is forwarded to the engine; with SGLang, enable `--enable-priority-scheduling` and `--radix-eviction-policy priority`. -- **Cache pinning (experimental):** [Anthropic's v1/messages](https://docs.anthropic.com/en/docs/build-with-claude/caching) includes a `cache_control` field that tells servers how long to keep KV cache for specific blocks. Dynamo implements an OSS version with SGLang's HiCache: users can set `cache_control` via the same API as Anthropic or as an `nvext` field on chat completions. When set, the Dynamo router calls a hook in HiCache after the request completes to **pin** the blocks created by those tokens for the user-specified TTL. Pinned nodes resist eviction (demoting to host memory rather than being deleted). - In the Nemo Agentic toolkit and Dynamo integration, TTL is dynamically computed as the product of how many times a block is expected to be reused and the time between those requests; the NAT profiler pre-computes these expectations during agent evaluations and stores them in a data structure per agent, then injects `nvext.cache_control` with the derived TTL (see [dynamo_llm.py](https://github.com/NVIDIA/NeMo-Agent-Toolkit/blob/develop/packages/nvidia_nat_core/src/nat/llm/dynamo_llm.py)). - - **Future work:** TTL could be determined dynamically by context type—e.g. think tokens or scratchpad content could use a lower TTL than system prompt or tool definitions, so high-value static context is retained longer while ephemeral context expires sooner. - - **Cache prefetching (future work):** Using the predictable agentic lifecycle (e.g. parent-child subagents, known next turn), Dynamo could proactively prefetch or move KV cache to a different worker so that the next request hits warm cache. ### Speculative prefill diff --git a/lib/bindings/python/rust/llm/entrypoint.rs b/lib/bindings/python/rust/llm/entrypoint.rs index 0d5329ab1db..c65eb82a89a 100644 --- a/lib/bindings/python/rust/llm/entrypoint.rs +++ b/lib/bindings/python/rust/llm/entrypoint.rs @@ -58,7 +58,7 @@ impl KvRouterConfig { #[pymethods] impl KvRouterConfig { #[new] - #[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_track_prefill_tokens=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(4.0), router_event_threads=4, router_enable_cache_control=false, router_queue_policy="fcfs", remote_indexer_component=None))] + #[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_track_prefill_tokens=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(4.0), router_event_threads=4, router_queue_policy="fcfs", remote_indexer_component=None))] #[allow(clippy::too_many_arguments)] fn new( overlap_score_weight: f64, @@ -77,7 +77,6 @@ impl KvRouterConfig { router_prune_target_ratio: f64, router_queue_threshold: Option, router_event_threads: u32, - router_enable_cache_control: bool, router_queue_policy: &str, remote_indexer_component: Option, ) -> Self { @@ -99,7 +98,6 @@ impl KvRouterConfig { router_prune_target_ratio, router_queue_threshold, router_event_threads, - router_enable_cache_control, skip_initial_worker_wait: false, router_queue_policy: router_queue_policy.parse().unwrap_or_else(|_| { panic!("invalid router_queue_policy: {router_queue_policy:?}") diff --git a/lib/bindings/python/src/dynamo/_core.pyi b/lib/bindings/python/src/dynamo/_core.pyi index 055bb75f4b5..8ff54663f46 100644 --- a/lib/bindings/python/src/dynamo/_core.pyi +++ b/lib/bindings/python/src/dynamo/_core.pyi @@ -1179,7 +1179,6 @@ class KvRouterConfig: router_prune_target_ratio: float = 0.8, router_queue_threshold: Optional[float] = 4.0, router_event_threads: int = 4, - router_enable_cache_control: bool = False, router_queue_policy: str = "fcfs", ) -> None: """ @@ -1211,8 +1210,6 @@ class KvRouterConfig: Set to None to disable queueing (all requests go directly to the scheduler). router_event_threads: Number of event processing threads (default: 4). When > 1, uses a concurrent radix tree with a thread pool. - router_enable_cache_control: Enable cache control (PIN with TTL) via the worker's - cache_control service mesh endpoint (default: False). router_queue_policy: Scheduling policy for the router queue (default: "fcfs"). "fcfs": first-come first-served with priority bumps — optimizes tail TTFT. "lcfs": last-come first-served with priority bumps — intentionally worsens tail behavior for policy comparisons. @@ -2035,4 +2032,3 @@ class StreamIncomplete(DynamoException): """The response stream was terminated before completion.""" ... - diff --git a/lib/kv-router/src/scheduling/config.rs b/lib/kv-router/src/scheduling/config.rs index 3f56bd03b8e..4b3c2e50386 100644 --- a/lib/kv-router/src/scheduling/config.rs +++ b/lib/kv-router/src/scheduling/config.rs @@ -156,12 +156,6 @@ pub struct KvRouterConfig { #[validate(range(min = 1))] pub router_event_threads: u32, - /// Enable cache control (PIN with TTL) via the worker's cache_control service mesh endpoint. - /// When true, the router creates a cache_control client and honors nvext.cache_control on - /// requests, firing a pin_prefix call (with TTL) to the worker after generation completes. - /// When false (default), cache_control is ignored and no cache_control client is created. - pub router_enable_cache_control: bool, - pub skip_initial_worker_wait: bool, /// Scheduling policy for the router queue. @@ -196,7 +190,6 @@ impl Default for KvRouterConfig { router_prune_target_ratio: 0.8, router_queue_threshold: Some(4.0), router_event_threads: 4, - router_enable_cache_control: false, skip_initial_worker_wait: false, router_queue_policy: RouterQueuePolicy::default(), remote_indexer_component: None, diff --git a/lib/llm/src/kv_router.rs b/lib/llm/src/kv_router.rs index 0a24a022754..5c953640116 100644 --- a/lib/llm/src/kv_router.rs +++ b/lib/llm/src/kv_router.rs @@ -28,7 +28,6 @@ use futures::stream; use tracing::Instrument; use validator::Validate; -pub mod cache_control; pub mod indexer; mod jetstream; pub mod metrics; @@ -40,7 +39,6 @@ pub mod sequence; pub mod subscriber; pub mod worker_query; -pub use cache_control::{CacheControlClient, spawn_pin_prefix}; pub use indexer::Indexer; pub use prefill_router::PrefillRouter; pub use push_router::{DirectRoutingRouter, KvPushRouter}; diff --git a/lib/llm/src/kv_router/cache_control.rs b/lib/llm/src/kv_router/cache_control.rs deleted file mode 100644 index ac079b38ac5..00000000000 --- a/lib/llm/src/kv_router/cache_control.rs +++ /dev/null @@ -1,93 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::Result; -use dynamo_runtime::{ - component::Component, - pipeline::{PushRouter, RouterMode, SingleIn}, - protocols::annotated::Annotated, -}; -use futures::StreamExt; - -use crate::protocols::TokenIdType; - -/// State captured at routing time for a deferred PIN after generation completes. -pub(crate) struct PinState { - pub token_ids: Vec, - pub cc_client: CacheControlClient, - pub instance_id: u64, - pub ttl_seconds: u64, -} - -/// A PushRouter client typed for cache_control requests/responses. -/// -/// Both request and response are untyped JSON. The worker's cache_control -/// endpoint returns {"status": "ok"/"error", ...} but the router treats -/// PIN as fire-and-forget and only logs the response at debug level. -pub type CacheControlClient = PushRouter>; - -/// Create a cache_control client from a component. -/// -/// Connects to the "cache_control" endpoint on the given component and returns -/// a PushRouter client for sending cache control operations (pin_prefix, -/// unpin_prefix) to workers. -pub(crate) async fn create_cache_control_client( - component: &Component, -) -> Result { - let client = component.endpoint("cache_control").client().await?; - CacheControlClient::from_client(client, RouterMode::KV).await -} - -/// Fire-and-forget pin_prefix to the worker that served this request. -/// -/// Spawns a detached task that sends the pin request and logs the outcome. -/// Does nothing if `client` is `None` (logs a warning). -pub fn spawn_pin_prefix( - client: Option<&CacheControlClient>, - token_ids: &[TokenIdType], - instance_id: u64, - context_id: &str, - ttl_seconds: u64, -) { - let Some(cc) = client else { - tracing::warn!( - request_id = %context_id, - "cache_control set but no cache_control_client configured" - ); - return; - }; - - let cc = cc.clone(); - let token_ids = token_ids.to_vec(); - let context_id = context_id.to_owned(); - - tokio::spawn(async move { - let pin_request = serde_json::json!({ - "action": "pin_prefix", - "token_ids": token_ids, - "ttl_seconds": ttl_seconds, - }); - match cc.direct(SingleIn::new(pin_request), instance_id).await { - Ok(mut stream) => { - if let Some(resp) = stream.next().await { - tracing::info!( - request_id = %context_id, - worker_id = instance_id, - ?resp, - "pin_prefix response" - ); - } - // Drain remaining stream to avoid "Failed to publish - // complete final" errors from the push handler. - while stream.next().await.is_some() {} - } - Err(e) => { - tracing::warn!( - request_id = %context_id, - worker_id = instance_id, - "Failed to pin prefix: {e}" - ); - } - } - }); -} diff --git a/lib/llm/src/kv_router/push_router.rs b/lib/llm/src/kv_router/push_router.rs index 543564cfc5c..af37feb3024 100644 --- a/lib/llm/src/kv_router/push_router.rs +++ b/lib/llm/src/kv_router/push_router.rs @@ -15,15 +15,10 @@ use dynamo_runtime::{ }; use futures::stream::{self, StreamExt}; use serde_json::json; -use tokio::sync::OnceCell; use tracing::Instrument; use crate::{ - kv_router::{ - CacheControlClient, KvRouter, - cache_control::{PinState, create_cache_control_client, spawn_pin_prefix}, - metrics::RouterRequestMetrics, - }, + kv_router::{KvRouter, metrics::RouterRequestMetrics}, preprocessor::PreprocessedRequest, protocols::common::{ llm_backend::LLMEngineOutput, @@ -34,8 +29,6 @@ use crate::{ pub struct KvPushRouter { inner: PushRouter>, pub chooser: Arc, - /// Lazily initialized on first PIN request. `None` when cache_control is disabled. - cache_control_cell: Option>, } /// Result of worker selection containing instance ID, dp_rank, and overlap amount. @@ -66,8 +59,6 @@ struct RequestGuard { isl_tokens: usize, block_size: usize, expected_output_tokens: Option, - // PIN state: set when cache_control TTL is present and a cc_client exists - pin_state: Option, } impl RequestGuard { @@ -143,16 +134,6 @@ impl RequestGuard { tracing::warn!("Failed to free request {}: {e}", self.context_id); } self.freed = true; - - if let Some(ref pin) = self.pin_state { - spawn_pin_prefix( - Some(&pin.cc_client), - &pin.token_ids, - pin.instance_id, - &self.context_id, - pin.ttl_seconds, - ); - } } fn record_metrics(&mut self) { @@ -200,17 +181,7 @@ impl KvPushRouter { // and the standalone router create KvPushRouter, so this covers both. RouterRequestMetrics::from_component(chooser.client().endpoint.component()); - let cache_control_cell = if chooser.kv_router_config().router_enable_cache_control { - tracing::info!("Cache control enabled for PIN operations (lazy init)"); - Some(OnceCell::new()) - } else { - None - }; - KvPushRouter { - inner, - chooser, - cache_control_cell, - } + KvPushRouter { inner, chooser } } /// Select a worker for the request, either using a preselected worker or finding the best match. @@ -472,26 +443,6 @@ impl AsyncEngine, ManyOut = async { - let ttl = request.routing.as_ref().and_then(|r| r.cache_control_ttl)?; - let cell = self.cache_control_cell.as_ref()?; - let component = self.chooser.client().endpoint.component().clone(); - let client = cell - .get_or_try_init(|| create_cache_control_client(&component)) - .await - .inspect_err(|e| tracing::warn!("Failed to create cache_control client: {e}")) - .ok()? - .clone(); - Some(PinState { - token_ids: request.token_ids.clone(), - cc_client: client, - instance_id, - ttl_seconds: ttl, - }) - } - .await; - let (mut backend_input, context) = request.into_parts(); backend_input.routing_mut().dp_rank = Some(dp_rank); let updated_request = context.map(|_| backend_input); @@ -533,7 +484,6 @@ impl AsyncEngine, ManyOut for NvCreateChatCompletionRequest { type Error = anyhow::Error; @@ -119,44 +119,7 @@ impl TryFrom for NvCreateChatCompletionRequest { top_k: req.top_k.map(|k| k as i32), ..Default::default() }, - nvext: { - // Lossy: collapse all per-block cache_control into a single - // last-one-wins value. Sufficient for backends with a single - // prefix cache boundary. Full per-block breakpoints are - // preserved in AnthropicContext::cache_breakpoints via UnifiedRequest. - let mut last_block_cc: Option = None; - for msg in &req.messages { - if let AnthropicMessageContent::Blocks { content } = &msg.content { - for block in content { - let block_cc = match block { - AnthropicContentBlock::Text { cache_control, .. } => { - cache_control.as_ref() - } - AnthropicContentBlock::ToolUse { cache_control, .. } => { - cache_control.as_ref() - } - AnthropicContentBlock::ToolResult { cache_control, .. } => { - cache_control.as_ref() - } - AnthropicContentBlock::Thinking { cache_control, .. } => { - cache_control.as_ref() - } - _ => None, - }; - if let Some(cc) = block_cc { - last_block_cc = Some(cc.clone()); - } - } - } - } - // Merge: top-level > per-block > system block cache_control - let system_cc = req.system.as_ref().and_then(|s| s.cache_control.clone()); - let effective_cc = req.cache_control.clone().or(last_block_cc).or(system_cc); - effective_cc.map(|cc| NvExt { - cache_control: Some(cc), - ..Default::default() - }) - }, + nvext: None, // chat_template_args may be augmented by the Anthropic handler // (anthropic.rs) after conversion — e.g., setting enable_thinking=true // when a reasoning parser is configured. The conversion layer only @@ -1419,7 +1382,7 @@ mod tests { #[test] fn test_cache_control_passthrough() { - use crate::protocols::openai::nvext::{CacheControl, CacheControlType}; + use dynamo_protocols::types::anthropic::{CacheControl, CacheControlType}; let req = AnthropicCreateMessageRequest { model: "test-model".into(), @@ -1450,18 +1413,11 @@ mod tests { }; let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); - let nvext = chat_req.nvext.expect("nvext should be set"); - let cc = nvext - .cache_control - .expect("nvext.cache_control should be set"); - assert_eq!(cc.control_type, CacheControlType::Ephemeral); - assert_eq!(cc.ttl_seconds(), 300); + assert!(chat_req.nvext.is_none()); } #[test] fn test_cache_control_1h_ttl_passthrough() { - use crate::protocols::openai::nvext::CacheControlType; - let json = r#"{ "model": "test", "max_tokens": 100, @@ -1472,12 +1428,7 @@ mod tests { assert!(req.cache_control.is_some()); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); - let nvext = chat_req.nvext.expect("nvext should be set"); - let cc = nvext - .cache_control - .expect("nvext.cache_control should be set"); - assert_eq!(cc.control_type, CacheControlType::Ephemeral); - assert_eq!(cc.ttl_seconds(), 3600); + assert!(chat_req.nvext.is_none()); } #[test] @@ -1546,8 +1497,6 @@ mod tests { #[test] fn test_per_block_cache_control_last_wins() { - use crate::protocols::openai::nvext::CacheControlType; - let json = r#"{ "model": "test", "max_tokens": 100, @@ -1563,10 +1512,7 @@ mod tests { }"#; let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap(); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); - let nvext = chat_req.nvext.expect("nvext should be set"); - let cc = nvext.cache_control.expect("cache_control should be set"); - assert_eq!(cc.control_type, CacheControlType::Ephemeral); - assert_eq!(cc.ttl_seconds(), 3600); // Last block's 1h TTL wins + assert!(chat_req.nvext.is_none()); } #[test] @@ -1586,16 +1532,11 @@ mod tests { }"#; let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap(); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); - let nvext = chat_req.nvext.expect("nvext should be set"); - let cc = nvext.cache_control.expect("cache_control should be set"); - // Top-level (no TTL = 300s default) takes precedence over per-block (1h) - assert_eq!(cc.ttl_seconds(), 300); + assert!(chat_req.nvext.is_none()); } #[test] fn test_system_block_array_with_cache_control() { - use crate::protocols::openai::nvext::CacheControlType; - let json = r#"{ "model": "test", "max_tokens": 100, @@ -1612,11 +1553,7 @@ mod tests { assert!(system.cache_control.is_some()); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); - let nvext = chat_req - .nvext - .expect("nvext should be set from system cache_control"); - let cc = nvext.cache_control.expect("cache_control should be set"); - assert_eq!(cc.control_type, CacheControlType::Ephemeral); + assert!(chat_req.nvext.is_none()); } #[test] diff --git a/lib/llm/src/protocols/common/preprocessor.rs b/lib/llm/src/protocols/common/preprocessor.rs index fb4c9f2df9e..292205a186d 100644 --- a/lib/llm/src/protocols/common/preprocessor.rs +++ b/lib/llm/src/protocols/common/preprocessor.rs @@ -58,10 +58,6 @@ pub struct RoutingHints { #[serde(default, skip_serializing_if = "Option::is_none")] pub priority: Option, - /// TTL in seconds for cache control pinning. None = no pinning. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub cache_control_ttl: Option, - /// Worker IDs provided externally and not discovered by the router. /// When set, only workers in this set are considered during scoring. #[serde(default, skip_serializing_if = "Option::is_none")] diff --git a/lib/llm/src/protocols/openai/chat_completions.rs b/lib/llm/src/protocols/openai/chat_completions.rs index 461334a5d05..f927ef1f52a 100644 --- a/lib/llm/src/protocols/openai/chat_completions.rs +++ b/lib/llm/src/protocols/openai/chat_completions.rs @@ -95,10 +95,6 @@ impl NvExtProvider for NvCreateChatCompletionRequest { fn raw_prompt(&self) -> Option { None } - - fn effective_cache_control(&self) -> Option<&crate::protocols::openai::nvext::CacheControl> { - NvExtProvider::nvext(self).and_then(|ext| ext.cache_control.as_ref()) - } } /// Implements `AnnotationsProvider` for `NvCreateChatCompletionRequest`, diff --git a/lib/llm/src/protocols/openai/nvext.rs b/lib/llm/src/protocols/openai/nvext.rs index 0d4e146b32a..0d50a63ed55 100644 --- a/lib/llm/src/protocols/openai/nvext.rs +++ b/lib/llm/src/protocols/openai/nvext.rs @@ -49,13 +49,6 @@ pub fn apply_header_routing_overrides(nvext: Option, headers: &HeaderMap) pub trait NvExtProvider { fn nvext(&self) -> Option<&NvExt>; fn raw_prompt(&self) -> Option; - - /// Return the effective cache control for this request. - /// Default: delegates to `nvext.cache_control`. Implementations may override - /// to also check a top-level `cache_control` field (see `NvCreateChatCompletionRequest`). - fn effective_cache_control(&self) -> Option<&CacheControl> { - self.nvext().and_then(|ext| ext.cache_control.as_ref()) - } } /// Worker ID information for disaggregated serving @@ -169,12 +162,6 @@ pub struct NvExt { #[serde(default, skip_serializing_if = "Option::is_none")] pub agent_hints: Option, - /// Cache control hint (Anthropic-style). When present, the router pins - /// the prefix on the selected worker with the given TTL. - #[builder(default, setter(strip_option))] - #[serde(default, skip_serializing_if = "Option::is_none")] - pub cache_control: Option, - /// Optional request timestamp in milliseconds for trace replay / virtual-time simulation. #[builder(default, setter(strip_option))] #[serde(default, skip_serializing_if = "Option::is_none")] @@ -214,10 +201,6 @@ pub struct AgentHints { pub latency_sensitivity: Option, } -// Re-export CacheControl types from dynamo-async-openai where they are canonically defined -// alongside the Anthropic protocol types they originate from. -pub use dynamo_protocols::types::anthropic::{CacheControl, CacheControlType}; - impl Default for NvExt { fn default() -> Self { NvExt::builder().build().unwrap() @@ -265,74 +248,7 @@ mod tests { assert_eq!(nv_ext.prefill_worker_id, None); assert_eq!(nv_ext.decode_worker_id, None); assert_eq!(nv_ext.agent_hints, None); - assert_eq!(nv_ext.cache_control, None); - } - - // Test CacheControl serde roundtrip and TTL parsing - #[test] - fn test_cache_control_serde_and_ttl() { - // Default (ephemeral, no TTL) - let cc = CacheControl::default(); - assert_eq!(cc.control_type, CacheControlType::Ephemeral); - assert_eq!(cc.ttl, None); - assert_eq!(cc.ttl_seconds(), 300); - - // Shorthand values - let cc_5m = CacheControl { - control_type: CacheControlType::Ephemeral, - ttl: Some("5m".to_string()), - }; - assert_eq!(cc_5m.ttl_seconds(), 300); - - let cc_1h = CacheControl { - control_type: CacheControlType::Ephemeral, - ttl: Some("1h".to_string()), - }; - assert_eq!(cc_1h.ttl_seconds(), 3600); - - // Integer seconds -- within range - let cc_600 = CacheControl { - control_type: CacheControlType::Ephemeral, - ttl: Some("600".to_string()), - }; - assert_eq!(cc_600.ttl_seconds(), 600); - - // Integer seconds -- clamped to min (300) - let cc_low = CacheControl { - control_type: CacheControlType::Ephemeral, - ttl: Some("10".to_string()), - }; - assert_eq!(cc_low.ttl_seconds(), 300); - - // Integer seconds -- clamped to max (3600) - let cc_high = CacheControl { - control_type: CacheControlType::Ephemeral, - ttl: Some("7200".to_string()), - }; - assert_eq!(cc_high.ttl_seconds(), 3600); - - // Unrecognized string defaults to 300 - let cc_bad = CacheControl { - control_type: CacheControlType::Ephemeral, - ttl: Some("forever".to_string()), - }; - assert_eq!(cc_bad.ttl_seconds(), 300); - - // Serde roundtrip - let json = serde_json::to_string(&cc_5m).unwrap(); - let deser: CacheControl = serde_json::from_str(&json).unwrap(); - assert_eq!(deser, cc_5m); - - // Deserialize from API-style JSON - let api_json = r#"{"type": "ephemeral", "ttl": "1h"}"#; - let from_api: CacheControl = serde_json::from_str(api_json).unwrap(); - assert_eq!(from_api.ttl_seconds(), 3600); - - // NvExt with cache_control - let nvext_json = r#"{"cache_control": {"type": "ephemeral", "ttl": "5m"}}"#; - let nvext: NvExt = serde_json::from_str(nvext_json).unwrap(); - assert!(nvext.cache_control.is_some()); - assert_eq!(nvext.cache_control.unwrap().ttl_seconds(), 300); + assert_eq!(nv_ext.request_timestamp_ms, None); } // Test valid builder configurations diff --git a/lib/llm/src/protocols/unified.rs b/lib/llm/src/protocols/unified.rs index 444f387e79a..e2f8b97355e 100644 --- a/lib/llm/src/protocols/unified.rs +++ b/lib/llm/src/protocols/unified.rs @@ -33,6 +33,7 @@ use std::collections::HashMap; +use dynamo_protocols::types::anthropic::CacheControl; use dynamo_runtime::protocols::annotated::AnnotationsProvider; use serde::{Deserialize, Serialize}; @@ -41,7 +42,7 @@ use crate::preprocessor::prompt::{OAIChatLikeRequest, TextInput}; use crate::protocols::openai::chat_completions::NvCreateChatCompletionRequest; use crate::protocols::openai::common_ext::{CommonExt, CommonExtProvider}; -use crate::protocols::openai::nvext::{CacheControl, NvExt, NvExtProvider}; +use crate::protocols::openai::nvext::{NvExt, NvExtProvider}; use crate::protocols::openai::{ OpenAIOutputOptionsProvider, OpenAISamplingOptionsProvider, OpenAIStopConditionsProvider, }; @@ -77,12 +78,8 @@ pub struct AnthropicContext { pub thinking: Option, /// Per-block cache control breakpoints with their position in the - /// message array. The existing Anthropic→Chat Completions conversion - /// collapses all per-block `cache_control` annotations into a single - /// last-one-wins `nvext.cache_control` field. This preserves the full - /// per-block granularity for future use (e.g., multi-breakpoint prefix - /// caching, or faithfully reporting per-breakpoint `cache_creation_input_tokens` - /// / `cache_read_input_tokens` in the response). + /// message array. These remain available in the API sidecar even when + /// the request conversion does not forward cache control into `nvext`. #[serde(default, skip_serializing_if = "Vec::is_empty")] pub cache_breakpoints: Vec, @@ -287,15 +284,6 @@ impl NvExtProvider for UnifiedRequest { fn raw_prompt(&self) -> Option { None } - - /// Returns the single collapsed cache control from `nvext`. This is the - /// last-one-wins value produced by the Anthropic→Chat Completions conversion - /// and is sufficient for backends that support a single prefix cache boundary - /// (SGLang, vLLM). For per-block granularity, consult - /// `AnthropicContext::cache_breakpoints` via the `ApiContext` sidecar. - fn effective_cache_control(&self) -> Option<&CacheControl> { - NvExtProvider::nvext(self).and_then(|ext| ext.cache_control.as_ref()) - } } impl AnnotationsProvider for UnifiedRequest {