Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e94e482
feat: add sticky session control for sglang sessions
ishandhanani Mar 27, 2026
f7bb91d
refactor(sglang): make agent controller session-only
ishandhanani Apr 1, 2026
429a47f
fix(router): registration and session control client fixes
ishandhanani Apr 2, 2026
f3e4aaa
fix: address review comments on session control PR
ishandhanani Apr 2, 2026
834fb44
docs: update agent docs for session control
ishandhanani Apr 2, 2026
d4f13c2
feat: add agg_agent.sh launch script and quickstart docs
ishandhanani Apr 2, 2026
291d4a4
docs: fix OpenCode launch command
ishandhanani Apr 2, 2026
e0173a3
docs: fix OpenCode fork URL
ishandhanani Apr 2, 2026
c96a519
docs: fix OpenCode fork URL to anomalyco/opencode
ishandhanani Apr 2, 2026
83cbff4
revert: restore agg_router.sh to match main
ishandhanani Apr 2, 2026
54d70a6
docs: remove python example and priority eviction section from agent …
ishandhanani Apr 2, 2026
68ab940
docs: expand OpenCode quickstart with provider config and subagent li…
ishandhanani Apr 2, 2026
0c88d13
docs: reference SGLang streaming session PR requirement
ishandhanani Apr 2, 2026
4291cce
refactor: simplify session control, remove dead cache_control code
ishandhanani Apr 2, 2026
59d1185
lint
ishandhanani Apr 2, 2026
665670e
lint
ishandhanani Apr 2, 2026
96caee8
fix: session close leak on drop and affinity bind on failed open
ishandhanani Apr 2, 2026
19ed3f5
go
ishandhanani Apr 2, 2026
aa3a6fd
fix: gate session_control endpoint on --enable-streaming-session
ishandhanani Apr 2, 2026
fd26501
Merge branch 'main' into idhanani/dyn-ephemeral-kv-sessions
ishandhanani Apr 2, 2026
7ed03b3
fix: use getattr for enable_streaming_session (upstream compat)
ishandhanani Apr 2, 2026
cd1da9f
Merge branch 'main' into idhanani/dyn-ephemeral-kv-sessions
ishandhanani Apr 10, 2026
55c9a94
fix(kv-router): correct drop ordering for deferred session close
ishandhanani Apr 10, 2026
31175a7
fix(kv-router): skip sticky resolution for non-session requests
ishandhanani Apr 10, 2026
0dddad4
Merge branch 'main' into idhanani/dyn-ephemeral-kv-sessions
ishandhanani Apr 13, 2026
11f9a2d
refactor(kv-router): make session control request-driven instead of s…
ishandhanani Apr 13, 2026
4d9ac52
fix(kv-router): graceful degradation when streaming sessions unavailable
ishandhanani Apr 13, 2026
77fbf83
Merge remote-tracking branch 'origin/main' into idhanani/dyn-ephemera…
ishandhanani Apr 13, 2026
ff8636d
test(sglang): improve streaming session smoke test
ishandhanani Apr 13, 2026
facdd23
go
ishandhanani Apr 13, 2026
8cb46c1
fix(clippy): collapse nested if in RequestGuard drop
ishandhanani Apr 13, 2026
a340913
style: cargo fmt
ishandhanani Apr 13, 2026
647c110
docs: note SGLang version requirement for streaming sessions
ishandhanani Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions components/src/dynamo/frontend/sglang_prepost.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,22 @@ def create_parsers(
return tool_call_parser, reasoning_parser


def _normalize_prompt_token_ids(prompt_token_ids: Any) -> list[int]:
if isinstance(prompt_token_ids, list):
return prompt_token_ids

input_ids = getattr(prompt_token_ids, "input_ids", None)
if input_ids is not None and not isinstance(input_ids, str):
return list(input_ids)

if isinstance(prompt_token_ids, dict):
dict_input_ids = prompt_token_ids.get("input_ids")
if dict_input_ids is not None and not isinstance(dict_input_ids, str):
return list(dict_input_ids)

return list(prompt_token_ids)
Comment thread
ishandhanani marked this conversation as resolved.


def preprocess_chat_request(
request: dict[str, Any],
*,
Expand Down Expand Up @@ -124,9 +140,9 @@ def preprocess_chat_request(
):
template_kwargs["tools"] = [t.model_dump() for t in sglang_tools]

prompt_token_ids = tokenizer.apply_chat_template(messages, **template_kwargs)
if not isinstance(prompt_token_ids, list):
prompt_token_ids = list(prompt_token_ids)
prompt_token_ids = _normalize_prompt_token_ids(
tokenizer.apply_chat_template(messages, **template_kwargs)
)

tool_call_parser, reasoning_parser = create_parsers(
request,
Expand Down
21 changes: 19 additions & 2 deletions components/src/dynamo/frontend/sglang_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@
logger = logging.getLogger(__name__)


def _runtime_config_parser_name(
mdc: ModelDeploymentCard,
key: str,
) -> str | None:
runtime_config = mdc.runtime_config()
if not isinstance(runtime_config, dict):
return None
value = runtime_config.get(key)
return value if isinstance(value, str) and value else None


def _unsupported_n_error(n: int) -> dict[str, Any]:
return {
"error": {
Expand Down Expand Up @@ -553,8 +564,14 @@ async def chat_engine_factory(

eos_token_id = getattr(tokenizer, "eos_token_id", None)

tool_call_parser_name = self.tool_call_parser_name
reasoning_parser_name = self.reasoning_parser_name
tool_call_parser_name = (
self.tool_call_parser_name
or _runtime_config_parser_name(mdc, "tool_call_parser")
)
reasoning_parser_name = (
self.reasoning_parser_name
or _runtime_config_parser_name(mdc, "reasoning_parser")
)

if tool_call_parser_name:
logger.info("SGLang tool call parser: %s", tool_call_parser_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dynamo.frontend.sglang_prepost import (
SglangPreprocessResult,
SglangStreamingPostProcessor,
_normalize_prompt_token_ids,
convert_tools,
create_parsers,
preprocess_chat_request,
Expand All @@ -26,6 +27,7 @@
_build_dynamo_preproc,
_init_worker,
_map_finish_reason,
_runtime_config_parser_name,
)
from dynamo.frontend.utils import PreprocessError, random_call_id, random_uuid

Expand Down Expand Up @@ -436,6 +438,46 @@ def test_both_parsers(self):
assert rp is not None


class TestNormalizePromptTokenIds:
def test_batch_encoding_like_object_uses_input_ids(self):
class FakeBatchEncoding:
def __init__(self):
self.input_ids = [11, 22, 33]

def __iter__(self):
yield from ("input_ids", "attention_mask")

assert _normalize_prompt_token_ids(FakeBatchEncoding()) == [11, 22, 33]
Comment thread
ishandhanani marked this conversation as resolved.

def test_mapping_uses_input_ids(self):
assert _normalize_prompt_token_ids(
{"input_ids": [1, 2, 3], "attention_mask": [1, 1, 1]}
) == [1, 2, 3]


class TestRuntimeConfigParserName:
def test_missing_runtime_config_returns_none(self):
class FakeMdc:
def runtime_config(self):
return None

assert _runtime_config_parser_name(FakeMdc(), "tool_call_parser") is None

def test_missing_key_returns_none(self):
class FakeMdc:
def runtime_config(self):
return {"reasoning_parser": "qwen3"}

assert _runtime_config_parser_name(FakeMdc(), "tool_call_parser") is None

def test_reads_non_empty_string_value(self):
class FakeMdc:
def runtime_config(self):
return {"tool_call_parser": "hermes"}

assert _runtime_config_parser_name(FakeMdc(), "tool_call_parser") == "hermes"


# ---------------------------------------------------------------------------
# preprocess_chat_request
# ---------------------------------------------------------------------------
Expand Down
8 changes: 8 additions & 0 deletions components/src/dynamo/sglang/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,14 @@ text-to-video-diffusion.sh # 1-2 GPUs - Text-to-video (Wan2.1)
Always slice with an offset, don't assume per-chunk logprobs.
- **Zombie GPU processes**: `sgl_diffusion::scheduler` spawns a child process that
survives parent kill. Always check `nvidia-smi` after teardown.
- **Session control graceful degradation**: Session control is request-driven --
the router's `AgentController` and `StickySessionRouter` are always created but
activate lazily. If no worker has `--enable-streaming-session`, the router warns
once and ignores `session_control` in requests. On the handler side,
`_session_kwargs()` checks `enable_streaming_session` before injecting
`session_params` into SGLang calls. Both layers must agree: the router skips
lifecycle RPCs, and the handler skips session params. Without both guards,
SGLang errors with "session id does not exist".

For troubleshooting (CuDNN, config.json errors, OOM, disagg connectivity), see
`docs/backends/sglang/sglang-examples.md#troubleshooting`.
Expand Down
16 changes: 14 additions & 2 deletions components/src/dynamo/sglang/init_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,15 @@ async def init_decode(
"The chat template will be loaded but the /v1/chat/completions endpoint will not be available."
)

# Only serve session_control when streaming sessions are enabled.
if getattr(server_args, "enable_streaming_session", False):
session_control_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.session_control"
)
shutdown_endpoints.append(session_control_endpoint)

try:
await asyncio.gather(
gather_tasks = [
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
Expand All @@ -133,7 +140,12 @@ async def init_decode(
output_type=parse_endpoint_types(dynamo_args.endpoint_types),
readiness_gate=ready_event,
),
)
]
if getattr(server_args, "enable_streaming_session", False):
gather_tasks.append(
session_control_endpoint.serve_endpoint(handler.session_control)
)
await asyncio.gather(*gather_tasks)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
Expand Down
1 change: 0 additions & 1 deletion components/src/dynamo/sglang/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def _get_bootstrap_info_for_config(
f"Using auto-detected local IP: {local_ip} "
f"({'IPv6' if local_addr.is_ipv6 else 'IPv4'})"
)

return bootstrap_host, bootstrap_port
except Exception as e:
logging.warning(f"Failed to get bootstrap info: {e}")
Expand Down
87 changes: 87 additions & 0 deletions components/src/dynamo/sglang/request_handlers/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,78 @@ async def update_weight_version(self, body: dict) -> dict:
"new_version": req.new_version,
}

async def open_session(self, body: dict) -> dict:
"""Open a streaming session for subagent KV isolation.

Args:
body: Dict with "session_id", optional "timeout" (default 120),
and optional "capacity_of_str_len" (default 65536).
"""
from sglang.srt.managers.io_struct import OpenSessionReqInput

session_id = body.get("session_id")
if not session_id:
return {"status": "error", "message": "session_id required"}
timeout = body.get("timeout", 120)
capacity = body.get("capacity_of_str_len", 65536)
try:
obj = OpenSessionReqInput(
capacity_of_str_len=capacity,
session_id=session_id,
streaming=True,
timeout=float(timeout),
)
result = await self.engine.tokenizer_manager.open_session(obj, None)
if result is None:
return {
"status": "ok",
"session_id": session_id,
"message": "Session already exists",
}
return {"status": "ok", "session_id": result}
except Exception as e:
logging.error(f"Failed to open session {session_id}: {e}")
return {"status": "error", "message": str(e)}

async def close_session(self, body: dict) -> dict:
"""Close a streaming session and release its KV resources.

Args:
body: Dict with "session_id".
"""
from sglang.srt.managers.io_struct import CloseSessionReqInput

session_id = body.get("session_id")
if not session_id:
return {"status": "error", "message": "session_id required"}
try:
obj = CloseSessionReqInput(session_id=session_id)
await self.engine.tokenizer_manager.close_session(obj, None)
return {"status": "ok", "session_id": session_id}
except Exception as e:
logging.error(f"Failed to close session {session_id}: {e}")
return {"status": "error", "message": str(e)}

async def session_control(self, request, context=None):
"""Service mesh endpoint for session lifecycle operations.

Args:
request: Dict with "action" key ("open_session" or "close_session")
and action-specific parameters.
context: Optional Dynamo context (unused but required by protocol).

Yields:
Single dict with operation result.
"""
action = request.get("action")
if action == "open_session":
result = await self.open_session(request)
elif action == "close_session":
result = await self.close_session(request)
else:
result = {"status": "error", "message": f"Unknown action: {action}"}
yield result

def register_engine_routes(self, runtime: DistributedRuntime) -> None:
"""Register all engine routes for this handler.

Expand Down Expand Up @@ -511,6 +583,9 @@ def register_engine_routes(self, runtime: DistributedRuntime) -> None:
self.config.dynamo_args, "enable_rl", False
):
self.register_rl_engine_routes(runtime)
# session_control is served as a discoverable service endpoint
# (not an engine route) so the router can find it via
# component.endpoint("session_control"). See init_llm.py.

@abstractmethod
def generate(self, request: RequestT, context: Context) -> AsyncIterator[ResponseT]:
Expand Down Expand Up @@ -539,6 +614,18 @@ def _get_input_param(self, request: Dict[str, Any]) -> Dict[str, Any]:
"prompt" if isinstance(request_input, str) else "input_ids": request_input
}

def _session_kwargs(self, request: Dict[str, Any]) -> Dict[str, Any]:
if not getattr(self.config.server_args, "enable_streaming_session", False):
return {}
routing = request.get("routing") or {}
session_control = routing.get("session_control") or {}
session_id = session_control.get("session_id")
if not session_id:
return {}

# Streaming sessions only need the session identifier on each turn.
return {"session_params": {"id": session_id}}

@staticmethod
def _get_guided_decoding_params(
guided_decoding: Optional[Dict[str, Any]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ async def generate(
external_trace_header=trace_header,
rid=trace_id,
data_parallel_rank=dp_rank,
**self._session_kwargs(request),
**logprob_kwargs,
**self._priority_kwargs(priority),
)
Expand Down Expand Up @@ -338,6 +339,7 @@ async def generate(
external_trace_header=trace_header,
rid=trace_id,
data_parallel_rank=dp_rank,
**self._session_kwargs(request),
**logprob_kwargs,
**self._priority_kwargs(priority),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ async def generate(
external_trace_header=trace_header,
rid=trace_id,
data_parallel_rank=dp_rank,
**self._session_kwargs(inner_request),
**self._priority_kwargs(priority),
)

Expand Down
Loading
Loading