Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3780,6 +3780,55 @@ def test_abort_request_finished_recving():
assert not scheduler.finished_recving_kv_req_ids


def test_ignore_late_finished_recving_after_abort_cleanup():
scheduler = create_scheduler(use_kv_connector=True)

# add a single request
request = create_requests(num_requests=1)[0]
scheduler.add_request(request)

# abort after recv completed but before the scheduler promotes the request
request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
scheduler.finished_recving_kv_req_ids.add(request.request_id)
scheduler.finish_requests((request.request_id,), RequestStatus.FINISHED_ABORTED)

assert request.request_id not in scheduler.requests

# a late worker callback should be ignored rather than crashing
scheduler_output = scheduler.schedule()
model_runner_output = ModelRunnerOutput(
req_ids=[],
req_id_to_index={},
kv_connector_output=KVConnectorOutput(finished_recving={request.request_id}),
)
scheduler.update_from_output(scheduler_output, model_runner_output)

assert request.request_id not in scheduler.requests
assert not scheduler.finished_recving_kv_req_ids


def test_ignore_late_finished_sending_after_request_cleanup():
scheduler = create_scheduler(use_kv_connector=True)

# add and finish a single request so it is fully removed
request = create_requests(num_requests=1)[0]
scheduler.add_request(request)
scheduler.finish_requests((request.request_id,), RequestStatus.FINISHED_ABORTED)

assert request.request_id not in scheduler.requests

# a stale async send completion should also be ignored
scheduler_output = scheduler.schedule()
model_runner_output = ModelRunnerOutput(
req_ids=[],
req_id_to_index={},
kv_connector_output=KVConnectorOutput(finished_sending={request.request_id}),
)
scheduler.update_from_output(scheduler_output, model_runner_output)

assert request.request_id not in scheduler.requests


# ==============================================================================
# Variable-length encoder cross-attention block allocation tests
# ==============================================================================
Expand Down
22 changes: 17 additions & 5 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2100,17 +2100,29 @@ def _update_from_kv_xfer_finished(self, kv_connector_output: KVConnectorOutput):
# KV Connector:: update recv and send status from last step.
for req_id in kv_connector_output.finished_recving or ():
logger.debug("Finished recving KV transfer for request %s", req_id)
assert req_id in self.requests
req = self.requests[req_id]
req = self.requests.get(req_id)
if req is None:
logger.debug(
"Ignoring finished recving KV transfer for unknown request %s",
req_id,
)
self.finished_recving_kv_req_ids.discard(req_id)
continue
if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
self.finished_recving_kv_req_ids.add(req_id)
else:
assert RequestStatus.is_finished(req.status)
self._free_blocks(self.requests[req_id])
self._free_blocks(req)
for req_id in kv_connector_output.finished_sending or ():
logger.debug("Finished sending KV transfer for request %s", req_id)
assert req_id in self.requests
self._free_blocks(self.requests[req_id])
req = self.requests.get(req_id)
if req is None:
logger.debug(
"Ignoring finished sending KV transfer for unknown request %s",
req_id,
)
continue
self._free_blocks(req)

def _update_requests_with_invalid_blocks(
self,
Expand Down
Loading