diff --git a/python/sglang/srt/managers/tokenizer_manager.py b/python/sglang/srt/managers/tokenizer_manager.py index 0e7e41a7948d..7aaaf37f9e75 100644 --- a/python/sglang/srt/managers/tokenizer_manager.py +++ b/python/sglang/srt/managers/tokenizer_manager.py @@ -1147,80 +1147,99 @@ async def _wait_one_response( ) continue - out = state.out_list[-1] - + # Drain all pending outputs atomically. For streaming, every + # chunk must be yielded to avoid dropping token deltas. For + # non-streaming only the latest cumulative output matters. + pending = state.out_list if is_stream else state.out_list[-1:] state.out_list = [] - if state.finished: - # For non-streaming cases, response has not been sent yet (`response_sent_to_client_time` has not been set yet). - # Record response sent time right before we log finished results and metrics. - if not state.time_stats.response_sent_to_client_time: - state.time_stats.set_response_sent_to_client_time() - out["meta_info"][ - "response_sent_to_client_ts" - ] = state.time_stats.get_response_sent_to_client_realtime() - self.request_logger.log_finished_request( - obj, - out, - is_multimodal_gen=self.model_config.is_multimodal_gen, - request=request, + finished = state.finished + state.event.clear() + + if is_stream and len(pending) > 1: + logger.warning( + "Streaming backlog: rid=%s, draining %d queued chunks. " + "This may inflate P99 TBT for affected requests.", + obj.rid, + len(pending), ) - if self.request_metrics_exporter_manager.exporter_enabled(): - # Asynchronously write metrics for this request using the exporter manager. - asyncio.create_task( - self.request_metrics_exporter_manager.write_record(obj, out) + for i, out in enumerate(pending): + is_last = i == len(pending) - 1 + + if finished and is_last: + # For non-streaming cases, response has not been sent yet (`response_sent_to_client_time` has not been set yet). + # Record response sent time right before we log finished results and metrics. + if not state.time_stats.response_sent_to_client_time: + state.time_stats.set_response_sent_to_client_time() + out["meta_info"][ + "response_sent_to_client_ts" + ] = state.time_stats.get_response_sent_to_client_realtime() + self.request_logger.log_finished_request( + obj, + out, + is_multimodal_gen=self.model_config.is_multimodal_gen, + request=request, ) - # Check if this was an abort/error created by scheduler - if isinstance(out["meta_info"].get("finish_reason"), dict): - finish_reason = out["meta_info"]["finish_reason"] - if ( - finish_reason.get("type") == "abort" - and finish_reason.get("status_code") == HTTPStatus.BAD_REQUEST - ): - if not is_stream: - raise ValueError(finish_reason["message"]) - else: - yield out - break - - if finish_reason.get("type") == "abort" and finish_reason.get( - "status_code" - ) in ( - HTTPStatus.SERVICE_UNAVAILABLE, - HTTPStatus.INTERNAL_SERVER_ERROR, - ): - # This is an abort request initiated by scheduler. - # Delete the key to prevent resending abort request to the scheduler and - # to ensure aborted request state is cleaned up. - if state.obj.rid in self.rid_to_state: - del self.rid_to_state[state.obj.rid] - - # Mark ongoing LoRA request as finished. - if self.server_args.enable_lora and state.obj.lora_path: - await self.lora_registry.release(state.obj.lora_id) - if not is_stream: - raise fastapi.HTTPException( - status_code=finish_reason["status_code"], - detail=finish_reason["message"], - ) - else: - yield out - break - yield out - break + if self.request_metrics_exporter_manager.exporter_enabled(): + # Asynchronously write metrics for this request using the exporter manager. + asyncio.create_task( + self.request_metrics_exporter_manager.write_record(obj, out) + ) - state.event.clear() + # Check if this was an abort/error created by scheduler + if isinstance(out["meta_info"].get("finish_reason"), dict): + finish_reason = out["meta_info"]["finish_reason"] + if ( + finish_reason.get("type") == "abort" + and finish_reason.get("status_code") + == HTTPStatus.BAD_REQUEST + ): + if not is_stream: + raise ValueError(finish_reason["message"]) + else: + yield out + break + + if finish_reason.get("type") == "abort" and finish_reason.get( + "status_code" + ) in ( + HTTPStatus.SERVICE_UNAVAILABLE, + HTTPStatus.INTERNAL_SERVER_ERROR, + ): + # This is an abort request initiated by scheduler. + # Delete the key to prevent resending abort request to the scheduler and + # to ensure aborted request state is cleaned up. + if state.obj.rid in self.rid_to_state: + del self.rid_to_state[state.obj.rid] + + # Mark ongoing LoRA request as finished. + if self.server_args.enable_lora and state.obj.lora_path: + await self.lora_registry.release(state.obj.lora_id) + if not is_stream: + raise fastapi.HTTPException( + status_code=finish_reason["status_code"], + detail=finish_reason["message"], + ) + else: + yield out + break + yield out + break + + if is_stream: + # Record response sent time right before we send response. + if not state.time_stats.response_sent_to_client_time: + state.time_stats.set_response_sent_to_client_time() + out["meta_info"][ + "response_sent_to_client_ts" + ] = state.time_stats.get_response_sent_to_client_realtime() + yield out + + if finished: + break - if is_stream: - # Record response sent time right before we send response. - if not state.time_stats.response_sent_to_client_time: - state.time_stats.set_response_sent_to_client_time() - out["meta_info"][ - "response_sent_to_client_ts" - ] = state.time_stats.get_response_sent_to_client_realtime() - yield out - else: + if not is_stream: if ( request is not None and not obj.background