From 7d9c9695377af1151bc2beda971cd2dbf3207d9b Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 28 Nov 2025 04:50:57 -0500 Subject: [PATCH 1/2] [NIXL] Add remote_request_id to kv_transfer_params Include the internal request ID that the prefill instance is expecting the decode instance to send it in the NIXL notification. Right now, we rely on the proxy supplying the ID via X-Request-ID and that prefill and decode will mangle this ID in identical ways. This is obviously quite brittle, and P should be explicit about what ID it expects from D. Relates to #27987 - adding a random prefix to client-provided request IDs. Signed-off-by: Mark McLoughlin --- tests/v1/kv_connector/unit/test_nixl_connector.py | 6 ++++++ tests/v1/kv_connector/unit/utils.py | 1 + .../kv_transfer/kv_connector/v1/nixl_connector.py | 14 ++++++++++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/tests/v1/kv_connector/unit/test_nixl_connector.py b/tests/v1/kv_connector/unit/test_nixl_connector.py index ae4125d54190..6aed9235f0c1 100644 --- a/tests/v1/kv_connector/unit/test_nixl_connector.py +++ b/tests/v1/kv_connector/unit/test_nixl_connector.py @@ -470,6 +470,7 @@ def test_multi_xfer_one_engine( num_xfers + 6, ], "remote_engine_id": FakeNixlConnectorWorker.REMOTE_ENGINE_ID, + "remote_request_id": f"prefill-{request_id}", "remote_host": "localhost", "remote_port": 1234, "remote_tp_size": 1, @@ -536,6 +537,7 @@ def test_async_load_kv( kv_transfer_params={ "remote_block_ids": [4, 5, 6], "remote_engine_id": FakeNixlConnectorWorker.REMOTE_ENGINE_ID, + "remote_request_id": "prefill-id", "remote_host": "localhost", "remote_port": 1234, "remote_tp_size": prefill_tp_size, @@ -591,6 +593,7 @@ def test_concurrent_load_kv( kv_transfer_params={ "remote_block_ids": [4, 5, 6], "remote_engine_id": FakeNixlConnectorWorker.REMOTE_ENGINE_ID, + "remote_request_id": f"prefill-id-{i}", "remote_host": "localhost", "remote_port": 1234, "remote_tp_size": 1, @@ -754,6 +757,7 @@ def test_kv_connector_stats(dist_init): kv_transfer_params={ "remote_block_ids": [4, 5, 6], "remote_engine_id": FakeNixlConnectorWorker.REMOTE_ENGINE_ID, + "remote_request_id": f"prefill-{request_id}", "remote_host": "localhost", "remote_port": 1234, "remote_tp_size": 1, @@ -1470,6 +1474,7 @@ def test_handshake_failure_returns_finished(dist_init): kv_transfer_params={ "remote_block_ids": [4, 5, 6], "remote_engine_id": FakeNixlConnectorWorker.REMOTE_ENGINE_ID, + "remote_request_id": f"prefill-{request_id}", "remote_host": "localhost", "remote_port": 1234, "remote_tp_size": 1, @@ -1519,6 +1524,7 @@ def test_transfer_setup_failure_returns_finished(dist_init): kv_transfer_params={ "remote_block_ids": [10, 11, 12], "remote_engine_id": FakeNixlConnectorWorker.REMOTE_ENGINE_ID, + "remote_request_id": f"prefill-{request_id}", "remote_host": "localhost", "remote_port": 1234, "remote_tp_size": 1, diff --git a/tests/v1/kv_connector/unit/utils.py b/tests/v1/kv_connector/unit/utils.py index cea41c3ab18a..58f1a7282352 100644 --- a/tests/v1/kv_connector/unit/utils.py +++ b/tests/v1/kv_connector/unit/utils.py @@ -194,6 +194,7 @@ def create_request( do_remote_prefill=True, do_remote_decode=False, remote_engine_id="my-engine-id", + remote_request_id=f"prefill-{request_id}", remote_block_ids=list(range(num_remote_blocks)), remote_host="my-host", remote_port=1234, diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 49330abcec32..fd162593f2d5 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -210,6 +210,7 @@ class ReqMeta: remote_host: str remote_port: int remote_engine_id: str + remote_request_id: str tp_size: int @@ -236,6 +237,7 @@ def add_new_req( local_physical_block_ids=local_block_ids, remote_block_ids=kv_transfer_params["remote_block_ids"], remote_engine_id=kv_transfer_params["remote_engine_id"], + remote_request_id=kv_transfer_params["remote_request_id"], remote_host=kv_transfer_params["remote_host"], remote_port=kv_transfer_params["remote_port"], # P workers don't need to receive tp_size from proxy here. @@ -622,7 +624,12 @@ def update_state_after_alloc( if params.get("remote_block_ids"): if all( p in params - for p in ("remote_engine_id", "remote_host", "remote_port") + for p in ( + "remote_engine_id", + "remote_request_id", + "remote_host", + "remote_port", + ) ): # If remote_blocks and num_external_tokens = 0, we have # a full prefix cache hit on the D worker. We need to call @@ -751,6 +758,7 @@ def request_finished( do_remote_decode=False, remote_block_ids=block_ids, remote_engine_id=self.engine_id, + remote_request_id=request.request_id, remote_host=self.side_channel_host, remote_port=self.side_channel_port, tp_size=self.vllm_config.parallel_config.tensor_parallel_size, @@ -1970,6 +1978,7 @@ def _read_blocks_for_req(self, req_id: str, meta: ReqMeta): self._read_blocks( request_id=req_id, dst_engine_id=meta.remote_engine_id, + remote_request_id=meta.remote_request_id, local_block_ids=meta.local_physical_block_ids, remote_block_ids=meta.remote_block_ids, ) @@ -1980,6 +1989,7 @@ def _read_blocks( remote_block_ids: list[int], dst_engine_id: str, request_id: str, + remote_request_id: str, ): block_size_ratio = self.kv_topo.block_size_ratio_from_engine_id(dst_engine_id) if block_size_ratio > 1: @@ -2012,7 +2022,7 @@ def _read_blocks( # Number of D TP workers that will read from dst P. Propagate tp_ratio # on notification so that dst worker can wait before freeing blocks. tp_ratio = self.kv_topo.tp_ratio_from_engine_id(dst_engine_id) - notif_id = f"{request_id}:{tp_ratio}".encode() + notif_id = f"{remote_request_id}:{tp_ratio}".encode() # Full prefix cache hit: do not need to read remote blocks, # just notify P worker that we have the blocks we need. From 442a5077571be69d5c79487c1995523a61033e0a Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 5 Dec 2025 10:20:43 -0500 Subject: [PATCH 2/2] [NIXL] Bump NIXL_CONNECTOR_VERSION Signed-off-by: Mark McLoughlin --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index fd162593f2d5..5bcdfd55e3d2 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -71,8 +71,9 @@ # # Version History: # 1: Initial version with compatibility checking +# 2: Add remote_request_id to kv_transfer_params # -NIXL_CONNECTOR_VERSION: int = 1 +NIXL_CONNECTOR_VERSION: int = 2 GET_META_MSG = b"get_meta_msg"