diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py b/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py index b843c5b59308..1a743b35d1e9 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py @@ -347,17 +347,51 @@ def add_new_req( transfer_id = kv_transfer_params["transfer_id"] # Parse host/ports from the request_id. The router embeds both zmq_addresses - # in the request_id - peer_zmq = get_peer_zmq_from_request_id(request_id, is_producer=write_mode) - remote_host, remote_handshake_port, remote_notify_port = ( - parse_moriio_zmq_address(peer_zmq) - ) - + # in the request_id. Some cleanup-path requests (e.g. those that hit the + # external prefix cache and take the "aborted before scheduling" branch + # in `request_finished`) surface a short internal request_id without + # the proxy's `___prefill_addr_...` prefix; `get_peer_zmq_from_request_id` + # raises ValueError on those. Fall back to `kv_transfer_params.remote_hosts` + # (proxy already forwards it for multi-node TP) and MoRIIO's default + # well-known ports, which match the kv_connector_extra_config the runner + # scripts pass on every container. Only the request_id parse path is + # affected; once we have a valid host/port triple we proceed as before. + try: + peer_zmq = get_peer_zmq_from_request_id(request_id, is_producer=write_mode) + remote_host, remote_handshake_port, remote_notify_port = ( + parse_moriio_zmq_address(peer_zmq) + ) + except ValueError: + # Normalize remote_hosts: callers may pass a list (per-rank host + # vector from the proxy) or a single host string from older + # proxy versions. A bare string would silently slice into "1." + # for "172.30.0.1" if we just did remote_hosts[0]. + remote_hosts_raw = kv_transfer_params.get("remote_hosts") or [] + if isinstance(remote_hosts_raw, str): + remote_hosts = [remote_hosts_raw] if remote_hosts_raw else [] + else: + remote_hosts = list(remote_hosts_raw) + if not remote_hosts: + raise ValueError( + f"MoRIIO add_new_req: could not resolve peer host/ports " + f"for {request_id!r}; neither request_id parse nor " + f"kv_transfer_params.remote_hosts provided them" + ) from None + remote_host = remote_hosts[0] + remote_handshake_port = int(MoRIIOConstants.DEFAULT_HANDSHAKE_PORT) + remote_notify_port = int(MoRIIOConstants.DEFAULT_NOTIFY_PORT) + + # Cleanup-path requests (the same "aborted before scheduling" branch + # that surfaces a short request_id) can also omit remote_block_ids + # and remote_engine_id. Use .get() defaults so we don't crash the + # same EngineCore the request_id fallback above is meant to keep + # alive — an empty block list is the correct no-op for an aborted + # request. _req = ReqMeta( transfer_id=transfer_id, local_block_ids=local_block_ids, - remote_block_ids=kv_transfer_params["remote_block_ids"], - remote_engine_id=kv_transfer_params["remote_engine_id"], + remote_block_ids=kv_transfer_params.get("remote_block_ids", []), + remote_engine_id=kv_transfer_params.get("remote_engine_id", ""), remote_host=remote_host, remote_port=remote_handshake_port, remote_handshake_port=remote_handshake_port,