diff --git a/python/sglang/srt/entrypoints/openai/serving_chat.py b/python/sglang/srt/entrypoints/openai/serving_chat.py index 6bfd678f9629..bd71be560169 100644 --- a/python/sglang/srt/entrypoints/openai/serving_chat.py +++ b/python/sglang/srt/entrypoints/openai/serving_chat.py @@ -736,7 +736,11 @@ async def _generate_chat_stream( stream_started = True stream_buffer = stream_buffers.get(index, "") - delta = content["text"][len(stream_buffer) :] + if self.tokenizer_manager.server_args.incremental_streaming_output: + # content["text"] is already the incremental delta + delta = content["text"] + else: + delta = content["text"][len(stream_buffer) :] stream_buffers[index] = stream_buffer + delta # Handle reasoning content diff --git a/test/registered/openai_server/basic/test_serving_chat.py b/test/registered/openai_server/basic/test_serving_chat.py index 2ca6135ac71f..2c2f5e04877c 100644 --- a/test/registered/openai_server/basic/test_serving_chat.py +++ b/test/registered/openai_server/basic/test_serving_chat.py @@ -784,6 +784,94 @@ async def run_stream(): self.assertEqual(len(chunks), 2) self.assertIn("error", chunks[0]) + # ------------- incremental streaming output tests ------------- + def test_incremental_streaming_output_delta(self): + """Test that streaming with incremental_streaming_output produces correct deltas. + + When incremental_streaming_output is enabled, content["text"] is already the + incremental delta (not the full accumulated text). The delta computation must + use content["text"] directly instead of slicing by the accumulated buffer length. + + Regression test for https://github.com/sgl-project/sglang/issues/22510. + """ + # Enable incremental_streaming_output on the mock + self.tm.server_args.incremental_streaming_output = True + + # Simulate incremental streaming: each yield has ONLY the new text (delta), + # NOT the full accumulated text. + incremental_chunks = [ + ("I am", None), + (" a large", None), + (" language model", None), + (".", {"type": "stop", "matched": None}), + ] + + async def _mock_generate_incremental(): + for text, finish_reason in incremental_chunks: + yield { + "text": text, + "meta_info": { + "id": "chatcmpl-incr-test", + "prompt_tokens": 10, + "completion_tokens": 5, + "cached_tokens": 0, + "finish_reason": finish_reason, + "output_token_logprobs": None, + "output_top_logprobs": None, + }, + "index": 0, + } + + self.tm.generate_request.return_value = _mock_generate_incremental() + + req = ChatCompletionRequest( + model="x", + messages=[{"role": "user", "content": "Hi?"}], + temperature=0.7, + max_tokens=100, + stream=True, + ) + + with patch( + "sglang.srt.entrypoints.openai.serving_chat.generate_chat_conv" + ) as conv_mock: + conv_ins = Mock() + conv_ins.get_prompt.return_value = "Test prompt" + conv_mock.return_value = conv_ins + + adapted_request, _ = self.chat._convert_to_internal_request( + req, self.fastapi_request + ) + + async def run_stream(): + chunks = [] + async for chunk in self.chat._generate_chat_stream( + adapted_request, req, self.fastapi_request + ): + chunks.append(chunk) + return chunks + + loop = get_or_create_event_loop() + chunks = loop.run_until_complete(run_stream()) + + # Extract content deltas from SSE chunks + deltas = [] + for c in chunks: + if not c.startswith("data: ") or c.strip() == "data: [DONE]": + continue + data = json.loads(c[len("data: ") :]) + if "choices" in data and data["choices"]: + content = data["choices"][0]["delta"].get("content") + if content: + deltas.append(content) + + joined = "".join(deltas) + self.assertEqual( + joined, + "I am a large language model.", + f"Streaming deltas produced broken text: {deltas!r}", + ) + # ------------- X-Data-Parallel-Rank header tests ------------- def test_extract_routed_dp_rank_from_header_no_header(self): """Test that None is returned when no header is present."""