Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/scripts/hardware_ci/run-xpu-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ docker run \
pytest -v -s v1/worker --ignore=v1/worker/test_gpu_model_runner.py
pytest -v -s v1/structured_output
pytest -v -s v1/spec_decode --ignore=v1/spec_decode/test_max_len.py --ignore=v1/spec_decode/test_tree_attention.py --ignore=v1/spec_decode/test_speculators_eagle3.py
pytest -v -s v1/kv_connector/unit --ignore=v1/kv_connector/unit/test_multi_connector.py --ignore=v1/kv_connector/unit/test_nixl_connector.py --ignore=v1/kv_connector/unit/test_shared_storage_connector.py --ignore=v1/kv_connector/unit/test_lmcache_integration.py
pytest -v -s v1/kv_connector/unit --ignore=v1/kv_connector/unit/test_multi_connector.py --ignore=v1/kv_connector/unit/test_nixl_connector.py --ignore=v1/kv_connector/unit/test_example_connector.py --ignore=v1/kv_connector/unit/test_lmcache_integration.py
pytest -v -s v1/test_serial_utils.py
'
6 changes: 3 additions & 3 deletions docs/features/disagg_encoder.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ Design doc: <https://docs.google.com/document/d/1aed8KtC6XkXtdoV87pWT0a8OJlZ-Cpn

## 2 Usage Example

The current reference pathway is **SharedStorageConnector**.
The current reference pathway is **ExampleConnector**.
Below ready-to-run scripts shows the workflow:

1 Encoder instance + 1 PD instance:
`examples/online_serving/disaggregated_encoder/shared_storage_connector/disagg_encoder_example.sh`
`examples/online_serving/disaggregated_encoder/disagg_1e1pd_example.sh`

1 Encoder instance + 1 Prefill instance + 1 Decode instance:
`examples/online_serving/disaggregated_encoder/shared_storage_connector/disagg_epd_example.sh`
`examples/online_serving/disaggregated_encoder/disagg_1e1p1d_example.sh`

---

Expand Down
4 changes: 2 additions & 2 deletions docs/features/disagg_prefill.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ Please refer to [examples/online_serving/disaggregated_prefill.sh](../../example

Now supports 5 types of connectors:

- **SharedStorageConnector**: refer to [examples/offline_inference/disaggregated-prefill-v1/run.sh](../../examples/offline_inference/disaggregated-prefill-v1/run.sh) for the example usage of SharedStorageConnector disaggregated prefilling.
- **ExampleConnector**: refer to [examples/offline_inference/disaggregated-prefill-v1/run.sh](../../examples/offline_inference/disaggregated-prefill-v1/run.sh) for the example usage of ExampleConnector disaggregated prefilling.
- **LMCacheConnectorV1**: refer to [examples/others/lmcache/disagg_prefill_lmcache_v1/disagg_example_nixl.sh](../../examples/others/lmcache/disagg_prefill_lmcache_v1/disagg_example_nixl.sh) for the example usage of LMCacheConnectorV1 disaggregated prefilling which uses NIXL as the underlying KV transmission.
- **NixlConnector**: refer to [tests/v1/kv_connector/nixl_integration/run_accuracy_test.sh](../../tests/v1/kv_connector/nixl_integration/run_accuracy_test.sh) for the example usage of NixlConnector disaggregated prefilling which support fully async send/recv. For detailed usage guide, see [NixlConnector Usage Guide](nixl_connector_usage.md).
- **P2pNcclConnector**: refer to [examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_example_p2p_nccl_xpyd.sh](../../examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_example_p2p_nccl_xpyd.sh) for the example usage of P2pNcclConnector disaggregated prefilling.
- **MultiConnector**: take advantage of the kv_connector_extra_config: dict[str, Any] already present in KVTransferConfig to stash all the connectors we want in an ordered list of kwargs.such as:

```bash
--kv-transfer-config '{"kv_connector":"MultiConnector","kv_role":"kv_both","kv_connector_extra_config":{"connectors":[{"kv_connector":"NixlConnector","kv_role":"kv_both"},{"kv_connector":"SharedStorageConnector","kv_role":"kv_both","kv_connector_extra_config":{"shared_storage_path":"local_storage"}}]}}'
--kv-transfer-config '{"kv_connector":"MultiConnector","kv_role":"kv_both","kv_connector_extra_config":{"connectors":[{"kv_connector":"NixlConnector","kv_role":"kv_both"},{"kv_connector":"ExampleConnector","kv_role":"kv_both","kv_connector_extra_config":{"shared_storage_path":"local_storage"}}]}}'
```

For NixlConnector, you may also specify one or multiple NIXL_Backend. Such as:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def main():
max_num_batched_tokens=64,
max_num_seqs=16,
kv_transfer_config=KVTransferConfig(
kv_connector="SharedStorageConnector",
kv_connector="ExampleConnector",
kv_role="kv_both",
kv_connector_extra_config={"shared_storage_path": "local_storage"},
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def main():
enforce_eager=True,
gpu_memory_utilization=0.8,
kv_transfer_config=KVTransferConfig(
kv_connector="SharedStorageConnector",
kv_connector="ExampleConnector",
kv_role="kv_both",
kv_connector_extra_config={"shared_storage_path": "local_storage"},
),
Expand Down
4 changes: 2 additions & 2 deletions examples/offline_inference/kv_load_failure_recovery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ It demonstrates vLLM's ability to recover from KV load failures in both synchron
- `decode_example.py` – performs the decode stage. Accepts:
- `--simulate-failure`: simulates KV load failure using a custom connector.
- `--async-load`: enables asynchronous KV loading mode.
- `rogue_shared_storage_connector.py` – defines `RogueSharedStorageConnector`, a subclass of `SharedStorageConnector`, that simulates missing or corrupted external KV blocks by failing to load blocks for the first decode request.
- `load_recovery_example_connector.py` – defines `LoadRecoveryExampleConnector`, a subclass of `ExampleConnector`, that simulates missing or corrupted external KV blocks by failing to load blocks for the first decode request.
- `run.sh` – orchestrates the test: runs the prefill stage, then three decode stages:
1. Normal decode (baseline).
2. Decode with simulated sync KV load failure.
Expand All @@ -20,7 +20,7 @@ It demonstrates vLLM's ability to recover from KV load failures in both synchron

## How It Works

- The test dynamically loads `RogueSharedStorageConnector` via `KVTransferConfig.kv_connector_module_path`, enabling controlled simulation of load failures without modifying the original connector.
- The test dynamically loads `LoadRecoveryExampleConnector` via `KVTransferConfig.kv_connector_module_path`, enabling controlled simulation of load failures without modifying the original connector.
- The decode stages that simulate failure are expected to trigger recovery logic in vLLM, resulting in the same output as the baseline decode.
- If recovery fails, the script prints a unified diff of the output mismatch and exits with error.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ def main():

if args.simulate_failure:
ktc = KVTransferConfig(
kv_connector="RogueSharedStorageConnector",
kv_connector="LoadRecoveryExampleConnector",
kv_role="kv_both",
kv_connector_extra_config={
"shared_storage_path": "local_storage",
"async_load": args.async_load,
},
kv_connector_module_path="rogue_shared_storage_connector",
kv_connector_module_path="load_recovery_example_connector",
)
out_file = (
"async_decode_recovered_output.txt"
Expand All @@ -50,7 +50,7 @@ def main():
)
else:
ktc = KVTransferConfig(
kv_connector="SharedStorageConnector",
kv_connector="ExampleConnector",
kv_role="kv_both",
kv_connector_extra_config={
"shared_storage_path": "local_storage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
KVConnectorMetadata,
KVConnectorRole,
)
from vllm.distributed.kv_transfer.kv_connector.v1.shared_storage_connector import (
SharedStorageConnector,
SharedStorageConnectorMetadata,
from vllm.distributed.kv_transfer.kv_connector.v1.example_connector import (
ExampleConnector,
ExampleConnectorMetadata,
)
from vllm.forward_context import ForwardContext
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
Expand All @@ -26,15 +26,15 @@


@dataclass
class RogueSharedStorageConnectorMetadata(SharedStorageConnectorMetadata):
class LoadRecoveryExampleConnectorMetadata(ExampleConnectorMetadata):
req_to_block_ids: dict[str, set[int]] = field(default_factory=dict)

@classmethod
def from_base(cls, base: SharedStorageConnectorMetadata):
def from_base(cls, base: ExampleConnectorMetadata):
return cls(requests=base.requests)


class RogueSharedStorageConnector(SharedStorageConnector):
class LoadRecoveryExampleConnector(ExampleConnector):
def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
super().__init__(vllm_config=vllm_config, role=role)
self._async_load = vllm_config.kv_transfer_config.get_from_extra_config(
Expand All @@ -45,7 +45,7 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
self._req_to_block_ids: dict[str, list[int]] = dict()

def bind_connector_metadata(self, connector_metadata: KVConnectorMetadata) -> None:
assert isinstance(connector_metadata, RogueSharedStorageConnectorMetadata)
assert isinstance(connector_metadata, LoadRecoveryExampleConnectorMetadata)
index, failed_request = next(
(
(i, x)
Expand Down Expand Up @@ -84,7 +84,7 @@ def get_finished(
) -> tuple[set[str] | None, set[str] | None]:
if self._async_load:
meta = self._get_connector_metadata()
assert isinstance(meta, RogueSharedStorageConnectorMetadata)
assert isinstance(meta, LoadRecoveryExampleConnectorMetadata)
if meta.req_to_block_ids:
return None, set(meta.req_to_block_ids)

Expand Down Expand Up @@ -126,9 +126,9 @@ def build_connector_meta(
) -> KVConnectorMetadata:
if not self._async_load:
base = super().build_connector_meta(scheduler_output)
meta = RogueSharedStorageConnectorMetadata.from_base(base)
meta = LoadRecoveryExampleConnectorMetadata.from_base(base)
else:
meta = RogueSharedStorageConnectorMetadata()
meta = LoadRecoveryExampleConnectorMetadata()
if self._requests_need_load:
for req_id, request in self._requests_need_load.items():
meta.add_request(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def main():
enforce_eager=True,
gpu_memory_utilization=0.8,
kv_transfer_config=KVTransferConfig(
kv_connector="SharedStorageConnector",
kv_connector="ExampleConnector",
kv_role="kv_both",
kv_connector_extra_config={"shared_storage_path": "local_storage"},
),
Expand Down
6 changes: 3 additions & 3 deletions examples/online_serving/disaggregated_encoder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ The vllm instances and `disagg_encoder_proxy` supports local URIs with ```{"url"

## EC connector and KV transfer

The `ECSharedStorageConnector` is used to store the encoder cache on local disk and facilitate transfer. To enable the encoder disaggregation feature, add the following configuration:
The `ECExampleonnector` is used to store the encoder cache on local disk and facilitate transfer. To enable the encoder disaggregation feature, add the following configuration:

```bash
# Add to encoder instance:
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_producer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand All @@ -64,7 +64,7 @@ The `ECSharedStorageConnector` is used to store the encoder cache on local disk

# Add to prefill/prefill+decode instance:
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_consumer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ CUDA_VISIBLE_DEVICES="$GPU_E" vllm serve "$MODEL" \
--max-num-seqs 128 \
--allowed-local-media-path ${GIT_ROOT}/tests/v1/ec_connector/integration \
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_producer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand All @@ -126,7 +126,7 @@ vllm serve "$MODEL" \
--max-num-seqs 128 \
--allowed-local-media-path ${GIT_ROOT}/tests/v1/ec_connector/integration \
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_consumer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ CUDA_VISIBLE_DEVICES="$GPU_E" vllm serve "$MODEL" \
--max-num-seqs 128 \
--allowed-local-media-path ${GIT_ROOT}/tests/v1/ec_connector/integration \
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_producer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand All @@ -117,7 +117,7 @@ CUDA_VISIBLE_DEVICES="$GPU_PD" vllm serve "$MODEL" \
--max-num-seqs 128 \
--allowed-local-media-path ${GIT_ROOT}/tests/v1/ec_connector/integration \
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_consumer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand Down
2 changes: 1 addition & 1 deletion tests/distributed/test_kvlayout.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_get_kv_connector_cache_layout_with_multi_connector():
kv_role="kv_both",
kv_connector_extra_config={
"connectors": [
{"kv_connector": "SharedStorageConnector", "kv_role": "kv_both"},
{"kv_connector": "ExampleConnector", "kv_role": "kv_both"},
{"kv_connector": "NixlConnector", "kv_role": "kv_both"},
]
},
Expand Down
6 changes: 3 additions & 3 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ def create_scheduler_with_priority(
)
kv_transfer_config = (
KVTransferConfig(
kv_connector="SharedStorageConnector",
kv_connector="ExampleConnector",
kv_role="kv_both",
kv_connector_extra_config={"shared_storage_path": "local_storage"},
)
Expand All @@ -1552,7 +1552,7 @@ def create_scheduler_with_priority(

ec_transfer_config = (
ECTransferConfig(
ec_connector="ECSharedStorageConnector",
ec_connector="ECExampleConnector",
ec_role=ec_role,
ec_connector_extra_config={"shared_storage_path": "/tmp/ec_test"},
)
Expand Down Expand Up @@ -2413,7 +2413,7 @@ def _assert_right_ec_connector_metadata(
metadata_dict = {mm_data.mm_hash: mm_data for mm_data in metadata.mm_datas}

# Check all required identifiers exist in metadata; and no extra
# In ECSharedStorageConnector format
# In ECExampleConnector format
# NOTE: even having same identifier, the mm_features can be different
# since their mm_position can be in different offsets, etc
identifiers_dict = {f.identifier for f in mm_features_list}
Expand Down
4 changes: 2 additions & 2 deletions tests/v1/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def create_scheduler(
)
elif use_kv_connector:
kv_transfer_config = KVTransferConfig(
kv_connector="SharedStorageConnector",
kv_connector="ExampleConnector",
kv_role="kv_both",
kv_connector_extra_config={"shared_storage_path": "local_storage"},
)
Expand All @@ -121,7 +121,7 @@ def create_scheduler(

ec_transfer_config = (
ECTransferConfig(
ec_connector="ECSharedStorageConnector",
ec_connector="ECExampleConnector",
ec_role=ec_role,
ec_connector_extra_config={"shared_storage_path": "/tmp/ec_test"},
)
Expand Down
8 changes: 4 additions & 4 deletions tests/v1/ec_connector/integration/run_epd_correctness_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ run_epd_1e_1pd() {
--max-num-seqs 128 \
--allowed-local-media-path ${GIT_ROOT}/tests/v1/ec_connector/integration \
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_producer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand All @@ -167,7 +167,7 @@ run_epd_1e_1pd() {
--max-num-seqs 128 \
--allowed-local-media-path ${GIT_ROOT}/tests/v1/ec_connector/integration \
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_consumer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand Down Expand Up @@ -348,7 +348,7 @@ run_epd_1e_1p_1d() {
--max-num-seqs 128 \
--allowed-local-media-path ${GIT_ROOT}/tests/v1/ec_connector/integration \
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_producer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand All @@ -369,7 +369,7 @@ run_epd_1e_1p_1d() {
--max-num-seqs 128 \
--allowed-local-media-path ${GIT_ROOT}/tests/v1/ec_connector/integration \
--ec-transfer-config '{
"ec_connector": "ECSharedStorageConnector",
"ec_connector": "ECExampleConnector",
"ec_role": "ec_consumer",
"ec_connector_extra_config": {
"shared_storage_path": "'"$EC_SHARED_STORAGE_PATH"'"
Expand Down
Loading