Skip to content

Commit 2d314c7

Browse files
committed
[fs_connector][feat]: Add Python backend implementation for fs backend of offloading connector (fs_connector)
Signed-off-by: Kfir Toledo <[email protected]>
1 parent f8bb304 commit 2d314c7

File tree

11 files changed

+892
-0
lines changed

11 files changed

+892
-0
lines changed

.gitignore

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,25 @@ __pycache__/
2222
*.pyd
2323
*.python-version
2424

25+
# Python build artifacts
26+
*.egg-info/
27+
build/
28+
dist/
29+
30+
# C++/CUDA build outputs
31+
*.o
32+
*.so
33+
*.d
34+
*.a
35+
36+
# Ninja build files
37+
build.ninja
38+
.ninja_log
39+
.ninja_deps
40+
41+
# Temporary pip build directories
42+
*.egg-info/
43+
2544
# Go workspace file
2645
go.work
2746
go.work.sum
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# llmd-fs-backend README
2+
3+
## Overview
4+
The llmd-fs-backend extends the native [vLLM Offloading Connector](#offloading-connector-docs) to support a file system backend.
5+
This backend provides a shared-storage offloading layer for vLLM. It moves KV-cache blocks between GPU and shared storage efficiently using:
6+
7+
- Async CUDA copies or GPU kernels
8+
- Pinned memory pools
9+
- Multi-threaded I/O workers
10+
- NUMA-aware CPU affinity
11+
- Atomic file writes and zero-copy reads
12+
13+
The fs connector (llmd_fs_backend) is used for shared storage but it can also work with local disk.
14+
15+
For architectural clarity, the fs connector is not responsible for cleanup. Storage systems should manage this.
16+
For simple setups, see the **Storage Cleanup** section.
17+
18+
<img src="./docs/images/fs_connector.png" width="400" />
19+
20+
## System Requirements
21+
- vLLM version 0.11.0 or above, which includes the Offloading Connector
22+
23+
## Installation
24+
25+
```bash
26+
apt-get update && apt-get install -y libnuma-dev
27+
pip install git+https://github.com/llm-d-kv-cache-manager.git#subdirectory=kv_connectors/llmd_fs_backend
28+
```
29+
30+
This installs:
31+
- Python module `llmd_fs_backend`
32+
- CUDA extension `storage_offload.so`
33+
34+
## Configuration Flags
35+
36+
### Connector parameters
37+
38+
- `shared_storage_path`: filesystem path for store and load the KV files.
39+
- `block_size`: number of GPU blocks grouped into each file (must be in granulaity of GPU block size that)
40+
- `threads_per_gpu`: number of I/O threads per GPU
41+
- `max_pinned_memory_gb`: total pinned memory limit
42+
43+
### Environment variables
44+
- `STORAGE_CONNECTOR_DEBUG`: enable debug logs
45+
- `USE_KERNEL_COPY_WRITE`: enable GPU-kernel writes (default 0)
46+
- `USE_KERNEL_COPY_READ`: enable GPU-kernel reads (default 1)
47+
48+
## Example vLLM YAML
49+
50+
To load the fs connector:
51+
52+
```yaml
53+
--kv-transfer-config '{
54+
"kv_connector": "OffloadingConnector",
55+
"kv_role": "kv_both",
56+
"kv_connector_extra_config": {
57+
"spec_name": "SharedStorageOffloadingSpec",
58+
"spec_module_path": "llmd_fs_backend.spec",
59+
"shared_storage_path": "/mnt/files-storage/kv-cache/",
60+
"block_size": 256,
61+
"threads_per_gpu": "64"
62+
}
63+
}'
64+
--distributed_executor_backend "mp"
65+
```
66+
67+
A full deployment example can be found in the [`docs`](./docs/deployment) folder.
68+
69+
It is recommended to use multiprocess mode by setting:
70+
`--distributed_executor_backend "mp"`
71+
72+
To configure environment variables:
73+
74+
```yaml
75+
env:
76+
- name: STORAGE_CONNECTOR_DEBUG
77+
value: 1
78+
```
79+
80+
## Storage Cleanup
81+
TBD
82+
83+
## Troubleshooting
84+
85+
### Missing `numa.h`
86+
Install the required package:
87+
88+
```bash
89+
apt-get install -y libnuma-dev
90+
```
91+
92+
---
93+
94+
## Link Aliases
95+
96+
- **Offloading Connector Docs**
97+
<a name="offloading-connector-docs"></a>
98+
https://docs.vllm.ai/en/stable/features/disagg_prefill/#usage-example:~:text=backends%22%3A%5B%22UCX%22%2C%20%22GDS%22%5D%7D%7D%27-,OffloadingConnector,-%3A%20enable%20offloading%20of
84.7 KB
Loading
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
[build-system]
2+
requires = [
3+
"setuptools>=65",
4+
"wheel",
5+
"torch",
6+
"ninja"
7+
]
8+
build-backend = "setuptools.build_meta"
9+
10+
[project]
11+
name = "llmd_fs_connector"
12+
version = "0.1.0"
13+
description = "Standalone llm-d fs storage connector"
14+
readme = "README.md"
15+
authors = [
16+
{ name = "Kfir", email = "[email protected]" }
17+
]
18+
maintainers = [
19+
{ name = "llm-d community" }
20+
]
21+
requires-python = ">=3.9"
22+
dependencies = [
23+
"torch>=2.1",
24+
]
25+
26+
[tool.setuptools]
27+
packages = ["llmd_fs_backend"]
28+
package-dir = {"" = "src"}
29+
30+
[tool.setuptools.package-data]
31+
llmd_fs_backend = ["*.so"]
32+
33+
[project.optional-dependencies]
34+
dev = [
35+
"vllm",
36+
"pytest",
37+
"black",
38+
"ruff",
39+
]

kv_connectors/llmd_fs_backend/src/llmd_fs_backend/__init__.py

Whitespace-only changes.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from vllm.logger import init_logger
2+
from vllm.v1.kv_offload.factory import OffloadingSpecFactory
3+
4+
logger = init_logger(__name__)
5+
6+
# Register SharedStorageOffloadingSpec to offloading connector
7+
OffloadingSpecFactory.register_spec("SharedStorageOffloadingSpec",
8+
"vllm.v1.kv_offload.shared_storage",
9+
"SharedStorageOffloadingSpec")
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import os
2+
import torch
3+
from pathlib import Path
4+
from collections.abc import Iterable
5+
from typing import Optional
6+
7+
from vllm.v1.core.kv_cache_utils import BlockHash
8+
from llmd_fs_backend.mediums import SharedStorageLoadStoreSpec
9+
from vllm.v1.kv_offload.abstract import (
10+
LoadStoreSpec,
11+
OffloadingManager,
12+
PrepareStoreOutput,
13+
)
14+
from llmd_fs_backend.worker import StorageOffloadingHandler
15+
from vllm.logger import init_logger
16+
17+
logger = init_logger(__name__)
18+
19+
20+
class SharedStorageOffloadingManager(OffloadingManager):
21+
"""
22+
SharedStorageOffloadingManager manages KV offloading to a shared storage medium.
23+
"""
24+
25+
def __init__(
26+
self,
27+
model_name: str,
28+
tp_size: int,
29+
tp_rank: int,
30+
dtype: torch.dtype,
31+
root_dir: str = "/tmp/shared-kv",
32+
) -> None:
33+
34+
# Basic metadata about the model and tensor parallelism
35+
self.model_name = model_name
36+
self.tp_size = tp_size
37+
self.tp_rank = tp_rank
38+
self.dtype = dtype
39+
40+
# Resolve base directory where KV files for this model and tp rank are stored
41+
self.base_path: Path = StorageOffloadingHandler.get_kv_cache_base_path(
42+
dtype=dtype,
43+
model_name=model_name,
44+
tp_size=tp_size,
45+
tp_rank=tp_rank,
46+
root_dir=root_dir,
47+
)
48+
49+
# ----------------------------------------------------------------------
50+
# Lookup
51+
# ----------------------------------------------------------------------
52+
def lookup(self, block_hashes: Iterable[BlockHash]) -> int:
53+
"""
54+
Return how many consecutive blocks from the start are already offloaded.
55+
"""
56+
hit_count = 0
57+
for block_hash in block_hashes:
58+
file_path = StorageOffloadingHandler.get_file_name(self.base_path, block_hash)
59+
if not os.path.exists(file_path):
60+
break
61+
hit_count += 1
62+
return hit_count
63+
64+
# ----------------------------------------------------------------------
65+
# Load
66+
# ----------------------------------------------------------------------
67+
def prepare_load(self, block_hashes: Iterable[BlockHash]) -> LoadStoreSpec:
68+
"""
69+
For shared storage, loading is stateless - return specs that point to files.
70+
"""
71+
return SharedStorageLoadStoreSpec(block_hashes)
72+
73+
def touch(self, block_hashes: Iterable[BlockHash]):
74+
"""
75+
Update access times if desired.
76+
Shared storage version does nothing here because updates are handled
77+
by the file thread for performance reasons.
78+
"""
79+
pass
80+
81+
def complete_load(self, block_hashes: Iterable[BlockHash]):
82+
"""Stateless load - no post-load action needed."""
83+
pass
84+
85+
# ----------------------------------------------------------------------
86+
# Store
87+
# ----------------------------------------------------------------------
88+
def prepare_store(self, block_hashes: Iterable[BlockHash]) -> Optional[PrepareStoreOutput]:
89+
"""
90+
Prepare storing new blocks.
91+
Shared storage always accepts new blocks. Eviction is not needed.
92+
If a file already exists, the file thread handles it.
93+
"""
94+
block_hashes_to_store = list(block_hashes)
95+
96+
# Set up store spec
97+
store_spec = SharedStorageLoadStoreSpec(block_hashes_to_store)
98+
99+
return PrepareStoreOutput(
100+
block_hashes_to_store=block_hashes_to_store,
101+
store_spec=store_spec,
102+
block_hashes_evicted=[], # no eviction needed
103+
)
104+
105+
def complete_store(self, block_hashes: Iterable[BlockHash], success: bool = True):
106+
"""
107+
For shared storage, storing is stateless - no action needed.
108+
"""
109+
pass
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import numpy as np
2+
from typing import Iterable
3+
from vllm.v1.core.kv_cache_utils import BlockHash
4+
from vllm.v1.kv_offload.abstract import LoadStoreSpec
5+
6+
class SharedStorageLoadStoreSpec(LoadStoreSpec):
7+
"""
8+
Spec for loading and storing KV blocks on shared storage.
9+
Stores block hashes internally as a numpy array.
10+
"""
11+
12+
def __init__(self, block_hashes: Iterable[BlockHash]):
13+
# Validate all items are bytes (BlockHash)
14+
block_hashes = list(block_hashes)
15+
for h in block_hashes:
16+
if not isinstance(h, (bytes, bytearray)):
17+
raise TypeError(f"Expected BlockHash (bytes-like), got {type(h).__name__}")
18+
19+
# Store directly as object array of bytes
20+
self.block_hashes = np.array(block_hashes, dtype=object)
21+
22+
def __repr__(self) -> str:
23+
return repr(self.block_hashes)
24+
25+
@staticmethod
26+
def medium() -> str:
27+
return "SHARED_STORAGE"

0 commit comments

Comments
 (0)