Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions tests/v1/kv_connector/unit/test_offloading_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,22 @@ def close(self):
self.sub.close()


def _wait_for_prefix_cache_reset(llm: LLM) -> None:
def _wait_for_prefix_cache_reset(llm: LLM, reset_connector: bool = False) -> None:
"""Wait for async offload transfers to finish so prefix cache can reset.

The GPU-to-CPU offload runs on a CUDA stream asynchronously. While blocks
are still held by the offload worker, ``reset_prefix_cache`` returns
``False``. Between retries we send a dummy single-token prefill to force
the engine to step, which polls the worker for completed transfers and
frees GPU blocks.

Args:
llm: The LLM instance to reset.
reset_connector: If True, also reset the KV connector state.
"""
_dummy_params = SamplingParams(max_tokens=1)
deadline = time.monotonic() + _RESET_CACHE_TIMEOUT
while not llm.reset_prefix_cache():
while not llm.reset_prefix_cache(reset_connector=reset_connector):
if time.monotonic() > deadline:
raise TimeoutError(
"reset_prefix_cache did not succeed within "
Expand All @@ -133,7 +137,9 @@ def _wait_for_prefix_cache_reset(llm: LLM) -> None:
)


def _latency_test(llm: LLM, subscriber: MockSubscriber | None):
def _latency_test(
llm: LLM, subscriber: MockSubscriber | None, reset_connector: bool = False
):
sampling_params = SamplingParams(max_tokens=1)

num_times_cpu_better_than_cold = 0
Expand Down Expand Up @@ -163,7 +169,7 @@ def _latency_test(llm: LLM, subscriber: MockSubscriber | None):

# Wait for the async CPU offload to finish, then reset prefix cache
# so the next generate() must reload from CPU rather than GPU.
_wait_for_prefix_cache_reset(llm)
_wait_for_prefix_cache_reset(llm, reset_connector=reset_connector)

# Verify CPU stored events arrived (offload is done before we
# attempt to load from CPU).
Expand Down Expand Up @@ -337,3 +343,49 @@ def test_tiering_offloading() -> None:
finally:
subscriber.close()
del llm


def test_fs_tiering_offloading(tmp_path) -> None:
"""Tests OffloadingConnector with TieringOffloadingSpec
+ fs_python secondary tier."""
extra_config: dict = {
"cpu_bytes_to_use": 1 << 30,
"block_size": 48,
"spec_name": "TieringOffloadingSpec",
"secondary_tiers": [{"type": "fs_python", "root_dir": str(tmp_path)}],
}
kv_transfer_config = KVTransferConfig(
kv_connector="OffloadingConnector",
kv_role="kv_both",
kv_connector_extra_config=extra_config,
)

port: int
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("0.0.0.0", 0))
port = s.getsockname()[1]
events_endpoint = f"tcp://*:{port}"
kv_events_config = KVEventsConfig(
enable_kv_cache_events=True,
publisher="zmq",
endpoint=events_endpoint,
topic="test",
)

llm = LLM(
model="meta-llama/Llama-3.2-1B-Instruct",
max_model_len=512,
gpu_memory_utilization=0.5,
kv_events_config=kv_events_config,
kv_transfer_config=kv_transfer_config,
)
subscriber = MockSubscriber(
events_endpoint.replace("*", "127.0.0.1"),
topic=kv_events_config.topic,
)
try:
_latency_test(llm, subscriber, reset_connector=True)
_accuracy_test(llm, subscriber)
finally:
subscriber.close()
del llm
127 changes: 127 additions & 0 deletions tests/v1/kv_offload/test_file_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Unit tests for FileMapper."""

from unittest.mock import MagicMock

from vllm.v1.kv_offload.base import (
OffloadingSpec,
make_offload_key,
)
from vllm.v1.kv_offload.file_mapper import FileMapper

# ---------------------------------------------------------------------------
# Shared mocks (mirrors test_fs_tier.py pattern)
# ---------------------------------------------------------------------------

_MOCK_VLLM_CONFIG = MagicMock()
_MOCK_VLLM_CONFIG.model_config.model = "test-model"
_MOCK_VLLM_CONFIG.cache_config.block_size = 16
_MOCK_VLLM_CONFIG.cache_config.cache_dtype = "torch.float32"
_MOCK_VLLM_CONFIG.parallel_config.tensor_parallel_size = 1
_MOCK_VLLM_CONFIG.parallel_config.pipeline_parallel_size = 1
_MOCK_VLLM_CONFIG.parallel_config.prefill_context_parallel_size = 1
_MOCK_VLLM_CONFIG.parallel_config.decode_context_parallel_size = 1
_MOCK_VLLM_CONFIG.parallel_config.rank = 0

_MOCK_KV_CACHE_CONFIG = MagicMock()
_MOCK_KV_CACHE_CONFIG.kv_cache_groups = []

_MOCK_OFFLOADING_SPEC = MagicMock(spec=OffloadingSpec)
_MOCK_OFFLOADING_SPEC.vllm_config = _MOCK_VLLM_CONFIG
_MOCK_OFFLOADING_SPEC.kv_cache_config = _MOCK_KV_CACHE_CONFIG
Comment thread
rshavitt marked this conversation as resolved.
_MOCK_OFFLOADING_SPEC.block_size_factor = 1


# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------


def make_mapper_from_offloading_spec(**kwargs) -> FileMapper:
"""Helper to create FileMapper with customizable mock config."""
# Create a copy of the mock config to avoid modifying the global one
mock_vllm_config = MagicMock()
mock_vllm_config.model_config.model = kwargs.get("model_name", "test-model")
mock_vllm_config.cache_config.block_size = kwargs.get("hash_block_size", 16)
mock_vllm_config.cache_config.cache_dtype = (
f"torch.{kwargs.get('dtype', 'float16')}"
)
mock_vllm_config.parallel_config.tensor_parallel_size = kwargs.get("tp_size", 1)
mock_vllm_config.parallel_config.pipeline_parallel_size = kwargs.get("pp_size", 1)
mock_vllm_config.parallel_config.prefill_context_parallel_size = kwargs.get(
"pcp_size", 1
)
mock_vllm_config.parallel_config.decode_context_parallel_size = kwargs.get(
"dcp_size", 1
)
mock_vllm_config.parallel_config.rank = kwargs.get("rank", 0)

mock_kv_cache_config = MagicMock()
mock_kv_cache_config.kv_cache_groups = []

mock_offloading_spec = MagicMock(spec=OffloadingSpec)
mock_offloading_spec.vllm_config = mock_vllm_config
mock_offloading_spec.kv_cache_config = mock_kv_cache_config
mock_offloading_spec.block_size_factor = kwargs.get("block_size_factor", 1)

return FileMapper.from_offloading_spec(
root_dir=kwargs.get("root_dir", "/tmp/cache"),
offloading_spec=mock_offloading_spec,
gpu_blocks_per_file=mock_offloading_spec.block_size_factor,
)


# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------


def test_get_file_name_full_structure():
"""
Path must match: <base_path>_r<rank>/<hhh>/<hh>_g<group_idx>/<hash_hex>.bin

Concretely:
- The segment immediately after base_path must end with `_r0`
- The next segment is the first 3 hex chars of the block hash
- The next segment is <2 hex chars>_g<group_idx>
- The final segment is <full hash hex>.bin
"""
rank = 3
group_idx = 2
block_hash = bytes(range(8)) # deterministic, non-zero bytes
fm = make_mapper_from_offloading_spec(rank=rank)
key = make_offload_key(block_hash, group_idx)
path = fm.get_file_name(key)

expected_path = (
"/tmp/cache/test-model_588656ebcc66_r3/000/10_g2/0001020304050607.bin"
)
Comment thread
rshavitt marked this conversation as resolved.
assert path == expected_path


def test_get_run_config_fields():
fm = make_mapper_from_offloading_spec(
model_name="my-model",
dtype="bfloat16",
tp_size=2,
)
cfg = fm.get_run_config()
assert cfg == {
"model_name": "my-model",
"hash_block_size": 16,
"gpu_blocks_per_file": 1,
"tp_size": 2,
"pp_size": 1,
"pcp_size": 1,
"dcp_size": 1,
"dtype": "bfloat16",
"kv_cache_groups": [],
"inference_engine": "vllm",
}


def test_get_config_file_path():
fm = make_mapper_from_offloading_spec()
config_path = fm.get_config_file_path()
assert config_path == f"{fm.base_path}/config.json"
Loading
Loading