[kv_offload] Add multi-tier KV cache offloading framework#40020
[kv_offload] Add multi-tier KV cache offloading framework#40020ronensc wants to merge 23 commits intovllm-project:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a multi-tier KV cache offloading system, featuring a TieringOffloadingManager that orchestrates data movement between a primary CPU tier and multiple secondary tiers. The implementation includes logic for cascading offloads to all tiers and staged promotion of blocks back to the primary tier for GPU access. Feedback identifies potential memory safety risks when using memoryview with PyTorch-backed NumPy arrays and suggests performance optimizations regarding the allocation of key lists during lookup operations.
| self._secondary_views: list[memoryview] = [] | ||
| cpu_tensor = primary_tier.get_primary_kv_tensor() | ||
| for tier in self.secondary_tiers: | ||
| view = memoryview(cpu_tensor.numpy()) |
There was a problem hiding this comment.
Creating a memoryview from a NumPy array that is a view of a PyTorch tensor can lead to undefined behavior if the underlying PyTorch tensor is reallocated or if its storage is modified in a way that NumPy doesn't track. While _mmap_region._base is expected to be stable, it is safer to ensure the tensor is contiguous and its storage is explicitly kept alive. Additionally, verify that cpu_tensor.numpy() does not create a copy, which would defeat the zero-copy objective.
| # are finalized and available in the primary tier | ||
| self._process_finished_jobs() | ||
|
|
||
| keys_list = list(keys) |
There was a problem hiding this comment.
orozery
left a comment
There was a problem hiding this comment.
Could you please rebase?
Can you extend test_cpu_offloading.py::test_cpu_offloading with a new pytest parameter enable_tiering: bool?
| Abstract interface for managing a single non-primary offloading tier. | ||
|
|
||
| Secondary tiers cannot directly access GPU memory. All data transfers | ||
| must go through the primary tier (implemented as CPU in current version): |
There was a problem hiding this comment.
I don't think we have plans to support a primary tier other than CPU.
Maybe we should adapt comments to that?
| transfer job, but does NOT perform the actual data transfer on the | ||
| calling thread. | ||
|
|
||
| The caller (TieringOffloadingManager) must have already called |
There was a problem hiding this comment.
The caller type documentation (added throughout this files) is helpful to understand the E2E flow.
However, from the point of view of someone implementing a secondary tier I'm not sure it is interesting, and perhaps even confusing.
I think we should document here the minimum necessary from the secondary tier implementor POV.
| """Result of an async transfer job (successful or failed).""" | ||
|
|
||
| job_id: JobId | ||
| success: bool |
There was a problem hiding this comment.
This is good for now.
Later on we will want to add stats as well.
| """Metadata for an in-flight async transfer job.""" | ||
|
|
||
| job_id: JobId | ||
| keys: list[OffloadKey] |
There was a problem hiding this comment.
I think we want to change to Sequence[OffloadKey].
To make sure we avoid conversion between data types.
This means also changing OffloadingManager to use it instead of Iterable for lookup, prepare_load and prepare_store
Maybe it's better to do it in a prequel PR.
There was a problem hiding this comment.
Partly done in 675e056
Will submit a prequel PR and continue once merged
|
|
||
| job_id: JobId | ||
| keys: list[OffloadKey] | ||
| spec: LoadStoreSpec |
There was a problem hiding this comment.
can we replace this with block_ids: np.ndarray?
| # Process any completed async jobs first to ensure promoted blocks | ||
| # are finalized and available in the primary tier | ||
| self._process_finished_jobs() |
There was a problem hiding this comment.
We call secondary tiers get_finished on every lookup, as well as on every prepare_load and prepare_store.
I think that we should call it once per engine step.
Currently, we can detect an engine step finished when OffloadingManager.take_events is called.
So maybe add some _processesd_finished_jobs: bool that will be reset to True on take_events?
| # are finalized and available in the primary tier | ||
| self._process_finished_jobs() | ||
|
|
||
| keys_list = list(keys) |
There was a problem hiding this comment.
See my previous comment on moving to Sequence[OffloadKey], then we don't need this list conversion.
There was a problem hiding this comment.
After rebasing on main (which now includes #36645), OffloadingManager.lookup() accepts a single key instead of a list.
Should we update SecondaryTierManager.lookup() accordingly to take a single key as well?
There was a problem hiding this comment.
Should we update
SecondaryTierManager.lookup()accordingly to take a single key as well?
Yep!
There was a problem hiding this comment.
Ok, done.
Worth noting that now submit_load() will always receive a JobMetadata with exactly one key, even though it allows a list of keys.
| secondary_hits = tier.lookup(remaining_keys) | ||
|
|
||
| # Skip if tier is busy (None) or has no hits (0) | ||
| if not secondary_hits: |
There was a problem hiding this comment.
If tier is busy we should return None as well (not immediately though).
| if primary_store_result is None: | ||
| # Cannot allocate space in primary tier (full) | ||
| # The next lookup() will retry | ||
| return |
There was a problem hiding this comment.
I think that in this case we want to allow the request to proceed (not return None in lookup).
| if primary_hits == len(keys_list): | ||
| # All blocks in primary tier | ||
| return primary_hits |
There was a problem hiding this comment.
I think it's maybe a good idea to still call lookup on the secondary tiers to allow their index to warm-up for the given keys.
|
@orozery Thanks for the thorough review! I’ll address all the points and follow up with fixes. |
| # Track this load job | ||
| job_metadata = JobMetadata( | ||
| job_id=job_id, | ||
| keys=keys, |
There was a problem hiding this comment.
Since the keys that already exist on the disk are filtered out, we probably want to pass the filtered keys here instead of the original ones:
keys=primary_store_result.keys_to_store.
There was a problem hiding this comment.
Thanks! Good catch. fixed in the latest commit.
d2a72d1 to
d6e3154
Compare
|
Hi @ronensc, the pre-commit checks have failed. Please run: uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
…iew() Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
ronensc
left a comment
There was a problem hiding this comment.
I've rebased the PR, resolved conflicts, and addressed part of the feedback. I'll follow up on the remaining comments later.
| decrementing ref_cnt.""" | ||
| self.complete_load(keys) | ||
|
|
||
| def get_primary_kv_tensor(self) -> torch.Tensor: |
There was a problem hiding this comment.
no problem. renamed it to create_kv_memoryview()
| self._secondary_views: list[memoryview] = [] | ||
| cpu_tensor = primary_tier.get_primary_kv_tensor() | ||
| for tier in self.secondary_tiers: | ||
| view = memoryview(cpu_tensor.numpy()) |
There was a problem hiding this comment.
Changed to single memoryview to all tiers.
I'll follow up on the numpy() issue.
| - store_threshold: (optional) How many times a block must appear in lookup() | ||
| before it is eligible for CPU offloading. Values < 2 disable filtering | ||
| (default: 0) | ||
| - max_tracker_size: (optional) Maximum number of blocks tracked for | ||
| store_threshold filtering (default: 64000) |
There was a problem hiding this comment.
no problem, removed it.
| # are finalized and available in the primary tier | ||
| self._process_finished_jobs() | ||
|
|
||
| keys_list = list(keys) |
There was a problem hiding this comment.
After rebasing on main (which now includes #36645), OffloadingManager.lookup() accepts a single key instead of a list.
Should we update SecondaryTierManager.lookup() accordingly to take a single key as well?
| """ | ||
| return | ||
|
|
||
| @abstractmethod |
There was a problem hiding this comment.
IIUC, the goal here is to remove the need for tier_name in the config https://github.com/ronensc/vllm/blob/84c6059c55bf43854212609e740075c3717a7c68/vllm/v1/kv_offload/tiering/spec.py#L34
If so, this would prevent defining multiple secondary tiers of the same type.
Is that intentional?
|
@ronensc regarding file structure. With this, I think we should have everything under |
…multiple keys Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Re-organized the file tree accordingly. |
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
|
Hi @ronensc, the pre-commit checks have failed. Please run: uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
|
Hi @ronensc, the pre-commit checks have failed. Please run: uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
| assert isinstance(store_spec, CPULoadStoreSpec) | ||
| job_metadata = JobMetadata( | ||
| job_id=job_id, | ||
| keys=primary_store_result.keys_to_store, |
There was a problem hiding this comment.
Length of keys might be 0. In such a case avoid calling submit_load()
| ) | ||
| self._load_jobs[job_id] = job_metadata | ||
|
|
||
| tier.submit_load(job_metadata) |
There was a problem hiding this comment.
Looks like submit_load() will always call maximum 1 block. This is sub optimal.
Can we try to batch multiple blocks before calling submit_load()?
That will reduce the number of transfer operations
There was a problem hiding this comment.
Correct. This stems from the change in https://github.com/vllm-project/vllm/pull/36645/changes#diff-d8f2304e5d54e4b60670dcea11b7d5e33d006ec71db6e6cd22526501e915e4a5L94-L98
, where lookup() was updated to accept a single key instead of a batch.
Batching multiple blocks before calling submit_load() could indeed reduce the number of transfer operations.
The tradeoff is that it would introduce some delay in submitting blocks to the secondary managers, which may add latency.
@orozery WDYT?
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
Signed-off-by: Ronen Schaffer <ronen.schaffer@ibm.com>
|
Hi @ronensc, the pre-commit checks have failed. Please run: uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
Purpose
Adds a hierarchical (tiered) KV cache offloading framework under
vllm/v1/kv_offload/tiering/, extending the existing single-tier CPU offloading with support for chained secondary tiers (e.g., storage, network).Implements the design proposed in #38260
[RFC]: Multi-tier KV offloading via the vLLM offloading connector.Key additions:
SecondaryTierManagerABC (abstract.py) — interface for secondary tier backends, defining async store/load/lookup primitives and aJobResultprotocol for polling completionsCPUPrimaryTierOffloadingManager(tiering/manager.py) — wrapsCPUOffloadingManagerand exposes a secondary-facing read/write alias API (prepare_read/complete_read,prepare_write/complete_write) to clarify directionality when called from the cascade/promotion pathsTieringOffloadingManager(tiering/manager.py) — orchestrates GPU↔CPU (primary) and CPU→secondary tier transfers:lookup()returnsNonewhile promotion is in flight to signal "retry later"prepare_read()increments ref_cnt to protect blocks from eviction during async transfersTieringOffloadingSpec(tiering/spec.py) — entry point for the tiered stack; aCPUOffloadingSpecsubclass that readssecondary_tiersfromkv_connector_extra_configand assembles theTieringOffloadingManagerDummySecondaryTier(secondary_tiers/dummy.py) — in-memory secondary tier for testing, with optional async simulationSharedOffloadRegionintegration —CPUPrimaryTierOffloadingManageraccepts the existingSharedOffloadRegionso secondary tiers can memoryview primary tier buffers zero-copyTest Plan
Test Result
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.