diff --git a/tests/v1/core/test_scheduler.py b/tests/v1/core/test_scheduler.py index 2fe45242153c..094a9eb65907 100644 --- a/tests/v1/core/test_scheduler.py +++ b/tests/v1/core/test_scheduler.py @@ -3780,6 +3780,55 @@ def test_abort_request_finished_recving(): assert not scheduler.finished_recving_kv_req_ids +def test_ignore_late_finished_recving_after_abort_cleanup(): + scheduler = create_scheduler(use_kv_connector=True) + + # add a single request + request = create_requests(num_requests=1)[0] + scheduler.add_request(request) + + # abort after recv completed but before the scheduler promotes the request + request.status = RequestStatus.WAITING_FOR_REMOTE_KVS + scheduler.finished_recving_kv_req_ids.add(request.request_id) + scheduler.finish_requests((request.request_id,), RequestStatus.FINISHED_ABORTED) + + assert request.request_id not in scheduler.requests + + # a late worker callback should be ignored rather than crashing + scheduler_output = scheduler.schedule() + model_runner_output = ModelRunnerOutput( + req_ids=[], + req_id_to_index={}, + kv_connector_output=KVConnectorOutput(finished_recving={request.request_id}), + ) + scheduler.update_from_output(scheduler_output, model_runner_output) + + assert request.request_id not in scheduler.requests + assert not scheduler.finished_recving_kv_req_ids + + +def test_ignore_late_finished_sending_after_request_cleanup(): + scheduler = create_scheduler(use_kv_connector=True) + + # add and finish a single request so it is fully removed + request = create_requests(num_requests=1)[0] + scheduler.add_request(request) + scheduler.finish_requests((request.request_id,), RequestStatus.FINISHED_ABORTED) + + assert request.request_id not in scheduler.requests + + # a stale async send completion should also be ignored + scheduler_output = scheduler.schedule() + model_runner_output = ModelRunnerOutput( + req_ids=[], + req_id_to_index={}, + kv_connector_output=KVConnectorOutput(finished_sending={request.request_id}), + ) + scheduler.update_from_output(scheduler_output, model_runner_output) + + assert request.request_id not in scheduler.requests + + # ============================================================================== # Variable-length encoder cross-attention block allocation tests # ============================================================================== diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 486ce8debc88..4297f183ebd6 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -2100,17 +2100,29 @@ def _update_from_kv_xfer_finished(self, kv_connector_output: KVConnectorOutput): # KV Connector:: update recv and send status from last step. for req_id in kv_connector_output.finished_recving or (): logger.debug("Finished recving KV transfer for request %s", req_id) - assert req_id in self.requests - req = self.requests[req_id] + req = self.requests.get(req_id) + if req is None: + logger.debug( + "Ignoring finished recving KV transfer for unknown request %s", + req_id, + ) + self.finished_recving_kv_req_ids.discard(req_id) + continue if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS: self.finished_recving_kv_req_ids.add(req_id) else: assert RequestStatus.is_finished(req.status) - self._free_blocks(self.requests[req_id]) + self._free_blocks(req) for req_id in kv_connector_output.finished_sending or (): logger.debug("Finished sending KV transfer for request %s", req_id) - assert req_id in self.requests - self._free_blocks(self.requests[req_id]) + req = self.requests.get(req_id) + if req is None: + logger.debug( + "Ignoring finished sending KV transfer for unknown request %s", + req_id, + ) + continue + self._free_blocks(req) def _update_requests_with_invalid_blocks( self,