Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions components/src/dynamo/sglang/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,12 @@ async def parse_args(args: list[str]) -> Config:
# contain code to download a model, it should only parse the args.
server_args = ServerArgs.from_cli_args(parsed_args)

# Dynamo's streaming handlers expect disjoint output_ids from SGLang (only new
# tokens since last output), not cumulative tokens. When stream_output=True,
# SGLang sends disjoint segments which Dynamo passes through directly.
# Force stream_output=True for optimal streaming performance.
server_args.stream_output = True

if parsed_args.use_sglang_tokenizer:
logging.info(
"Using SGLang's built in tokenizer. Setting skip_tokenizer_init to False"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,16 @@ async def _process_token_stream(
) -> AsyncGenerator[Dict[str, Any], None]:
"""Process token-based stream output.

With stream_output=True (enforced by Dynamo), SGLang sends disjoint segments
containing only new tokens since the last output. We pass these through directly.

Args:
stream_source: Async generator from engine.async_generate.
context: Context object for cancellation handling.

Yields:
Dict with token_ids and optional finish_reason.
"""
num_output_tokens_so_far = 0

# Use Future pattern for request ID - will be set when first response arrives
request_id_future = asyncio.Future()
async with self._cancellation_monitor(request_id_future, context):
Expand All @@ -196,16 +197,16 @@ async def _process_token_stream(
if finish_reason:
out["finish_reason"] = finish_reason["type"]

# With stream_output=True, output_ids contains only new tokens (disjoint)
output_ids = res.get("output_ids", [])
# If request is not finished yet, but there are no outputs, return an error.
if not output_ids and not finish_reason:
if not context.is_stopped():
yield {"finish_reason": "error", "token_ids": []}
break

next_total_toks = len(output_ids)
out["token_ids"] = output_ids[num_output_tokens_so_far:]
num_output_tokens_so_far = next_total_toks
# Pass through disjoint token segments directly
out["token_ids"] = output_ids
if finish_reason:
input_tokens = res["meta_info"]["prompt_tokens"]
completion_tokens = res["meta_info"]["completion_tokens"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,30 +133,26 @@ class StreamProcessor:

@staticmethod
async def process_sglang_stream(stream_source) -> AsyncIterator[str]:
"""Process SGLang stream output following backend pattern"""
num_output_tokens_so_far = 0
"""Process SGLang stream output.

With stream_output=True (enforced by Dynamo), SGLang sends disjoint segments
containing only new tokens since the last output. We pass these through directly.
"""
try:
async for res in stream_source:
try:
next_total_toks = len(res["output_ids"])

# Return incremental tokens
# With stream_output=True, output_ids contains only new tokens (disjoint)
output = {
"token_ids": res["output_ids"][num_output_tokens_so_far:],
"token_ids": res["output_ids"],
"text": res.get("text", ""),
"finished": False,
}
num_output_tokens_so_far = next_total_toks

# Check for finish reason
finish_reason = res.get("meta_info", {}).get("finish_reason")
if finish_reason:
output.update(
{
"token_ids": res["output_ids"][
num_output_tokens_so_far:
],
"finish_reason": finish_reason.get("type", "stop"),
"finished": True,
}
Expand Down
Loading