Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,11 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str):
"backends", ["UCX"])
# Agent.
non_ucx_backends = [b for b in self.nixl_backends if b != "UCX"]
config = nixl_agent_config(backends=self.nixl_backends) if len(
non_ucx_backends) > 0 and nixl_agent_config is not None else None
if nixl_agent_config is None:
config = None
else:
config = nixl_agent_config(backends=self.nixl_backends) if len(
non_ucx_backends) > 0 else nixl_agent_config(num_threads=8)

self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), config)
# Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}.
Expand Down
7 changes: 6 additions & 1 deletion vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,4 +1290,9 @@ def _update_from_kv_xfer_finished(self,
self.finished_recving_kv_req_ids.add(req_id)
for req_id in (kv_connector_output.finished_sending or ()):
logger.debug("Finished sending KV transfer for request %s", req_id)
self._free_blocks(self.requests[req_id])
if req_id not in self.requests:
logger.warning(
"Got finished sending KV transfer for request %s,"
"but the request is already freed.", req_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you unit test this scenario?

In #25067 the case I tested was an abort after the prefill request had finished, but @NickLucche rightly asked (AIUI):

If P is done with the request (finished, capped length), then how is the request being aborted in P ?

Whatever the scenario is ... if it is supposed to happen, we shouldn't have a warning that is not actionable by the user. But a "why this is supposed to happen" comment would be important for maintainability. Something similar to

request = self.requests.get(req_id)
if request is None:
# The request is already finished. This can happen if the
# request is aborted while the model is executing it (e.g.,
# in pipeline parallelism).
continue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this should be a debug log.

I will post up a diff later today that:

  • adds commentary about what scenarios this can be caused by
  • adds a unit test to the scheduler tests we have for this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on this in #25067 again

else:
self._free_blocks(self.requests[req_id])