feat: enable ephemeral kv cache sessions via sglang #7384
Conversation
WalkthroughThe changes introduce streaming KV session lifecycle management across the request pipeline. New protocol types define session control actions, runtime infrastructure provides client connectivity and session state management, and request handlers expose endpoints for opening and closing sessions. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~28 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip You can disable the changed files summary in the walkthrough.Disable the |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@lib/llm/src/kv_router/push_router.rs`:
- Around line 520-539: The current session-close logic in the async block that
builds session_close_state (symbols: session_close_state, SessionCloseState,
session_control, SessionAction::Close, instance_id) can schedule Close against
whatever instance_id was selected earlier and may hit the wrong worker; change
the code to verify worker affinity before creating sc_client: if the incoming
request already includes backend_instance_id or a phase-specific pinned worker,
allow the close; otherwise perform a session-to-worker lookup (or consult
session metadata) to resolve the correct worker for sc.session_id and compare it
with instance_id, and if they differ return None / early-fail so the close is
not sent to a wrong worker; use the same component/client creation flow
(chooser.client().endpoint.component(),
session_control_cell.get_or_try_init(...), create_session_control_client) only
after confirming affinity matches, and ensure SessionCloseState contains the
resolved instance id and sc_client for the pinned worker.
- Around line 501-517: The branch that handles session_control ==
Some(SessionAction::Open) must fail fast instead of logging and continuing when
the router flag is off (self.session_control_cell is None) or when
create_session_control_client(...) fails; update the session open handling in
push_router.rs so that if sc.action == SessionAction::Open and either
self.session_control_cell.is_none() or cell.get_or_try_init(...).await returns
Err, the function returns an Err (propagate an appropriate error) rather than
falling through; keep use of create_session_control_client,
session_control_cell, SessionAction::Open, and spawn_open_session, but change
the error path to return an error immediately with a clear message indicating
session open failed.
- Around line 168-170: The drop path must mirror finish() by sending the
explicit close before freeing scheduler state: in RequestGuard::drop() check
self.session_close_state and, if present, call
spawn_close_session(&state.sc_client, &state.session_id, state.instance_id,
&self.context_id) (or otherwise invoke the same close routine used by finish()),
but guard against double-closing by atomically taking or marking the
session_close_state as consumed (e.g., swap Option to None or use an AtomicBool)
so finish() and Drop cannot both send the close; ensure existing scheduler
cleanup still runs after the close call.
In `@lib/llm/src/preprocessor.rs`:
- Around line 251-267: preprocess_request() currently forwards
nvext().session_params into preprocessed.extra_args but the
NvCreateCompletionRequest path builds common_request directly and drops
session_params; update the NvCreateCompletionRequest handling to reuse the same
propagation logic: after creating common_request (or the result of
builder.build()/preprocessed), detect request.nvext().and_then(|ext|
ext.session_params.clone()) and insert it into common_request.extra_args as a
JSON object field "session_params" (matching the existing pattern that checks
serde_json::Value::Object and calls map.insert), or alternatively explicitly
reject nvext.session_params for completions; modify the code around where
common_request is constructed in the NvCreateCompletionRequest flow to apply
this same insertion using the same symbols (session_params, extra_args,
common_request/NvCreateCompletionRequest handling).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ed29e2d0-290d-473e-a148-ce42108bb358
📒 Files selected for processing (8)
components/src/dynamo/sglang/request_handlers/handler_base.pycomponents/src/dynamo/sglang/request_handlers/llm/decode_handler.pylib/llm/src/kv_router.rslib/llm/src/kv_router/push_router.rslib/llm/src/kv_router/session_control.rslib/llm/src/preprocessor.rslib/llm/src/protocols/common/preprocessor.rslib/llm/src/protocols/openai/nvext.rs
0ad0e98 to
fd0c25c
Compare
fd0c25c to
1504f07
Compare
1504f07 to
93bd5cb
Compare
93bd5cb to
03b74cc
Compare
03b74cc to
80b5d67
Compare
- Add StickySessionRouter with trait-based AffinityStore for session affinity (in-memory default, pluggable for multi-router deployments) - Add AgentController for session lifecycle RPCs (open/close) with synchronous open to ensure session exists before first request - Replace fire-and-forget pin_prefix RPC with inline retention_seconds injection, enabling SGLang priority-based eviction with time decay - Register session_control as a discoverable service endpoint - Forward session_params through preprocessor to backend - Add SessionControl, SessionParams, SessionAction types to nvext - Update SGLang imports to use sglang.srt.utils.network - Update docs: cache pinning -> cache retention
80b5d67 to
0b8d705
Compare
|
Superseded by the split PRs:
Closing this combined PR so review stays on the smaller scoped branches. |
Summary
StickySessionRouterwith a trait-basedAffinityStore(in-memory default, pluggable for Redis/etcd in multi-router deployments)AgentControllerto session lifecycle RPCs only (open/close) -- removed cache_control client, PinAction, and the affinity DashMappin_prefixRPC with inlineretention_secondsinjection -- cache control TTL now flows as a field on the generate request, enabling SGLang's priority-based eviction with time decaypin_prefix/cache_controlPython handler codeData flow
sequenceDiagram participant Client participant Preprocessor participant StickyRouter as StickySessionRouter participant KVRouter as KV Router participant AgentCtrl as AgentController participant Worker as SGLang Worker participant Cache as Radix Cache Client->>Preprocessor: nvext.cache_control{ttl: "5m"}<br/>nvext.agent_hints{priority: 50}<br/>nvext.session_params{id: "sub-1", rid} Preprocessor->>Preprocessor: Extract routing hints<br/>cache_control_ttl=300<br/>priority=50 Preprocessor->>StickyRouter: resolve(session_params.id="sub-1") StickyRouter-->>KVRouter: worker_42 (from affinity table) Note over StickyRouter: Refreshes TTL on hit<br/>(sliding window) KVRouter->>KVRouter: select_worker() -> worker_42<br/>(pinned by sticky affinity) KVRouter->>AgentCtrl: on_routed(request, worker_42) Note over AgentCtrl: Open: fire open_session RPC +<br/>sticky.bind("sub-1", 42, ttl)<br/>Close: sticky.unbind + defer close KVRouter->>KVRouter: Inject extra_args.retention_seconds=300<br/>from cache_control_ttl KVRouter->>Worker: async_generate(<br/> retention_seconds=300,<br/> priority=50,<br/> session_params={id, rid}) Worker->>Cache: Insert with priority=50,<br/>retention_duration=300s Note over Cache: Survives over priority=0 blocks<br/>Decays to 0 after 5min idle Worker-->>Client: Stream response tokens Note over KVRouter,Worker: On stream end (RequestGuard) KVRouter-)Worker: close_session("sub-1")<br/>[fire-and-forget, if deferred]Sticky session routing
The
StickySessionRouteris a pure routing-layer abstraction -- no event plane, no I/O. It maintains asession_id -> worker_idmapping with sliding-window TTL (refreshed on every resolve).The default
InMemoryAffinityStoreuses aDashMapwith a background reaper. The trait is designed so that Redis/etcd/NATS KV backends can be swapped in for multi-router deployments where affinity needs to be shared across router instances.Key changes
lib/llm/src/kv_router/sticky_sessions.rsAffinityStoretrait,InMemoryAffinityStore,StickySessionRouter, 6 unit testslib/llm/src/kv_router/agent_controller.rslib/llm/src/kv_router/push_router.rsStickySessionRouter+AgentController+retention_secondsinjectioncomponents/.../handler_base.pypin_prefix,cache_controlmethods and route registration. Added_retention_kwargscomponents/.../decode_handler.pyretention_secondsfromextra_argstoasync_generate()docs/backends/sglang/agents.mdTest plan
cargo test -p dynamo-llm --lib-- 768 passed (6 new sticky_sessions tests)ruff check components/-- clean--enable-cache-control --radix-eviction-policy priority, send multi-turn requests withsession_params, verify sticky routing in logscache_control: {type: "ephemeral", ttl: "5m"}, verifyretention_seconds=300in SGLang generate call