Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def __init__(
# to eliminate pipeline bubbles.
self.batch_queue_size = self.model_executor.max_concurrent_batches
self.batch_queue: (
deque[tuple[Future[ModelRunnerOutput], SchedulerOutput]] | None
deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None
) = None
if self.batch_queue_size > 1:
logger.info("Batch queue is enabled with size %d", self.batch_queue_size)
Expand Down Expand Up @@ -336,16 +336,6 @@ def log_error_detail(self, scheduler_output: SchedulerOutput):
)
raise err

def _log_err_callback(self, scheduler_output: SchedulerOutput):
"""Log error details of a future that's not expected to return a result."""

def callback(f, sched_output=scheduler_output):
with self.log_error_detail(sched_output):
result = f.result()
assert result is None

return callback

def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
"""Schedule, execute, and make output.

Expand Down Expand Up @@ -422,8 +412,6 @@ def step_with_batch_queue(
# No sampling required (no requests scheduled).
future = cast(Future[ModelRunnerOutput], exec_future)
else:
exec_future.add_done_callback(self._log_err_callback(scheduler_output))

if not scheduler_output.pending_structured_output_tokens:
# We aren't waiting for any tokens, get any grammar output
# and sample immediately.
Expand All @@ -440,7 +428,7 @@ def step_with_batch_queue(

if not deferred_scheduler_output:
# Add this step's future to the queue.
batch_queue.appendleft((future, scheduler_output))
batch_queue.appendleft((future, scheduler_output, exec_future))
if (
model_executed
and len(batch_queue) < self.batch_queue_size
Expand All @@ -457,9 +445,14 @@ def step_with_batch_queue(
return None, False

# Block until the next result is available.
future, scheduler_output = batch_queue.pop()
future, scheduler_output, exec_model_fut = batch_queue.pop()
with self.log_error_detail(scheduler_output):
model_output = future.result()
if model_output is None:
# None from sample_tokens() implies that the original execute_model()
# call failed - raise that exception.
exec_model_fut.result()
raise RuntimeError("unexpected error")

# Before processing the model output, process any aborts that happened
# during the model execution.
Expand All @@ -478,7 +471,7 @@ def step_with_batch_queue(
deferred_scheduler_output
)
future = self.model_executor.sample_tokens(grammar_output, non_block=True)
batch_queue.appendleft((future, deferred_scheduler_output))
batch_queue.appendleft((future, deferred_scheduler_output, exec_future))

return engine_core_outputs, model_executed

Expand Down