Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions components/src/dynamo/sglang/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from dynamo.common.utils.runtime import parse_endpoint
from dynamo.llm import fetch_model
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sglang._compat import enable_disjoint_streaming_output
from dynamo.sglang.backend_args import DynamoSGLangArgGroup, DynamoSGLangConfig

configure_dynamo_logging()
Expand Down Expand Up @@ -374,11 +373,29 @@ async def parse_args(args: list[str]) -> Config:
"values are always higher priority at the API layer."
)

# Dynamo's streaming handlers expect disjoint output_ids from SGLang (only new
# tokens since last output), not cumulative tokens. Modern SGLang gates this
# behavior behind incremental_streaming_output, while older releases used
# stream_output.
enable_disjoint_streaming_output(server_args)
# Do NOT force incremental_streaming_output (or the old stream_output).
# When enabled, SGLang's tokenizer_manager must yield every streaming chunk
# individually (no coalescing), which creates backpressure under high
# concurrency and causes ~2x throughput regression in disaggregated PD
# serving. Instead, leave SGLang in its default cumulative mode and slice
# new tokens in the decode handler.
#
# Tradeoff: cumulative slicing (output_ids[prev:]) has O(n) copy cost per
# chunk that grows with output length, which hurts at very long OSLs. The
# incremental mode avoids this copy but its tokenizer_manager backpressure
# under high concurrency is far more damaging to throughput. If SGLang
# upstream fixes the backpressure issue, we should revisit re-enabling
# incremental mode.
# See: https://github.com/sgl-project/sglang/issues/22095
if getattr(server_args, "incremental_streaming_output", False) or getattr(
server_args, "stream_output", False
):
raise ValueError(
"--incremental-streaming-output (or --stream-output) is not supported "
"by Dynamo's SGLang handlers, which expect cumulative output_ids. "
"Dynamo slices new tokens from cumulative output in the decode handler. "
"Please remove the --incremental-streaming-output flag."
)

if dynamo_config.use_sglang_tokenizer:
warnings.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,10 @@ async def _process_token_stream(
) -> AsyncGenerator[Dict[str, Any], None]:
"""Process token-based stream output.

With stream_output=True (enforced by Dynamo), SGLang sends disjoint segments
containing only new tokens since the last output. We pass these through directly.
SGLang sends cumulative output_ids (all tokens generated so far). We
slice off only the new tokens each iteration to yield disjoint deltas
to the frontend. This avoids forcing SGLang's incremental streaming
mode which causes tokenizer_manager backpressure at high concurrency.

Args:
stream_source: Async generator from engine.async_generate.
Expand All @@ -211,6 +213,8 @@ async def _process_token_stream(
Yields:
Dict with token_ids and optional finish_reason.
"""
num_output_tokens_so_far = 0

# Use Future pattern for request ID - will be set when first response arrives
request_id_future: asyncio.Future[str] = asyncio.Future()
async with self._cancellation_monitor(request_id_future, context):
Expand All @@ -234,17 +238,18 @@ async def _process_token_stream(
finish_reason["type"]
)

# With stream_output=True, output_ids contains only new tokens (disjoint)
output_ids = res.get("output_ids", [])
# Empty, non-final chunks can happen during scheduler idle ticks.
# Keep waiting for the next chunk unless cancellation was requested.
# Empty, non-final chunks can happen during scheduler idle
# ticks or KV cache retractions. Keep waiting for the next
# chunk unless cancellation was requested.
if not output_ids and not finish_reason:
if context.is_stopped():
break
continue

# Pass through disjoint token segments directly
out["token_ids"] = output_ids
# Slice new tokens from cumulative output_ids
out["token_ids"] = output_ids[num_output_tokens_so_far:]
num_output_tokens_so_far = len(output_ids)
routed_experts = res["meta_info"].get("routed_experts")
if routed_experts is not None:
# Base64-encode tensor bytes to match sglang's output format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,24 @@ class StreamProcessor:
async def process_sglang_stream(stream_source) -> AsyncIterator[str]:
"""Process SGLang stream output.

With stream_output=True (enforced by Dynamo), SGLang sends disjoint segments
containing only new tokens since the last output. We pass these through directly.
SGLang sends cumulative output_ids. We slice off only the new tokens
each iteration to yield disjoint deltas.
"""
num_output_tokens_so_far = 0
try:
async for res in stream_source:
try:
# With stream_output=True, output_ids contains only new tokens (disjoint)
output_ids = res.get("output_ids", [])
finish_reason = res.get("meta_info", {}).get("finish_reason")

# Empty, non-final chunks can happen during scheduler idle ticks.
# Keep waiting for the next chunk.
if not output_ids and not finish_reason:
continue

new_tokens = output_ids[num_output_tokens_so_far:]
num_output_tokens_so_far = len(output_ids)

output = {
"token_ids": output_ids,
"token_ids": new_tokens,
"text": res.get("text", ""),
"finished": False,
}
Expand Down
Loading