Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,17 +347,51 @@ def add_new_req(
transfer_id = kv_transfer_params["transfer_id"]

# Parse host/ports from the request_id. The router embeds both zmq_addresses
# in the request_id
peer_zmq = get_peer_zmq_from_request_id(request_id, is_producer=write_mode)
remote_host, remote_handshake_port, remote_notify_port = (
parse_moriio_zmq_address(peer_zmq)
)

# in the request_id. Some cleanup-path requests (e.g. those that hit the
# external prefix cache and take the "aborted before scheduling" branch
# in `request_finished`) surface a short internal request_id without
# the proxy's `___prefill_addr_...` prefix; `get_peer_zmq_from_request_id`
# raises ValueError on those. Fall back to `kv_transfer_params.remote_hosts`
# (proxy already forwards it for multi-node TP) and MoRIIO's default
# well-known ports, which match the kv_connector_extra_config the runner
# scripts pass on every container. Only the request_id parse path is
# affected; once we have a valid host/port triple we proceed as before.
try:
peer_zmq = get_peer_zmq_from_request_id(request_id, is_producer=write_mode)
remote_host, remote_handshake_port, remote_notify_port = (
parse_moriio_zmq_address(peer_zmq)
)
except ValueError:
# Normalize remote_hosts: callers may pass a list (per-rank host
# vector from the proxy) or a single host string from older
# proxy versions. A bare string would silently slice into "1."
# for "172.30.0.1" if we just did remote_hosts[0].
remote_hosts_raw = kv_transfer_params.get("remote_hosts") or []
if isinstance(remote_hosts_raw, str):
remote_hosts = [remote_hosts_raw] if remote_hosts_raw else []
else:
remote_hosts = list(remote_hosts_raw)
if not remote_hosts:
raise ValueError(
f"MoRIIO add_new_req: could not resolve peer host/ports "
f"for {request_id!r}; neither request_id parse nor "
f"kv_transfer_params.remote_hosts provided them"
) from None
remote_host = remote_hosts[0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If kv_transfer_params.get("remote_hosts") returns a string instead of a list (which can happen depending on the upstream proxy's serialization or configuration), remote_hosts[0] will only extract the first character of the IP address string. This will lead to a connection failure that is difficult to diagnose. It is safer to handle both types explicitly.

Suggested change
remote_host = remote_hosts[0]
remote_host = remote_hosts[0] if isinstance(remote_hosts, list) else remote_hosts

remote_handshake_port = int(MoRIIOConstants.DEFAULT_HANDSHAKE_PORT)
remote_notify_port = int(MoRIIOConstants.DEFAULT_NOTIFY_PORT)

# Cleanup-path requests (the same "aborted before scheduling" branch
# that surfaces a short request_id) can also omit remote_block_ids
# and remote_engine_id. Use .get() defaults so we don't crash the
# same EngineCore the request_id fallback above is meant to keep
# alive — an empty block list is the correct no-op for an aborted
# request.
_req = ReqMeta(
transfer_id=transfer_id,
Comment on lines 390 to 391
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

For requests that are aborted before scheduling (handled in request_finished lines 567-576), kv_transfer_params may not contain remote_block_ids or remote_engine_id. Accessing them with [] in the subsequent lines (381-382) will cause a KeyError, leading to the same EngineCore crash this PR aims to fix. Consider using .get() with appropriate defaults (e.g., [] for block IDs and "" for engine ID) to ensure robustness in the cleanup path.

local_block_ids=local_block_ids,
remote_block_ids=kv_transfer_params["remote_block_ids"],
remote_engine_id=kv_transfer_params["remote_engine_id"],
remote_block_ids=kv_transfer_params.get("remote_block_ids", []),
remote_engine_id=kv_transfer_params.get("remote_engine_id", ""),
remote_host=remote_host,
remote_port=remote_handshake_port,
remote_handshake_port=remote_handshake_port,
Expand Down