Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 233 additions & 13 deletions tests/v1/kv_offload/test_cpu_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
DEVICE_TYPE = current_platform.device_type
DEVICES = [f"{DEVICE_TYPE}:0"]
NUM_MAPPINGS = [3]
NUM_MAPPINGS_PER_GROUP = [2]


@pytest.mark.parametrize("gpu_to_cpu", [True, False])
Expand Down Expand Up @@ -58,9 +59,7 @@ def test_transfer(
# build CanonicalKVCacheTensor list: one per tensor
kv_cache_tensors: list[CanonicalKVCacheTensor] = []
for i in range(num_tensors):
gpu_tensor = torch.randint(
-128,
127,
gpu_tensor = torch.zeros(
(num_gpu_blocks, gpu_page_size_bytes),
dtype=torch.int8,
device=device,
Expand Down Expand Up @@ -119,34 +118,44 @@ def test_transfer(
for j in range(block_size_factor)
]

# maybe skip some GPU blocks to test reading from the middle of a CPU block
if not gpu_to_cpu:
blocks_to_skip = block_size_factor - 1
# maybe skip some GPU blocks to test reading/writing from the middle of a CPU block
blocks_to_skip = block_size_factor - 1
if blocks_to_skip > 0:
gpu_blocks = gpu_blocks[blocks_to_skip:]
cpu_blocks_expanded = cpu_blocks_expanded[blocks_to_skip:]

# set transfer direction
if gpu_to_cpu:
handler = handlers.gpu_to_cpu_handler
src_spec = GPULoadStoreSpec(gpu_blocks, group_sizes=(len(gpu_blocks),))
src_spec = GPULoadStoreSpec(
gpu_blocks, group_sizes=(len(gpu_blocks),), block_indices=(blocks_to_skip,)
)
dst_spec = CPULoadStoreSpec(cpu_blocks)
dst_to_src = dict(zip(cpu_blocks_expanded, gpu_blocks))
num_dst_sub_blocks = num_cpu_blocks * block_size_factor
num_dst_sub_blocks = num_gpu_blocks
else:
handler = handlers.cpu_to_gpu_handler
src_spec = CPULoadStoreSpec(cpu_blocks)
dst_spec = GPULoadStoreSpec(gpu_blocks, group_sizes=(len(gpu_blocks),))
dst_spec = GPULoadStoreSpec(
gpu_blocks, group_sizes=(len(gpu_blocks),), block_indices=(blocks_to_skip,)
)
dst_to_src = dict(zip(gpu_blocks, cpu_blocks_expanded))
num_dst_sub_blocks = num_gpu_blocks

# randomize src and dst tensors before transfer
for tensor in handler.src_tensors:
tensor.random_()
for tensor in handler.dst_tensors:
tensor.random_()

# clone src and dst tensors before transfer
orig_src_tensors = [x.clone() for x in handler.src_tensors]
orig_dst_tensors = [x.clone() for x in handler.dst_tensors]

# call transfer function
start_time = time.time()
assert handler.transfer_async(1, (src_spec, dst_spec))
assert set({x.job_id for x in handler._transfers}) == {1}
assert {x.job_id for x in handler._transfers} == {1}

# wait for transfer to complete
end_time = time.time() + 10
Expand All @@ -155,11 +164,14 @@ def test_transfer(
if finished:
assert finished[0].job_id == 1
assert finished[0].success
assert finished[0].transfer_type == (
("GPU", "CPU") if gpu_to_cpu else ("CPU", "GPU")
assert (
finished[0].transfer_type == ("GPU", "CPU")
if gpu_to_cpu
else ("CPU", "GPU")
)
assert finished[0].transfer_size == (
len(gpu_blocks) * handler.group_block_size_in_bytes[0]
len(gpu_blocks)
* sum([x.page_size_bytes for x in handler.kv_cache_groups_data_refs[0]])
)
assert finished[0].transfer_time > 0
assert finished[0].transfer_time < (time.time() - start_time)
Expand Down Expand Up @@ -196,3 +208,211 @@ def test_transfer(
handlers.gpu_to_cpu_handler.shutdown()
if mmap_region:
mmap_region.cleanup()


@pytest.mark.parametrize("gpu_to_cpu", [True, False])
@pytest.mark.parametrize("num_mappings_per_group", NUM_MAPPINGS_PER_GROUP)
@pytest.mark.parametrize("gpu_page_size_bytes", GPU_PAGE_SIZES)
@pytest.mark.parametrize("block_size_factor", BLOCK_SIZE_FACTORS)
@pytest.mark.parametrize("num_gpu_blocks", NUM_GPU_BLOCKS)
@pytest.mark.parametrize("num_cpu_blocks", NUM_CPU_BLOCKS)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@torch.inference_mode()
def test_transfer_multi_group(
default_vllm_config,
gpu_to_cpu: bool,
num_mappings_per_group: int,
gpu_page_size_bytes: int,
block_size_factor: int,
num_gpu_blocks: int,
num_cpu_blocks: int,
seed: int,
device: str,
) -> None:
"""Test transfers with three KV cache groups:
- Group 0: aligned transfer with num_mappings_per_group blocks
- Group 1: zero blocks (empty group)
- Group 2: unaligned CPU->GPU transfer (logical_offset=block_size_factor-1,
causing the implementation to skip source sub-blocks) with
num_mappings_per_group blocks
"""
set_random_seed(seed)

# 3 groups, each with 2 tensors
num_groups = 3
tensors_per_group = 2
num_tensors = num_groups * tensors_per_group
kv_cache_tensors: list[CanonicalKVCacheTensor] = []
for _ in range(num_tensors):
gpu_tensor = torch.zeros(
(num_gpu_blocks, gpu_page_size_bytes),
dtype=torch.int8,
device=device,
)
kv_cache_tensors.append(
CanonicalKVCacheTensor(
tensor=gpu_tensor,
page_size_bytes=gpu_page_size_bytes,
)
)

kv_cache_groups_data_refs: list[list[CanonicalKVCacheRef]] = [
[
CanonicalKVCacheRef(
tensor_idx=g * tensors_per_group + i,
page_size_bytes=gpu_page_size_bytes,
)
for i in range(tensors_per_group)
]
for g in range(num_groups)
]

canonical_kv_caches = CanonicalKVCaches(
tensors=kv_cache_tensors, group_data_refs=kv_cache_groups_data_refs
)

handlers = CpuGpuOffloadingHandlers(
kv_caches=canonical_kv_caches,
block_size_factor=block_size_factor,
num_cpu_blocks=num_cpu_blocks,
)

# group 0: aligned, group 1: empty, group 2: unaligned on CPU->GPU
group_sizes_in_cpu_blocks = [num_mappings_per_group, 0, num_mappings_per_group]

total_cpu_blocks = sum(group_sizes_in_cpu_blocks)
total_gpu_blocks_needed = total_cpu_blocks * block_size_factor
gpu_blocks_all = random.sample(range(num_gpu_blocks), total_gpu_blocks_needed)
cpu_blocks_all = random.sample(range(num_cpu_blocks), total_cpu_blocks)

# split gpu/cpu blocks per group
gpu_blocks_per_group: list[list[int]] = []
cpu_blocks_per_group: list[list[int]] = []
gpu_offset = 0
cpu_offset = 0
for size in group_sizes_in_cpu_blocks:
gpu_count = size * block_size_factor
gpu_blocks_per_group.append(gpu_blocks_all[gpu_offset : gpu_offset + gpu_count])
cpu_blocks_per_group.append(cpu_blocks_all[cpu_offset : cpu_offset + size])
gpu_offset += gpu_count
cpu_offset += size

# expand cpu blocks to gpu-page granularity
cpu_blocks_expanded_per_group = [
[
cpu_block * block_size_factor + j
for cpu_block in cpu_blocks
for j in range(block_size_factor)
]
for cpu_blocks in cpu_blocks_per_group
]

# skip sub-blocks from group 2 to test unaligned transfers.
sub_blocks_to_skip = block_size_factor - 1 # e.g. 2 when block_size_factor=3
if sub_blocks_to_skip > 0:
gpu_blocks_per_group[2] = gpu_blocks_per_group[2][
sub_blocks_to_skip:-sub_blocks_to_skip
]
cpu_blocks_expanded_per_group[2] = cpu_blocks_expanded_per_group[2][
sub_blocks_to_skip:-sub_blocks_to_skip
]

# build flat gpu_blocks list and group_sizes in GPU blocks
gpu_blocks: list[int] = []
group_sizes: list[int] = []
for gpu_blks in gpu_blocks_per_group:
gpu_blocks.extend(gpu_blks)
group_sizes.append(len(gpu_blks))

# build flat cpu_blocks list
cpu_blocks = []
for cpu_blks in cpu_blocks_per_group:
cpu_blocks.extend(cpu_blks)

# block_indices: only relevant for unaligned transfers
block_indices: list[int] = [0, 0, sub_blocks_to_skip]

if gpu_to_cpu:
handler = handlers.gpu_to_cpu_handler
src_spec = GPULoadStoreSpec(
gpu_blocks, group_sizes=group_sizes, block_indices=block_indices
)
dst_spec = CPULoadStoreSpec(cpu_blocks)
# per-group mapping: cpu sub-block -> gpu sub-block
dst_to_src_per_group = [
dict(zip(expanded, gpu_blks))
for expanded, gpu_blks in zip(
cpu_blocks_expanded_per_group, gpu_blocks_per_group
)
]
num_dst_sub_blocks = num_cpu_blocks * block_size_factor
else:
handler = handlers.cpu_to_gpu_handler
src_spec = CPULoadStoreSpec(cpu_blocks)
dst_spec = GPULoadStoreSpec(
gpu_blocks, group_sizes=group_sizes, block_indices=block_indices
)
# per-group mapping: gpu sub-block -> cpu sub-block
dst_to_src_per_group = [
dict(zip(gpu_blks, expanded))
for gpu_blks, expanded in zip(
gpu_blocks_per_group, cpu_blocks_expanded_per_group
)
]
num_dst_sub_blocks = num_gpu_blocks

# randomize src and dst tensors before transfer
for tensor in handler.src_tensors:
tensor.random_()
for tensor in handler.dst_tensors:
tensor.random_()

orig_src_tensors = [x.clone() for x in handler.src_tensors]
orig_dst_tensors = [x.clone() for x in handler.dst_tensors]

assert handler.transfer_async(1, (src_spec, dst_spec))
assert {x.job_id for x in handler._transfers} == {1}

end_time = time.time() + 10
while time.time() < end_time:
finished = handler.get_finished()
if finished:
assert finished[0].job_id == 1
assert finished[0].success
expected_bytes = sum(
group_size * sum([x.page_size_bytes for x in data_refs])
for group_size, data_refs in zip(
group_sizes, handler.kv_cache_groups_data_refs
)
)
assert finished[0].transfer_size == expected_bytes
break
time.sleep(0.1)

# verify src tensors did not change
for orig_tensor, tensor in zip(orig_src_tensors, handler.src_tensors):
assert torch.equal(orig_tensor, tensor)

# verify dst tensors at gpu-page granularity
for group_idx, dst_to_src in enumerate(dst_to_src_per_group):
group_tensor_offset = group_idx * tensors_per_group
for tensor_idx in range(tensors_per_group):
src_tensor = handler.src_tensors[group_tensor_offset + tensor_idx]
dst_tensor = handler.dst_tensors[group_tensor_offset + tensor_idx]
orig_dst_tensor = orig_dst_tensors[group_tensor_offset + tensor_idx]
src_view = src_tensor.view(-1, gpu_page_size_bytes)
dst_view = dst_tensor.view(-1, gpu_page_size_bytes)
orig_dst_view = orig_dst_tensor.view(-1, gpu_page_size_bytes)
for dst_sub_block in range(num_dst_sub_blocks):
src_sub_block = dst_to_src.get(dst_sub_block)
if src_sub_block is not None:
expected = src_view[src_sub_block]
else:
expected = orig_dst_view[dst_sub_block]
torch.testing.assert_close(
dst_view[dst_sub_block].cpu(), expected.cpu()
)

handlers.cpu_to_gpu_handler.shutdown()
handlers.gpu_to_cpu_handler.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ def _get_reqs_to_store(self, scheduler_output: SchedulerOutput):
for i in range(self.config.block_size_factor):
src_block_ids.append(block_ids[gpu_block_idx + i])
src_spec = GPULoadStoreSpec(
src_block_ids, group_sizes=(len(src_block_ids),)
src_block_ids,
group_sizes=(len(src_block_ids),),
block_indices=(0,),
)

reqs_to_store[req_id] = (src_spec, dst_spec)
Expand Down
10 changes: 4 additions & 6 deletions vllm/v1/kv_offload/mediums.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,24 @@ class GPULoadStoreSpec(BlockIDsLoadStoreSpec):
will correspond to logically contiguous blocks, e.g. blocks 5-10 of a some request.
block_indices[i] will represent the block index of the first block in group #i.
Thus, len(block_indices) == len(group_sizes) = number of KV cache groups.
This information is required in order to support loading from offloaded blocks
This information is required in order to support off/loading from offloaded blocks
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is this off/loading? 😆

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I meant offloading + onloading, but indeed it is even confusing to me :)

which are larger than GPU blocks.
In such cases, the first GPU block per each group may be unaligned to the offloaded
block size, and so knowing block_indices[i] allows the worker to correctly
skip part of the first matching offloaded block.
Offloading from GPU is always aligned to offloaded block size, and so
block_indices will only be set by the offloading connector when loading into GPU.
"""

def __init__(
self,
block_ids: list[int],
group_sizes: Sequence[int],
block_indices: Sequence[int] | None = None,
block_indices: Sequence[int],
):
super().__init__(block_ids)
assert sum(group_sizes) == len(block_ids)
assert block_indices is None or len(block_indices) == len(group_sizes)
assert len(block_indices) == len(group_sizes)
self.group_sizes: Sequence[int] = group_sizes
self.block_indices: Sequence[int] | None = block_indices
self.block_indices: Sequence[int] = block_indices

@staticmethod
def medium() -> str:
Expand Down
Loading
Loading