Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 78 additions & 15 deletions vllm/model_executor/offloader/prefetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from vllm.logger import init_logger
from vllm.model_executor.offloader.base import BaseOffloader, should_pin_memory
from vllm.utils.torch_utils import get_dtype_size
from vllm import _custom_ops as ops

logger = init_logger(__name__)

Expand Down Expand Up @@ -390,6 +391,10 @@ def __init__(
# Used for per-layer synchronization (both eager and capture modes).
self._copy_done_event = torch.cuda.Event()

# Fork: record event on compute stream, copy_stream waits on it.
# This joins copy_stream to any active CUDA graph capture.
self._fork_event = torch.cuda.Event()
Comment thread
xiaobao520123 marked this conversation as resolved.

# Track whether _copy_done_event is valid for eager-mode wait_event.
# False when: (1) never recorded, or (2) last recorded during a
# cudagraph capture (events become invalid after capture ends).
Expand All @@ -409,6 +414,12 @@ def __init__(
self._buffer_pool: StaticBufferPool | None = None
self._buffer_slot_idx: int = 0

# Buffer pointers
# Grouped pointers enable batch copy from cuMemcpyBatchAsync.
self._buffer_src_ptrs: torch.Tensor | None = None
self._buffer_dst_ptrs: torch.Tensor | None = None
self._buffer_sizes: torch.Tensor | None = None

param_dict = dict(self.module.named_parameters())
assert all(name in param_dict for name in whitelist_param_names), (
f"Whitelist params {whitelist_param_names} not found in module params "
Expand Down Expand Up @@ -486,6 +497,12 @@ def assign_buffer_slot(self, pool: StaticBufferPool, slot_idx: int):
self._buffer_pool = pool
self._buffer_slot_idx = slot_idx

pin_memory = should_pin_memory()

src_ptrs: list[int] = []
dst_ptrs: list[int] = []
sizes: list[int] = []

# Assign static buffers to parameters
# Use CPU storage shape/stride/dtype since param.data is now empty
for name, offloader in self._param_offloaders.items():
Expand All @@ -500,6 +517,37 @@ def assign_buffer_slot(self, pool: StaticBufferPool, slot_idx: int):
)
offloader.assign_static_buffer(buffer)

# IMPORTANT: Update pointer.
cpu_storage = offloader._cpu_storage
assert cpu_storage is not None, "CPU storage not initialized"
assert not pin_memory or cpu_storage.is_pinned(), (
f"CPU storage for {name} is not pinned, but pin_memory is "
"enabled. The batched H2D prefetch path requires pinned "
"source memory; otherwise cuMemcpyBatchAsync degrades to a "
"synchronous copy and breaks event-based fork "
"synchronization with the compute stream."
)

src_ptrs.append(cpu_storage.data_ptr())
dst_ptrs.append(buffer.data_ptr())
sizes.append(cpu_storage.numel() * cpu_storage.element_size())

# Group buffer's pointer.
if not src_ptrs:
self._buffer_src_ptrs = None
self._buffer_dst_ptrs = None
self._buffer_sizes = None
else:
self._buffer_src_ptrs = torch.tensor(
src_ptrs, dtype=torch.int64, pin_memory=pin_memory
)
self._buffer_dst_ptrs = torch.tensor(
dst_ptrs, dtype=torch.int64, pin_memory=pin_memory
)
self._buffer_sizes = torch.tensor(
sizes, dtype=torch.int64, pin_memory=pin_memory
)

def start_onload_to_static(self):
"""Start async copy from CPU storage to GPU buffer.

Expand All @@ -517,24 +565,39 @@ def start_onload_to_static(self):
self._prefetch_in_capture = torch.cuda.is_current_stream_capturing()

# Fork: record event on compute stream, copy_stream waits on it
# This joins copy_stream to any active CUDA graph capture
fork_event = torch.cuda.Event()
torch.cuda.current_stream().record_event(fork_event)
self.copy_stream.wait_event(fork_event)
# This joins copy_stream to any active CUDA graph capture.
torch.cuda.current_stream().record_event(self._fork_event)
self.copy_stream.wait_event(self._fork_event)

with torch.cuda.stream(self.copy_stream):
for name, offloader in self._param_offloaders.items():
cpu_storage = offloader._cpu_storage
gpu_buffer = offloader._gpu_buffer
assert cpu_storage is not None, "CPU storage not initialized"
assert gpu_buffer is not None, "GPU buffer not assigned"
assert not should_pin_memory() or cpu_storage.is_pinned(), (
f"CPU storage for {name} is not pinned! "
"non_blocking=True H2D copy from non-pinned memory "
"causes stream synchronization that breaks "
"event-based fork synchronization."
if not torch.cuda.is_current_stream_capturing() and (
self._buffer_src_ptrs is not None
and self._buffer_dst_ptrs is not None
and self._buffer_sizes is not None
):
# Batch API path: batched copy using custom op (single cuMemcpyBatchAsync call on CUDA 12.8+)
# cuMemcpyBatchAsync can have less driver-call overhead and better performance.
# swap_blocks_batch() will fallback to internal cudaMemcpyAsync loop if cuMemcpyBatchAsync is not available.
# IMPORTANT: cuMemcpyBatchAsync is not capture-safe.
ops.swap_blocks_batch(
src_ptrs=self._buffer_src_ptrs,
dst_ptrs=self._buffer_dst_ptrs,
sizes=self._buffer_sizes
)
gpu_buffer.copy_(cpu_storage, non_blocking=True)
else:
# Graph path: Fallbacks to per-param copy_() so they can get recorded into the graph.
for name, offloader in self._param_offloaders.items():
cpu_storage = offloader._cpu_storage
gpu_buffer = offloader._gpu_buffer
assert cpu_storage is not None, "CPU storage not initialized"
assert gpu_buffer is not None, "GPU buffer not assigned"
assert not should_pin_memory() or cpu_storage.is_pinned(), (
f"CPU storage for {name} is not pinned! "
"non_blocking=True H2D copy from non-pinned memory "
"causes stream synchronization that breaks "
"event-based fork synchronization."
)
gpu_buffer.copy_(cpu_storage, non_blocking=True)

# Record completion event for _wait_for_layer to use
self._copy_done_event.record(self.copy_stream)
Expand Down
Loading