From 7b4d513ac542a0c0f3f292de49a98ce7517bcff2 Mon Sep 17 00:00:00 2001 From: Long Wu Date: Fri, 1 May 2026 09:17:26 +0000 Subject: [PATCH] feat: prefetch offloader weights using batched memcpy async --- vllm/model_executor/offloader/prefetch.py | 93 +++++++++++++++++++---- 1 file changed, 78 insertions(+), 15 deletions(-) diff --git a/vllm/model_executor/offloader/prefetch.py b/vllm/model_executor/offloader/prefetch.py index 466d8c13ce76..87425f7d8b56 100644 --- a/vllm/model_executor/offloader/prefetch.py +++ b/vllm/model_executor/offloader/prefetch.py @@ -22,6 +22,7 @@ from vllm.logger import init_logger from vllm.model_executor.offloader.base import BaseOffloader, should_pin_memory from vllm.utils.torch_utils import get_dtype_size +from vllm import _custom_ops as ops logger = init_logger(__name__) @@ -390,6 +391,10 @@ def __init__( # Used for per-layer synchronization (both eager and capture modes). self._copy_done_event = torch.cuda.Event() + # Fork: record event on compute stream, copy_stream waits on it. + # This joins copy_stream to any active CUDA graph capture. + self._fork_event = torch.cuda.Event() + # Track whether _copy_done_event is valid for eager-mode wait_event. # False when: (1) never recorded, or (2) last recorded during a # cudagraph capture (events become invalid after capture ends). @@ -409,6 +414,12 @@ def __init__( self._buffer_pool: StaticBufferPool | None = None self._buffer_slot_idx: int = 0 + # Buffer pointers + # Grouped pointers enable batch copy from cuMemcpyBatchAsync. + self._buffer_src_ptrs: torch.Tensor | None = None + self._buffer_dst_ptrs: torch.Tensor | None = None + self._buffer_sizes: torch.Tensor | None = None + param_dict = dict(self.module.named_parameters()) assert all(name in param_dict for name in whitelist_param_names), ( f"Whitelist params {whitelist_param_names} not found in module params " @@ -486,6 +497,12 @@ def assign_buffer_slot(self, pool: StaticBufferPool, slot_idx: int): self._buffer_pool = pool self._buffer_slot_idx = slot_idx + pin_memory = should_pin_memory() + + src_ptrs: list[int] = [] + dst_ptrs: list[int] = [] + sizes: list[int] = [] + # Assign static buffers to parameters # Use CPU storage shape/stride/dtype since param.data is now empty for name, offloader in self._param_offloaders.items(): @@ -500,6 +517,37 @@ def assign_buffer_slot(self, pool: StaticBufferPool, slot_idx: int): ) offloader.assign_static_buffer(buffer) + # IMPORTANT: Update pointer. + cpu_storage = offloader._cpu_storage + assert cpu_storage is not None, "CPU storage not initialized" + assert not pin_memory or cpu_storage.is_pinned(), ( + f"CPU storage for {name} is not pinned, but pin_memory is " + "enabled. The batched H2D prefetch path requires pinned " + "source memory; otherwise cuMemcpyBatchAsync degrades to a " + "synchronous copy and breaks event-based fork " + "synchronization with the compute stream." + ) + + src_ptrs.append(cpu_storage.data_ptr()) + dst_ptrs.append(buffer.data_ptr()) + sizes.append(cpu_storage.numel() * cpu_storage.element_size()) + + # Group buffer's pointer. + if not src_ptrs: + self._buffer_src_ptrs = None + self._buffer_dst_ptrs = None + self._buffer_sizes = None + else: + self._buffer_src_ptrs = torch.tensor( + src_ptrs, dtype=torch.int64, pin_memory=pin_memory + ) + self._buffer_dst_ptrs = torch.tensor( + dst_ptrs, dtype=torch.int64, pin_memory=pin_memory + ) + self._buffer_sizes = torch.tensor( + sizes, dtype=torch.int64, pin_memory=pin_memory + ) + def start_onload_to_static(self): """Start async copy from CPU storage to GPU buffer. @@ -517,24 +565,39 @@ def start_onload_to_static(self): self._prefetch_in_capture = torch.cuda.is_current_stream_capturing() # Fork: record event on compute stream, copy_stream waits on it - # This joins copy_stream to any active CUDA graph capture - fork_event = torch.cuda.Event() - torch.cuda.current_stream().record_event(fork_event) - self.copy_stream.wait_event(fork_event) + # This joins copy_stream to any active CUDA graph capture. + torch.cuda.current_stream().record_event(self._fork_event) + self.copy_stream.wait_event(self._fork_event) with torch.cuda.stream(self.copy_stream): - for name, offloader in self._param_offloaders.items(): - cpu_storage = offloader._cpu_storage - gpu_buffer = offloader._gpu_buffer - assert cpu_storage is not None, "CPU storage not initialized" - assert gpu_buffer is not None, "GPU buffer not assigned" - assert not should_pin_memory() or cpu_storage.is_pinned(), ( - f"CPU storage for {name} is not pinned! " - "non_blocking=True H2D copy from non-pinned memory " - "causes stream synchronization that breaks " - "event-based fork synchronization." + if not torch.cuda.is_current_stream_capturing() and ( + self._buffer_src_ptrs is not None + and self._buffer_dst_ptrs is not None + and self._buffer_sizes is not None + ): + # Batch API path: batched copy using custom op (single cuMemcpyBatchAsync call on CUDA 12.8+) + # cuMemcpyBatchAsync can have less driver-call overhead and better performance. + # swap_blocks_batch() will fallback to internal cudaMemcpyAsync loop if cuMemcpyBatchAsync is not available. + # IMPORTANT: cuMemcpyBatchAsync is not capture-safe. + ops.swap_blocks_batch( + src_ptrs=self._buffer_src_ptrs, + dst_ptrs=self._buffer_dst_ptrs, + sizes=self._buffer_sizes ) - gpu_buffer.copy_(cpu_storage, non_blocking=True) + else: + # Graph path: Fallbacks to per-param copy_() so they can get recorded into the graph. + for name, offloader in self._param_offloaders.items(): + cpu_storage = offloader._cpu_storage + gpu_buffer = offloader._gpu_buffer + assert cpu_storage is not None, "CPU storage not initialized" + assert gpu_buffer is not None, "GPU buffer not assigned" + assert not should_pin_memory() or cpu_storage.is_pinned(), ( + f"CPU storage for {name} is not pinned! " + "non_blocking=True H2D copy from non-pinned memory " + "causes stream synchronization that breaks " + "event-based fork synchronization." + ) + gpu_buffer.copy_(cpu_storage, non_blocking=True) # Record completion event for _wait_for_layer to use self._copy_done_event.record(self.copy_stream)