From 38c00afbc2fad9ac3dfa2edd1577920cf22ebc6c Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 2 Mar 2026 16:11:12 +0000 Subject: [PATCH 1/5] [Feat][NIXL] Add KV lease refresh mechanism for disaggregated prefill Replace the static P-side KV block timeout with an active lease mechanism. D workers periodically POST /internal/nixl/lease_refresh to extend the hold window while requests sit in the D queue, preventing premature block expiry on bursty workloads without requiring a large static timeout. - D scheduler tracks pending remote-prefill requests in `_requires_lease_dict`; a background thread POSTs refreshes every `timeout // 3` seconds - P scheduler receives refreshes via new EngineCore utility method `nixl_lease_refresh`, stores updated expiry in `_lease_refreshes`, and passes them to the P worker through `NixlConnectorMetadata` - P worker applies refreshes in `start_load_kv` and does a full scan (not early-break) in `get_finished` since expiry order may change - New `VLLM_NIXL_HTTP_PORT` env var (default 8000) lets D locate P's HTTP server; P includes it in `kv_transfer_params` - New FastAPI route registered unconditionally in `build_app`; no-op on non-NIXL instances Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Robert Shaw --- .../kv_transfer/kv_connector/v1/base.py | 8 ++ .../kv_connector/v1/nixl_connector.py | 126 +++++++++++++++++- vllm/entrypoints/openai/api_server.py | 6 + vllm/entrypoints/openai/nixl_router.py | 37 +++++ vllm/envs.py | 4 + vllm/v1/engine/core.py | 11 ++ 6 files changed, 185 insertions(+), 7 deletions(-) create mode 100644 vllm/entrypoints/openai/nixl_router.py diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py index a0e03b002b34..623e8cf1fa76 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py @@ -592,6 +592,14 @@ def build_prom_metrics( """ return None + def refresh_lease(self, request_ids: list[str]) -> None: + """Refresh the KV lease for the given request IDs. No-op by default. + + Called from the P-side API server when D workers POST + /internal/nixl/lease_refresh to extend the KV block hold time. + """ + return + def reset_cache(self) -> bool | None: """ Reset the connector's internal cache. diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 87091d650b17..4ffce2641583 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -256,6 +256,7 @@ def __init__(self): self.reqs_to_recv: dict[ReqId, ReqMeta] = {} self.reqs_to_save: dict[ReqId, ReqMeta] = {} self.reqs_to_send: dict[ReqId, float] = {} + self.reqs_to_refresh: dict[ReqId, float] = {} # lease refresh expiry updates self.reqs_in_batch: set[ReqId] = set() self.reqs_not_processed: set[ReqId] = set() @@ -409,6 +410,15 @@ def set_xfer_handshake_metadata( assert self.connector_scheduler is not None self.connector_scheduler.set_xfer_handshake_metadata(metadata) + def refresh_lease(self, request_ids: list[str]) -> None: + """Refresh the KV block lease for the given requests. + + Called from the P-side EngineCore when D workers POST lease refreshes + via /internal/nixl/lease_refresh. + """ + assert self.connector_scheduler is not None + self.connector_scheduler.refresh_lease(request_ids) + ############################################################ # Worker Side Methods ############################################################ @@ -539,6 +549,9 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self._encoded_xfer_handshake_metadata: dict[int, Any] = {} self._stop_event = threading.Event() + # kv_transfer_params fields for D workers to find this P's HTTP server + self.http_port = envs.VLLM_NIXL_HTTP_PORT + # Requests that need to start recv/send. # New requests are added by update_state_after_alloc in # the scheduler. Used to make metadata passed to Worker. @@ -551,11 +564,28 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): # remote prefill or aborted. self._reqs_not_processed: set[ReqId] = set() + # P-side: pending lease refresh updates to pass to worker next step + self._lease_refreshes: dict[ReqId, float] = {} + + # D-side: requests waiting to be scheduled that need P KV lease refresh + # req_id -> (P_host, P_http_port) + self._requires_lease_dict: dict[ReqId, tuple[str, int]] = {} + self._requires_lease_lock = threading.Lock() + self._stop_lease_event = threading.Event() + self._lease_refresh_thread = threading.Thread( + target=self._lease_refresh_loop, + daemon=True, + name="nixl-d-lease-refresh", + ) + self._lease_refresh_thread.start() + def shutdown(self): self._stop_event.set() + self._stop_lease_event.set() if self._nixl_handshake_listener_t is not None: self._nixl_handshake_listener_t.join() self._nixl_handshake_listener_t = None + self._lease_refresh_thread.join(timeout=3) def set_xfer_handshake_metadata( self, metadata: dict[int, KVConnectorHandshakeMetadata] @@ -665,6 +695,20 @@ def get_num_new_matched_tokens( token_ids = request.prompt_token_ids or [] count = len(token_ids) - num_computed_tokens if count > 0: + # Track this request for D-side lease refresh while it is + # waiting to be scheduled. Only add once (this may be called + # multiple times before scheduling). + with self._requires_lease_lock: + if request.request_id not in self._requires_lease_dict: + remote_host = params.get("remote_host", "") + remote_http_port = params.get( + "remote_http_port", envs.VLLM_NIXL_HTTP_PORT + ) + if remote_host: + self._requires_lease_dict[request.request_id] = ( + remote_host, + remote_http_port, + ) return count, True # No remote prefill for this request. @@ -725,6 +769,9 @@ def update_state_after_alloc( assert num_external_tokens == 0 # Only trigger 1 KV transfer per request. params["do_remote_prefill"] = False + # Request is being scheduled — no longer needs D-side lease refresh. + with self._requires_lease_lock: + self._requires_lease_dict.pop(request.request_id, None) def build_connector_meta( self, @@ -769,6 +816,7 @@ def build_connector_meta( self._reqs_need_save.pop(req_id) meta.reqs_to_send = self._reqs_need_send + meta.reqs_to_refresh = self._lease_refreshes meta.reqs_in_batch = self._reqs_in_batch meta.reqs_not_processed = self._reqs_not_processed @@ -777,6 +825,7 @@ def build_connector_meta( self._reqs_in_batch = set() self._reqs_not_processed = set() self._reqs_need_send = {} + self._lease_refreshes = {} return meta @@ -847,9 +896,61 @@ def request_finished( remote_request_id=request.request_id, remote_host=self.side_channel_host, remote_port=self.side_channel_port, + remote_http_port=self.http_port, tp_size=self.vllm_config.parallel_config.tensor_parallel_size, ) + def refresh_lease(self, request_ids: list[str]) -> None: + """Update lease expiry for the given requests. + + Called from P's EngineCore utility handler when D workers POST + /internal/nixl/lease_refresh. Runs in the engine loop thread so no + locking is required for _lease_refreshes. + """ + new_expiry = time.perf_counter() + envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT + for req_id in request_ids: + self._lease_refreshes[req_id] = new_expiry + + def _lease_refresh_loop(self) -> None: + """D-side background thread: periodically POST lease refresh to P.""" + import json + import urllib.request as urlreq + + refresh_interval = max(envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT // 3, 1) + while not self._stop_lease_event.wait(timeout=refresh_interval): + with self._requires_lease_lock: + if not self._requires_lease_dict: + continue + snapshot = dict(self._requires_lease_dict) + + # Group by (P_host, P_http_port) + groups: dict[tuple[str, int], list[str]] = defaultdict(list) + for req_id, endpoint in snapshot.items(): + groups[endpoint].append(req_id) + + for (host, http_port), req_ids in groups.items(): + try: + body = json.dumps({"request_ids": req_ids}).encode() + url = f"http://{host}:{http_port}/internal/nixl/lease_refresh" + req = urlreq.Request(url, data=body, method="POST") + req.add_header("Content-Type", "application/json") + with urlreq.urlopen(req, timeout=5) as resp: + if resp.status != 200: + raise RuntimeError(f"HTTP {resp.status}") + logger.debug( + "Refreshed lease for %d request(s) at %s:%d", + len(req_ids), + host, + http_port, + ) + except Exception as e: + logger.warning( + "Failed to refresh KV lease with P at %s:%d: %s", + host, + http_port, + e, + ) + class NixlConnectorWorker: """Implementation of Worker side methods""" @@ -1979,17 +2080,18 @@ def get_finished(self) -> tuple[set[str], set[str]]: ) in block_ids_for_blocksize_post_process.items(): self.post_process_device_kv_on_receive(block_size_ratio, block_ids_list) - # Handle timeout to avoid stranding blocks on remote. + # Handle timeout: free P-side KV blocks for requests whose lease expired. + # Full scan (not early-break) since lease refreshes may change expiry + # order arbitrarily. now = time.perf_counter() - while self._reqs_to_send: - req_id, expires = next(iter(self._reqs_to_send.items())) - # Sorted dict, oldest requests are put first so we can exit early. - if now < expires: - break + expired = [ + req_id for req_id, expires in self._reqs_to_send.items() if now >= expires + ] + for req_id in expired: count = self.consumer_notification_counts_by_req.pop(req_id, 0) self.xfer_stats.record_kv_expired_req() logger.warning( - "Releasing expired KV blocks for request %s which were " + "Releasing expired KV blocks for request %s which were not " "retrieved by %d decode worker(s) within %d seconds.", req_id, count, @@ -2166,6 +2268,16 @@ def start_load_kv(self, metadata: NixlConnectorMetadata): if req_id in self._reqs_to_process: self._reqs_to_send[req_id] = expiration_time + # Apply lease refresh expiry updates for requests already tracked. + for req_id, new_expiry in metadata.reqs_to_refresh.items(): + if req_id in self._reqs_to_send: + self._reqs_to_send[req_id] = new_expiry + logger.debug( + "Lease refreshed for request %s, expires in %ds", + req_id, + envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT, + ) + def _read_blocks_for_req(self, req_id: str, meta: ReqMeta): assert meta.remote is not None and self.kv_topo is not None remote_ranks = self.kv_topo.get_target_remote_ranks_from_engine_id( diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index d76a7446d2a9..94763881118b 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -188,6 +188,12 @@ def build_app( register_models_api_router(app) + from vllm.entrypoints.openai.nixl_router import ( + attach_router as attach_nixl_router, + ) + + attach_nixl_router(app) + from vllm.entrypoints.sagemaker.api_router import ( attach_router as register_sagemaker_api_router, ) diff --git a/vllm/entrypoints/openai/nixl_router.py b/vllm/entrypoints/openai/nixl_router.py new file mode 100644 index 000000000000..650eb6b81948 --- /dev/null +++ b/vllm/entrypoints/openai/nixl_router.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import json + +from fastapi import APIRouter, FastAPI, Request +from fastapi.responses import Response + +from vllm.logger import init_logger + +logger = init_logger(__name__) + +router = APIRouter() + + +@router.post("/internal/nixl/lease_refresh") +async def nixl_lease_refresh(raw_request: Request) -> Response: + """Receive KV lease refresh requests from D workers. + + D workers POST here periodically while requests are queued, before the + NIXL transfer begins, to prevent P from expiring and freeing KV blocks + prematurely. + """ + try: + body = await raw_request.json() + request_ids: list[str] = body.get("request_ids", []) + except (json.JSONDecodeError, Exception) as e: + logger.warning("nixl_lease_refresh: failed to parse request body: %s", e) + return Response(status_code=400) + + engine_client = raw_request.app.state.engine_client + await engine_client.call_utility_async("nixl_lease_refresh", request_ids) + return Response(status_code=200) + + +def attach_router(app: FastAPI) -> None: + app.include_router(router) diff --git a/vllm/envs.py b/vllm/envs.py index 864ea6649a49..e5904d0d079a 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -191,6 +191,7 @@ VLLM_ROCM_QUICK_REDUCE_CAST_BF16_TO_FP16: bool = True VLLM_ROCM_QUICK_REDUCE_MAX_SIZE_BYTES_MB: int | None = None VLLM_NIXL_ABORT_REQUEST_TIMEOUT: int = 480 + VLLM_NIXL_HTTP_PORT: int = 8000 VLLM_MORIIO_CONNECTOR_READ_MODE: bool = False VLLM_MORIIO_QP_PER_TRANSFER: int = 1 VLLM_MORIIO_POST_BATCH_SIZE: int = -1 @@ -1378,6 +1379,9 @@ def _get_or_set_default() -> str: "VLLM_NIXL_ABORT_REQUEST_TIMEOUT": lambda: int( os.getenv("VLLM_NIXL_ABORT_REQUEST_TIMEOUT", "480") ), + # HTTP port of the P-side vLLM OpenAI server, used by D to send lease + # refresh requests to keep KV blocks alive while requests are queued. + "VLLM_NIXL_HTTP_PORT": lambda: int(os.getenv("VLLM_NIXL_HTTP_PORT", "8000")), # Controls the read mode for the Mori-IO connector "VLLM_MORIIO_CONNECTOR_READ_MODE": lambda: ( os.getenv("VLLM_MORIIO_CONNECTOR_READ_MODE", "False").lower() in ("true", "1") diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 4de3e4ea7d3a..07f2c6e9bf9f 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -576,6 +576,17 @@ def reset_prefix_cache( reset_running_requests, reset_connector ) + def nixl_lease_refresh(self, request_ids: list[str]) -> None: + """Refresh the KV block lease for pending remote-prefill requests. + + Called from the P-side OpenAI API server when D workers POST + /internal/nixl/lease_refresh. Handled synchronously within the + engine loop so no locking is required. + """ + connector = self.scheduler.get_kv_connector() + if connector is not None: + connector.refresh_lease(request_ids) + def reset_encoder_cache(self) -> None: """Reset the encoder cache to invalidate all cached encoder outputs. From 934224a2154cc79df2abc7bfed1a6a0bb59efb37 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 2 Mar 2026 21:39:53 -0500 Subject: [PATCH 2/5] humans are still needed to write code Signed-off-by: Robert Shaw --- .../kv_transfer/kv_connector/v1/base.py | 83 +++++++++++-- .../kv_connector/v1/nixl_connector.py | 112 +----------------- .../kv_connector/v1/p2p/p2p_nccl_connector.py | 1 - vllm/envs.py | 4 - vllm/v1/core/sched/scheduler.py | 5 + 5 files changed, 82 insertions(+), 123 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py index 623e8cf1fa76..03c449ef206c 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py @@ -38,11 +38,15 @@ ids of requests that have completed async sending/recving. """ +import asyncio import enum +import threading +import time from abc import ABC, abstractmethod from collections.abc import Callable, Iterable from typing import TYPE_CHECKING, Any, Literal +import aiohttp import torch from vllm.logger import init_logger @@ -167,6 +171,7 @@ def __init__( "Initializing KVConnectorBase_V1. This API is experimental and " "subject to change in the future as we iterate the design." ) + self._reqs_need_lease: set[Request] = set() self._connector_metadata: KVConnectorMetadata | None = None self._vllm_config = vllm_config if vllm_config.kv_transfer_config is not None: @@ -183,6 +188,22 @@ def __init__( ) self._role = role + if role == KVConnectorRole.SCHEDULER: + self._last_lease_refresh_time = time.perf_counter() + self._lease_refresh_loop = asyncio.new_event_loop() + + def background_loop(loop: asyncio.AbstractEventLoop): + asyncio.set_event_loop(loop) + loop.run_forever() + + self._lease_refresh_thread = threading.Thread( + target=background_loop, + args=(self._lease_refresh_loop,), + daemon=True, + name="kv-lease-refresh-thread", + ) + self._lease_refresh_thread.start() + @property def role(self) -> KVConnectorRole: return self._role @@ -413,6 +434,57 @@ def get_handshake_metadata(self) -> KVConnectorHandshakeMetadata | None: # Scheduler-side methods # ============================== + def add_request(self, request: "Request"): + """ + Add a request to the connector's state. This is called when a new + request is added to the scheduler, and can be used by the connector + to track the requests it needs to handle. + + Args: + request (Request): the request object. + """ + if request.kv_transfer_params is not None and hasattr( + request.kv_transfer_params, "remote_engine_id" + ): + self._reqs_need_lease.add(request.request_id) + + def refresh_leases(self): + """ + Refresh the leases for requests that need it. This is called periodically + by the scheduler to ensure that the connector can maintain any necessary + leases for the requests it is handling. + """ + + LEASE_REFRESH_TIME_S = 5 + if time.perf_counter() - self._last_lease_refresh_time < LEASE_REFRESH_TIME_S: + return + + async def _http_lease_refresh(request_id: str): + async with aiohttp.ClientSession() as session: + url = "http://localhost:7000/refresh_kv_lease" + async with session.post( + url, json={"request_id": request_id} + ) as response: + print(f"[BG] [{request_id}] {url} -> {response.status}") + + for request_id in self._reqs_need_lease: + # TODO: get result of the future and check if the remote engine + # is still running so we can avoid KV transfer failure. + _ = asyncio.run_coroutine_threadsafe( + _http_lease_refresh(request_id), self._lease_refresh_loop + ) + self._last_lease_refresh_time = time.perf_counter() + + def finish_lease_refresh(self, request_id: str): + """ + Stop lease refresh for a request. This is called when a request is finished + to stop refreshing its lease. + + Args: + request_id (str): the ID of the request. + """ + self._reqs_need_lease.discard(request_id) + @abstractmethod def get_num_new_matched_tokens( self, @@ -513,6 +585,9 @@ def request_finished( Optional KVTransferParams to be included in the request outputs returned by the engine. """ + # NOTE(rob): we need to ensure all subclasses call this super() method, + # else we will get a leak from not cleaning up _reqs_need_lease. + self._reqs_need_lease.discard(request.request_id) return False, None def take_events(self) -> Iterable["KVCacheEvent"]: @@ -592,14 +667,6 @@ def build_prom_metrics( """ return None - def refresh_lease(self, request_ids: list[str]) -> None: - """Refresh the KV lease for the given request IDs. No-op by default. - - Called from the P-side API server when D workers POST - /internal/nixl/lease_refresh to extend the KV block hold time. - """ - return - def reset_cache(self) -> bool | None: """ Reset the connector's internal cache. diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 4ffce2641583..995386752371 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -395,6 +395,8 @@ def request_finished( request: "Request", block_ids: list[int], ) -> tuple[bool, dict[str, Any] | None]: + # NOTE(rob): we need to ensure the subclasses all call this. + super().request_finished(request, block_ids) assert self.connector_scheduler is not None return self.connector_scheduler.request_finished(request, block_ids) @@ -410,15 +412,6 @@ def set_xfer_handshake_metadata( assert self.connector_scheduler is not None self.connector_scheduler.set_xfer_handshake_metadata(metadata) - def refresh_lease(self, request_ids: list[str]) -> None: - """Refresh the KV block lease for the given requests. - - Called from the P-side EngineCore when D workers POST lease refreshes - via /internal/nixl/lease_refresh. - """ - assert self.connector_scheduler is not None - self.connector_scheduler.refresh_lease(request_ids) - ############################################################ # Worker Side Methods ############################################################ @@ -549,9 +542,6 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self._encoded_xfer_handshake_metadata: dict[int, Any] = {} self._stop_event = threading.Event() - # kv_transfer_params fields for D workers to find this P's HTTP server - self.http_port = envs.VLLM_NIXL_HTTP_PORT - # Requests that need to start recv/send. # New requests are added by update_state_after_alloc in # the scheduler. Used to make metadata passed to Worker. @@ -564,28 +554,11 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): # remote prefill or aborted. self._reqs_not_processed: set[ReqId] = set() - # P-side: pending lease refresh updates to pass to worker next step - self._lease_refreshes: dict[ReqId, float] = {} - - # D-side: requests waiting to be scheduled that need P KV lease refresh - # req_id -> (P_host, P_http_port) - self._requires_lease_dict: dict[ReqId, tuple[str, int]] = {} - self._requires_lease_lock = threading.Lock() - self._stop_lease_event = threading.Event() - self._lease_refresh_thread = threading.Thread( - target=self._lease_refresh_loop, - daemon=True, - name="nixl-d-lease-refresh", - ) - self._lease_refresh_thread.start() - def shutdown(self): self._stop_event.set() - self._stop_lease_event.set() if self._nixl_handshake_listener_t is not None: self._nixl_handshake_listener_t.join() self._nixl_handshake_listener_t = None - self._lease_refresh_thread.join(timeout=3) def set_xfer_handshake_metadata( self, metadata: dict[int, KVConnectorHandshakeMetadata] @@ -695,20 +668,6 @@ def get_num_new_matched_tokens( token_ids = request.prompt_token_ids or [] count = len(token_ids) - num_computed_tokens if count > 0: - # Track this request for D-side lease refresh while it is - # waiting to be scheduled. Only add once (this may be called - # multiple times before scheduling). - with self._requires_lease_lock: - if request.request_id not in self._requires_lease_dict: - remote_host = params.get("remote_host", "") - remote_http_port = params.get( - "remote_http_port", envs.VLLM_NIXL_HTTP_PORT - ) - if remote_host: - self._requires_lease_dict[request.request_id] = ( - remote_host, - remote_http_port, - ) return count, True # No remote prefill for this request. @@ -769,9 +728,6 @@ def update_state_after_alloc( assert num_external_tokens == 0 # Only trigger 1 KV transfer per request. params["do_remote_prefill"] = False - # Request is being scheduled — no longer needs D-side lease refresh. - with self._requires_lease_lock: - self._requires_lease_dict.pop(request.request_id, None) def build_connector_meta( self, @@ -816,7 +772,6 @@ def build_connector_meta( self._reqs_need_save.pop(req_id) meta.reqs_to_send = self._reqs_need_send - meta.reqs_to_refresh = self._lease_refreshes meta.reqs_in_batch = self._reqs_in_batch meta.reqs_not_processed = self._reqs_not_processed @@ -825,7 +780,6 @@ def build_connector_meta( self._reqs_in_batch = set() self._reqs_not_processed = set() self._reqs_need_send = {} - self._lease_refreshes = {} return meta @@ -896,61 +850,9 @@ def request_finished( remote_request_id=request.request_id, remote_host=self.side_channel_host, remote_port=self.side_channel_port, - remote_http_port=self.http_port, tp_size=self.vllm_config.parallel_config.tensor_parallel_size, ) - def refresh_lease(self, request_ids: list[str]) -> None: - """Update lease expiry for the given requests. - - Called from P's EngineCore utility handler when D workers POST - /internal/nixl/lease_refresh. Runs in the engine loop thread so no - locking is required for _lease_refreshes. - """ - new_expiry = time.perf_counter() + envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT - for req_id in request_ids: - self._lease_refreshes[req_id] = new_expiry - - def _lease_refresh_loop(self) -> None: - """D-side background thread: periodically POST lease refresh to P.""" - import json - import urllib.request as urlreq - - refresh_interval = max(envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT // 3, 1) - while not self._stop_lease_event.wait(timeout=refresh_interval): - with self._requires_lease_lock: - if not self._requires_lease_dict: - continue - snapshot = dict(self._requires_lease_dict) - - # Group by (P_host, P_http_port) - groups: dict[tuple[str, int], list[str]] = defaultdict(list) - for req_id, endpoint in snapshot.items(): - groups[endpoint].append(req_id) - - for (host, http_port), req_ids in groups.items(): - try: - body = json.dumps({"request_ids": req_ids}).encode() - url = f"http://{host}:{http_port}/internal/nixl/lease_refresh" - req = urlreq.Request(url, data=body, method="POST") - req.add_header("Content-Type", "application/json") - with urlreq.urlopen(req, timeout=5) as resp: - if resp.status != 200: - raise RuntimeError(f"HTTP {resp.status}") - logger.debug( - "Refreshed lease for %d request(s) at %s:%d", - len(req_ids), - host, - http_port, - ) - except Exception as e: - logger.warning( - "Failed to refresh KV lease with P at %s:%d: %s", - host, - http_port, - e, - ) - class NixlConnectorWorker: """Implementation of Worker side methods""" @@ -2268,16 +2170,6 @@ def start_load_kv(self, metadata: NixlConnectorMetadata): if req_id in self._reqs_to_process: self._reqs_to_send[req_id] = expiration_time - # Apply lease refresh expiry updates for requests already tracked. - for req_id, new_expiry in metadata.reqs_to_refresh.items(): - if req_id in self._reqs_to_send: - self._reqs_to_send[req_id] = new_expiry - logger.debug( - "Lease refreshed for request %s, expires in %ds", - req_id, - envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT, - ) - def _read_blocks_for_req(self, req_id: str, meta: ReqMeta): assert meta.remote is not None and self.kv_topo is not None remote_ranks = self.kv_topo.get_target_remote_ranks_from_engine_id( diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py index 3be1be18e534..f1786c60cba2 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py @@ -490,7 +490,6 @@ def request_finished( Optional KVTransferParams to be included in the request outputs returned by the engine. """ - self.chunked_prefill.pop(request.request_id, None) return False, None diff --git a/vllm/envs.py b/vllm/envs.py index e5904d0d079a..864ea6649a49 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -191,7 +191,6 @@ VLLM_ROCM_QUICK_REDUCE_CAST_BF16_TO_FP16: bool = True VLLM_ROCM_QUICK_REDUCE_MAX_SIZE_BYTES_MB: int | None = None VLLM_NIXL_ABORT_REQUEST_TIMEOUT: int = 480 - VLLM_NIXL_HTTP_PORT: int = 8000 VLLM_MORIIO_CONNECTOR_READ_MODE: bool = False VLLM_MORIIO_QP_PER_TRANSFER: int = 1 VLLM_MORIIO_POST_BATCH_SIZE: int = -1 @@ -1379,9 +1378,6 @@ def _get_or_set_default() -> str: "VLLM_NIXL_ABORT_REQUEST_TIMEOUT": lambda: int( os.getenv("VLLM_NIXL_ABORT_REQUEST_TIMEOUT", "480") ), - # HTTP port of the P-side vLLM OpenAI server, used by D to send lease - # refresh requests to keep KV blocks alive while requests are queued. - "VLLM_NIXL_HTTP_PORT": lambda: int(os.getenv("VLLM_NIXL_HTTP_PORT", "8000")), # Controls the read mode for the Mori-IO connector "VLLM_MORIIO_CONNECTOR_READ_MODE": lambda: ( os.getenv("VLLM_MORIIO_CONNECTOR_READ_MODE", "False").lower() in ("true", "1") diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index bf397ad681ca..fe7b745af341 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1676,6 +1676,8 @@ def add_request(self, request: Request) -> None: if request.resumable: request.streaming_queue = deque() self.waiting.add_request(request) + if self.connector is not None: + self.connector.add_request(request) self.requests[request.request_id] = request if self.log_stats: request.record_event(EngineCoreEventType.QUEUED) @@ -1976,6 +1978,9 @@ def _update_waiting_for_remote_kv(self, request: Request) -> bool: if request.request_id not in self.finished_recving_kv_req_ids: return False + # Stop KV lease refresh for this request as we are done. + self.connector.finish_lease_refresh(request.request_id) + if request.request_id in self.failed_recving_kv_req_ids: # Request had KV load failures; num_computed_tokens was already # updated in _update_requests_with_invalid_blocks From 4b554d19ab145b0231b3e2af77eceffd1fd338b9 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 2 Mar 2026 21:45:22 -0500 Subject: [PATCH 3/5] update from nixl to internal Signed-off-by: Robert Shaw --- vllm/entrypoints/openai/api_server.py | 6 +++--- .../openai/{nixl_router.py => internal.py} | 14 ++++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) rename vllm/entrypoints/openai/{nixl_router.py => internal.py} (63%) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 94763881118b..a9bdad47c326 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -188,11 +188,11 @@ def build_app( register_models_api_router(app) - from vllm.entrypoints.openai.nixl_router import ( - attach_router as attach_nixl_router, + from vllm.entrypoints.openai.internal import ( + attach_router as attach_internal_router, ) - attach_nixl_router(app) + attach_internal_router(app) from vllm.entrypoints.sagemaker.api_router import ( attach_router as register_sagemaker_api_router, diff --git a/vllm/entrypoints/openai/nixl_router.py b/vllm/entrypoints/openai/internal.py similarity index 63% rename from vllm/entrypoints/openai/nixl_router.py rename to vllm/entrypoints/openai/internal.py index 650eb6b81948..7ea049a76da9 100644 --- a/vllm/entrypoints/openai/nixl_router.py +++ b/vllm/entrypoints/openai/internal.py @@ -13,23 +13,25 @@ router = APIRouter() -@router.post("/internal/nixl/lease_refresh") -async def nixl_lease_refresh(raw_request: Request) -> Response: +@router.post("/internal/kv_connector_refresh_lease") +async def kv_connector_refresh_lease(raw_request: Request) -> Response: """Receive KV lease refresh requests from D workers. D workers POST here periodically while requests are queued, before the - NIXL transfer begins, to prevent P from expiring and freeing KV blocks + KV transfer begins, to prevent P from expiring and freeing KV blocks prematurely. """ try: body = await raw_request.json() - request_ids: list[str] = body.get("request_ids", []) + request_id: str = body.get("request_id") except (json.JSONDecodeError, Exception) as e: - logger.warning("nixl_lease_refresh: failed to parse request body: %s", e) + logger.warning( + "kv_connector_refresh_lease: failed to parse request body: %s", e + ) return Response(status_code=400) engine_client = raw_request.app.state.engine_client - await engine_client.call_utility_async("nixl_lease_refresh", request_ids) + await engine_client.call_utility_async("kv_connector_refresh_lease", request_id) return Response(status_code=200) From a250ae3345f78358f7c148cafe3b2d3ec560a42b Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 2 Mar 2026 23:03:12 -0500 Subject: [PATCH 4/5] refactor a bit Signed-off-by: Robert Shaw --- .../kv_transfer/kv_connector/v1/base.py | 9 +++- .../kv_connector/v1/nixl_connector.py | 44 ++++++++++++++++--- vllm/v1/engine/core.py | 11 +++-- 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py index 03c449ef206c..305ab1d2ed16 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py @@ -443,8 +443,10 @@ def add_request(self, request: "Request"): Args: request (Request): the request object. """ - if request.kv_transfer_params is not None and hasattr( - request.kv_transfer_params, "remote_engine_id" + if ( + request.kv_transfer_params is not None + and hasattr(request.kv_transfer_params, "remote_engine_id") + and getattr(request.kv_transfer_params, "do_remote_prefill", False) ): self._reqs_need_lease.add(request.request_id) @@ -485,6 +487,9 @@ def finish_lease_refresh(self, request_id: str): """ self._reqs_need_lease.discard(request_id) + def handle_refresh_lease(self, request_id: str): + return + @abstractmethod def get_num_new_matched_tokens( self, diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 995386752371..76fdffb56749 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -256,7 +256,7 @@ def __init__(self): self.reqs_to_recv: dict[ReqId, ReqMeta] = {} self.reqs_to_save: dict[ReqId, ReqMeta] = {} self.reqs_to_send: dict[ReqId, float] = {} - self.reqs_to_refresh: dict[ReqId, float] = {} # lease refresh expiry updates + self.reqs_to_refresh: set[ReqId] = set() self.reqs_in_batch: set[ReqId] = set() self.reqs_not_processed: set[ReqId] = set() @@ -282,6 +282,12 @@ def add_new_req_to_save( local_block_ids, kv_transfer_params ) + def add_new_req_to_refresh( + self, + request_id: ReqId, + ): + self.reqs_to_refresh.add(request_id) + def add_new_req_to_recv( self, request_id: ReqId, @@ -367,6 +373,12 @@ def get_required_kvcache_layout(cls, vllm_config: VllmConfig): # Scheduler Side Methods ############################################################ + def handle_refresh_lease(self, request_id: str): + assert self.connector_scheduler is not None + return self.connector_scheduler.handle_refresh_lease( + request_id, + ) + def get_num_new_matched_tokens( self, request: "Request", num_computed_tokens: int ) -> tuple[int | None, bool]: @@ -436,7 +448,8 @@ def set_host_xfer_buffer_ops(self, copy_operation: CopyBlocksOp): def get_finished(self, finished_req_ids: set[str]) -> tuple[set[str], set[str]]: """Get the finished recving and sending requests.""" assert self.connector_worker is not None - return self.connector_worker.get_finished() + assert isinstance(self._connector_metadata, NixlConnectorMetadata) + return self.connector_worker.get_finished(self._connector_metadata) def get_block_ids_with_load_errors(self) -> set[int]: """Get block IDs that failed to load via NIXL.""" @@ -549,6 +562,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self._reqs_need_save: dict[ReqId, Request] = {} # Reqs to send and their expiration time self._reqs_need_send: dict[ReqId, float] = {} + self._reqs_need_refresh: set[ReqId] = set() self._reqs_in_batch: set[ReqId] = set() # Reqs to remove from processed set because they're not to send after # remote prefill or aborted. @@ -560,6 +574,11 @@ def shutdown(self): self._nixl_handshake_listener_t.join() self._nixl_handshake_listener_t = None + def handle_refresh_lease(self, request_id: str): + # We will refresh the lease by extending the expiration time. + # TODO: should check if in reqs_need_send? + self._reqs_need_refresh.add(request_id) + def set_xfer_handshake_metadata( self, metadata: dict[int, KVConnectorHandshakeMetadata] ) -> None: @@ -780,6 +799,7 @@ def build_connector_meta( self._reqs_in_batch = set() self._reqs_not_processed = set() self._reqs_need_send = {} + self._reqs_need_refresh = set() return meta @@ -992,6 +1012,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self._recving_transfers = defaultdict[ReqId, list[TransferHandle]](list) # Track the expiration time of requests that are waiting to be sent. self._reqs_to_send: dict[ReqId, float] = {} + self._reqs_to_refresh: set[ReqId] = set() # Set of requests that have been part of a batch, regardless of status. self._reqs_to_process: set[ReqId] = set() @@ -1934,7 +1955,9 @@ def post_process_device_kv_on_receive( cache, indices, block_size_ratio ) - def get_finished(self) -> tuple[set[str], set[str]]: + def get_finished( + self, connector_metadata: NixlConnectorMetadata + ) -> tuple[set[str], set[str]]: """ Get requests that are done sending or recving on this specific worker. The scheduler process (via the MultiprocExecutor) will use this output @@ -1986,10 +2009,17 @@ def get_finished(self) -> tuple[set[str], set[str]]: # Full scan (not early-break) since lease refreshes may change expiry # order arbitrarily. now = time.perf_counter() - expired = [ - req_id for req_id, expires in self._reqs_to_send.items() if now >= expires - ] - for req_id in expired: + for req_id in self._reqs_to_send: + # If we have a lease refresh, update it. + if req_id in connector_metadata.reqs_to_refresh: + self._reqs_to_send[req_id] = now + envs.VLLM_NIXL_ABORT_REQUEST_TIMEOUT + + # Lease not yet expired, continue. + expires_t = self._reqs_to_send[req_id] + if expires_t > now: + continue + + # Lease expired, free it and push back to scheduler. count = self.consumer_notification_counts_by_req.pop(req_id, 0) self.xfer_stats.record_kv_expired_req() logger.warning( diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 07f2c6e9bf9f..9551b3b1c9ec 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -576,16 +576,15 @@ def reset_prefix_cache( reset_running_requests, reset_connector ) - def nixl_lease_refresh(self, request_ids: list[str]) -> None: - """Refresh the KV block lease for pending remote-prefill requests. + def kv_connector_refresh_lease(self, request_id: str) -> None: + """Refresh the KV block lease for pending remote-decode requests. - Called from the P-side OpenAI API server when D workers POST - /internal/nixl/lease_refresh. Handled synchronously within the - engine loop so no locking is required. + Called on P-side OpenAI API server when D workers POST + /internal/kv_connector_refresh_lease. """ connector = self.scheduler.get_kv_connector() if connector is not None: - connector.refresh_lease(request_ids) + connector.handle_refresh_lease(request_id) def reset_encoder_cache(self) -> None: """Reset the encoder cache to invalidate all cached encoder outputs. From 275da3ca98df914e64ae400ecb3f5596cc486f92 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 2 Mar 2026 23:03:59 -0500 Subject: [PATCH 5/5] revert spurious change Signed-off-by: Robert Shaw --- .../kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py index f1786c60cba2..3be1be18e534 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py @@ -490,6 +490,7 @@ def request_finished( Optional KVTransferParams to be included in the request outputs returned by the engine. """ + self.chunked_prefill.pop(request.request_id, None) return False, None