Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,15 @@ def __init__(
# to eliminate pipeline bubbles.
self.batch_queue_size = self.model_executor.max_concurrent_batches
self.batch_queue: (
deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None
deque[
tuple[
Future[ModelRunnerOutput],
SchedulerOutput,
Future[Any],
float,
]
]
| None
) = None
if self.batch_queue_size > 1:
logger.debug("Batch queue is enabled with size %d", self.batch_queue_size)
Expand Down Expand Up @@ -343,14 +351,25 @@ def log_error_detail(self, scheduler_output: SchedulerOutput):
raise err

@contextmanager
def log_iteration_details(self, scheduler_output: SchedulerOutput):
def log_iteration_details(
self,
scheduler_output: SchedulerOutput,
submit_time: float | None = None,
):
"""Log iteration stats. If submit_time is set (batch-queue path), elapsed
time is from submit to now; otherwise it's the time inside this with block.
"""
if not self.vllm_config.observability_config.enable_logging_iteration_details:
yield
return
self._iteration_index = getattr(self, "_iteration_index", 0)
iteration_details = compute_iteration_details(scheduler_output)
before = time.monotonic()
yield
if submit_time is not None:
elapsed_ms = (time.monotonic() - submit_time) * 1000
else:
elapsed_ms = (time.monotonic() - before) * 1000
logger.info(
"".join(
[
Expand All @@ -365,7 +384,7 @@ def log_iteration_details(self, scheduler_output: SchedulerOutput):
" generation requests, ",
str(iteration_details.num_generation_tokens),
" generation tokens, iteration elapsed time: ",
format((time.monotonic() - before) * 1000, ".2f"),
format(elapsed_ms, ".2f"),
" ms",
]
)
Expand Down Expand Up @@ -468,7 +487,9 @@ def step_with_batch_queue(

if not deferred_scheduler_output:
# Add this step's future to the queue.
batch_queue.appendleft((future, scheduler_output, exec_future))
batch_queue.appendleft(
(future, scheduler_output, exec_future, time.monotonic())
)
if (
model_executed
and len(batch_queue) < self.batch_queue_size
Expand All @@ -485,10 +506,10 @@ def step_with_batch_queue(
return None, False

# Block until the next result is available.
future, scheduler_output, exec_model_fut = batch_queue.pop()
future, scheduler_output, exec_model_fut, submit_time = batch_queue.pop()
with (
self.log_error_detail(scheduler_output),
self.log_iteration_details(scheduler_output),
self.log_iteration_details(scheduler_output, submit_time=submit_time),
):
model_output = future.result()
if model_output is None:
Expand Down Expand Up @@ -526,7 +547,9 @@ def step_with_batch_queue(
deferred_scheduler_output
)
future = self.model_executor.sample_tokens(grammar_output, non_block=True)
batch_queue.appendleft((future, deferred_scheduler_output, exec_future))
batch_queue.appendleft(
(future, deferred_scheduler_output, exec_future, time.monotonic())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the deferred sampling path (used for structured outputs with speculative decoding), recording time.monotonic() here only captures the duration of the sampling phase and its time in the queue. It misses the initial model execution time from the execute_model call at the beginning of the step_with_batch_queue function. This results in inconsistent iteration time metrics compared to the non-deferred path, where the timestamp is recorded immediately after the model execution starts. To ensure consistent and accurate metrics, the start time should be captured once at the beginning of the iteration and used for both paths.

)

return engine_core_outputs, model_executed

Expand Down