From c8fdf9fec11818017f442d8749cd7b364f9e15ed Mon Sep 17 00:00:00 2001 From: Jade Zheng Date: Fri, 27 Feb 2026 19:44:28 +0800 Subject: [PATCH 1/2] [Core] Proactively free KV cache blocks when aborting finished requests This is an enhancement to PR #25067 which ignored aborts on finished requests and relied on timeout-based cleanup. Instead of waiting for the connector timeout to free blocks, immediately free them when receiving FINISHED_ABORTED for an already-finished request. This enables earlier KV cache memory reclamation, which is especially important under heavy load in multi-node scenarios where memory pressure is high. Signed-off-by: Jade Zheng --- .../unit/test_remote_decode_lifecycle.py | 2 +- vllm/v1/core/sched/scheduler.py | 23 ++++++++++++++++--- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/tests/v1/kv_connector/unit/test_remote_decode_lifecycle.py b/tests/v1/kv_connector/unit/test_remote_decode_lifecycle.py index b2ec2ddfb64d..3a3155903c42 100644 --- a/tests/v1/kv_connector/unit/test_remote_decode_lifecycle.py +++ b/tests/v1/kv_connector/unit/test_remote_decode_lifecycle.py @@ -250,7 +250,7 @@ def test_abort_during_kv_transfer(): # Abort the request, and check the blocks are still not freed scheduler.finish_requests([request.request_id], RequestStatus.FINISHED_ABORTED) - assert len(scheduler.requests) == 1 + assert_scheduler_empty(scheduler) # Simulate a finished sending notification scheduler_output = scheduler.schedule() diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index bf397ad681ca..1e8100e5711d 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1709,10 +1709,21 @@ def finish_requests( # First pass: collect requests to remove from queues for req_id in request_ids: request = self.requests.get(req_id) - if request is None or request.is_finished(): + if request is None: # Invalid request ID. continue + if request.is_finished(): + # If the request is already finished, only FINISHED_ABORTED is + # allowed, which is used to force resource cleanup. + assert finished_status == RequestStatus.FINISHED_ABORTED, ( + "Only FINISHED_ABORTED is allowed for requests that are " + "already finished." + ) + logger.info("Aborting finished request %s, freeing blocks.", req_id) + self._free_blocks(request) + continue + valid_requests.append(request) if request.status == RequestStatus.RUNNING: running_requests_to_remove.add(request) @@ -2032,8 +2043,14 @@ def _update_from_kv_xfer_finished(self, kv_connector_output: KVConnectorOutput): self._free_blocks(self.requests[req_id]) for req_id in kv_connector_output.finished_sending or (): logger.debug("Finished sending KV transfer for request %s", req_id) - assert req_id in self.requests - self._free_blocks(self.requests[req_id]) + if req_id not in self.requests: + logger.warning( + "Got finished sending KV transfer for request %s," + "but the request is already freed.", + req_id, + ) + else: + self._free_blocks(self.requests[req_id]) def _update_requests_with_invalid_blocks( self, From 753add1be1db05e5c41fca495c09787969604539 Mon Sep 17 00:00:00 2001 From: Shoujian Zheng Date: Thu, 5 Mar 2026 11:24:15 +0800 Subject: [PATCH 2/2] [Core] Fix abort after finished to maintain scheduler-connector contract This addresses review feedback for PR #35506. The original implementation broke the contract between scheduler and connector by directly freeing blocks without notifying the connector. Changes: - Scheduler: Set request.status to FINISHED_ABORTED and call connector for cleanup instead of immediately freeing blocks - NixlConnectorScheduler: Detect FINISHED_ABORTED status and mark for cleanup via finished_sending mechanism - NixlConnectorMetadata: Add reqs_abort_done field for worker communication - NixlConnectorWorker: Track aborted requests and report via finished_sending This ensures: - The scheduler-connector contract is maintained - Connector participates in cleanup process - Blocks are freed through the established finished_sending mechanism Signed-off-by: Shoujian Zheng --- .../unit/test_remote_decode_lifecycle.py | 15 ++++++--- .../kv_connector/v1/nixl_connector.py | 33 +++++++++++++++++++ vllm/v1/core/sched/scheduler.py | 22 +++++++------ 3 files changed, 56 insertions(+), 14 deletions(-) diff --git a/tests/v1/kv_connector/unit/test_remote_decode_lifecycle.py b/tests/v1/kv_connector/unit/test_remote_decode_lifecycle.py index 3a3155903c42..b8cc1a83678f 100644 --- a/tests/v1/kv_connector/unit/test_remote_decode_lifecycle.py +++ b/tests/v1/kv_connector/unit/test_remote_decode_lifecycle.py @@ -248,15 +248,22 @@ def test_abort_during_kv_transfer(): # Request removed from PB but blocks should not be freed. assert len(scheduler.requests) == 1 - # Abort the request, and check the blocks are still not freed + # Abort the request. Since the request is already finished + # (FINISHED_LENGTH_CAPPED), this becomes an "abort after finished" scenario. + # Blocks will NOT be freed immediately; instead they wait for the connector + # to report finished_sending. scheduler.finish_requests([request.request_id], RequestStatus.FINISHED_ABORTED) - assert_scheduler_empty(scheduler) - # Simulate a finished sending notification + # After abort, the request should still exist (waiting for finished_sending). + # This is the new behavior for "abort after finished" scenario. + assert len(scheduler.requests) == 1 + assert request.status == RequestStatus.FINISHED_ABORTED + + # Simulate a finished sending notification - now blocks will be freed scheduler_output = scheduler.schedule() model_runner_output = copy.deepcopy(EMPTY_MODEL_RUNNER_OUTPUT) model_runner_output.kv_connector_output = KVConnectorOutput( - finished_sending=[request.request_id] + finished_sending=set([request.request_id]) ) scheduler.update_from_output(scheduler_output, model_runner_output) assert_scheduler_empty(scheduler) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index c5a5b0450fbb..cd6b1086ecf9 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -261,6 +261,7 @@ def __init__(self): self.reqs_to_send: dict[ReqId, float] = {} self.reqs_in_batch: set[ReqId] = set() self.reqs_not_processed: set[ReqId] = set() + self.reqs_abort_done: set[ReqId] = set() def _add_new_req( self, @@ -553,6 +554,8 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): # Reqs to remove from processed set because they're not to send after # remote prefill or aborted. self._reqs_not_processed: set[ReqId] = set() + # Reqs that were aborted after finished and need cleanup. + self._reqs_abort_done: set[ReqId] = set() def shutdown(self): self._stop_event.set() @@ -774,12 +777,14 @@ def build_connector_meta( meta.reqs_to_send = self._reqs_need_send meta.reqs_in_batch = self._reqs_in_batch meta.reqs_not_processed = self._reqs_not_processed + meta.reqs_abort_done = self._reqs_abort_done # Clear the list once workers start the transfers self._reqs_need_recv.clear() self._reqs_in_batch = set() self._reqs_not_processed = set() self._reqs_need_send = {} + self._reqs_abort_done = set() return meta @@ -794,6 +799,24 @@ def request_finished( """ from vllm.v1.request import RequestStatus + # Check if this is an abort after finished case. + if request.status == RequestStatus.FINISHED_ABORTED: + # Request was already finished and is now being aborted. + # Clean up state and mark for immediate reporting via + # finished_sending to unblock block freeing. + req_id = request.request_id + logger.debug( + "NIXLConnector request_finished(%s): abort after finished, " + "marking for cleanup via finished_sending", + req_id, + ) + self._reqs_not_processed.add(req_id) + self._reqs_need_send.pop(req_id, None) + self._reqs_abort_done.add(req_id) + # Don't delay free blocks - will be freed when finished_sending + # is reported from worker. + return False, None + params = request.kv_transfer_params logger.debug( "NIXLConnector request_finished(%s), request_status=%s, " @@ -999,6 +1022,8 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self._invalid_block_ids: set[int] = set() # requests that skipped transfer (handshake or transfer failures) self._failed_recv_reqs: set[ReqId] = set() + # requests that were aborted after finished + self._aborted_reqs: set[ReqId] = set() # Handshake metadata of this worker for NIXL transfers. self.xfer_handshake_metadata: NixlHandshakePayload | None = None @@ -2002,6 +2027,10 @@ def get_finished(self) -> tuple[set[str], set[str]]: del self._reqs_to_send[req_id] done_sending.add(req_id) + # Add aborted requests (abort after finished) to done_sending. + done_sending.update(self._aborted_reqs) + self._aborted_reqs.clear() + return done_sending, done_recving def _get_new_notifs(self) -> set[str]: @@ -2169,6 +2198,10 @@ def start_load_kv(self, metadata: NixlConnectorMetadata): if req_id in self._reqs_to_process: self._reqs_to_send[req_id] = expiration_time + # Handle aborted requests (abort after finished). + # These will be reported as done_sending immediately. + self._aborted_reqs.update(metadata.reqs_abort_done) + def _read_blocks_for_req(self, req_id: str, meta: ReqMeta): assert meta.remote is not None and self.kv_topo is not None remote_ranks = self.kv_topo.get_target_remote_ranks_from_engine_id( diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 1e8100e5711d..c7df0e9cd8e0 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1720,8 +1720,16 @@ def finish_requests( "Only FINISHED_ABORTED is allowed for requests that are " "already finished." ) - logger.info("Aborting finished request %s, freeing blocks.", req_id) - self._free_blocks(request) + logger.info("Aborting finished request %s.", req_id) + # Set status to FINISHED_ABORTED so connector can detect this + # case and participate in cleanup. + request.status = RequestStatus.FINISHED_ABORTED + # Notify connector to participate in cleanup. Blocks will be + # freed when connector reports finished_sending. + # A finished request can only exist in self.requests when + # connector delays block freeing (P/D scenario). + assert self.connector is not None + self._connector_finished(request) continue valid_requests.append(request) @@ -2043,14 +2051,8 @@ def _update_from_kv_xfer_finished(self, kv_connector_output: KVConnectorOutput): self._free_blocks(self.requests[req_id]) for req_id in kv_connector_output.finished_sending or (): logger.debug("Finished sending KV transfer for request %s", req_id) - if req_id not in self.requests: - logger.warning( - "Got finished sending KV transfer for request %s," - "but the request is already freed.", - req_id, - ) - else: - self._free_blocks(self.requests[req_id]) + assert req_id in self.requests + self._free_blocks(self.requests[req_id]) def _update_requests_with_invalid_blocks( self,