Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions tests/v1/kv_connector/unit/test_offloading_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
get_request_block_hasher,
init_none_hash,
)
from vllm.v1.core.sched.async_scheduler import AsyncScheduler
from vllm.v1.core.sched.scheduler import Scheduler
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.kv_cache_interface import (
FullAttentionSpec,
KVCacheConfig,
KVCacheGroupSpec,
)
from vllm.v1.kv_offload.abstract import (
LoadStoreSpec,
OffloadingEvent,
Expand All @@ -43,11 +48,11 @@
)
from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, KVConnectorOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.structured_output import StructuredOutputManager

from .utils import (
EOS_TOKEN_ID,
create_model_runner_output,
create_scheduler,
create_vllm_config,
)

Expand Down Expand Up @@ -175,10 +180,37 @@ def __init__(
},
)

self.scheduler: Scheduler = create_scheduler(
vllm_config, num_blocks=num_gpu_blocks
block_size = vllm_config.cache_config.block_size
kv_cache_config = KVCacheConfig(
num_blocks=num_gpu_blocks,
kv_cache_tensors=[],
kv_cache_groups=[
KVCacheGroupSpec(
["layer"],
FullAttentionSpec(
block_size=block_size,
num_kv_heads=1,
head_size=1,
dtype=torch.float32,
),
)
],
)
vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks
self.num_kv_groups = len(kv_cache_config.kv_cache_groups)

scheduler_cls = AsyncScheduler if async_scheduling else Scheduler
self.scheduler = scheduler_cls(
vllm_config=vllm_config,
kv_cache_config=kv_cache_config,
log_stats=True,
structured_output_manager=StructuredOutputManager(vllm_config),
block_size=block_size,
)

self.worker_connector = OffloadingConnector(
vllm_config, KVConnectorRole.WORKER, kv_cache_config
)
self.worker_connector = OffloadingConnector(vllm_config, KVConnectorRole.WORKER)

# register worker kv_caches to enable OffloadingWorker creations
self.worker_connector.register_cross_layers_kv_cache(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(
):
super().__init__(vllm_config, role, kv_cache_config)

assert kv_cache_config is not None
spec = OffloadingSpecFactory.create_spec(vllm_config, kv_cache_config)

self.connector_scheduler: OffloadingConnectorScheduler | None = None
Expand Down Expand Up @@ -245,9 +246,10 @@ class OffloadingConnectorScheduler:
"""Implementation of Scheduler side methods"""

def __init__(self, spec: OffloadingSpec):
self.gpu_block_size = spec.gpu_block_size
self.offloaded_block_size = spec.offloaded_block_size
self.block_size_factor = self.offloaded_block_size // self.gpu_block_size
assert len(spec.gpu_block_size) == 1
self.gpu_block_size = spec.gpu_block_size[0]
self.offloaded_block_size = self.gpu_block_size * spec.block_size_factor
self.block_size_factor = spec.block_size_factor
self.manager: OffloadingManager = spec.get_manager()

self._requests: dict[ReqId, Request] = {}
Expand Down
16 changes: 10 additions & 6 deletions vllm/v1/kv_offload/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
* len(kv_cache_config.kv_cache_tensors)
* vllm_config.parallel_config.world_size
)
kv_bytes_per_offloaded_block = kv_bytes_per_block * (
self.offloaded_block_size // self.gpu_block_size
)

kv_bytes_per_offloaded_block = kv_bytes_per_block * self.block_size_factor
self.num_blocks = (
int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block
if kv_bytes_per_offloaded_block > 0
Expand All @@ -67,8 +65,11 @@ def get_manager(self) -> OffloadingManager:
kv_events_config is not None and kv_events_config.enable_kv_cache_events
)

assert len(self.gpu_block_size) == 1
gpu_block_size = self.gpu_block_size[0]
offloaded_block_size = gpu_block_size * self.block_size_factor
backend = CPUBackend(
block_size=self.offloaded_block_size, num_blocks=self.num_blocks
block_size=offloaded_block_size, num_blocks=self.num_blocks
)

if self.eviction_policy == "lru":
Expand Down Expand Up @@ -111,10 +112,13 @@ def get_handlers(
"CPU Offloading is currently only supported on CUDA-alike GPUs"
)

assert len(self.gpu_block_size) == 1
gpu_block_size = self.gpu_block_size[0]

self._handlers = CpuGpuOffloadingHandlers(
attn_backends=attn_backends,
gpu_block_size=self.gpu_block_size,
cpu_block_size=self.offloaded_block_size,
gpu_block_size=gpu_block_size,
cpu_block_size=gpu_block_size * self.block_size_factor,
num_cpu_blocks=self.num_blocks,
gpu_caches=kv_caches,
)
Expand Down
2 changes: 1 addition & 1 deletion vllm/v1/kv_offload/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def loader() -> type[OffloadingSpec]:
def create_spec(
cls,
config: "VllmConfig",
kv_cache_config: "KVCacheConfig | None",
kv_cache_config: "KVCacheConfig",
) -> OffloadingSpec:
kv_transfer_config = config.kv_transfer_config
assert kv_transfer_config is not None
Expand Down
34 changes: 27 additions & 7 deletions vllm/v1/kv_offload/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
class OffloadingSpec(ABC):
"""Spec for an offloading connector"""

def __init__(
self, vllm_config: "VllmConfig", kv_cache_config: "KVCacheConfig | None"
):
def __init__(self, vllm_config: "VllmConfig", kv_cache_config: "KVCacheConfig"):
logger.warning(
"Initializing OffloadingSpec. This API is experimental and "
"subject to change in the future as we iterate the design."
Expand All @@ -35,12 +33,34 @@ def __init__(
assert kv_transfer_config is not None
self.extra_config = kv_transfer_config.kv_connector_extra_config

self.gpu_block_size = vllm_config.cache_config.block_size
self.offloaded_block_size = int(
self.extra_config.get("block_size", self.gpu_block_size)
# block size used by vLLM for hashing request tokens for the sake
# of enabling prefix caching
self.hash_block_size = vllm_config.cache_config.block_size
# gpu block size per group
self.gpu_block_size: tuple[int, ...] = tuple(
kv_cache_group.kv_cache_spec.block_size
for kv_cache_group in kv_cache_config.kv_cache_groups
)

assert self.offloaded_block_size % self.gpu_block_size == 0
for block_size in self.gpu_block_size:
assert block_size % self.hash_block_size == 0

# offloaded_block_size / gpu_block_size
self.block_size_factor: int = 1

offloaded_block_size = self.extra_config.get("block_size")
if offloaded_block_size is not None:
offloaded_block_size_int = int(offloaded_block_size)
gpu_block_sizes = set(self.gpu_block_size)
assert len(gpu_block_sizes) == 1, (
"If 'block_size' is specified in kv_connector_extra_config, "
"there must be at least one KV cache group, "
"and all groups must have the same block size."
Comment on lines +56 to +58
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
"If 'block_size' is specified in kv_connector_extra_config, "
"there must be at least one KV cache group, "
"and all groups must have the same block size."
"If 'block_size' is specified in kv_connector_extra_config "
"all groups must have the same block size."

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should never be empty regardless

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're basically reverting gemini's suggestion :)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in theory it can be empty if we're running a model without KV cache (encoder model?)

)
Comment thread
orozery marked this conversation as resolved.
gpu_block_size = gpu_block_sizes.pop()

assert offloaded_block_size_int % gpu_block_size == 0
self.block_size_factor = offloaded_block_size_int // gpu_block_size
Comment thread
orozery marked this conversation as resolved.

@abstractmethod
def get_manager(self) -> OffloadingManager:
Expand Down
Loading