Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/features/disagg_prefill.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ For NixlConnector, you may also specify one or multiple NIXL_Backend. Such as:
--kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both", "kv_buffer_device":"cuda", "kv_connector_extra_config":{"backends":["UCX", "GDS"]}}'
```

- **OffloadingConnector**: enable offloading of KV data to CPU memory, customizing the CPU block size (in tokens) and number of blocks to allocate (per worker):
- **OffloadingConnector**: enable offloading of KV data to CPU memory, customizing the CPU block size (in tokens) and total CPU memory bytes to allocate:

```bash
--kv-transfer-config '{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"block_size": 64, "num_cpu_blocks": 1000}}'
--kv-transfer-config '{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"block_size": 64, "cpu_bytes_to_use": 1000000000}}'
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should probably follow up with a more human readable way of expressing the value

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open for suggestions :)

Copy link
Copy Markdown
Collaborator

@NickLucche NickLucche Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about unifying with max-model-len format

```

## Benchmarks
Expand Down
5 changes: 2 additions & 3 deletions tests/v1/kv_connector/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
[
("native", 4.0, 1, 1, "OffloadingConnector", 4.0 * (1 << 30)),
# bytes per rank: 8.0 GiB / (2 * 2) = 2.0 GiB
("native", 8.0, 2, 2, "OffloadingConnector", 8.0 * (1 << 30) / 4),
("native", 8.0, 2, 2, "OffloadingConnector", 8.0 * (1 << 30)),
("lmcache", 4.0, 1, 1, "LMCacheConnectorV1", 4.0),
# size per rank: 8.0 GiB / (2 * 2) = 2.0 GiB
("lmcache", 8.0, 2, 2, "LMCacheConnectorV1", 2.0),
Expand Down Expand Up @@ -54,8 +54,7 @@ def test_kv_connector(
assert kv_transfer_config.kv_role == "kv_both"

if kv_offloading_backend == "native":
assert kv_connector_extra_config["kv_bytes_per_rank"] == expected_bytes
assert kv_connector_extra_config["num_cpu_blocks"] == 0
assert kv_connector_extra_config["cpu_bytes_to_use"] == expected_bytes
# Existing config should be preserved
assert kv_connector_extra_config["existing_key"] == "existing_value"
elif kv_offloading_backend == "lmcache":
Expand Down
5 changes: 3 additions & 2 deletions tests/v1/kv_connector/unit/test_offloading_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
init_none_hash,
)
from vllm.v1.core.sched.scheduler import Scheduler
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.kv_offload.abstract import (
LoadStoreSpec,
OffloadingEvent,
Expand Down Expand Up @@ -79,8 +80,8 @@ def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:


class MockOffloadingSpec(OffloadingSpec):
def __init__(self, vllm_config: VllmConfig):
super().__init__(vllm_config)
def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
super().__init__(vllm_config, kv_cache_config)

self.manager = MagicMock(spec=OffloadingManager)
self.manager.lookup.return_value = 0
Expand Down
2 changes: 1 addition & 1 deletion tests/v1/kv_offload/test_cpu_offloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_cpu_offloading(cpu_block_size: int, attn_backend: str) -> None:
kv_connector="OffloadingConnector",
kv_role="kv_both",
kv_connector_extra_config={
"num_cpu_blocks": 1000,
"cpu_bytes_to_use": 500 << 20,
"block_size": cpu_block_size,
},
)
Expand Down
6 changes: 1 addition & 5 deletions vllm/config/vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,8 @@ def _post_init_kv_transfer_config(self) -> None:

if kv_offloading_backend == "native":
self.kv_transfer_config.kv_connector = "OffloadingConnector"
kv_bytes_per_rank = kv_offloading_size * (1 << 30) / num_kv_ranks

# NOTE(ApostaC): the actual calculation for num_cpu_blocks should be
# done after the model's KV cache is initialized
self.kv_transfer_config.kv_connector_extra_config.update(
{"kv_bytes_per_rank": kv_bytes_per_rank, "num_cpu_blocks": 0}
{"cpu_bytes_to_use": kv_offloading_size * (1 << 30)}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this no longer need to be scaled by the number of ranks?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It never needed scaling in the first place.

)
elif kv_offloading_backend == "lmcache":
self.kv_transfer_config.kv_connector = "LMCacheConnectorV1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(
):
super().__init__(vllm_config, role, kv_cache_config)

spec = OffloadingSpecFactory.create_spec(vllm_config)
spec = OffloadingSpecFactory.create_spec(vllm_config, kv_cache_config)

self.connector_scheduler: OffloadingConnectorScheduler | None = None
self.connector_worker: OffloadingConnectorWorker | None = None
Expand Down
39 changes: 31 additions & 8 deletions vllm/v1/kv_offload/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from vllm.config import VllmConfig
from vllm.platforms import current_platform
from vllm.v1.attention.backend import AttentionBackend
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.kv_offload.abstract import LoadStoreSpec, OffloadingManager
from vllm.v1.kv_offload.arc_manager import ARCOffloadingManager
from vllm.v1.kv_offload.backends.cpu import CPUBackend
Expand All @@ -18,15 +19,37 @@


class CPUOffloadingSpec(OffloadingSpec):
def __init__(self, vllm_config: VllmConfig):
super().__init__(vllm_config)
def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
super().__init__(vllm_config, kv_cache_config)

num_cpu_blocks = self.extra_config.get("num_cpu_blocks")
if not num_cpu_blocks:
cpu_bytes_to_use = self.extra_config.get("cpu_bytes_to_use")
if not cpu_bytes_to_use:
raise Exception(
"num_cpu_blocks must be specified in kv_connector_extra_config"
"cpu_bytes_to_use must be specified in kv_connector_extra_config"
)
self.num_cpu_blocks: int = num_cpu_blocks

# calculate kv_bytes_per_offloaded_block
assert kv_cache_config is not None
page_sizes = {
kv_cache_group.kv_cache_spec.page_size_bytes
for kv_cache_group in kv_cache_config.kv_cache_groups
}
assert len(page_sizes) == 1
page_size_bytes = page_sizes.pop()
kv_bytes_per_block = (
page_size_bytes
* len(kv_cache_config.kv_cache_tensors)
* vllm_config.parallel_config.world_size
)
kv_bytes_per_offloaded_block = kv_bytes_per_block * (
self.offloaded_block_size // self.gpu_block_size
)

self.num_blocks = (
int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block
if kv_bytes_per_offloaded_block > 0
else 0
)

# scheduler-side
self._manager: OffloadingManager | None = None
Expand All @@ -44,7 +67,7 @@ def get_manager(self) -> OffloadingManager:
)

backend = CPUBackend(
block_size=self.offloaded_block_size, num_blocks=self.num_cpu_blocks
block_size=self.offloaded_block_size, num_blocks=self.num_blocks
)

if self.eviction_policy == "lru":
Expand Down Expand Up @@ -77,7 +100,7 @@ def get_handlers(
attn_backends=attn_backends,
gpu_block_size=self.gpu_block_size,
cpu_block_size=self.offloaded_block_size,
num_cpu_blocks=self.num_cpu_blocks,
num_cpu_blocks=self.num_blocks,
gpu_caches=kv_caches,
)

Expand Down
4 changes: 3 additions & 1 deletion vllm/v1/kv_offload/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

if TYPE_CHECKING:
from vllm.config import VllmConfig
from vllm.v1.kv_cache_interface import KVCacheConfig

logger = init_logger(__name__)

Expand All @@ -32,6 +33,7 @@ def loader() -> type[OffloadingSpec]:
def create_spec(
cls,
config: "VllmConfig",
kv_cache_config: "KVCacheConfig | None",
) -> OffloadingSpec:
kv_transfer_config = config.kv_transfer_config
assert kv_transfer_config is not None
Expand All @@ -47,7 +49,7 @@ def create_spec(
spec_cls = getattr(spec_module, spec_name)
assert issubclass(spec_cls, OffloadingSpec)
logger.info("Creating offloading spec with name: %s", spec_name)
return spec_cls(config)
return spec_cls(config, kv_cache_config)


# Register various specs here.
Expand Down
6 changes: 5 additions & 1 deletion vllm/v1/kv_offload/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,23 @@

if TYPE_CHECKING:
from vllm.config import VllmConfig
from vllm.v1.kv_cache_interface import KVCacheConfig

logger = init_logger(__name__)


class OffloadingSpec(ABC):
"""Spec for an offloading connector"""

def __init__(self, vllm_config: "VllmConfig"):
def __init__(
self, vllm_config: "VllmConfig", kv_cache_config: "KVCacheConfig | None"
):
logger.warning(
"Initializing OffloadingSpec. This API is experimental and "
"subject to change in the future as we iterate the design."
)
self.vllm_config = vllm_config
self.kv_cache_config = kv_cache_config

kv_transfer_config = vllm_config.kv_transfer_config
assert kv_transfer_config is not None
Expand Down