diff --git a/tests/v1/kv_connector/unit/offloading_connector/__init__.py b/tests/v1/kv_connector/unit/offloading_connector/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/v1/kv_connector/unit/offloading_connector/conftest.py b/tests/v1/kv_connector/unit/offloading_connector/conftest.py new file mode 100644 index 000000000000..f1066df285ee --- /dev/null +++ b/tests/v1/kv_connector/unit/offloading_connector/conftest.py @@ -0,0 +1,7 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from tests.v1.kv_connector.unit.offloading_connector.utils import ( + request_runner, +) + +__all__ = ["request_runner"] diff --git a/tests/v1/kv_connector/unit/offloading_connector/test_metrics.py b/tests/v1/kv_connector/unit/offloading_connector/test_metrics.py new file mode 100644 index 000000000000..88ccb0aeb68d --- /dev/null +++ b/tests/v1/kv_connector/unit/offloading_connector/test_metrics.py @@ -0,0 +1,151 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from vllm.distributed.kv_transfer.kv_connector.v1.offloading.metrics import ( + OffloadingConnectorStats, +) +from vllm.distributed.kv_transfer.kv_connector.v1.offloading_connector import ( + OffloadingConnector, +) + + +def test_build_kv_connector_stats_with_none(): + """Test that build_kv_connector_stats returns empty stats when given None.""" + stats = OffloadingConnector.build_kv_connector_stats(data=None) + + assert stats is not None + assert isinstance(stats, OffloadingConnectorStats) + assert len(stats.data) == 0 + assert stats.is_empty() + + +def test_build_kv_connector_stats_with_empty_dict(): + """Test that build_kv_connector_stats returns empty stats with empty dict.""" + stats = OffloadingConnector.build_kv_connector_stats(data={}) + + assert stats is not None + assert isinstance(stats, OffloadingConnectorStats) + assert len(stats.data) == 0 + assert stats.is_empty() + + +def test_build_kv_connector_stats_reconstructs_offload_stats(): + """Test that OffloadingConnector stats are properly reconstructed with + correct data.""" + serialized_data = { + "CPU_to_GPU": [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + ], + "GPU_to_CPU": [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + ], + } + + stats = OffloadingConnector.build_kv_connector_stats(data=serialized_data) + + offload_connector_stats = stats + assert isinstance(offload_connector_stats, OffloadingConnectorStats) + assert offload_connector_stats.data["CPU_to_GPU"] == [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + ] + assert offload_connector_stats.data["GPU_to_CPU"] == [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + ] + + +def test_aggregate_same_connector(): + """Test aggregating stats from the same connector type.""" + stats1 = OffloadingConnectorStats( + data={ + "CPU_to_GPU": [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + ], + "GPU_to_CPU": [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + ], + } + ) + + stats2 = OffloadingConnectorStats( + data={ + "CPU_to_GPU": [ + {"op_size": 3, "op_time": 0.2}, + {"op_size": 7, "op_time": 0.9}, + ], + "GPU_to_CPU": [{"op_size": 16, "op_time": 2}], + } + ) + + result = stats1.aggregate(stats2) + + assert result is stats1 # Should return self + offload_connector_stats = result + assert offload_connector_stats.data["CPU_to_GPU"] == [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + {"op_size": 3, "op_time": 0.2}, + {"op_size": 7, "op_time": 0.9}, + ] + assert offload_connector_stats.data["GPU_to_CPU"] == [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + {"op_size": 16, "op_time": 2}, + ] + + +def test_reduce(): + """Test that reduce() correctly reduces all nested connector stats.""" + stats = OffloadingConnectorStats( + data={ + "CPU_to_GPU": [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + {"op_size": 3, "op_time": 0.2}, + {"op_size": 7, "op_time": 0.9}, + ], + "GPU_to_CPU": [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + {"op_size": 16, "op_time": 2}, + ], + } + ) + + reduced = stats.reduce() + + assert isinstance(reduced, dict) + # Check that the stats were reduced (should have aggregated values) + assert "CPU_to_GPU_total_bytes" in reduced + assert "CPU_to_GPU_total_time" in reduced + assert "GPU_to_CPU_total_bytes" in reduced + assert "GPU_to_CPU_total_time" in reduced + assert reduced["CPU_to_GPU_total_bytes"] == 34 + assert reduced["CPU_to_GPU_total_time"] == 2.6 + assert reduced["GPU_to_CPU_total_time"] == 2.3 + assert reduced["GPU_to_CPU_total_bytes"] == 19 + + +def test_reset(): + """Test that reset() resets all nested connector stats.""" + offload_connector_stats = OffloadingConnectorStats( + data={ + "CPU_to_GPU": [ + {"op_size": 3, "op_time": 0.2}, + {"op_size": 7, "op_time": 0.9}, + ], + "GPU_to_CPU": [{"op_size": 16, "op_time": 2}], + } + ) + + assert not offload_connector_stats.is_empty() + + offload_connector_stats.reset() + + # After reset, stats should be empty + assert offload_connector_stats.is_empty() + assert len(offload_connector_stats.data) == 0 diff --git a/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py new file mode 100644 index 000000000000..e96ce29e95e6 --- /dev/null +++ b/tests/v1/kv_connector/unit/offloading_connector/test_scheduler.py @@ -0,0 +1,341 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from collections.abc import Iterable + +import pytest + +from tests.v1.kv_connector.unit.offloading_connector.utils import ( + generate_store_output, +) +from tests.v1.kv_connector.unit.utils import EOS_TOKEN_ID +from vllm.distributed.kv_events import BlockRemoved, BlockStored +from vllm.v1.core.kv_cache_utils import BlockHash +from vllm.v1.kv_offload.abstract import OffloadingEvent +from vllm.v1.request import RequestStatus + + +@pytest.mark.parametrize("async_scheduling", [True, False]) +def test_offloading_connector(request_runner, async_scheduling: bool): + offloaded_block_size = 12 + gpu_block_size = 4 + num_gpu_blocks = 100 + block_size_factor = offloaded_block_size // gpu_block_size + + runner = request_runner( + offloaded_block_size=offloaded_block_size, + gpu_block_size=gpu_block_size, + num_gpu_blocks=num_gpu_blocks, + async_scheduling=async_scheduling, + ) + + # 3 blocks, store just the middle block (skip first and last) + # blocks = [0, 1, 2], [3, 4, 5], [6, 7, 8] + runner.new_request(token_ids=[0] * offloaded_block_size * 3) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output(list(block_hashes)[1:2]) + ) + runner.run(decoded_tokens=[0]) + + # add block missing 1 token -> no offload + runner.run( + decoded_tokens=[0] * (offloaded_block_size - 1), + expected_stored_gpu_block_indexes=(3, 4, 5), + ) + runner.manager.prepare_store.assert_not_called() + + # +1 token -> single block, fail prepare_store + runner.manager.prepare_store.side_effect = lambda block_hashes: None + runner.run(decoded_tokens=[0]) + runner.manager.prepare_store.assert_called() + + # 1 more block (+ token for async scheduling) + # now set block_hashes_to_store = [] + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output([]) + ) + runner.run(decoded_tokens=[0] * (offloaded_block_size + 1)) + + # 1 more block (+ token for kicking off offloading) + # now check touch was called with all 6 blocks + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output(block_hashes) + ) + runner.run( + decoded_tokens=[0] * (offloaded_block_size + 1), + expected_stored_gpu_block_indexes=(15, 16, 17), + ) + runner.manager.touch.assert_called() + block_hashes1 = list(runner.manager.touch.call_args.args[0]) + assert len(block_hashes1) == 6 + + # terminate request + runner.run(decoded_tokens=[EOS_TOKEN_ID]) + + # create a new request differing only on the last token + runner.new_request(token_ids=[0] * (offloaded_block_size * 6 - 1) + [1]) + runner.run(decoded_tokens=[0]) + runner.manager.touch.assert_called() + block_hashes2 = list(runner.manager.touch.call_args.args[0]) + assert len(block_hashes2) == 6 + + # verify hashes are the same, except for the last block + assert block_hashes1[:5] == block_hashes2[:5] + assert block_hashes1[5] != block_hashes2[5] + + # terminate request + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + expected_stored_gpu_block_indexes=tuple(range(6 * block_size_factor)), + ) + + # full_block_tokens - num_computed_tokens < offloaded_block_size + runner.new_request( + token_ids=[0] * gpu_block_size + [1] * (offloaded_block_size - gpu_block_size) + ) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output([]) + ) + runner.run(decoded_tokens=[EOS_TOKEN_ID]) + runner.manager.lookup.assert_not_called() + + # single block lookup with no hits + runner.new_request(token_ids=[1] * offloaded_block_size) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output([]) + ) + runner.run(decoded_tokens=[EOS_TOKEN_ID]) + runner.manager.lookup.assert_called() + assert len(list(runner.manager.lookup.call_args.args[0])) == 1 + + # single block lookup with a hit + runner.scheduler.reset_prefix_cache() + runner.new_request(token_ids=[0] * offloaded_block_size) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output([]) + ) + runner.manager.lookup.return_value = 1 + runner.run( + decoded_tokens=[EOS_TOKEN_ID], expected_loaded_gpu_block_indexes=(0, 1, 2) + ) + + # single block lookup with a hit in a middle block + runner.new_request( + token_ids=[0] * offloaded_block_size * 2 + [1] * offloaded_block_size + ) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output([]) + ) + runner.manager.lookup.return_value = 1 + runner.run( + decoded_tokens=[EOS_TOKEN_ID], expected_loaded_gpu_block_indexes=(3, 4, 5) + ) + + # test take_events + def to_hashes(int_hashes: list[int]) -> list[BlockHash]: + return [BlockHash(str(i).encode()) for i in int_hashes] + + def take_events() -> Iterable[OffloadingEvent]: + yield OffloadingEvent( + block_hashes=to_hashes([1, 2, 3]), block_size=16, medium="A", removed=False + ) + yield OffloadingEvent( + block_hashes=to_hashes([4, 5, 6]), block_size=32, medium="B", removed=True + ) + + runner.manager.take_events.side_effect = take_events + events = list(runner.scheduler_connector.take_events()) + assert len(events) == 2 + event = events[0] + assert isinstance(event, BlockStored) + assert event.block_hashes == to_hashes([1, 2, 3]) + assert event.block_size == 16 + assert event.medium == "A" + assert event.token_ids == [] + assert event.parent_block_hash is None + assert event.lora_id is None + assert event.lora_name is None + event = events[1] + assert isinstance(event, BlockRemoved) + assert event.block_hashes == to_hashes([4, 5, 6]) + assert event.medium == "B" + + +@pytest.mark.parametrize("async_scheduling", [True, False]) +def test_request_preemption(request_runner, async_scheduling: bool): + offloaded_block_size = 12 + gpu_block_size = 4 + num_gpu_blocks = 100 + + runner = request_runner( + offloaded_block_size=offloaded_block_size, + gpu_block_size=gpu_block_size, + num_gpu_blocks=num_gpu_blocks, + async_scheduling=async_scheduling, + ) + + free_block_queue = runner.scheduler.kv_cache_manager.block_pool.free_block_queue + num_free_blocks_empty = free_block_queue.num_free_blocks + + # 2 blocks, store all, without flushing + # blocks = [0, 1, 2], [3, 4, 5] + runner.new_request(token_ids=[0] * offloaded_block_size * 2) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output(block_hashes) + ) + runner.run( + decoded_tokens=[0], + complete_transfers=False, + ) + + # decode 2 more blocks - 1 gpu block, storing [6, 7, 8] (no flush) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output(block_hashes) + ) + runner.run( + decoded_tokens=[0] * (2 * offloaded_block_size - gpu_block_size), + complete_transfers=False, + ) + + # simulate KV cache running out of space + free_block_queue.num_free_blocks = 0 + + # request should be preempted now + runner.run( + decoded_tokens=[], + complete_transfers=False, + expected_flushed_gpu_block_indexes=(0, 1, 2, 3, 4, 5, 6, 7, 8), + expected_stored_gpu_block_indexes=(0, 1, 2, 3, 4, 5, 6, 7, 8), + ) + + # restore KV cache space and reset GPU prefix cache + free_block_queue.num_free_blocks = num_free_blocks_empty + runner.scheduler.reset_prefix_cache() + + # request should now return from preemption + # re-load [0, ..., 8] from the CPU and store [9, 10, 11] + runner.manager.lookup.return_value = 3 + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output(block_hashes) + ) + runner.run( + decoded_tokens=[0] * gpu_block_size, + expected_loaded_gpu_block_indexes=(0, 1, 2, 3, 4, 5, 6, 7, 8), + ) + + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + expected_stored_gpu_block_indexes=(9, 10, 11), + ) + + +@pytest.mark.parametrize("async_scheduling", [True, False]) +def test_concurrent_lookups_of_the_same_prefix(request_runner, async_scheduling: bool): + offloaded_block_size = 12 + gpu_block_size = 4 + num_gpu_blocks = 100 + + runner = request_runner( + offloaded_block_size=offloaded_block_size, + gpu_block_size=gpu_block_size, + num_gpu_blocks=num_gpu_blocks, + async_scheduling=async_scheduling, + ) + + # store 1 blocks + runner.new_request(token_ids=[0] * offloaded_block_size) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output(block_hashes) + ) + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + expected_stored_gpu_block_indexes=(0, 1, 2), + ) + + # start a request to load the first block, but don't complete + runner.scheduler.reset_prefix_cache() + runner.new_request(token_ids=[0] * offloaded_block_size) + runner.manager.lookup.return_value = 1 + runner.run( + decoded_tokens=[], + complete_transfers=False, + ) + + # request triggered a load + transfer_jobs = list(runner.offloading_spec.handler.transfer_specs) + assert transfer_jobs + + # start a new request to load the same first block + runner.new_request(token_ids=[0] * offloaded_block_size) + runner.manager.lookup.return_value = 1 + runner.run( + decoded_tokens=[], + complete_transfers=False, + ) + + # request did not trigger a load + assert transfer_jobs == list(runner.offloading_spec.handler.transfer_specs) + + # complete transfers + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output([]) + ) + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + expected_loaded_gpu_block_indexes=(0, 1, 2), + ) + + # second request will use the GPU prefix cache + assert transfer_jobs == list(runner.offloading_spec.handler.transfer_specs) + + +@pytest.mark.parametrize("async_scheduling", [True, False]) +def test_abort_loading_requests(request_runner, async_scheduling: bool): + offloaded_block_size = 12 + gpu_block_size = 4 + num_gpu_blocks = 100 + + runner = request_runner( + offloaded_block_size=offloaded_block_size, + gpu_block_size=gpu_block_size, + num_gpu_blocks=num_gpu_blocks, + async_scheduling=async_scheduling, + ) + + # store 1 blocks + runner.new_request(token_ids=[0] * offloaded_block_size) + runner.manager.prepare_store.side_effect = ( + lambda block_hashes: generate_store_output(block_hashes) + ) + runner.run( + decoded_tokens=[EOS_TOKEN_ID], + expected_stored_gpu_block_indexes=(0, 1, 2), + ) + + # start a request to load the first block, but don't complete + runner.scheduler.reset_prefix_cache() + runner.new_request(token_ids=[0] * offloaded_block_size) + runner.manager.lookup.return_value = 1 + runner.run( + decoded_tokens=[], + complete_transfers=False, + ) + + # request triggered a load + transfer_jobs = list(runner.offloading_spec.handler.transfer_specs) + assert transfer_jobs + + # abort request + req_id = str(runner.req_id) + runner.scheduler.finish_requests((req_id,), RequestStatus.FINISHED_ABORTED) + + # verify request is not deleted + assert req_id in runner.scheduler.requests + + # complete loading request + runner.run( + decoded_tokens=[], + expected_loaded_gpu_block_indexes=(0, 1, 2), + ) + + # assert request is deleted + assert req_id not in runner.scheduler.requests diff --git a/tests/v1/kv_connector/unit/offloading_connector/test_worker.py b/tests/v1/kv_connector/unit/offloading_connector/test_worker.py new file mode 100644 index 000000000000..3cfb32b3068f --- /dev/null +++ b/tests/v1/kv_connector/unit/offloading_connector/test_worker.py @@ -0,0 +1,504 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from collections import defaultdict +from unittest.mock import MagicMock, patch + +import pytest +import torch + +from vllm.platforms import current_platform +from vllm.utils.torch_utils import get_dtype_size +from vllm.v1.attention.backend import AttentionBackend +from vllm.v1.attention.backends.registry import AttentionBackendEnum +from vllm.v1.attention.backends.utils import set_kv_cache_layout +from vllm.v1.kv_cache_interface import ( + FullAttentionSpec, + KVCacheConfig, + KVCacheGroupSpec, + KVCacheTensor, + MambaSpec, + MLAAttentionSpec, + UniformTypeKVCacheSpecs, +) +from vllm.v1.kv_offload.spec import ( + CanonicalKVCacheRef, + CanonicalKVCaches, + OffloadingSpec, +) + +NUM_BLOCKS = 10 +BLOCK_SIZE = 16 +NUM_KV_HEADS = 4 +HEAD_SIZE = 64 +DTYPE = torch.float16 + +# Attention backends to test +ATTN_BACKENDS: list[str] = [] +if current_platform.is_cuda(): + ATTN_BACKENDS = [ + "FLASH_ATTN", + "FLEX_ATTENTION", + "FLASHINFER", + "TRITON_ATTN", + ] +elif current_platform.is_rocm(): + ATTN_BACKENDS = ["TRITON_ATTN"] + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _allocate_and_reshape_kv_caches( + kv_cache_config: KVCacheConfig, + attn_groups: list[list], + device: torch.device, +): + """ + Use the real GPUModelRunner allocation and reshape methods to produce + kv_caches, just like the model runner does during initialization. + """ + from vllm.v1.worker.gpu_model_runner import GPUModelRunner + + # Some backends (e.g. FlashAttention) query the KV cache layout during + # reshape, which ultimately calls get_current_vllm_config(). Setting + # the layout override avoids needing a full VllmConfig context. + set_kv_cache_layout("NHD") + try: + runner = object.__new__(GPUModelRunner) + runner.device = device + runner.runner_only_attn_layers = set() + runner.attn_groups = attn_groups + runner.kv_cache_config = kv_cache_config + runner.cache_config = MagicMock(cache_dtype="auto") + runner.shared_kv_cache_layers = {} + runner.model_config = MagicMock() + runner.model_config.hf_config.model_type = "" + runner.compilation_config = MagicMock( + static_forward_context=defaultdict(MagicMock) + ) + runner.kv_caches = [] + + kernel_block_sizes = [BLOCK_SIZE] * len(kv_cache_config.kv_cache_groups) + return runner.initialize_kv_cache_tensors(kv_cache_config, kernel_block_sizes) + finally: + set_kv_cache_layout(None) + + +def _make_mock_layer(backend_cls: type[AttentionBackend]): + """ + Create a mock AttentionLayerBase whose get_attn_backend returns backend_cls. + """ + layer = MagicMock() + layer.get_attn_backend.return_value = backend_cls + return layer + + +def _make_worker(kv_cache_config: KVCacheConfig): + """ + Create an OffloadingConnectorWorker with mocked dependencies. + """ + from vllm.distributed.kv_transfer.kv_connector.v1.offloading.worker import ( + OffloadingConnectorWorker, + ) + + spec = MagicMock(spec=OffloadingSpec) + spec.kv_cache_config = kv_cache_config + spec.vllm_config = MagicMock() + spec.get_handlers.return_value = iter([]) + + worker = OffloadingConnectorWorker(spec=spec) + worker.worker = MagicMock() + + return worker, spec + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("backend", ATTN_BACKENDS) +@patch( + "vllm.distributed.kv_transfer.kv_connector.v1.offloading" + ".worker.get_layers_from_vllm_config" +) +def test_register_kv_caches(mock_get_layers, backend): + """Test register_kv_caches with multiple groups covering all layer types. + + Creates one FullAttention group, one MLA group, one Mamba group, and + one Mamba-padded group. Each group has GROUP_SIZE layers. + + KVCacheTensors are shared across all groups mirroring the real allocation + in kv_cache_utils.py: tensor i is shared by layer i from every group. + The padded-mamba group has a different page size so its layers get their + own dedicated tensors. + + Uses the real GPUModelRunner.initialize_kv_cache_tensors to produce + kv_caches, which automatically applies + _update_hybrid_attention_mamba_layout for hybrid models. + + Verifies that the canonicalized CanonicalKVCaches has the correct + block tensors, tensor_idx references, and page sizes across all groups. + """ + from vllm.v1.attention.backends.mla.indexer import ( + DeepseekV32IndexerBackend, + ) + from vllm.v1.worker.utils import AttentionGroup + + MLA_HEAD_SIZE = NUM_KV_HEADS * HEAD_SIZE * 2 + + # padded mamba (missing HEAD_SIZE) + CONV_STATE_SHAPE = (BLOCK_SIZE * NUM_KV_HEADS, HEAD_SIZE) + UNALIGNED_SSM_STATE_SHAPE = (BLOCK_SIZE * NUM_KV_HEADS - 1, HEAD_SIZE) + + PAGE_SIZE_BYTES = 2 * BLOCK_SIZE * NUM_KV_HEADS * HEAD_SIZE * get_dtype_size(DTYPE) + unaligned_mamba_page_size = PAGE_SIZE_BYTES - HEAD_SIZE * get_dtype_size(DTYPE) + + # unpadded mamba (fills page exactly) + ALIGNED_SSM_STATE_SHAPE = (BLOCK_SIZE * NUM_KV_HEADS, HEAD_SIZE) + + backend_cls = AttentionBackendEnum[backend].get_class() + + attn_spec = FullAttentionSpec( + block_size=BLOCK_SIZE, + num_kv_heads=NUM_KV_HEADS, + head_size=HEAD_SIZE, + dtype=DTYPE, + ) + mla_spec = MLAAttentionSpec( + block_size=BLOCK_SIZE, + num_kv_heads=1, + head_size=MLA_HEAD_SIZE, + dtype=DTYPE, + ) + unaligned_mamba_spec = MambaSpec( + block_size=BLOCK_SIZE, + shapes=(CONV_STATE_SHAPE, UNALIGNED_SSM_STATE_SHAPE), + dtypes=(DTYPE, DTYPE), + page_size_padded=PAGE_SIZE_BYTES, + ) + aligned_mamba_spec = MambaSpec( + block_size=BLOCK_SIZE, + shapes=(CONV_STATE_SHAPE, ALIGNED_SSM_STATE_SHAPE), + dtypes=(DTYPE, DTYPE), + page_size_padded=PAGE_SIZE_BYTES, + ) + + assert attn_spec.page_size_bytes == PAGE_SIZE_BYTES + assert mla_spec.page_size_bytes == PAGE_SIZE_BYTES + assert unaligned_mamba_spec.page_size_bytes == PAGE_SIZE_BYTES + assert aligned_mamba_spec.page_size_bytes == PAGE_SIZE_BYTES + + GROUP_SIZE = 3 + + # -- Build per-group layer info ---------------------------------------- + layer_idx = 0 + + attn_layer_names = [] + for _ in range(GROUP_SIZE): + attn_layer_names.append(f"model.layers.{layer_idx}.self_attn") + layer_idx += 1 + + mla_layer_names = [] + for _ in range(GROUP_SIZE): + mla_layer_names.append(f"model.layers.{layer_idx}.self_attn") + layer_idx += 1 + + unaligned_mamba_layer_names = [] + for _ in range(GROUP_SIZE): + unaligned_mamba_layer_names.append(f"model.layers.{layer_idx}.mamba_unpadded") + layer_idx += 1 + + aligned_mamba_layer_names = [] + for _ in range(GROUP_SIZE - 1): + aligned_mamba_layer_names.append(f"model.layers.{layer_idx}.mamba_padded") + layer_idx += 1 + + layer_groups = [ + attn_layer_names, + mla_layer_names, + unaligned_mamba_layer_names, + aligned_mamba_layer_names, + ] + + kv_cache_tensors: list[KVCacheTensor] = [] + for i in range(GROUP_SIZE): + shared_by: list[str] = [] + for group_layer_names in layer_groups: + if len(group_layer_names) > i: + shared_by.append(group_layer_names[i]) + kv_cache_tensors.append( + KVCacheTensor( + size=PAGE_SIZE_BYTES * NUM_BLOCKS, + shared_by=shared_by, + ) + ) + + kv_cache_groups = [ + KVCacheGroupSpec(layer_names=attn_layer_names, kv_cache_spec=attn_spec), + KVCacheGroupSpec(layer_names=mla_layer_names, kv_cache_spec=mla_spec), + KVCacheGroupSpec( + layer_names=unaligned_mamba_layer_names, kv_cache_spec=unaligned_mamba_spec + ), + KVCacheGroupSpec( + layer_names=aligned_mamba_layer_names, kv_cache_spec=aligned_mamba_spec + ), + ] + + attn_groups = [ + [ + AttentionGroup( + backend=backend_cls, + layer_names=attn_layer_names, + kv_cache_spec=attn_spec, + kv_cache_group_id=0, + ), + AttentionGroup( + backend=DeepseekV32IndexerBackend, + layer_names=mla_layer_names, + kv_cache_spec=mla_spec, + kv_cache_group_id=1, + ), + AttentionGroup( + backend=DeepseekV32IndexerBackend, # unused for mamba + layer_names=unaligned_mamba_layer_names, + kv_cache_spec=unaligned_mamba_spec, + kv_cache_group_id=2, + ), + AttentionGroup( + backend=DeepseekV32IndexerBackend, # unused for mamba + layer_names=aligned_mamba_layer_names, + kv_cache_spec=aligned_mamba_spec, + kv_cache_group_id=3, + ), + ] + ] + + kv_cache_config = KVCacheConfig( + num_blocks=NUM_BLOCKS, + kv_cache_tensors=kv_cache_tensors, + kv_cache_groups=kv_cache_groups, + ) + + kv_caches = _allocate_and_reshape_kv_caches( + kv_cache_config, + attn_groups, + device=torch.device("cuda:0"), + ) + + mock_layers: dict[str, MagicMock] = {} + for layer_name in attn_layer_names: + mock_layers[layer_name] = _make_mock_layer(backend_cls) + for layer_name in mla_layer_names: + mock_layers[layer_name] = _make_mock_layer(DeepseekV32IndexerBackend) + mock_get_layers.return_value = mock_layers + + worker, spec = _make_worker(kv_cache_config) + worker.register_kv_caches(kv_caches) + + canonical = spec.get_handlers.call_args[0][0] + assert isinstance(canonical, CanonicalKVCaches) + + # -- Expected block tensors ---------------------------------------------- + # All tensors have the same padded page size (PAGE_SIZE_BYTES). + # Tensor 0: shared by attn[0], mla[0], mamba_unaligned[0], mamba_aligned[0] + # Tensor 1: shared by attn[1], mla[1], mamba_unaligned[1], mamba_aligned[1] + # Tensor 2: shared by attn[2], mla[2], mamba_unaligned[2] + # (mamba_aligned has only GROUP_SIZE-1 = 2 layers) + expected_tensors = [ + (NUM_BLOCKS, PAGE_SIZE_BYTES), + (NUM_BLOCKS, PAGE_SIZE_BYTES), + (NUM_BLOCKS, PAGE_SIZE_BYTES), + ] + + # -- Expected group data refs (order matches kv_cache_groups) ------------- + ref = CanonicalKVCacheRef + expected_group_refs = [ + # attn group: layers attn[0..2] → tensors 0,1,2 with full page size + [ + ref(tensor_idx=0, page_size_bytes=PAGE_SIZE_BYTES), + ref(tensor_idx=1, page_size_bytes=PAGE_SIZE_BYTES), + ref(tensor_idx=2, page_size_bytes=PAGE_SIZE_BYTES), + ], + # mla group: layers mla[0..2] → tensors 0,1,2 with full page size + [ + ref(tensor_idx=0, page_size_bytes=PAGE_SIZE_BYTES), + ref(tensor_idx=1, page_size_bytes=PAGE_SIZE_BYTES), + ref(tensor_idx=2, page_size_bytes=PAGE_SIZE_BYTES), + ], + # unaligned mamba group: layers [0..2] → tensors 0,1,2 with unaligned page + [ + ref(tensor_idx=0, page_size_bytes=unaligned_mamba_page_size), + ref(tensor_idx=1, page_size_bytes=unaligned_mamba_page_size), + ref(tensor_idx=2, page_size_bytes=unaligned_mamba_page_size), + ], + # aligned mamba group: layers [0..1] → tensors 0,1 with full page size + [ + ref(tensor_idx=0, page_size_bytes=PAGE_SIZE_BYTES), + ref(tensor_idx=1, page_size_bytes=PAGE_SIZE_BYTES), + ], + ] + + # Verify block tensors + assert len(canonical.tensors) == len(expected_tensors) + for block_tensor, (exp_num_blocks, exp_page_size) in zip( + canonical.tensors, expected_tensors + ): + tensor = block_tensor.tensor + assert tensor.dtype == torch.int8 + assert tensor.shape == (exp_num_blocks, exp_page_size) + assert block_tensor.page_size_bytes == exp_page_size + + # Verify group data refs + assert len(canonical.group_data_refs) == len(expected_group_refs) + for actual_refs, exp_refs in zip(canonical.group_data_refs, expected_group_refs): + assert len(actual_refs) == len(exp_refs) + for actual, expected in zip(actual_refs, exp_refs): + assert actual.tensor_idx == expected.tensor_idx + assert actual.page_size_bytes == expected.page_size_bytes + + +@pytest.mark.parametrize("backend", ATTN_BACKENDS) +@patch( + "vllm.distributed.kv_transfer.kv_connector.v1.offloading" + ".worker.get_layers_from_vllm_config" +) +def test_register_kv_caches_uniform_type(mock_get_layers, backend): + """Test register_kv_caches with UniformTypeKVCacheSpecs. + + Two attention layers use the same backend but different num_kv_heads, + giving them different per-layer page sizes. Each has its own + KVCacheTensor and are wrapped in a UniformTypeKVCacheSpecs group. + Verifies that each layer gets the correct tensor_idx and + page_size_bytes in its block data ref. + """ + from vllm.v1.worker.utils import AttentionGroup + + backend_cls = AttentionBackendEnum[backend].get_class() + + layer_a = "model.layers.0.self_attn" + layer_b = "model.layers.1.self_attn" + spec_a = FullAttentionSpec( + block_size=BLOCK_SIZE, + num_kv_heads=NUM_KV_HEADS, + head_size=HEAD_SIZE, + dtype=DTYPE, + ) + spec_b = FullAttentionSpec( + block_size=BLOCK_SIZE, + num_kv_heads=NUM_KV_HEADS * 2, + head_size=HEAD_SIZE, + dtype=DTYPE, + ) + assert spec_a.page_size_bytes != spec_b.page_size_bytes + + uniform_spec = UniformTypeKVCacheSpecs( + block_size=BLOCK_SIZE, + kv_cache_specs={layer_a: spec_a, layer_b: spec_b}, + ) + + kv_cache_config = KVCacheConfig( + num_blocks=NUM_BLOCKS, + kv_cache_tensors=[ + KVCacheTensor( + size=spec_a.page_size_bytes * NUM_BLOCKS, + shared_by=[layer_a], + ), + KVCacheTensor( + size=spec_b.page_size_bytes * NUM_BLOCKS, + shared_by=[layer_b], + ), + ], + kv_cache_groups=[ + KVCacheGroupSpec( + layer_names=[layer_a, layer_b], + kv_cache_spec=uniform_spec, + ) + ], + ) + + attn_groups = [ + [ + AttentionGroup( + backend=backend_cls, + layer_names=[layer_a], + kv_cache_spec=spec_a, + kv_cache_group_id=0, + ), + AttentionGroup( + backend=backend_cls, + layer_names=[layer_b], + kv_cache_spec=spec_b, + kv_cache_group_id=0, + ), + ] + ] + + kv_caches = _allocate_and_reshape_kv_caches( + kv_cache_config, + attn_groups, + device=torch.device("cuda:0"), + ) + + mock_get_layers.return_value = { + layer_a: _make_mock_layer(backend_cls), + layer_b: _make_mock_layer(backend_cls), + } + + worker, spec = _make_worker(kv_cache_config) + worker.register_kv_caches(kv_caches) + + canonical = spec.get_handlers.call_args[0][0] + assert isinstance(canonical, CanonicalKVCaches) + + unbinds = backend_cls.get_name() in ("FLASH_ATTN", "FLEX_ATTENTION") + tensors_per_layer = 2 if unbinds else 1 + + for block_tensor in canonical.tensors: + assert block_tensor.tensor.dtype == torch.int8 + + # Single group with refs from both layers + assert len(canonical.group_data_refs) == 1 + group_refs = canonical.group_data_refs[0] + assert len(group_refs) == 2 * tensors_per_layer + + if unbinds: + half_a = spec_a.page_size_bytes // 2 + half_b = spec_b.page_size_bytes // 2 + + assert len(canonical.tensors) == 4 + assert canonical.tensors[0].page_size_bytes == half_a + assert canonical.tensors[1].page_size_bytes == half_a + assert canonical.tensors[2].page_size_bytes == half_b + assert canonical.tensors[3].page_size_bytes == half_b + assert canonical.tensors[0].tensor.shape == (NUM_BLOCKS, half_a) + assert canonical.tensors[1].tensor.shape == (NUM_BLOCKS, half_a) + assert canonical.tensors[2].tensor.shape == (NUM_BLOCKS, half_b) + assert canonical.tensors[3].tensor.shape == (NUM_BLOCKS, half_b) + + assert group_refs[0] == CanonicalKVCacheRef( + tensor_idx=0, page_size_bytes=half_a + ) + assert group_refs[1] == CanonicalKVCacheRef( + tensor_idx=1, page_size_bytes=half_a + ) + assert group_refs[2] == CanonicalKVCacheRef( + tensor_idx=2, page_size_bytes=half_b + ) + assert group_refs[3] == CanonicalKVCacheRef( + tensor_idx=3, page_size_bytes=half_b + ) + else: + assert len(canonical.tensors) == 2 + assert canonical.tensors[0].page_size_bytes == spec_a.page_size_bytes + assert canonical.tensors[1].page_size_bytes == spec_b.page_size_bytes + assert canonical.tensors[0].tensor.shape == (NUM_BLOCKS, spec_a.page_size_bytes) + assert canonical.tensors[1].tensor.shape == (NUM_BLOCKS, spec_b.page_size_bytes) + + assert group_refs[0] == CanonicalKVCacheRef( + tensor_idx=0, page_size_bytes=spec_a.page_size_bytes + ) + assert group_refs[1] == CanonicalKVCacheRef( + tensor_idx=1, page_size_bytes=spec_b.page_size_bytes + ) diff --git a/tests/v1/kv_connector/unit/test_offloading_connector.py b/tests/v1/kv_connector/unit/offloading_connector/utils.py similarity index 52% rename from tests/v1/kv_connector/unit/test_offloading_connector.py rename to tests/v1/kv_connector/unit/offloading_connector/utils.py index ba65f5bad7ff..e051f239773c 100644 --- a/tests/v1/kv_connector/unit/test_offloading_connector.py +++ b/tests/v1/kv_connector/unit/offloading_connector/utils.py @@ -9,16 +9,17 @@ import pytest import torch +from tests.v1.kv_connector.unit.utils import ( + EOS_TOKEN_ID, + create_model_runner_output, + create_vllm_config, +) from vllm import SamplingParams -from vllm.config import KVTransferConfig, VllmConfig -from vllm.distributed.kv_events import BlockRemoved, BlockStored +from vllm.config import KVTransferConfig, VllmConfig, set_current_vllm_config from vllm.distributed.kv_transfer.kv_connector.v1 import KVConnectorRole from vllm.distributed.kv_transfer.kv_connector.v1.offloading.common import ( OffloadingConnectorMetadata, ) -from vllm.distributed.kv_transfer.kv_connector.v1.offloading.metrics import ( - OffloadingConnectorStats, -) from vllm.distributed.kv_transfer.kv_connector.v1.offloading_connector import ( OffloadingConnector, ) @@ -39,7 +40,6 @@ ) from vllm.v1.kv_offload.abstract import ( LoadStoreSpec, - OffloadingEvent, OffloadingManager, PrepareStoreOutput, ) @@ -51,15 +51,9 @@ TransferSpec, ) from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, KVConnectorOutput -from vllm.v1.request import Request, RequestStatus +from vllm.v1.request import Request from vllm.v1.structured_output import StructuredOutputManager -from .utils import ( - EOS_TOKEN_ID, - create_model_runner_output, - create_vllm_config, -) - class MockLoadStoreSpec(LoadStoreSpec): def __init__(self, block_hashes: Iterable[BlockHash]): @@ -125,7 +119,7 @@ def get_manager(self) -> OffloadingManager: return self.manager def get_handlers( - self, _, __ + self, _ ) -> Iterator[tuple[type[LoadStoreSpec], type[LoadStoreSpec], OffloadingHandler]]: yield GPULoadStoreSpec, MockLoadStoreSpec, self.handler yield MockLoadStoreSpec, GPULoadStoreSpec, self.handler @@ -179,7 +173,7 @@ def __init__( kv_role="kv_both", kv_connector_extra_config={ "spec_name": "MockOffloadingSpec", - "spec_module_path": "tests.v1.kv_connector.unit.test_offloading_connector", # noqa: E501 + "spec_module_path": "tests.v1.kv_connector.unit.offloading_connector.utils", # noqa: E501 "block_size": offloaded_block_size, }, ) @@ -217,10 +211,12 @@ def __init__( ) # register worker kv_caches to enable OffloadingWorker creations - self.worker_connector.register_cross_layers_kv_cache( - kv_cache=torch.empty(0), - attn_backend=FlashAttentionBackend, - ) + # set_current_vllm_config is needed for get_kv_cache_layout() to work + with set_current_vllm_config(vllm_config): + self.worker_connector.register_cross_layers_kv_cache( + kv_cache=torch.empty(0), + attn_backend=FlashAttentionBackend, + ) # extract connector of scheduler scheduler_connector = self.scheduler.connector @@ -521,471 +517,3 @@ def generate_store_output(block_hashes: Iterable[BlockHash]): store_spec=MockLoadStoreSpec(block_hashes), block_hashes_evicted=[], ) - - -@pytest.mark.parametrize("async_scheduling", [True, False]) -def test_offloading_connector(request_runner, async_scheduling: bool): - offloaded_block_size = 12 - gpu_block_size = 4 - num_gpu_blocks = 100 - block_size_factor = offloaded_block_size // gpu_block_size - - runner = request_runner( - offloaded_block_size=offloaded_block_size, - gpu_block_size=gpu_block_size, - num_gpu_blocks=num_gpu_blocks, - async_scheduling=async_scheduling, - ) - - # 3 blocks, store just the middle block (skip first and last) - # blocks = [0, 1, 2], [3, 4, 5], [6, 7, 8] - runner.new_request(token_ids=[0] * offloaded_block_size * 3) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output(list(block_hashes)[1:2]) - ) - runner.run(decoded_tokens=[0]) - - # add block missing 1 token -> no offload - runner.run( - decoded_tokens=[0] * (offloaded_block_size - 1), - expected_stored_gpu_block_indexes=(3, 4, 5), - ) - runner.manager.prepare_store.assert_not_called() - - # +1 token -> single block, fail prepare_store - runner.manager.prepare_store.side_effect = lambda block_hashes: None - runner.run(decoded_tokens=[0]) - runner.manager.prepare_store.assert_called() - - # 1 more block (+ token for async scheduling) - # now set block_hashes_to_store = [] - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output([]) - ) - runner.run(decoded_tokens=[0] * (offloaded_block_size + 1)) - - # 1 more block (+ token for kicking off offloading) - # now check touch was called with all 6 blocks - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output(block_hashes) - ) - runner.run( - decoded_tokens=[0] * (offloaded_block_size + 1), - expected_stored_gpu_block_indexes=(15, 16, 17), - ) - runner.manager.touch.assert_called() - block_hashes1 = list(runner.manager.touch.call_args.args[0]) - assert len(block_hashes1) == 6 - - # terminate request - runner.run(decoded_tokens=[EOS_TOKEN_ID]) - - # create a new request differing only on the last token - runner.new_request(token_ids=[0] * (offloaded_block_size * 6 - 1) + [1]) - runner.run(decoded_tokens=[0]) - runner.manager.touch.assert_called() - block_hashes2 = list(runner.manager.touch.call_args.args[0]) - assert len(block_hashes2) == 6 - - # verify hashes are the same, except for the last block - assert block_hashes1[:5] == block_hashes2[:5] - assert block_hashes1[5] != block_hashes2[5] - - # terminate request - runner.run( - decoded_tokens=[EOS_TOKEN_ID], - expected_stored_gpu_block_indexes=tuple(range(6 * block_size_factor)), - ) - - # full_block_tokens - num_computed_tokens < offloaded_block_size - runner.new_request( - token_ids=[0] * gpu_block_size + [1] * (offloaded_block_size - gpu_block_size) - ) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output([]) - ) - runner.run(decoded_tokens=[EOS_TOKEN_ID]) - runner.manager.lookup.assert_not_called() - - # single block lookup with no hits - runner.new_request(token_ids=[1] * offloaded_block_size) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output([]) - ) - runner.run(decoded_tokens=[EOS_TOKEN_ID]) - runner.manager.lookup.assert_called() - assert len(list(runner.manager.lookup.call_args.args[0])) == 1 - - # single block lookup with a hit - runner.scheduler.reset_prefix_cache() - runner.new_request(token_ids=[0] * offloaded_block_size) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output([]) - ) - runner.manager.lookup.return_value = 1 - runner.run( - decoded_tokens=[EOS_TOKEN_ID], expected_loaded_gpu_block_indexes=(0, 1, 2) - ) - - # single block lookup with a hit in a middle block - runner.new_request( - token_ids=[0] * offloaded_block_size * 2 + [1] * offloaded_block_size - ) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output([]) - ) - runner.manager.lookup.return_value = 1 - runner.run( - decoded_tokens=[EOS_TOKEN_ID], expected_loaded_gpu_block_indexes=(3, 4, 5) - ) - - # test take_events - def to_hashes(int_hashes: list[int]) -> list[BlockHash]: - return [BlockHash(str(i).encode()) for i in int_hashes] - - def take_events() -> Iterable[OffloadingEvent]: - yield OffloadingEvent( - block_hashes=to_hashes([1, 2, 3]), block_size=16, medium="A", removed=False - ) - yield OffloadingEvent( - block_hashes=to_hashes([4, 5, 6]), block_size=32, medium="B", removed=True - ) - - runner.manager.take_events.side_effect = take_events - events = list(runner.scheduler_connector.take_events()) - assert len(events) == 2 - event = events[0] - assert isinstance(event, BlockStored) - assert event.block_hashes == to_hashes([1, 2, 3]) - assert event.block_size == 16 - assert event.medium == "A" - assert event.token_ids == [] - assert event.parent_block_hash is None - assert event.lora_id is None - assert event.lora_name is None - event = events[1] - assert isinstance(event, BlockRemoved) - assert event.block_hashes == to_hashes([4, 5, 6]) - assert event.medium == "B" - - -@pytest.mark.parametrize("async_scheduling", [True, False]) -def test_request_preemption(request_runner, async_scheduling: bool): - offloaded_block_size = 12 - gpu_block_size = 4 - num_gpu_blocks = 100 - - runner = request_runner( - offloaded_block_size=offloaded_block_size, - gpu_block_size=gpu_block_size, - num_gpu_blocks=num_gpu_blocks, - async_scheduling=async_scheduling, - ) - - free_block_queue = runner.scheduler.kv_cache_manager.block_pool.free_block_queue - num_free_blocks_empty = free_block_queue.num_free_blocks - - # 2 blocks, store all, without flushing - # blocks = [0, 1, 2], [3, 4, 5] - runner.new_request(token_ids=[0] * offloaded_block_size * 2) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output(block_hashes) - ) - runner.run( - decoded_tokens=[0], - complete_transfers=False, - ) - - # decode 2 more blocks - 1 gpu block, storing [6, 7, 8] (no flush) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output(block_hashes) - ) - runner.run( - decoded_tokens=[0] * (2 * offloaded_block_size - gpu_block_size), - complete_transfers=False, - ) - - # simulate KV cache running out of space - free_block_queue.num_free_blocks = 0 - - # request should be preempted now - runner.run( - decoded_tokens=[], - complete_transfers=False, - expected_flushed_gpu_block_indexes=(0, 1, 2, 3, 4, 5, 6, 7, 8), - expected_stored_gpu_block_indexes=(0, 1, 2, 3, 4, 5, 6, 7, 8), - ) - - # restore KV cache space and reset GPU prefix cache - free_block_queue.num_free_blocks = num_free_blocks_empty - runner.scheduler.reset_prefix_cache() - - # request should now return from preemption - # re-load [0, ..., 8] from the CPU and store [9, 10, 11] - runner.manager.lookup.return_value = 3 - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output(block_hashes) - ) - runner.run( - decoded_tokens=[0] * gpu_block_size, - expected_loaded_gpu_block_indexes=(0, 1, 2, 3, 4, 5, 6, 7, 8), - ) - - runner.run( - decoded_tokens=[EOS_TOKEN_ID], - expected_stored_gpu_block_indexes=(9, 10, 11), - ) - - -@pytest.mark.parametrize("async_scheduling", [True, False]) -def test_concurrent_lookups_of_the_same_prefix(request_runner, async_scheduling: bool): - offloaded_block_size = 12 - gpu_block_size = 4 - num_gpu_blocks = 100 - - runner = request_runner( - offloaded_block_size=offloaded_block_size, - gpu_block_size=gpu_block_size, - num_gpu_blocks=num_gpu_blocks, - async_scheduling=async_scheduling, - ) - - # store 1 blocks - runner.new_request(token_ids=[0] * offloaded_block_size) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output(block_hashes) - ) - runner.run( - decoded_tokens=[EOS_TOKEN_ID], - expected_stored_gpu_block_indexes=(0, 1, 2), - ) - - # start a request to load the first block, but don't complete - runner.scheduler.reset_prefix_cache() - runner.new_request(token_ids=[0] * offloaded_block_size) - runner.manager.lookup.return_value = 1 - runner.run( - decoded_tokens=[], - complete_transfers=False, - ) - - # request triggered a load - transfer_jobs = list(runner.offloading_spec.handler.transfer_specs) - assert transfer_jobs - - # start a new request to load the same first block - runner.new_request(token_ids=[0] * offloaded_block_size) - runner.manager.lookup.return_value = 1 - runner.run( - decoded_tokens=[], - complete_transfers=False, - ) - - # request did not trigger a load - assert transfer_jobs == list(runner.offloading_spec.handler.transfer_specs) - - # complete transfers - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output([]) - ) - runner.run( - decoded_tokens=[EOS_TOKEN_ID], - expected_loaded_gpu_block_indexes=(0, 1, 2), - ) - - # second request will use the GPU prefix cache - assert transfer_jobs == list(runner.offloading_spec.handler.transfer_specs) - - -@pytest.mark.parametrize("async_scheduling", [True, False]) -def test_abort_loading_requests(request_runner, async_scheduling: bool): - offloaded_block_size = 12 - gpu_block_size = 4 - num_gpu_blocks = 100 - - runner = request_runner( - offloaded_block_size=offloaded_block_size, - gpu_block_size=gpu_block_size, - num_gpu_blocks=num_gpu_blocks, - async_scheduling=async_scheduling, - ) - - # store 1 blocks - runner.new_request(token_ids=[0] * offloaded_block_size) - runner.manager.prepare_store.side_effect = ( - lambda block_hashes: generate_store_output(block_hashes) - ) - runner.run( - decoded_tokens=[EOS_TOKEN_ID], - expected_stored_gpu_block_indexes=(0, 1, 2), - ) - - # start a request to load the first block, but don't complete - runner.scheduler.reset_prefix_cache() - runner.new_request(token_ids=[0] * offloaded_block_size) - runner.manager.lookup.return_value = 1 - runner.run( - decoded_tokens=[], - complete_transfers=False, - ) - - # request triggered a load - transfer_jobs = list(runner.offloading_spec.handler.transfer_specs) - assert transfer_jobs - - # abort request - req_id = str(runner.req_id) - runner.scheduler.finish_requests((req_id,), RequestStatus.FINISHED_ABORTED) - - # verify request is not deleted - assert req_id in runner.scheduler.requests - - # complete loading request - runner.run( - decoded_tokens=[], - expected_loaded_gpu_block_indexes=(0, 1, 2), - ) - - # assert request is deleted - assert req_id not in runner.scheduler.requests - - -class TestOffloadingConnectorStats: - """Tests for OffloadingConnector stats reconstruction and operations.""" - - def test_build_kv_connector_stats_with_none(self): - """Test that build_kv_connector_stats returns empty stats when given None.""" - stats = OffloadingConnector.build_kv_connector_stats(data=None) - - assert stats is not None - assert isinstance(stats, OffloadingConnectorStats) - assert len(stats.data) == 0 - assert stats.is_empty() - - def test_build_kv_connector_stats_with_empty_dict(self): - """Test that build_kv_connector_stats returns empty stats with empty dict.""" - stats = OffloadingConnector.build_kv_connector_stats(data={}) - - assert stats is not None - assert isinstance(stats, OffloadingConnectorStats) - assert len(stats.data) == 0 - assert stats.is_empty() - - def test_build_kv_connector_stats_reconstructs_offload_stats(self): - """Test that OffloadingConnector stats are properly reconstructed with - correct data.""" - serialized_data = { - "CPU_to_GPU": [ - {"op_size": 16, "op_time": 1.0}, - {"op_size": 8, "op_time": 0.5}, - ], - "GPU_to_CPU": [ - {"op_size": 1, "op_time": 0.1}, - {"op_size": 2, "op_time": 0.2}, - ], - } - - stats = OffloadingConnector.build_kv_connector_stats(data=serialized_data) - - offload_connector_stats = stats - assert isinstance(offload_connector_stats, OffloadingConnectorStats) - assert offload_connector_stats.data["CPU_to_GPU"] == [ - {"op_size": 16, "op_time": 1.0}, - {"op_size": 8, "op_time": 0.5}, - ] - assert offload_connector_stats.data["GPU_to_CPU"] == [ - {"op_size": 1, "op_time": 0.1}, - {"op_size": 2, "op_time": 0.2}, - ] - - def test_aggregate_same_connector(self): - """Test aggregating stats from the same connector type.""" - stats1 = OffloadingConnectorStats( - data={ - "CPU_to_GPU": [ - {"op_size": 16, "op_time": 1.0}, - {"op_size": 8, "op_time": 0.5}, - ], - "GPU_to_CPU": [ - {"op_size": 1, "op_time": 0.1}, - {"op_size": 2, "op_time": 0.2}, - ], - } - ) - - stats2 = OffloadingConnectorStats( - data={ - "CPU_to_GPU": [ - {"op_size": 3, "op_time": 0.2}, - {"op_size": 7, "op_time": 0.9}, - ], - "GPU_to_CPU": [{"op_size": 16, "op_time": 2}], - } - ) - - result = stats1.aggregate(stats2) - - assert result is stats1 # Should return self - offload_connector_stats = result - assert offload_connector_stats.data["CPU_to_GPU"] == [ - {"op_size": 16, "op_time": 1.0}, - {"op_size": 8, "op_time": 0.5}, - {"op_size": 3, "op_time": 0.2}, - {"op_size": 7, "op_time": 0.9}, - ] - assert offload_connector_stats.data["GPU_to_CPU"] == [ - {"op_size": 1, "op_time": 0.1}, - {"op_size": 2, "op_time": 0.2}, - {"op_size": 16, "op_time": 2}, - ] - - def test_reduce(self): - """Test that reduce() correctly reduces all nested connector stats.""" - stats = OffloadingConnectorStats( - data={ - "CPU_to_GPU": [ - {"op_size": 16, "op_time": 1.0}, - {"op_size": 8, "op_time": 0.5}, - {"op_size": 3, "op_time": 0.2}, - {"op_size": 7, "op_time": 0.9}, - ], - "GPU_to_CPU": [ - {"op_size": 1, "op_time": 0.1}, - {"op_size": 2, "op_time": 0.2}, - {"op_size": 16, "op_time": 2}, - ], - } - ) - - reduced = stats.reduce() - - assert isinstance(reduced, dict) - # Check that the stats were reduced (should have aggregated values) - assert "CPU_to_GPU_total_bytes" in reduced - assert "CPU_to_GPU_total_time" in reduced - assert "GPU_to_CPU_total_bytes" in reduced - assert "GPU_to_CPU_total_time" in reduced - assert reduced["CPU_to_GPU_total_bytes"] == 34 - assert reduced["CPU_to_GPU_total_time"] == 2.6 - assert reduced["GPU_to_CPU_total_time"] == 2.3 - assert reduced["GPU_to_CPU_total_bytes"] == 19 - - def test_reset(self): - """Test that reset() resets all nested connector stats.""" - offload_connector_stats = OffloadingConnectorStats( - data={ - "CPU_to_GPU": [ - {"op_size": 3, "op_time": 0.2}, - {"op_size": 7, "op_time": 0.9}, - ], - "GPU_to_CPU": [{"op_size": 16, "op_time": 2}], - } - ) - - assert not offload_connector_stats.is_empty() - - offload_connector_stats.reset() - - # After reset, stats should be empty - assert offload_connector_stats.is_empty() - assert len(offload_connector_stats.data) == 0 diff --git a/tests/v1/kv_connector/unit/test_nixl_connector.py b/tests/v1/kv_connector/unit/test_nixl_connector.py index 472599747087..b4ee97cd1d74 100644 --- a/tests/v1/kv_connector/unit/test_nixl_connector.py +++ b/tests/v1/kv_connector/unit/test_nixl_connector.py @@ -91,6 +91,9 @@ def clear_kv_transfer(): yield if has_kv_transfer_group(): ensure_kv_transfer_shutdown() + # Reset any KV cache layout override set during tests so it doesn't + # leak into tests in other modules. + set_kv_cache_layout(None) def get_default_xfer_telemetry( diff --git a/tests/v1/kv_offload/test_cpu_gpu.py b/tests/v1/kv_offload/test_cpu_gpu.py index 3f4ef7d07f98..1983cca22d8f 100644 --- a/tests/v1/kv_offload/test_cpu_gpu.py +++ b/tests/v1/kv_offload/test_cpu_gpu.py @@ -6,32 +6,20 @@ import pytest import torch -from vllm.platforms import current_platform from vllm.utils.torch_utils import set_random_seed -from vllm.v1.attention.backends.flash_attn import FlashAttentionBackend from vllm.v1.kv_offload.mediums import CPULoadStoreSpec, GPULoadStoreSpec +from vllm.v1.kv_offload.spec import ( + CanonicalKVCacheRef, + CanonicalKVCaches, + CanonicalKVCacheTensor, +) from vllm.v1.kv_offload.worker.cpu_gpu import CpuGpuOffloadingHandlers -BACKENDS_TO_TEST = [FlashAttentionBackend] - -if not current_platform.is_rocm(): - from vllm.v1.attention.backends.flashinfer import FlashInferBackend - - BACKENDS_TO_TEST.append(FlashInferBackend) - - from vllm.v1.attention.backends.mla.flashattn_mla import FlashAttnMLABackend - - BACKENDS_TO_TEST.append(FlashAttnMLABackend) - NUM_GPU_BLOCKS = [64] NUM_CPU_BLOCKS = [256] -KERNEL_BLOCK_SIZES = [16] -LOGICAL_BLOCK_SIZES = [16, 32] -LOGICAL_BLOCKS_PER_CPU_BLOCK = [1, 3] -HEAD_SIZES = [64] -NUM_HEADS = [8] -NUM_LAYERS = [4] -DTYPES = [torch.bfloat16] +GPU_PAGE_SIZES = [512, 1024] +BLOCK_SIZE_FACTORS = [1, 3] +NUM_TENSORS = [4] SEEDS = [0] CUDA_DEVICES = ["cuda:0"] NUM_MAPPINGS = [3] @@ -39,15 +27,11 @@ @pytest.mark.parametrize("gpu_to_cpu", [True, False]) @pytest.mark.parametrize("num_mappings", NUM_MAPPINGS) -@pytest.mark.parametrize("head_size", HEAD_SIZES) -@pytest.mark.parametrize("num_heads", NUM_HEADS) -@pytest.mark.parametrize("kernel_block_size", KERNEL_BLOCK_SIZES) -@pytest.mark.parametrize("logical_block_size", LOGICAL_BLOCK_SIZES) -@pytest.mark.parametrize("logical_blocks_per_cpu_block", LOGICAL_BLOCKS_PER_CPU_BLOCK) +@pytest.mark.parametrize("gpu_page_size_bytes", GPU_PAGE_SIZES) +@pytest.mark.parametrize("block_size_factor", BLOCK_SIZE_FACTORS) @pytest.mark.parametrize("num_gpu_blocks", NUM_GPU_BLOCKS) @pytest.mark.parametrize("num_cpu_blocks", NUM_CPU_BLOCKS) -@pytest.mark.parametrize("num_layers", NUM_LAYERS) -@pytest.mark.parametrize("dtype", DTYPES) +@pytest.mark.parametrize("num_tensors", NUM_TENSORS) @pytest.mark.parametrize("seed", SEEDS) @pytest.mark.parametrize("device", CUDA_DEVICES) @torch.inference_mode() @@ -55,113 +39,89 @@ def test_transfer( default_vllm_config, gpu_to_cpu: bool, num_mappings: int, - head_size: int, - num_heads: int, - kernel_block_size: int, - logical_block_size: int, - logical_blocks_per_cpu_block: int, + gpu_page_size_bytes: int, + block_size_factor: int, num_gpu_blocks: int, num_cpu_blocks: int, - num_layers: int, - dtype: torch.dtype, + num_tensors: int, seed: int, device: str, ) -> None: set_random_seed(seed) - # create per-layer GPU KV caches based on available attn_backends - attn_backends_list = BACKENDS_TO_TEST - - assert logical_block_size % kernel_block_size == 0 - kernel_blocks_per_gpu_block = logical_block_size // kernel_block_size - num_gpu_kernel_blocks = num_gpu_blocks * kernel_blocks_per_gpu_block - - gpu_caches = {} - attn_backends = {} - for i in range(num_layers): - layer_name = f"layer {i}" - - attn_backend = attn_backends_list[i % len(attn_backends_list)] - attn_backends[layer_name] = attn_backend - - gpu_cache_shape = attn_backend.get_kv_cache_shape( - num_gpu_kernel_blocks, kernel_block_size, num_heads, head_size + # build CanonicalKVCacheTensor list: one per tensor + kv_cache_tensors: list[CanonicalKVCacheTensor] = [] + for i in range(num_tensors): + gpu_tensor = torch.randint( + -128, + 127, + (num_gpu_blocks, gpu_page_size_bytes), + dtype=torch.int8, + device=device, + ) + kv_cache_tensors.append( + CanonicalKVCacheTensor( + tensor=gpu_tensor, + page_size_bytes=gpu_page_size_bytes, + ) ) - gpu_caches[layer_name] = torch.rand(gpu_cache_shape, dtype=dtype, device=device) - # create handler - cpu_block_size = logical_blocks_per_cpu_block * logical_block_size - kernel_blocks_per_cpu_block = cpu_block_size // kernel_block_size + # one group containing all tensors, one data ref per tensor + kv_cache_groups_data_refs: list[list[CanonicalKVCacheRef]] = [ + [ + CanonicalKVCacheRef( + tensor_idx=i, + page_size_bytes=gpu_page_size_bytes, + ) + for i in range(num_tensors) + ] + ] + + kv_caches = CanonicalKVCaches( + tensors=kv_cache_tensors, + group_data_refs=kv_cache_groups_data_refs, + ) handlers = CpuGpuOffloadingHandlers( - attn_backends=attn_backends, - gpu_block_size=logical_block_size, - cpu_block_size=cpu_block_size, + kv_caches=kv_caches, + block_size_factor=block_size_factor, num_cpu_blocks=num_cpu_blocks, - gpu_caches=gpu_caches, ) # select block mappings - gpu_blocks = random.sample( - range(num_gpu_blocks), num_mappings * logical_blocks_per_cpu_block - ) + gpu_blocks = random.sample(range(num_gpu_blocks), num_mappings * block_size_factor) cpu_blocks = random.sample(range(num_cpu_blocks), num_mappings) - # convert gpu blocks to kernel block size - gpu_blocks_in_kernel_block_size = [] - for gpu_block in gpu_blocks: - base_block_id = gpu_block * kernel_blocks_per_gpu_block - for i in range(kernel_blocks_per_gpu_block): - gpu_blocks_in_kernel_block_size.append(i + base_block_id) - - # convert cpu blocks to gpu block size - cpu_blocks_in_kernel_block_size = [] - for cpu_block in cpu_blocks: - base_block_id = cpu_block * kernel_blocks_per_cpu_block - for i in range(kernel_blocks_per_cpu_block): - cpu_blocks_in_kernel_block_size.append(i + base_block_id) - - # maybe skip some GPU block to test reading from the middle of a CPU block + # expand cpu blocks to gpu-page granularity for uniform comparison: + # each cpu block maps to block_size_factor consecutive sub-blocks + cpu_blocks_expanded = [ + cpu_block * block_size_factor + j + for cpu_block in cpu_blocks + for j in range(block_size_factor) + ] + + # maybe skip some GPU blocks to test reading from the middle of a CPU block if not gpu_to_cpu: - gpu_blocks_to_skip = logical_blocks_per_cpu_block - 1 - gpu_blocks = gpu_blocks[gpu_blocks_to_skip:] - kernel_blocks_to_skip = gpu_blocks_to_skip * kernel_blocks_per_gpu_block - gpu_blocks_in_kernel_block_size = gpu_blocks_in_kernel_block_size[ - kernel_blocks_to_skip: - ] - cpu_blocks_in_kernel_block_size = cpu_blocks_in_kernel_block_size[ - kernel_blocks_to_skip: - ] + blocks_to_skip = block_size_factor - 1 + gpu_blocks = gpu_blocks[blocks_to_skip:] + cpu_blocks_expanded = cpu_blocks_expanded[blocks_to_skip:] # set transfer direction if gpu_to_cpu: handler = handlers.gpu_to_cpu_handler - src_blocks = gpu_blocks - dst_blocks = cpu_blocks - src_spec = GPULoadStoreSpec(src_blocks, group_sizes=(len(src_blocks),)) - dst_spec = CPULoadStoreSpec(dst_blocks) - src_blocks_in_kernel_block_size = gpu_blocks_in_kernel_block_size - dst_blocks_in_kernel_block_size = cpu_blocks_in_kernel_block_size - dst_size_in_kernel_blocks = num_cpu_blocks * kernel_blocks_per_cpu_block + src_spec = GPULoadStoreSpec(gpu_blocks, group_sizes=(len(gpu_blocks),)) + dst_spec = CPULoadStoreSpec(cpu_blocks) + dst_to_src = dict(zip(cpu_blocks_expanded, gpu_blocks)) + num_dst_sub_blocks = num_cpu_blocks * block_size_factor else: handler = handlers.cpu_to_gpu_handler - src_blocks = cpu_blocks - dst_blocks = gpu_blocks - src_spec = CPULoadStoreSpec(src_blocks) - dst_spec = GPULoadStoreSpec(dst_blocks, group_sizes=(len(dst_blocks),)) - src_blocks_in_kernel_block_size = cpu_blocks_in_kernel_block_size - dst_blocks_in_kernel_block_size = gpu_blocks_in_kernel_block_size - dst_size_in_kernel_blocks = num_gpu_blocks * kernel_blocks_per_gpu_block - - # build dst -> src mapping - dst_to_src = {} - for src_block, dst_block in zip( - src_blocks_in_kernel_block_size, dst_blocks_in_kernel_block_size - ): - dst_to_src[dst_block] = src_block + src_spec = CPULoadStoreSpec(cpu_blocks) + dst_spec = GPULoadStoreSpec(gpu_blocks, group_sizes=(len(gpu_blocks),)) + dst_to_src = dict(zip(gpu_blocks, cpu_blocks_expanded)) + num_dst_sub_blocks = num_gpu_blocks # clone src and dst tensors before transfer - orig_src_caches = [x.clone() for x in handler.src_tensors] - orig_dst_caches = [x.clone() for x in handler.dst_tensors] + orig_src_tensors = [x.clone() for x in handler.src_tensors] + orig_dst_tensors = [x.clone() for x in handler.dst_tensors] # call transfer function start_time = time.time() @@ -180,11 +140,8 @@ def test_transfer( if gpu_to_cpu else ("CPU", "GPU") ) - assert ( - finished[0].transfer_size - == handler.total_block_size_in_bytes - * handler.dst_block_size_factor - * len(dst_blocks) + assert finished[0].transfer_size == ( + len(gpu_blocks) * handler.group_block_size_in_bytes[0] ) assert finished[0].transfer_time > 0 assert finished[0].transfer_time < (time.time() - start_time) @@ -192,19 +149,23 @@ def test_transfer( time.sleep(0.1) # verify src tensors did not change - for orig_tensor, tensor in zip(orig_src_caches, handler.src_tensors): + for orig_tensor, tensor in zip(orig_src_tensors, handler.src_tensors): assert torch.equal(orig_tensor, tensor) - # verify dst tensors - for dst_block in range(dst_size_in_kernel_blocks): - src_block_candidate = dst_to_src.get(dst_block) - for src_cache, dst_cache, orig_dst_cache in zip( - handler.src_tensors, - handler.dst_tensors, - orig_dst_caches, - ): - if src_block_candidate is not None: - expected_value = src_cache[src_block_candidate] + # verify dst tensors at gpu-page granularity. + for src_tensor, dst_tensor, orig_dst_tensor in zip( + handler.src_tensors, + handler.dst_tensors, + orig_dst_tensors, + ): + # view both GPU and CPU tensors as (n, gpu_page_size_bytes) for comparison. + src_view = src_tensor.view(-1, gpu_page_size_bytes) + dst_view = dst_tensor.view(-1, gpu_page_size_bytes) + orig_dst_view = orig_dst_tensor.view(-1, gpu_page_size_bytes) + for dst_sub_block in range(num_dst_sub_blocks): + src_sub_block = dst_to_src.get(dst_sub_block) + if src_sub_block is not None: + expected = src_view[src_sub_block] else: - expected_value = orig_dst_cache[dst_block] - torch.testing.assert_close(dst_cache[dst_block].cpu(), expected_value.cpu()) + expected = orig_dst_view[dst_sub_block] + torch.testing.assert_close(dst_view[dst_sub_block].cpu(), expected.cpu()) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/worker.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/worker.py index 63f1d0133f3c..77398eee8885 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading/worker.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading/worker.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project from collections import defaultdict +from dataclasses import replace import torch @@ -18,7 +19,17 @@ from vllm.logger import init_logger from vllm.model_executor.layers.attention_layer_base import AttentionLayerBase from vllm.v1.attention.backend import AttentionBackend -from vllm.v1.kv_offload.spec import OffloadingSpec +from vllm.v1.kv_cache_interface import ( + AttentionSpec, + MambaSpec, + UniformTypeKVCacheSpecs, +) +from vllm.v1.kv_offload.spec import ( + CanonicalKVCacheRef, + CanonicalKVCaches, + CanonicalKVCacheTensor, + OffloadingSpec, +) from vllm.v1.kv_offload.worker.worker import ( OffloadingWorker, TransferSpec, @@ -53,17 +64,13 @@ def _generate_job_id(self) -> int: self._job_counter = job_id + 1 return job_id - def _register_handlers( - self, - kv_caches: dict[str, torch.Tensor], - attn_backends: dict[str, type[AttentionBackend]], - ): - for src_cls, dst_cls, handler in self.spec.get_handlers( - kv_caches, attn_backends - ): + def _register_handlers(self, kv_caches: CanonicalKVCaches): + for src_cls, dst_cls, handler in self.spec.get_handlers(kv_caches): self.worker.register_handler(src_cls, dst_cls, handler) - def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]): + def register_kv_caches( + self, kv_caches: dict[str, torch.Tensor | list[torch.Tensor]] + ): layer_names = list(kv_caches.keys()) layers = get_layers_from_vllm_config( self.spec.vllm_config, @@ -73,16 +80,221 @@ def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]): attn_backends = { layer_name: layers[layer_name].get_attn_backend() for layer_name in layer_names + if layer_name in layers } - self._register_handlers(kv_caches, attn_backends) + + # layer_name -> list of matching KV cache tensors + # such that each tensor starts with the num_blocks dimension. + # FlashAttention layers which use the (2, num_blocks, ...) layout + # will possibly map to 2 tensors, one per K and one per V. + # All other layers will probably map to a single tensor. + tensors_per_block: dict[str, tuple[torch.Tensor, ...]] = {} + # layer_name -> size of (un-padded) page in bytes + unpadded_page_size_bytes: dict[str, int] = {} + # layer_name -> size of page in bytes + page_size_bytes: dict[str, int] = {} + for kv_cache_group in self.spec.kv_cache_config.kv_cache_groups: + group_layer_names = kv_cache_group.layer_names + group_kv_cache_spec = kv_cache_group.kv_cache_spec + if isinstance(group_kv_cache_spec, UniformTypeKVCacheSpecs): + per_layer_specs = group_kv_cache_spec.kv_cache_specs + else: + per_layer_specs = {} + for layer_name in group_layer_names: + layer_kv_cache_spec = per_layer_specs.get( + layer_name, group_kv_cache_spec + ) + if isinstance(layer_kv_cache_spec, AttentionSpec): + layer_kv_cache = kv_caches[layer_name] + assert isinstance(layer_kv_cache, torch.Tensor) + assert layer_kv_cache.storage_offset() == 0 + + # get the logical dimension for num_blocks + test_shape = attn_backends[layer_name].get_kv_cache_shape( + num_blocks=1234, + block_size=16, + num_kv_heads=1, + head_size=256, + ) + num_blocks_logical_dim = test_shape.index(1234) + + # sort the logical dimensions by stride (high to low) + # to get a physical-to-logical mapping: + # physical_to_logical[physical_pos] = logical_dim + logical_strides = layer_kv_cache.stride() + physical_to_logical = sorted( + range(len(logical_strides)), + key=lambda idx: logical_strides[idx], + reverse=True, + ) + + num_blocks_physical_dim = physical_to_logical.index( + num_blocks_logical_dim + ) + if num_blocks_physical_dim == 0: + num_blocks = layer_kv_cache.shape[num_blocks_logical_dim] + storage = layer_kv_cache.untyped_storage() + page = layer_kv_cache_spec.page_size_bytes + tensors_per_block[layer_name] = ( + torch.tensor( + [], + dtype=torch.int8, + device=layer_kv_cache.device, + ) + .set_(storage) + .view(num_blocks, page), + ) + page_size_bytes[layer_name] = ( + layer_kv_cache_spec.page_size_bytes + ) + else: + # Flash Attention case: (2, num_blocks, ...) + assert test_shape[0] == 2 + assert physical_to_logical[0] == 0 + assert num_blocks_physical_dim == 1 + + # unbind the tensor to separate K and V tensors + num_blocks = layer_kv_cache.shape[num_blocks_logical_dim] + half_page_size = layer_kv_cache_spec.page_size_bytes // 2 + storage = layer_kv_cache.untyped_storage() + raw = ( + torch.tensor( + [], + dtype=torch.int8, + device=layer_kv_cache.device, + ) + .set_(storage) + .view(2, num_blocks, half_page_size) + ) + tensors_per_block[layer_name] = tuple(raw.unbind(0)) + + page_size_bytes[layer_name] = half_page_size + + unpadded_page_size_bytes[layer_name] = page_size_bytes[layer_name] + + elif isinstance(layer_kv_cache_spec, MambaSpec): + state_tensors = kv_caches[layer_name] + assert isinstance(state_tensors, list) + + # re-construct the raw (num_blocks, page_size) tensor + # from the first state tensor + assert len(state_tensors) > 0 + first_state_tensor = state_tensors[0] + assert first_state_tensor.storage_offset() == 0 + num_blocks = first_state_tensor.shape[0] + tensor = ( + torch.tensor( + [], + dtype=torch.int8, + device=first_state_tensor.device, + ) + .set_(first_state_tensor.untyped_storage()) + .view((num_blocks, layer_kv_cache_spec.page_size_bytes)) + ) + tensors_per_block[layer_name] = (tensor,) + + page_size_bytes[layer_name] = layer_kv_cache_spec.page_size_bytes + unpadded_page_size_bytes[layer_name] = replace( + layer_kv_cache_spec, page_size_padded=None + ).page_size_bytes + + else: + raise NotImplementedError + + block_tensors: list[CanonicalKVCacheTensor] = [] + block_data_refs: dict[str, list[CanonicalKVCacheRef]] = defaultdict(list) + for kv_cache_tensor in self.spec.kv_cache_config.kv_cache_tensors: + tensor_layer_names = kv_cache_tensor.shared_by + + # verify all layers in the group reference the exact same tensors + assert len({len(tensors_per_block[n]) for n in tensor_layer_names}) == 1 + assert ( + len({tensors_per_block[n][0].data_ptr() for n in tensor_layer_names}) + == 1 + ) + assert ( + len({tensors_per_block[n][0].stride() for n in tensor_layer_names}) == 1 + ) + + # pick the first layer to represent the group + first_layer_name = tensor_layer_names[0] + for tensor in tensors_per_block[first_layer_name]: + block_tensors.append( + CanonicalKVCacheTensor( + tensor=tensor, + page_size_bytes=page_size_bytes[first_layer_name], + ) + ) + + curr_tensor_idx = len(block_tensors) - 1 + for layer_name in tensor_layer_names: + block_data_refs[layer_name].append( + CanonicalKVCacheRef( + tensor_idx=curr_tensor_idx, + page_size_bytes=(unpadded_page_size_bytes[layer_name]), + ) + ) + + group_data_refs: list[list[CanonicalKVCacheRef]] = [] + for kv_cache_group in self.spec.kv_cache_config.kv_cache_groups: + group_refs: list[CanonicalKVCacheRef] = [] + for layer_name in kv_cache_group.layer_names: + group_refs += block_data_refs[layer_name] + group_data_refs.append(group_refs) + + canonical_kv_caches = CanonicalKVCaches( + tensors=block_tensors, + group_data_refs=group_data_refs, + ) + + self._register_handlers(canonical_kv_caches) def register_cross_layers_kv_cache( self, kv_cache: torch.Tensor, attn_backend: type[AttentionBackend] ): - cross_layer_name = "ALL_LAYERS" - kv_caches = {cross_layer_name: kv_cache} - attn_backends = {cross_layer_name: attn_backend} - self._register_handlers(kv_caches, attn_backends) + # verify that num_blocks is at physical position 0 in the cross-layers + # tensor layout. + test_shape = attn_backend.get_kv_cache_shape( + num_blocks=1234, block_size=16, num_kv_heads=1, head_size=256 + ) + num_blocks_logical_dim = test_shape.index(1234) + 1 + physical_to_logical = attn_backend.get_kv_cache_stride_order( + include_num_layers_dimension=True + ) + num_blocks_physical_dim = physical_to_logical.index(num_blocks_logical_dim) + assert num_blocks_physical_dim == 0 + + kv_cache_groups = self.spec.kv_cache_config.kv_cache_groups + assert len(kv_cache_groups) == 1 + kv_cache_spec = kv_cache_groups[0].kv_cache_spec + num_layers = len(kv_cache_groups[0].layer_names) + page_size_bytes = kv_cache_spec.page_size_bytes * num_layers + + assert kv_cache.storage_offset() == 0 + storage = kv_cache.untyped_storage() + assert len(storage) % page_size_bytes == 0 + num_blocks = len(storage) // page_size_bytes + tensor = ( + torch.tensor( + [], + dtype=torch.int8, + device=kv_cache.device, + ) + .set_(storage) + .view(num_blocks, page_size_bytes) + ) + kv_cache_tensor = CanonicalKVCacheTensor( + tensor=tensor, page_size_bytes=page_size_bytes + ) + # in cross layers layout, there's currently only a single group + kv_cache_data_ref = CanonicalKVCacheRef( + tensor_idx=0, page_size_bytes=page_size_bytes + ) + canonical_kv_caches = CanonicalKVCaches( + tensors=[kv_cache_tensor], group_data_refs=[[kv_cache_data_ref]] + ) + + self._register_handlers(canonical_kv_caches) def handle_preemptions(self, kv_connector_metadata: OffloadingConnectorMetadata): for job_id, transfer_spec in self._unsubmitted_store_jobs: diff --git a/vllm/v1/attention/backends/utils.py b/vllm/v1/attention/backends/utils.py index c64355ff51c9..59f6ca9bf6e9 100644 --- a/vllm/v1/attention/backends/utils.py +++ b/vllm/v1/attention/backends/utils.py @@ -78,9 +78,10 @@ def get_kv_cache_layout(): return cache_layout -def set_kv_cache_layout(cache_layout: KVCacheLayoutType): +def set_kv_cache_layout(cache_layout: KVCacheLayoutType | None): global _KV_CACHE_LAYOUT_OVERRIDE _KV_CACHE_LAYOUT_OVERRIDE = cache_layout + get_kv_cache_layout.cache_clear() @dataclass diff --git a/vllm/v1/kv_offload/cpu/spec.py b/vllm/v1/kv_offload/cpu/spec.py index 810967077a40..4feae8cf7d5a 100644 --- a/vllm/v1/kv_offload/cpu/spec.py +++ b/vllm/v1/kv_offload/cpu/spec.py @@ -2,17 +2,14 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project from collections.abc import Iterator -import torch - from vllm.config import VllmConfig from vllm.platforms import current_platform -from vllm.v1.attention.backend import AttentionBackend from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.kv_offload.abstract import LoadStoreSpec, OffloadingManager from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager from vllm.v1.kv_offload.mediums import CPULoadStoreSpec, GPULoadStoreSpec from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager -from vllm.v1.kv_offload.spec import OffloadingSpec +from vllm.v1.kv_offload.spec import CanonicalKVCaches, OffloadingSpec from vllm.v1.kv_offload.worker.cpu_gpu import CpuGpuOffloadingHandlers from vllm.v1.kv_offload.worker.worker import OffloadingHandler @@ -90,9 +87,7 @@ def get_manager(self) -> OffloadingManager: return self._manager def get_handlers( - self, - kv_caches: dict[str, torch.Tensor], - attn_backends: dict[str, type[AttentionBackend]], + self, kv_caches: CanonicalKVCaches ) -> Iterator[tuple[type[LoadStoreSpec], type[LoadStoreSpec], OffloadingHandler]]: if not self._handlers: if not current_platform.is_cuda_alike(): @@ -100,15 +95,10 @@ def get_handlers( "CPU Offloading is currently only supported on CUDA-alike GPUs" ) - assert len(self.gpu_block_size) == 1 - gpu_block_size = self.gpu_block_size[0] - self._handlers = CpuGpuOffloadingHandlers( - attn_backends=attn_backends, - gpu_block_size=gpu_block_size, - cpu_block_size=gpu_block_size * self.block_size_factor, + kv_caches=kv_caches, + block_size_factor=self.block_size_factor, num_cpu_blocks=self.num_blocks, - gpu_caches=kv_caches, ) assert self._handlers is not None diff --git a/vllm/v1/kv_offload/spec.py b/vllm/v1/kv_offload/spec.py index 6d5c74985ae1..1eb4fdb3e6ce 100644 --- a/vllm/v1/kv_offload/spec.py +++ b/vllm/v1/kv_offload/spec.py @@ -2,12 +2,12 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project from abc import ABC, abstractmethod from collections.abc import Iterator +from dataclasses import dataclass from typing import TYPE_CHECKING import torch from vllm.logger import init_logger -from vllm.v1.attention.backend import AttentionBackend from vllm.v1.kv_offload.abstract import LoadStoreSpec, OffloadingManager from vllm.v1.kv_offload.worker.worker import OffloadingHandler @@ -18,6 +18,56 @@ logger = init_logger(__name__) +@dataclass +class CanonicalKVCacheTensor: + """ + A canonicalized KV cache tensor whose first dimension is num_blocks. + + For attention backends where the raw tensor has num_blocks at a + non-leading physical dimension (e.g. FlashAttention's + (2, num_blocks, ...) layout), the tensor is split so that each + resulting CanonicalKVCacheTensor starts with (num_blocks, ...). + """ + + # The KV cache tensor with shape (num_blocks, ...) + tensor: torch.Tensor + # The (possibly padded) page size per block in bytes + page_size_bytes: int + + +@dataclass +class CanonicalKVCacheRef: + """ + Per-layer (or group of layers) reference to a specific (by index) + CanonicalKVCacheTensor and records the un-padded page size used by that layer. + """ + + # Index into the list of CanonicalKVCacheTensor objects + tensor_idx: int + # The un-padded page size per block in bytes + page_size_bytes: int + + +@dataclass +class CanonicalKVCaches: + """ + Canonicalized block-level representation of the KV caches. + + Composed of: + - Unique list of KV cache data tensors, + each with shape (num_blocks, page_size_in_bytes) and int8 dtype. + - Per-group data references of the tensors. + i.e. how each KV cache group maps to the tensors. + """ + + # Ordered list of unique block tensors, each with shape + # (num_blocks, ...). + tensors: list[CanonicalKVCacheTensor] + # Per-KV-cache-group list of data references that map each layer + # in the group to the appropriate entry in the tensors list. + group_data_refs: list[list[CanonicalKVCacheRef]] + + class OffloadingSpec(ABC): """Spec for an offloading connector""" @@ -73,16 +123,13 @@ def get_manager(self) -> OffloadingManager: @abstractmethod def get_handlers( - self, - kv_caches: dict[str, torch.Tensor], - attn_backends: dict[str, type[AttentionBackend]], + self, kv_caches: CanonicalKVCaches ) -> Iterator[tuple[type[LoadStoreSpec], type[LoadStoreSpec], OffloadingHandler]]: """ Get offloading handlers along with their respective src and dst types. Args: - kv_caches: A dictionary of layer_name -> gpu_kv_cache tensor. - attn_backends: A dictionary of layer_name -> AttentionBackend. + kv_caches: Canonicalized KV caches. Yields: Tuples of (src_type, dst_type, offloading_handler). diff --git a/vllm/v1/kv_offload/worker/cpu_gpu.py b/vllm/v1/kv_offload/worker/cpu_gpu.py index 69a827a870b6..eeabf0cdadd7 100644 --- a/vllm/v1/kv_offload/worker/cpu_gpu.py +++ b/vllm/v1/kv_offload/worker/cpu_gpu.py @@ -9,8 +9,8 @@ from vllm import _custom_ops as ops from vllm.logger import init_logger from vllm.utils.platform_utils import is_pin_memory_available -from vllm.v1.attention.backend import AttentionBackend from vllm.v1.kv_offload.mediums import BlockIDsLoadStoreSpec +from vllm.v1.kv_offload.spec import CanonicalKVCacheRef, CanonicalKVCaches from vllm.v1.kv_offload.worker.worker import ( OffloadingHandler, TransferResult, @@ -73,39 +73,72 @@ class SingleDirectionOffloadingHandler(OffloadingHandler): def __init__( self, - src_tensors: list[torch.Tensor], - dst_tensors: list[torch.Tensor], - src_block_size_factor: int, - dst_block_size_factor: int, + gpu_tensors: list[torch.Tensor], + cpu_tensors: list[torch.Tensor], + block_size_factor: int, + kv_cache_groups_data_refs: list[list[CanonicalKVCacheRef]], + gpu_to_cpu: bool, ): """ Initialize a SingleDirectionOffloadingHandler. Args: - src_tensors: list of KV cache tensors to copy from. - dst_tensors: list of KV cache tensors to copy to. - Order should match src_tensors. - src_block_size_factor: The number of kernel blocks - per KV block in a source tensor. - dst_block_size_factor: The number of kernel blocks - per KV block in a destination tensor. + gpu_tensors: list of GPU KV cache tensors. + Each of shape (num_gpu_blocks, gpu_page_size_bytes) with dtype int8. + cpu_tensors: list of CPU KV cache tensors. + Each of shape (num_cpu_blocks, cpu_page_size_bytes) with dtype int8. + Order should match gpu_tensors. + kv_cache_groups_data_refs: list of CanonicalKVCacheRef per group. + gpu_to_cpu: if True, transfer from GPU to CPU; otherwise CPU to GPU. """ - assert len(src_tensors) == len(dst_tensors) + assert len(gpu_tensors) == len(cpu_tensors) + assert len(gpu_tensors) > 0 + + # assert a single KV group until transfer_async supports multiple groups + assert len(kv_cache_groups_data_refs) == 1 + + # assert input tensors are as expected + for gpu_tensor, cpu_tensor in zip(gpu_tensors, cpu_tensors): + assert gpu_tensor.dtype == torch.int8 + assert gpu_tensor.ndim == 2 + assert gpu_tensor.is_cuda + assert cpu_tensor.dtype == torch.int8 + assert cpu_tensor.ndim == 2 + assert cpu_tensor.device.type == "cpu" + _, gpu_page_size = gpu_tensor.shape + _, cpu_page_size = cpu_tensor.shape + assert cpu_page_size == gpu_page_size * block_size_factor + + self.src_tensors: list[torch.Tensor] = ( + gpu_tensors if gpu_to_cpu else cpu_tensors + ) + self.dst_tensors: list[torch.Tensor] = ( + cpu_tensors if gpu_to_cpu else gpu_tensors + ) + self.gpu_to_cpu: bool = gpu_to_cpu - self.src_tensors: list[torch.Tensor] = src_tensors - self.dst_tensors: list[torch.Tensor] = dst_tensors - min_block_size_factor = min(src_block_size_factor, dst_block_size_factor) - self.src_block_size_factor: int = src_block_size_factor // min_block_size_factor - self.dst_block_size_factor: int = dst_block_size_factor // min_block_size_factor + # GPU blocks may be smaller + # cpu_page_size = gpu_page_size * block_size_factor. + self.src_block_size_factor = 1 if self.gpu_to_cpu else block_size_factor + self.dst_block_size_factor = block_size_factor if self.gpu_to_cpu else 1 - self.block_size_in_bytes = [ - tensor.element_size() * tensor.stride(0) * min_block_size_factor - for tensor in src_tensors + # per-tensor block size in byte + self.tensor_block_size_in_bytes = [ + gpu_tensor.shape[1] for gpu_tensor in gpu_tensors ] - self.total_block_size_in_bytes = sum(self.block_size_in_bytes) - assert len(src_tensors) > 0 - self.gpu_to_cpu: bool = self.src_tensors[0].is_cuda + # per-group block size in bytes + self.group_block_size_in_bytes = [] + for kv_cache_group_data_refs in kv_cache_groups_data_refs: + group_block_size_in_bytes = 0 + for kv_cache_data_ref in kv_cache_group_data_refs: + # TODO(orozery): use kv_cache_data_ref.page_size_bytes + # once swap_blocks support it + group_block_size_in_bytes += self.tensor_block_size_in_bytes[ + kv_cache_data_ref.tensor_idx + ] + self.group_block_size_in_bytes.append(group_block_size_in_bytes) + self.transfer_type = ("GPU", "CPU") if self.gpu_to_cpu else ("CPU", "GPU") # job_id -> event self._transfer_events: dict[int, torch.Event] = {} @@ -167,7 +200,7 @@ def transfer_async(self, job_id: int, transfer_spec: TransferSpec) -> bool: for src_tensor, dst_tensor, block_size_in_bytes in zip( self.src_tensors, self.dst_tensors, - self.block_size_in_bytes, + self.tensor_block_size_in_bytes, ): ops.swap_blocks( src_tensor, @@ -184,7 +217,7 @@ def transfer_async(self, job_id: int, transfer_spec: TransferSpec) -> bool: stream=stream, start_event=start_event, end_event=end_event, - num_bytes=dst_sub_block_count * self.total_block_size_in_bytes, + num_bytes=dst_sub_block_count * self.group_block_size_in_bytes[0], ) ) @@ -223,102 +256,42 @@ def wait(self, job_ids: set[int]): class CpuGpuOffloadingHandlers: def __init__( self, - gpu_block_size: int, - cpu_block_size: int, + kv_caches: CanonicalKVCaches, + block_size_factor: int, num_cpu_blocks: int, - gpu_caches: dict[str, torch.Tensor], - attn_backends: dict[str, type[AttentionBackend]], ): - assert gpu_caches - assert cpu_block_size % gpu_block_size == 0 - - # find kernel block size and determine layout per each gpu tensor - kernel_block_size: int | None = None - # list of (gpu_tensor, split_k_and_v) - parsed_gpu_tensors: list[tuple[torch.Tensor, bool]] = [] - for layer_name, gpu_tensor in gpu_caches.items(): - gpu_shape = gpu_tensor.shape - attn_backend = attn_backends[layer_name] - test_shape = attn_backend.get_kv_cache_shape( - num_blocks=1234, block_size=16, num_kv_heads=1, head_size=256 - ) - - has_layers_dim = False - split_k_and_v = False - if len(gpu_shape) != len(test_shape): - # cross-layers tensor - # shape is (num_blocks, ...) - assert len(gpu_shape) == len(test_shape) + 1 - has_layers_dim = True - # prepend a dummy num_layers=80 to test_shape - test_shape = (80,) + test_shape - elif test_shape[0] != 1234: - # shape should be (2, num_blocks, ...) - assert test_shape[0] == 2 - assert test_shape[1] == 1234 - assert gpu_shape[0] == 2 - split_k_and_v = True - - if has_layers_dim: - # in the cross layers case, the registered kv cache tensor - # shape matches the physical layout, whereas test_shape - # is the logical layout. - # To match them, we need to permute test_shape - try: - kv_cache_stride_order = attn_backend.get_kv_cache_stride_order( - include_num_layers_dimension=has_layers_dim - ) - assert len(kv_cache_stride_order) == len(gpu_shape) - except (AttributeError, NotImplementedError): - kv_cache_stride_order = tuple(range(len(gpu_shape))) - - test_shape = tuple(test_shape[i] for i in kv_cache_stride_order) - - # find block_size (16) dimension index - block_size_idx = test_shape.index(16) - if kernel_block_size is not None: - assert kernel_block_size == gpu_shape[block_size_idx] - else: - kernel_block_size = gpu_shape[block_size_idx] - assert gpu_block_size % kernel_block_size == 0 - - parsed_gpu_tensors.append((gpu_tensor, split_k_and_v)) - - assert kernel_block_size is not None - cpu_block_size_factor = cpu_block_size // kernel_block_size - gpu_block_size_factor = gpu_block_size // kernel_block_size - num_cpu_kernel_blocks = num_cpu_blocks * cpu_block_size_factor - - # allocate cpu tensors pin_memory = is_pin_memory_available() - logger.info("Allocating %d CPU tensors...", len(parsed_gpu_tensors)) + logger.info("Allocating %d CPU tensors...", len(kv_caches.tensors)) gpu_tensors: list[torch.Tensor] = [] cpu_tensors: list[torch.Tensor] = [] - for gpu_tensor, split_k_and_v in parsed_gpu_tensors: - cpu_shape = list(gpu_tensor.shape) - cpu_shape[1 if split_k_and_v else 0] = num_cpu_kernel_blocks - - logger.debug("Allocating CPU tensor of shape %r", cpu_shape) + for kv_cache_tensor in kv_caches.tensors: + gpu_page_size_bytes = kv_cache_tensor.page_size_bytes + gpu_tensor = kv_cache_tensor.tensor.view(torch.int8).view( + (-1, gpu_page_size_bytes) + ) + cpu_page_size_bytes = gpu_page_size_bytes * block_size_factor cpu_tensor = torch.zeros( - cpu_shape, - dtype=gpu_tensor.dtype, + (num_cpu_blocks, cpu_page_size_bytes), + dtype=torch.int8, device="cpu", pin_memory=pin_memory, ) - gpu_tensors.extend(gpu_tensor.unbind(0) if split_k_and_v else [gpu_tensor]) - cpu_tensors.extend(cpu_tensor.unbind(0) if split_k_and_v else [cpu_tensor]) + gpu_tensors.append(gpu_tensor) + cpu_tensors.append(cpu_tensor) self.gpu_to_cpu_handler = SingleDirectionOffloadingHandler( - src_tensors=gpu_tensors, - dst_tensors=cpu_tensors, - src_block_size_factor=gpu_block_size_factor, - dst_block_size_factor=cpu_block_size_factor, + gpu_tensors=gpu_tensors, + cpu_tensors=cpu_tensors, + block_size_factor=block_size_factor, + kv_cache_groups_data_refs=kv_caches.group_data_refs, + gpu_to_cpu=True, ) self.cpu_to_gpu_handler = SingleDirectionOffloadingHandler( - src_tensors=cpu_tensors, - dst_tensors=gpu_tensors, - src_block_size_factor=cpu_block_size_factor, - dst_block_size_factor=gpu_block_size_factor, + gpu_tensors=gpu_tensors, + cpu_tensors=cpu_tensors, + block_size_factor=block_size_factor, + kv_cache_groups_data_refs=kv_caches.group_data_refs, + gpu_to_cpu=False, )