diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 7fbce2da8587..e1efdbe39892 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -178,7 +178,7 @@ def __init__( # to eliminate pipeline bubbles. self.batch_queue_size = self.model_executor.max_concurrent_batches self.batch_queue: ( - deque[tuple[Future[ModelRunnerOutput], SchedulerOutput]] | None + deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None ) = None if self.batch_queue_size > 1: logger.info("Batch queue is enabled with size %d", self.batch_queue_size) @@ -336,16 +336,6 @@ def log_error_detail(self, scheduler_output: SchedulerOutput): ) raise err - def _log_err_callback(self, scheduler_output: SchedulerOutput): - """Log error details of a future that's not expected to return a result.""" - - def callback(f, sched_output=scheduler_output): - with self.log_error_detail(sched_output): - result = f.result() - assert result is None - - return callback - def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]: """Schedule, execute, and make output. @@ -422,8 +412,6 @@ def step_with_batch_queue( # No sampling required (no requests scheduled). future = cast(Future[ModelRunnerOutput], exec_future) else: - exec_future.add_done_callback(self._log_err_callback(scheduler_output)) - if not scheduler_output.pending_structured_output_tokens: # We aren't waiting for any tokens, get any grammar output # and sample immediately. @@ -440,7 +428,7 @@ def step_with_batch_queue( if not deferred_scheduler_output: # Add this step's future to the queue. - batch_queue.appendleft((future, scheduler_output)) + batch_queue.appendleft((future, scheduler_output, exec_future)) if ( model_executed and len(batch_queue) < self.batch_queue_size @@ -457,9 +445,14 @@ def step_with_batch_queue( return None, False # Block until the next result is available. - future, scheduler_output = batch_queue.pop() + future, scheduler_output, exec_model_fut = batch_queue.pop() with self.log_error_detail(scheduler_output): model_output = future.result() + if model_output is None: + # None from sample_tokens() implies that the original execute_model() + # call failed - raise that exception. + exec_model_fut.result() + raise RuntimeError("unexpected error") # Before processing the model output, process any aborts that happened # during the model execution. @@ -478,7 +471,7 @@ def step_with_batch_queue( deferred_scheduler_output ) future = self.model_executor.sample_tokens(grammar_output, non_block=True) - batch_queue.appendleft((future, deferred_scheduler_output)) + batch_queue.appendleft((future, deferred_scheduler_output, exec_future)) return engine_core_outputs, model_executed