diff --git a/components/src/dynamo/sglang/args.py b/components/src/dynamo/sglang/args.py index 931227edd266..803248814d78 100644 --- a/components/src/dynamo/sglang/args.py +++ b/components/src/dynamo/sglang/args.py @@ -24,7 +24,6 @@ from dynamo.common.utils.runtime import parse_endpoint from dynamo.llm import fetch_model from dynamo.runtime.logging import configure_dynamo_logging -from dynamo.sglang._compat import enable_disjoint_streaming_output from dynamo.sglang.backend_args import DynamoSGLangArgGroup, DynamoSGLangConfig configure_dynamo_logging() @@ -374,11 +373,29 @@ async def parse_args(args: list[str]) -> Config: "values are always higher priority at the API layer." ) - # Dynamo's streaming handlers expect disjoint output_ids from SGLang (only new - # tokens since last output), not cumulative tokens. Modern SGLang gates this - # behavior behind incremental_streaming_output, while older releases used - # stream_output. - enable_disjoint_streaming_output(server_args) + # Do NOT force incremental_streaming_output (or the old stream_output). + # When enabled, SGLang's tokenizer_manager must yield every streaming chunk + # individually (no coalescing), which creates backpressure under high + # concurrency and causes ~2x throughput regression in disaggregated PD + # serving. Instead, leave SGLang in its default cumulative mode and slice + # new tokens in the decode handler. + # + # Tradeoff: cumulative slicing (output_ids[prev:]) has O(n) copy cost per + # chunk that grows with output length, which hurts at very long OSLs. The + # incremental mode avoids this copy but its tokenizer_manager backpressure + # under high concurrency is far more damaging to throughput. If SGLang + # upstream fixes the backpressure issue, we should revisit re-enabling + # incremental mode. + # See: https://github.com/sgl-project/sglang/issues/22095 + if getattr(server_args, "incremental_streaming_output", False) or getattr( + server_args, "stream_output", False + ): + raise ValueError( + "--incremental-streaming-output (or --stream-output) is not supported " + "by Dynamo's SGLang handlers, which expect cumulative output_ids. " + "Dynamo slices new tokens from cumulative output in the decode handler. " + "Please remove the --incremental-streaming-output flag." + ) if dynamo_config.use_sglang_tokenizer: warnings.warn( diff --git a/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py b/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py index 0582d2271dfc..ac79962cdf5a 100644 --- a/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py +++ b/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py @@ -201,8 +201,10 @@ async def _process_token_stream( ) -> AsyncGenerator[Dict[str, Any], None]: """Process token-based stream output. - With stream_output=True (enforced by Dynamo), SGLang sends disjoint segments - containing only new tokens since the last output. We pass these through directly. + SGLang sends cumulative output_ids (all tokens generated so far). We + slice off only the new tokens each iteration to yield disjoint deltas + to the frontend. This avoids forcing SGLang's incremental streaming + mode which causes tokenizer_manager backpressure at high concurrency. Args: stream_source: Async generator from engine.async_generate. @@ -211,6 +213,8 @@ async def _process_token_stream( Yields: Dict with token_ids and optional finish_reason. """ + num_output_tokens_so_far = 0 + # Use Future pattern for request ID - will be set when first response arrives request_id_future: asyncio.Future[str] = asyncio.Future() async with self._cancellation_monitor(request_id_future, context): @@ -234,17 +238,18 @@ async def _process_token_stream( finish_reason["type"] ) - # With stream_output=True, output_ids contains only new tokens (disjoint) output_ids = res.get("output_ids", []) - # Empty, non-final chunks can happen during scheduler idle ticks. - # Keep waiting for the next chunk unless cancellation was requested. + # Empty, non-final chunks can happen during scheduler idle + # ticks or KV cache retractions. Keep waiting for the next + # chunk unless cancellation was requested. if not output_ids and not finish_reason: if context.is_stopped(): break continue - # Pass through disjoint token segments directly - out["token_ids"] = output_ids + # Slice new tokens from cumulative output_ids + out["token_ids"] = output_ids[num_output_tokens_so_far:] + num_output_tokens_so_far = len(output_ids) routed_experts = res["meta_info"].get("routed_experts") if routed_experts is not None: # Base64-encode tensor bytes to match sglang's output format. diff --git a/components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py b/components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py index 828fdb1f32d9..bb41a874c1e0 100644 --- a/components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py +++ b/components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py @@ -142,23 +142,24 @@ class StreamProcessor: async def process_sglang_stream(stream_source) -> AsyncIterator[str]: """Process SGLang stream output. - With stream_output=True (enforced by Dynamo), SGLang sends disjoint segments - containing only new tokens since the last output. We pass these through directly. + SGLang sends cumulative output_ids. We slice off only the new tokens + each iteration to yield disjoint deltas. """ + num_output_tokens_so_far = 0 try: async for res in stream_source: try: - # With stream_output=True, output_ids contains only new tokens (disjoint) output_ids = res.get("output_ids", []) finish_reason = res.get("meta_info", {}).get("finish_reason") - # Empty, non-final chunks can happen during scheduler idle ticks. - # Keep waiting for the next chunk. if not output_ids and not finish_reason: continue + new_tokens = output_ids[num_output_tokens_so_far:] + num_output_tokens_so_far = len(output_ids) + output = { - "token_ids": output_ids, + "token_ids": new_tokens, "text": res.get("text", ""), "finished": False, }