Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 86 additions & 67 deletions python/sglang/srt/managers/tokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,80 +1147,99 @@ async def _wait_one_response(
)
continue

out = state.out_list[-1]

# Drain all pending outputs atomically. For streaming, every
# chunk must be yielded to avoid dropping token deltas. For
# non-streaming only the latest cumulative output matters.
pending = state.out_list if is_stream else state.out_list[-1:]
state.out_list = []
if state.finished:
# For non-streaming cases, response has not been sent yet (`response_sent_to_client_time` has not been set yet).
# Record response sent time right before we log finished results and metrics.
if not state.time_stats.response_sent_to_client_time:
state.time_stats.set_response_sent_to_client_time()
out["meta_info"][
"response_sent_to_client_ts"
] = state.time_stats.get_response_sent_to_client_realtime()
self.request_logger.log_finished_request(
obj,
out,
is_multimodal_gen=self.model_config.is_multimodal_gen,
request=request,
finished = state.finished
state.event.clear()

if is_stream and len(pending) > 1:
logger.warning(
"Streaming backlog: rid=%s, draining %d queued chunks. "
"This may inflate P99 TBT for affected requests.",
obj.rid,
len(pending),
)

if self.request_metrics_exporter_manager.exporter_enabled():
# Asynchronously write metrics for this request using the exporter manager.
asyncio.create_task(
self.request_metrics_exporter_manager.write_record(obj, out)
for i, out in enumerate(pending):
is_last = i == len(pending) - 1

if finished and is_last:
# For non-streaming cases, response has not been sent yet (`response_sent_to_client_time` has not been set yet).
# Record response sent time right before we log finished results and metrics.
if not state.time_stats.response_sent_to_client_time:
state.time_stats.set_response_sent_to_client_time()
out["meta_info"][
"response_sent_to_client_ts"
] = state.time_stats.get_response_sent_to_client_realtime()
self.request_logger.log_finished_request(
obj,
out,
is_multimodal_gen=self.model_config.is_multimodal_gen,
request=request,
)

# Check if this was an abort/error created by scheduler
if isinstance(out["meta_info"].get("finish_reason"), dict):
finish_reason = out["meta_info"]["finish_reason"]
if (
finish_reason.get("type") == "abort"
and finish_reason.get("status_code") == HTTPStatus.BAD_REQUEST
):
if not is_stream:
raise ValueError(finish_reason["message"])
else:
yield out
break

if finish_reason.get("type") == "abort" and finish_reason.get(
"status_code"
) in (
HTTPStatus.SERVICE_UNAVAILABLE,
HTTPStatus.INTERNAL_SERVER_ERROR,
):
# This is an abort request initiated by scheduler.
# Delete the key to prevent resending abort request to the scheduler and
# to ensure aborted request state is cleaned up.
if state.obj.rid in self.rid_to_state:
del self.rid_to_state[state.obj.rid]

# Mark ongoing LoRA request as finished.
if self.server_args.enable_lora and state.obj.lora_path:
await self.lora_registry.release(state.obj.lora_id)
if not is_stream:
raise fastapi.HTTPException(
status_code=finish_reason["status_code"],
detail=finish_reason["message"],
)
else:
yield out
break
yield out
break
if self.request_metrics_exporter_manager.exporter_enabled():
# Asynchronously write metrics for this request using the exporter manager.
asyncio.create_task(
self.request_metrics_exporter_manager.write_record(obj, out)
)

state.event.clear()
# Check if this was an abort/error created by scheduler
if isinstance(out["meta_info"].get("finish_reason"), dict):
finish_reason = out["meta_info"]["finish_reason"]
if (
finish_reason.get("type") == "abort"
and finish_reason.get("status_code")
== HTTPStatus.BAD_REQUEST
):
if not is_stream:
raise ValueError(finish_reason["message"])
else:
yield out
break

if finish_reason.get("type") == "abort" and finish_reason.get(
"status_code"
) in (
HTTPStatus.SERVICE_UNAVAILABLE,
HTTPStatus.INTERNAL_SERVER_ERROR,
):
# This is an abort request initiated by scheduler.
# Delete the key to prevent resending abort request to the scheduler and
# to ensure aborted request state is cleaned up.
if state.obj.rid in self.rid_to_state:
del self.rid_to_state[state.obj.rid]

# Mark ongoing LoRA request as finished.
if self.server_args.enable_lora and state.obj.lora_path:
await self.lora_registry.release(state.obj.lora_id)
if not is_stream:
raise fastapi.HTTPException(
status_code=finish_reason["status_code"],
detail=finish_reason["message"],
)
else:
yield out
break
yield out
break

if is_stream:
# Record response sent time right before we send response.
if not state.time_stats.response_sent_to_client_time:
state.time_stats.set_response_sent_to_client_time()
out["meta_info"][
"response_sent_to_client_ts"
] = state.time_stats.get_response_sent_to_client_realtime()
yield out

if finished:
break

if is_stream:
# Record response sent time right before we send response.
if not state.time_stats.response_sent_to_client_time:
state.time_stats.set_response_sent_to_client_time()
out["meta_info"][
"response_sent_to_client_ts"
] = state.time_stats.get_response_sent_to_client_realtime()
yield out
else:
if not is_stream:
if (
request is not None
and not obj.background
Expand Down
Loading