Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/sglang/srt/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,11 @@ async def _generate_chat_stream(
stream_started = True

stream_buffer = stream_buffers.get(index, "")
delta = content["text"][len(stream_buffer) :]
if self.tokenizer_manager.server_args.incremental_streaming_output:
# content["text"] is already the incremental delta
delta = content["text"]
else:
delta = content["text"][len(stream_buffer) :]
stream_buffers[index] = stream_buffer + delta

# Handle reasoning content
Expand Down
88 changes: 88 additions & 0 deletions test/registered/openai_server/basic/test_serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,94 @@ async def run_stream():
self.assertEqual(len(chunks), 2)
self.assertIn("error", chunks[0])

# ------------- incremental streaming output tests -------------
def test_incremental_streaming_output_delta(self):
"""Test that streaming with incremental_streaming_output produces correct deltas.

When incremental_streaming_output is enabled, content["text"] is already the
incremental delta (not the full accumulated text). The delta computation must
use content["text"] directly instead of slicing by the accumulated buffer length.

Regression test for https://github.com/sgl-project/sglang/issues/22510.
"""
# Enable incremental_streaming_output on the mock
self.tm.server_args.incremental_streaming_output = True

# Simulate incremental streaming: each yield has ONLY the new text (delta),
# NOT the full accumulated text.
incremental_chunks = [
("I am", None),
(" a large", None),
(" language model", None),
(".", {"type": "stop", "matched": None}),
]

async def _mock_generate_incremental():
for text, finish_reason in incremental_chunks:
yield {
"text": text,
"meta_info": {
"id": "chatcmpl-incr-test",
"prompt_tokens": 10,
"completion_tokens": 5,
"cached_tokens": 0,
"finish_reason": finish_reason,
"output_token_logprobs": None,
"output_top_logprobs": None,
},
"index": 0,
}

self.tm.generate_request.return_value = _mock_generate_incremental()

req = ChatCompletionRequest(
model="x",
messages=[{"role": "user", "content": "Hi?"}],
temperature=0.7,
max_tokens=100,
stream=True,
)

with patch(
"sglang.srt.entrypoints.openai.serving_chat.generate_chat_conv"
) as conv_mock:
conv_ins = Mock()
conv_ins.get_prompt.return_value = "Test prompt"
conv_mock.return_value = conv_ins

adapted_request, _ = self.chat._convert_to_internal_request(
req, self.fastapi_request
)

async def run_stream():
chunks = []
async for chunk in self.chat._generate_chat_stream(
adapted_request, req, self.fastapi_request
):
chunks.append(chunk)
return chunks

loop = get_or_create_event_loop()
chunks = loop.run_until_complete(run_stream())

# Extract content deltas from SSE chunks
deltas = []
for c in chunks:
if not c.startswith("data: ") or c.strip() == "data: [DONE]":
continue
data = json.loads(c[len("data: ") :])
if "choices" in data and data["choices"]:
content = data["choices"][0]["delta"].get("content")
if content:
deltas.append(content)

joined = "".join(deltas)
self.assertEqual(
joined,
"I am a large language model.",
f"Streaming deltas produced broken text: {deltas!r}",
)

# ------------- X-Data-Parallel-Rank header tests -------------
def test_extract_routed_dp_rank_from_header_no_header(self):
"""Test that None is returned when no header is present."""
Expand Down
Loading