From f5d01eb224170ee0e43e221165796202bccfdbbc Mon Sep 17 00:00:00 2001 From: Chaemin Lim Date: Wed, 20 May 2026 02:55:22 +0000 Subject: [PATCH] [Bugfix] MoRIIO add_new_req: tolerate short request_ids via remote_hosts fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cycle 3 unblocked the TP=16 sym 1P1D data-plane and a run progressed 141/1319 before crashing on a second, latent issue: add_new_req raised ValueError on requests whose request_id did not match the proxy's ___prefill_addr_..._UUID convention. These short, 16-hex internal IDs showed up after extensive external-prefix-cache hits when the engine took the 'aborted before scheduling' cleanup branch in request_finished; without a parseable request_id and without explicit remote_host / remote_handshake_port / remote_notify_port in kv_transfer_params, the Cycle 1 strict resolution raised and killed EngineCore mid-eval. Add fallbacks: when remote_host is unresolved, use remote_hosts[0] (the proxy forwards it for multi-node TP); when ports are unresolved, use MoRIIO's DEFAULT_HANDSHAKE_PORT / DEFAULT_NOTIFY_PORT (matching the kv_connector_extra_config the exp scripts pass on every container). Only raise if even remote_hosts is missing — preserves Cycle 1's intent of not silently leaking producer KV blocks. Signed-off-by: Chaemin Lim # Conflicts: # vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py --- .../kv_connector/v1/moriio/moriio_common.py | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py b/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py index b843c5b59308..1a743b35d1e9 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py @@ -347,17 +347,51 @@ def add_new_req( transfer_id = kv_transfer_params["transfer_id"] # Parse host/ports from the request_id. The router embeds both zmq_addresses - # in the request_id - peer_zmq = get_peer_zmq_from_request_id(request_id, is_producer=write_mode) - remote_host, remote_handshake_port, remote_notify_port = ( - parse_moriio_zmq_address(peer_zmq) - ) - + # in the request_id. Some cleanup-path requests (e.g. those that hit the + # external prefix cache and take the "aborted before scheduling" branch + # in `request_finished`) surface a short internal request_id without + # the proxy's `___prefill_addr_...` prefix; `get_peer_zmq_from_request_id` + # raises ValueError on those. Fall back to `kv_transfer_params.remote_hosts` + # (proxy already forwards it for multi-node TP) and MoRIIO's default + # well-known ports, which match the kv_connector_extra_config the runner + # scripts pass on every container. Only the request_id parse path is + # affected; once we have a valid host/port triple we proceed as before. + try: + peer_zmq = get_peer_zmq_from_request_id(request_id, is_producer=write_mode) + remote_host, remote_handshake_port, remote_notify_port = ( + parse_moriio_zmq_address(peer_zmq) + ) + except ValueError: + # Normalize remote_hosts: callers may pass a list (per-rank host + # vector from the proxy) or a single host string from older + # proxy versions. A bare string would silently slice into "1." + # for "172.30.0.1" if we just did remote_hosts[0]. + remote_hosts_raw = kv_transfer_params.get("remote_hosts") or [] + if isinstance(remote_hosts_raw, str): + remote_hosts = [remote_hosts_raw] if remote_hosts_raw else [] + else: + remote_hosts = list(remote_hosts_raw) + if not remote_hosts: + raise ValueError( + f"MoRIIO add_new_req: could not resolve peer host/ports " + f"for {request_id!r}; neither request_id parse nor " + f"kv_transfer_params.remote_hosts provided them" + ) from None + remote_host = remote_hosts[0] + remote_handshake_port = int(MoRIIOConstants.DEFAULT_HANDSHAKE_PORT) + remote_notify_port = int(MoRIIOConstants.DEFAULT_NOTIFY_PORT) + + # Cleanup-path requests (the same "aborted before scheduling" branch + # that surfaces a short request_id) can also omit remote_block_ids + # and remote_engine_id. Use .get() defaults so we don't crash the + # same EngineCore the request_id fallback above is meant to keep + # alive — an empty block list is the correct no-op for an aborted + # request. _req = ReqMeta( transfer_id=transfer_id, local_block_ids=local_block_ids, - remote_block_ids=kv_transfer_params["remote_block_ids"], - remote_engine_id=kv_transfer_params["remote_engine_id"], + remote_block_ids=kv_transfer_params.get("remote_block_ids", []), + remote_engine_id=kv_transfer_params.get("remote_engine_id", ""), remote_host=remote_host, remote_port=remote_handshake_port, remote_handshake_port=remote_handshake_port,