[Feat][NIXL] Add KV lease refresh mechanism for disaggregated prefill#35764
[Feat][NIXL] Add KV lease refresh mechanism for disaggregated prefill#35764robertgshaw2-redhat wants to merge 5 commits intomainfrom
Conversation
Replace the static P-side KV block timeout with an active lease mechanism. D workers periodically POST /internal/nixl/lease_refresh to extend the hold window while requests sit in the D queue, preventing premature block expiry on bursty workloads without requiring a large static timeout. - D scheduler tracks pending remote-prefill requests in `_requires_lease_dict`; a background thread POSTs refreshes every `timeout // 3` seconds - P scheduler receives refreshes via new EngineCore utility method `nixl_lease_refresh`, stores updated expiry in `_lease_refreshes`, and passes them to the P worker through `NixlConnectorMetadata` - P worker applies refreshes in `start_load_kv` and does a full scan (not early-break) in `get_finished` since expiry order may change - New `VLLM_NIXL_HTTP_PORT` env var (default 8000) lets D locate P's HTTP server; P includes it in `kv_transfer_params` - New FastAPI route registered unconditionally in `build_app`; no-op on non-NIXL instances Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Robert Shaw <robshaw@redhat.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a KV lease refresh mechanism for disaggregated prefill in vLLM, addressing the issue of premature KV block expiry due to static timeout values. The changes include modifications to the D scheduler, P API server, P EngineCore, P scheduler, and P worker, along with a new environment variable for configuring the P's HTTP port. The implementation involves tracking requests requiring lease refresh, periodically refreshing leases from D workers, and updating lease expiry on the P side. The changes also include a full scan in get_finished to handle lease refreshes.
| with self._requires_lease_lock: | ||
| if request.request_id not in self._requires_lease_dict: | ||
| remote_host = params.get("remote_host", "") | ||
| remote_http_port = params.get( | ||
| "remote_http_port", envs.VLLM_NIXL_HTTP_PORT | ||
| ) | ||
| if remote_host: | ||
| self._requires_lease_dict[request.request_id] = ( | ||
| remote_host, | ||
| remote_http_port, | ||
| ) |
There was a problem hiding this comment.
The code adds the remote_host and remote_http_port to _requires_lease_dict if remote_host exists. However, it does not handle the case where remote_host is empty. This could lead to issues if a request is added to _requires_lease_dict without a valid remote_host, potentially causing the lease refresh mechanism to fail. It's critical to ensure that only requests with valid remote_host values are added to _requires_lease_dict to prevent unexpected behavior.
if remote_host:
self._requires_lease_dict[request.request_id] = (
remote_host,
remote_http_port,
)| with urlreq.urlopen(req, timeout=5) as resp: | ||
| if resp.status != 200: | ||
| raise RuntimeError(f"HTTP {resp.status}") |
There was a problem hiding this comment.
The code checks the HTTP status code but raises a generic RuntimeError without providing specific details about the error. This makes it difficult to debug issues related to lease refresh failures. It's critical to include the URL in the error message to provide more context for debugging.
if resp.status != 200:
raise RuntimeError(f"HTTP {resp.status} at {url}")| expired = [ | ||
| req_id for req_id, expires in self._reqs_to_send.items() if now >= expires | ||
| ] |
There was a problem hiding this comment.
The code iterates through self._reqs_to_send to identify expired requests. However, it doesn't handle potential exceptions that might occur during the iteration or within the list comprehension. This could lead to the loop terminating prematurely and not releasing all expired KV blocks. It's critical to add error handling to ensure that all expired requests are processed and their KV blocks are released, even if some requests encounter issues.
now = time.perf_counter()
expired = []
for req_id, expires in self._reqs_to_send.items():
try:
if now >= expires:
expired.append(req_id)
except Exception as e:
logger.warning(f"Error checking expiry for request {req_id}: {e}")Signed-off-by: Robert Shaw <robshaw@redhat.com>
Signed-off-by: Robert Shaw <robshaw@redhat.com>
Signed-off-by: Robert Shaw <robshaw@redhat.com>
Signed-off-by: Robert Shaw <robshaw@redhat.com>
|
This pull request has merge conflicts that must be resolved before it can be |
|
xref #38027 |
|
|
This pull request has merge conflicts that must be resolved before it can be |
Summary
VLLM_NIXL_ABORT_REQUEST_TIMEOUTon the P side causes premature KV block expiry when the D queue is bursty, and wastes memory when set large enough to be safe.Key changes
NixlConnectorScheduler): tracks alldo_remote_prefill=Truerequests in_requires_lease_dict(req_id → P host/http_port) until they are scheduled. A daemon thread (nixl-d-lease-refresh) POSTs to P's/internal/nixl/lease_refresheverytimeout // 3seconds.POST /internal/nixl/lease_refresh(registered unconditionally; no-op on non-NIXL instances) callsengine_client.call_utility_async("nixl_lease_refresh", request_ids).nixl_lease_refresh()utility method delegates toconnector.refresh_lease(), which runs in the engine loop thread so no locking is needed.NixlConnectorScheduler.refresh_lease): updates_lease_refreshes[req_id] = now + timeout; passed to the worker viaNixlConnectorMetadata.reqs_to_refresheach step.NixlConnectorWorker): applies refreshes instart_load_kv;get_finishednow does a full scan instead of early-break (expiry order is no longer guaranteed monotone after refreshes).VLLM_NIXL_HTTP_PORT(default8000): tells D where P's HTTP server is; P includesremote_http_portinkv_transfer_params.Thread safety
_requires_lease_dict_requires_lease_lock_lease_refreshesrefresh_lease)build_connector_meta)Test plan
test_abort_timeout_on_prefiller— unchanged behaviour: D never adds the request to_requires_lease_dict(noremote_host), so P expires afterVLLM_NIXL_ABORT_REQUEST_TIMEOUTas beforetest_disagg_accuracy.py— D refreshes leases while requests queue; transfer completes normally🤖 Generated with Claude Code