Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 86 additions & 57 deletions tests/v1/kv_offload/test_cpu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
OffloadingEvent,
PrepareStoreOutput,
)
from vllm.v1.kv_offload.arc_manager import ARCOffloadingManager
from vllm.v1.kv_offload.backends.cpu import CPUBackend
from vllm.v1.kv_offload.lru_manager import LRUOffloadingManager
from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager
from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy
from vllm.v1.kv_offload.mediums import CPULoadStoreSpec


Expand Down Expand Up @@ -79,12 +78,12 @@ def to_hash_sets(int_sets: tuple[set[int], ...]) -> tuple[set[BlockHash], ...]:
assert tuple(stores) == to_hash_sets(expected_stores)


@pytest.mark.parametrize("manager_class", [LRUOffloadingManager, ARCOffloadingManager])
def test_already_stored_block_not_evicted_during_prepare_store(manager_class):
@pytest.mark.parametrize("eviction_policy", ["lru", "arc"])
def test_already_stored_block_not_evicted_during_prepare_store(eviction_policy):
"""
Regression test: a block that is already stored must not be evicted
by prepare_store() when it needs to make room for new blocks.
Applies to both LRUOffloadingManager and ARCOffloadingManager.
Applies to both lru and arc policies.

Scenario:
- Store blocks [1, 2] and complete.
Expand All @@ -96,8 +95,12 @@ def test_already_stored_block_not_evicted_during_prepare_store(manager_class):
- After complete_store([2, 3, 4, 5]), block 2 must still be present.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
manager = manager_class(cpu_backend, enable_events=True)
manager = CPUOffloadingManager(
block_size=block_size,
num_blocks=4,
cache_policy=eviction_policy,
enable_events=True,
)

# store [1, 2] and complete
manager.prepare_store(to_hashes([1, 2]))
Expand Down Expand Up @@ -129,12 +132,13 @@ def test_already_stored_block_not_evicted_during_prepare_store(manager_class):

def test_cpu_manager():
"""
Tests LRUOffloadingManager with a CPUBackend.
Tests CPUOffloadingManager with lru policy.
"""
# initialize a CPU backend with a capacity of 4 blocks
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
cpu_manager = LRUOffloadingManager(cpu_backend, enable_events=True)
cpu_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True
)

# prepare store [1, 2]
prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2]))
Expand Down Expand Up @@ -241,13 +245,15 @@ def test_cpu_manager():

def test_arc_manager_basic():
"""
Tests ARCOffloadingManager basic operations with a CPUBackend.
Tests CPUOffloadingManager with arc policy.
Verifies that ARC handles store, load, and lookup operations correctly.
"""
# initialize a CPU backend with a capacity of 4 blocks
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=True)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)

# prepare store [1, 2]
prepare_store_output = arc_manager.prepare_store(to_hashes([1, 2]))
Expand Down Expand Up @@ -278,8 +284,8 @@ def test_arc_manager_basic():
assert arc_manager.lookup(to_hashes([1, 2, 3])) == 2

# blocks should be in T1 (recent)
assert len(arc_manager.t1) == 2
assert len(arc_manager.t2) == 0
assert len(arc_policy.t1) == 2
assert len(arc_policy.t2) == 0


def test_arc_manager_t1_to_t2_promotion():
Expand All @@ -288,23 +294,26 @@ def test_arc_manager_t1_to_t2_promotion():
This is a key feature of ARC's adaptive behavior.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=False)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=False
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)

# store and complete block 1
arc_manager.prepare_store(to_hashes([1]))
arc_manager.complete_store(to_hashes([1]))

# block 1 starts in T1 (recent)
assert to_hashes([1])[0] in arc_manager.t1
assert to_hashes([1])[0] not in arc_manager.t2
assert to_hashes([1])[0] in arc_policy.t1
assert to_hashes([1])[0] not in arc_policy.t2

# touch block 1 (simulate second access)
arc_manager.touch(to_hashes([1]))

# block 1 should now be in T2 (frequent)
assert to_hashes([1])[0] not in arc_manager.t1
assert to_hashes([1])[0] in arc_manager.t2
assert to_hashes([1])[0] not in arc_policy.t1
assert to_hashes([1])[0] in arc_policy.t2


def test_arc_manager_eviction_with_load():
Expand All @@ -313,8 +322,9 @@ def test_arc_manager_eviction_with_load():
Verifies that blocks being loaded (ref_cnt > 0) cannot be evicted.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=True)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)

# prepare and complete store [1, 2, 3, 4]
prepare_store_output = arc_manager.prepare_store(to_hashes([1, 2, 3, 4]))
Expand Down Expand Up @@ -354,28 +364,31 @@ def test_arc_manager_adaptive_target():
When a block in B2 is accessed, target_t1_size decreases.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=2)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=False)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=2, cache_policy="arc", enable_events=False
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)

# store blocks 1, 2 (fills cache)
arc_manager.prepare_store(to_hashes([1, 2]))
arc_manager.complete_store(to_hashes([1, 2]))

initial_target = arc_manager.target_t1_size
initial_target = arc_policy.target_t1_size

# store block 3, evicting block 1 (moves to B1 ghost list)
arc_manager.prepare_store(to_hashes([3]))
arc_manager.complete_store(to_hashes([3]))

# block 1 should be in B1 (ghost list)
assert to_hashes([1])[0] in arc_manager.b1
assert to_hashes([1])[0] in arc_policy.b1

# touch block 1 (cache miss, but in B1)
# this should increase target_t1_size (favor recency)
arc_manager.touch(to_hashes([1]))

# target should have increased
assert arc_manager.target_t1_size > initial_target
assert arc_policy.target_t1_size > initial_target


def test_arc_manager_t1_t2_eviction_policy():
Expand All @@ -384,8 +397,11 @@ def test_arc_manager_t1_t2_eviction_policy():
If |T1| >= target_t1_size, evict from T1, otherwise from T2.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=False)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=False
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)

# store blocks 1, 2, 3, 4
arc_manager.prepare_store(to_hashes([1, 2, 3, 4]))
Expand All @@ -395,12 +411,12 @@ def test_arc_manager_t1_t2_eviction_policy():
arc_manager.touch(to_hashes([3, 4]))

# now: T1 = {1, 2}, T2 = {3, 4}
assert len(arc_manager.t1) == 2
assert len(arc_manager.t2) == 2
assert len(arc_policy.t1) == 2
assert len(arc_policy.t2) == 2

# set target_t1_size to prefer evicting from T1
# (when |T1| >= target, evict from T1)
arc_manager.target_t1_size = 1
arc_policy.target_t1_size = 1

# store block 5, should evict from T1 (block 1, LRU in T1)
output = arc_manager.prepare_store(to_hashes([5]))
Expand All @@ -410,9 +426,9 @@ def test_arc_manager_t1_t2_eviction_policy():
arc_manager.complete_store(to_hashes([5]))

# block 1 should be in B1 (ghost list)
assert to_hashes([1])[0] in arc_manager.b1
assert to_hashes([1])[0] in arc_policy.b1
# block 5 should be in T1
assert to_hashes([5])[0] in arc_manager.t1
assert to_hashes([5])[0] in arc_policy.t1


def test_arc_manager_ghost_list_bounds():
Expand All @@ -421,8 +437,11 @@ def test_arc_manager_ghost_list_bounds():
They should be capped at cache_capacity.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=2)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=False)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=2, cache_policy="arc", enable_events=False
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)

# fill cache with blocks 1, 2
arc_manager.prepare_store(to_hashes([1, 2]))
Expand All @@ -434,8 +453,8 @@ def test_arc_manager_ghost_list_bounds():
arc_manager.complete_store(to_hashes([i]))

# ghost lists should not exceed cache_capacity
assert len(arc_manager.b1) <= arc_manager.cache_capacity
assert len(arc_manager.b2) <= arc_manager.cache_capacity
assert len(arc_policy.b1) <= arc_policy.cache_capacity
assert len(arc_policy.b2) <= arc_policy.cache_capacity


def test_arc_manager_touch_ordering():
Expand All @@ -444,8 +463,11 @@ def test_arc_manager_touch_ordering():
Similar to LRU test but verifies T1/T2 ordering.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=True)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)

# store blocks 1, 2, 3, 4
arc_manager.prepare_store(to_hashes([1, 2, 3, 4]))
Expand All @@ -459,8 +481,8 @@ def test_arc_manager_touch_ordering():
arc_manager.touch(to_hashes([1, 3, 4]))

# T1 = {2}, T2 = {1, 3, 4} (in that order, with 4 most recent)
assert len(arc_manager.t1) == 1
assert len(arc_manager.t2) == 3
assert len(arc_policy.t1) == 1
assert len(arc_policy.t2) == 3

# store block 5, should evict from T1 (block 2, only one in T1)
prepare_store_output = arc_manager.prepare_store(to_hashes([5]))
Expand All @@ -480,8 +502,11 @@ def test_arc_manager_failed_store():
Similar to LRU test but for ARC.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=True)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)

# store blocks 1, 2, 3, 4
arc_manager.prepare_store(to_hashes([1, 2, 3, 4]))
Expand All @@ -498,12 +523,12 @@ def test_arc_manager_failed_store():
# block 5 should not be in cache
assert arc_manager.lookup(to_hashes([5])) == 0
# block 5 should not be in T1 or T2
assert to_hashes([5])[0] not in arc_manager.t1
assert to_hashes([5])[0] not in arc_manager.t2
assert to_hashes([5])[0] not in arc_policy.t1
assert to_hashes([5])[0] not in arc_policy.t2

# evicted block should still be gone (in B1 ghost list)
evicted_hash = prepare_store_output.block_hashes_evicted[0]
assert evicted_hash in arc_manager.b1
assert evicted_hash in arc_policy.b1


def test_arc_manager_full_scenario():
Expand All @@ -512,8 +537,11 @@ def test_arc_manager_full_scenario():
Similar to the full LRU test but adapted for ARC behavior.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
arc_manager = ARCOffloadingManager(cpu_backend, enable_events=True)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)

# store [1, 2]
arc_manager.prepare_store(to_hashes([1, 2]))
Expand All @@ -529,8 +557,8 @@ def test_arc_manager_full_scenario():
arc_manager.touch(to_hashes([2, 3]))

# T1 has {4, 5}, T2 has {2, 3}
assert len(arc_manager.t1) == 2
assert len(arc_manager.t2) == 2
assert len(arc_policy.t1) == 2
assert len(arc_policy.t2) == 2

# store [6] -> should evict from T1 (4 is oldest in T1)
prepare_store_output = arc_manager.prepare_store(to_hashes([6]))
Expand All @@ -548,11 +576,12 @@ def test_arc_manager_full_scenario():

def test_filter_reused_manager():
"""
Tests FilterReusedOffloadingManager with a CPUBackend.
Tests FilterReusedOffloadingManager with a CPUOffloadingManager.
"""
block_size = 256
cpu_backend = CPUBackend(block_size=block_size, num_blocks=4)
lru_manager = LRUOffloadingManager(cpu_backend, enable_events=True)
lru_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True
)

from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager

Expand Down
Loading
Loading