diff --git a/components/src/dynamo/sglang/args.py b/components/src/dynamo/sglang/args.py index 0cf64245e4aa..f5d99a65243c 100644 --- a/components/src/dynamo/sglang/args.py +++ b/components/src/dynamo/sglang/args.py @@ -491,6 +491,12 @@ async def parse_args(args: list[str]) -> Config: # contain code to download a model, it should only parse the args. server_args = ServerArgs.from_cli_args(parsed_args) + # Dynamo's streaming handlers expect disjoint output_ids from SGLang (only new + # tokens since last output), not cumulative tokens. When stream_output=True, + # SGLang sends disjoint segments which Dynamo passes through directly. + # Force stream_output=True for optimal streaming performance. + server_args.stream_output = True + if parsed_args.use_sglang_tokenizer: logging.info( "Using SGLang's built in tokenizer. Setting skip_tokenizer_init to False" diff --git a/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py b/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py index ad13e0610534..c932c96502a5 100644 --- a/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py +++ b/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py @@ -166,6 +166,9 @@ async def _process_token_stream( ) -> AsyncGenerator[Dict[str, Any], None]: """Process token-based stream output. + With stream_output=True (enforced by Dynamo), SGLang sends disjoint segments + containing only new tokens since the last output. We pass these through directly. + Args: stream_source: Async generator from engine.async_generate. context: Context object for cancellation handling. @@ -173,8 +176,6 @@ async def _process_token_stream( Yields: Dict with token_ids and optional finish_reason. """ - num_output_tokens_so_far = 0 - # Use Future pattern for request ID - will be set when first response arrives request_id_future = asyncio.Future() async with self._cancellation_monitor(request_id_future, context): @@ -196,6 +197,7 @@ async def _process_token_stream( if finish_reason: out["finish_reason"] = finish_reason["type"] + # With stream_output=True, output_ids contains only new tokens (disjoint) output_ids = res.get("output_ids", []) # If request is not finished yet, but there are no outputs, return an error. if not output_ids and not finish_reason: @@ -203,9 +205,8 @@ async def _process_token_stream( yield {"finish_reason": "error", "token_ids": []} break - next_total_toks = len(output_ids) - out["token_ids"] = output_ids[num_output_tokens_so_far:] - num_output_tokens_so_far = next_total_toks + # Pass through disjoint token segments directly + out["token_ids"] = output_ids if finish_reason: input_tokens = res["meta_info"]["prompt_tokens"] completion_tokens = res["meta_info"]["completion_tokens"] diff --git a/components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py b/components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py index 3d700a5da175..5a87b95cbefe 100644 --- a/components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py +++ b/components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py @@ -133,30 +133,26 @@ class StreamProcessor: @staticmethod async def process_sglang_stream(stream_source) -> AsyncIterator[str]: - """Process SGLang stream output following backend pattern""" - num_output_tokens_so_far = 0 + """Process SGLang stream output. + With stream_output=True (enforced by Dynamo), SGLang sends disjoint segments + containing only new tokens since the last output. We pass these through directly. + """ try: async for res in stream_source: try: - next_total_toks = len(res["output_ids"]) - - # Return incremental tokens + # With stream_output=True, output_ids contains only new tokens (disjoint) output = { - "token_ids": res["output_ids"][num_output_tokens_so_far:], + "token_ids": res["output_ids"], "text": res.get("text", ""), "finished": False, } - num_output_tokens_so_far = next_total_toks # Check for finish reason finish_reason = res.get("meta_info", {}).get("finish_reason") if finish_reason: output.update( { - "token_ids": res["output_ids"][ - num_output_tokens_so_far: - ], "finish_reason": finish_reason.get("type", "stop"), "finished": True, }