Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/v1/kv_connector/unit/test_nixl_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ def test_multi_xfer_one_engine(
)
connector.bind_connector_metadata(metadata)

# Mimic maybe_setup_kv_connector in gpu_model_runner.
# Mimic logic in KVConnectorModelRunnerMixin._get_kv_connector_output.
dummy_ctx = ForwardContext(
no_compile_layers={},
attn_metadata={},
Expand All @@ -531,7 +531,7 @@ def test_multi_xfer_one_engine(
f"start_load_kv took {_after_load - _before_load} seconds"
)

# Mimic get_finished_kv_transfers in gpu_model_runner.
# Mimic logic in KVConnectorModelRunnerMixin._get_kv_connector_output.
_, done_recving = connector.get_finished(finished_req_ids=set())
if len(done_recving) > 0:
assert request_id in done_recving
Expand Down
30 changes: 0 additions & 30 deletions vllm/v1/worker/kv_connector_model_runner_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,42 +38,12 @@

# Defined as a kv connector functionality mixin for ModelRunner (GPU, TPU)
class KVConnectorModelRunnerMixin:
@staticmethod
def maybe_setup_kv_connector(scheduler_output: "SchedulerOutput"):
# Update KVConnector with the KVConnector metadata forward().
if has_kv_transfer_group():
kv_connector = get_kv_transfer_group()
assert isinstance(kv_connector, KVConnectorBase)
assert scheduler_output.kv_connector_metadata is not None
kv_connector.bind_connector_metadata(scheduler_output.kv_connector_metadata)

# Background KV cache transfers happen here.
# These transfers are designed to be async and the requests
# involved may be disjoint from the running requests.
# Do this here to save a collective_rpc.
kv_connector.start_load_kv(get_forward_context())

@staticmethod
def ensure_kv_transfer_shutdown() -> None:
# has_kv_transfer_group can be None during interpreter shutdown.
if has_kv_transfer_group and has_kv_transfer_group(): # type: ignore[truthy-function]
ensure_kv_transfer_shutdown()

@staticmethod
def maybe_wait_for_kv_save() -> None:
if has_kv_transfer_group():
get_kv_transfer_group().wait_for_save()

@staticmethod
def get_finished_kv_transfers(
scheduler_output: "SchedulerOutput",
) -> tuple[set[str] | None, set[str] | None]:
if has_kv_transfer_group():
return get_kv_transfer_group().get_finished(
scheduler_output.finished_req_ids
)
return None, None

@staticmethod
def kv_connector_no_forward(
scheduler_output: "SchedulerOutput", vllm_config: VllmConfig
Expand Down