Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions vllm/v1/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ def __init__(
async_output_copy_stream: torch.cuda.Stream,
):
self._model_runner_output = model_runner_output
self._finished_mask = finished_mask

# Event on the copy stream so we can synchronize the non-blocking copy.
self.async_copy_ready_event = torch.Event()
Expand All @@ -276,11 +275,15 @@ def __init__(
default_stream = torch.cuda.current_stream()
with torch.cuda.stream(async_output_copy_stream):
async_output_copy_stream.wait_stream(default_stream)
self._raw_pooler_output_cpu = json_map_leaves(
raw_pooler_output_cpu = json_map_leaves(
lambda x: None if x is None else x.to("cpu", non_blocking=True),
self._raw_pooler_output,
)
self.async_copy_ready_event.record()
self._model_runner_output.pooler_output = [
out if include else None
for out, include in zip(raw_pooler_output_cpu, finished_mask)
]

def get_output(self) -> ModelRunnerOutput:
"""Copy the device tensors to the host and return a ModelRunnerOutput.
Expand All @@ -290,11 +293,6 @@ def get_output(self) -> ModelRunnerOutput:

# Release the device tensors once the copy has completed.
del self._raw_pooler_output

self._model_runner_output.pooler_output = [
out if include else None
for out, include in zip(self._raw_pooler_output_cpu, self._finished_mask)
]
return self._model_runner_output


Expand Down Expand Up @@ -2537,8 +2535,7 @@ def _pool(

model = cast(VllmModelForPooling, self.model)
raw_pooler_output: PoolerOutput = model.pooler(
hidden_states=hidden_states,
pooling_metadata=pooling_metadata,
hidden_states=hidden_states, pooling_metadata=pooling_metadata
)

finished_mask = [
Expand Down Expand Up @@ -2568,12 +2565,12 @@ def _pool(
lambda x: None if x is None else x.to("cpu", non_blocking=True),
raw_pooler_output,
)
self._sync_device()

model_runner_output.pooler_output = [
out if include else None
for out, include in zip(raw_pooler_output, finished_mask)
]
self._sync_device()

return model_runner_output

def _pad_for_sequence_parallelism(self, num_scheduled_tokens: int) -> int:
Expand Down
Loading