From f4d5e3211fcb89f680ea9abab48c226b040178c1 Mon Sep 17 00:00:00 2001 From: Max Hu Date: Sun, 22 Feb 2026 17:14:51 -0800 Subject: [PATCH] add impl Signed-off-by: Max Hu Signed-off-by: Max Hu --- vllm/v1/engine/core.py | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index a258fe295068..1095d61dede6 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -181,7 +181,15 @@ def __init__( # to eliminate pipeline bubbles. self.batch_queue_size = self.model_executor.max_concurrent_batches self.batch_queue: ( - deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None + deque[ + tuple[ + Future[ModelRunnerOutput], + SchedulerOutput, + Future[Any], + float, + ] + ] + | None ) = None if self.batch_queue_size > 1: logger.debug("Batch queue is enabled with size %d", self.batch_queue_size) @@ -343,7 +351,14 @@ def log_error_detail(self, scheduler_output: SchedulerOutput): raise err @contextmanager - def log_iteration_details(self, scheduler_output: SchedulerOutput): + def log_iteration_details( + self, + scheduler_output: SchedulerOutput, + submit_time: float | None = None, + ): + """Log iteration stats. If submit_time is set (batch-queue path), elapsed + time is from submit to now; otherwise it's the time inside this with block. + """ if not self.vllm_config.observability_config.enable_logging_iteration_details: yield return @@ -351,6 +366,10 @@ def log_iteration_details(self, scheduler_output: SchedulerOutput): iteration_details = compute_iteration_details(scheduler_output) before = time.monotonic() yield + if submit_time is not None: + elapsed_ms = (time.monotonic() - submit_time) * 1000 + else: + elapsed_ms = (time.monotonic() - before) * 1000 logger.info( "".join( [ @@ -365,7 +384,7 @@ def log_iteration_details(self, scheduler_output: SchedulerOutput): " generation requests, ", str(iteration_details.num_generation_tokens), " generation tokens, iteration elapsed time: ", - format((time.monotonic() - before) * 1000, ".2f"), + format(elapsed_ms, ".2f"), " ms", ] ) @@ -468,7 +487,9 @@ def step_with_batch_queue( if not deferred_scheduler_output: # Add this step's future to the queue. - batch_queue.appendleft((future, scheduler_output, exec_future)) + batch_queue.appendleft( + (future, scheduler_output, exec_future, time.monotonic()) + ) if ( model_executed and len(batch_queue) < self.batch_queue_size @@ -485,10 +506,10 @@ def step_with_batch_queue( return None, False # Block until the next result is available. - future, scheduler_output, exec_model_fut = batch_queue.pop() + future, scheduler_output, exec_model_fut, submit_time = batch_queue.pop() with ( self.log_error_detail(scheduler_output), - self.log_iteration_details(scheduler_output), + self.log_iteration_details(scheduler_output, submit_time=submit_time), ): model_output = future.result() if model_output is None: @@ -526,7 +547,9 @@ def step_with_batch_queue( deferred_scheduler_output ) future = self.model_executor.sample_tokens(grammar_output, non_block=True) - batch_queue.appendleft((future, deferred_scheduler_output, exec_future)) + batch_queue.appendleft( + (future, deferred_scheduler_output, exec_future, time.monotonic()) + ) return engine_core_outputs, model_executed