add moriio transfer engine#1742
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 30142a6a0a
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| [[src_offset]], | ||
| [remote_mem], | ||
| [[pull.dst_offset]], | ||
| [[pull.length]], |
There was a problem hiding this comment.
Use source buffer size for RDMA write length
The sender already reads src_size from its own buffer table, but the transfer length is taken from pull.length (receiver-provided metadata). If that metadata is stale or malformed, the sender can truncate a valid payload (when pull.length < src_size) or attempt to read beyond the stored buffer (when pull.length > src_size) while still treating the operation as a normal transfer path. The write length should be validated against src_size (or derived solely from src_size) before calling batch_write.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ed7bde1f46
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if not bound: | ||
| try: | ||
| sock.bind(f"tcp://{self.host}:*") | ||
| bound = True | ||
| except zmq.ZMQError as exc: |
There was a problem hiding this comment.
Fail fast instead of rebinding sender ZMQ to random port
When bind on self.zmq_port fails with EADDRINUSE, this code silently rebinds to an OS-assigned port, but the metadata-less receive path still resolves the sender using configured sender_host/sender_zmq_port (see get() -> _query_metadata_from_sender(), and call sites like kv_transfer_manager.receive_kv_cache_for_request and chunk_transfer_adapter._poll_single_request that call connector.get(..., metadata=None)). In that context, receivers keep querying the old configured port and time out indefinitely even though the sender is running, so this should fail fast or explicitly propagate the new port to receivers.
Useful? React with 👍 / 👎.
| stale = [k for k, v in self._local_buffers.items() if now - v[5] > _BUFFER_TTL_SECONDS] | ||
| for k in stale: |
There was a problem hiding this comment.
Exclude in-flight buffers from TTL reclamation
TTL cleanup reclaims entries purely by age, but _handle_pull_request() only reads _local_buffers under lock and then performs batch_write(...); st.Wait() without marking that entry in-flight; if a receiver pulls a buffer near/after the 300s TTL, the listener thread can reclaim and release that allocator region concurrently while the RDMA write is still pending. This creates a race that can corrupt transfers or cause intermittent failures under delayed pulls, so stale-buffer GC needs an in-flight guard (or timestamp refresh) before transfer starts.
Useful? React with 👍 / 👎.
|
thanks @inkcherry for the PR. |
|
@inkcherry is MoRII beneficial for intranode communication? will you enable it for intranode communication? Because vLLM-Omni also has many use cases on single node deployment. |
Hi, @tjtanaa, I've added intra-node benchmark results. Mori delivers correct outputs with competitive performance. |
|
Hi @inkcherry, thanks for the PR. We tried to test with
There's no mention of Would you be able to share the reproduction steps you used to verify this? |
|
hi, @junkang1991 , thanks for the try, during the initialization phase, I can see such a log. The log shows that XGMI is automatically selected as the mori backend. I have added logs wrapping If you still encounter issues, could you try updating Mori to a newer commit? |
…node XGMI Signed-off-by: junkang1991 <junkangchow@gmail.com>
|
@inkcherry @junkang1991 please fix the readthedocs and pre-commit error. |
|
@tjtanaa , thanks for the reminder, fixd, cc @junkang1991 |
|
This refactor #1908 remove multi-node deployment on vllm-omni codebase. Current single node deployment still work. I think we can move forward with the PR at this stage, I've removed the inter-node configuration script for now. Once the vllm-omni codebase supports inter-node deployment, we will add it back after re-testing. |
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
|
|
||
| ## When to Use | ||
|
|
||
| Best for high-performance multi-node data transfer using Mori RDMA transfer engine. |
There was a problem hiding this comment.
can you update the description for now. Stating this reason #1742 (comment) . And mention that intra-node is currently supported.
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Upstream marge conflicts fix
|
@hsliuustc0106 could you please take a look? many thanks! |
| @@ -0,0 +1,138 @@ | |||
| # Qwen2.5-Omni single-node config using Mori XGMI transfer (TP2). | |||
There was a problem hiding this comment.
After #2383, we don't introduce the new stage configs. Please follow the new way.
|
@inkcherry Could you submit a topic about this PR in the sync meeting? If you're at UTC+8 Timezone, you could submit it here: tinyurl.com/vllm-omni-meeting. |
Thanks @gcanlin for the invitation, happy to join! Let me first address the pending review comments. Once the PR is ready, I'll submit the topic via tinyurl.com/vllm-omni-meeting and join an upcoming sync — hopefully next week or soon after. Appreciate it! |
After vllm-project#2383, new per-model stage configs are not introduced. Replace vllm_omni/model_executor/stage_configs/qwen2_5_omni_mori_intranode.yaml with vllm_omni/deploy/qwen2_5_omni_mori_intranode.yaml using the new top-level connectors/stages schema, and update the design doc to use --deploy-config. Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Align MoriTransferEngineConnector with SharedMemoryConnector and every other OmniConnector implementation by reading ``stage_id`` from the config dict in __init__. The value is injected by arg_utils.py into the connector config on the chunk_transfer_adapter path and is later read back by OmniChunkTransferAdapter (process_pending_chunks / _send_single_request / _poll_single_request) to decide whether the connector sits at the sender or receiver end of a stage edge. Without this attribute Mori cannot be used as a chunk-mode connector at all: the first decode step crashes in chunk_transfer_adapter.py with ``AttributeError: 'MoriTransferEngineConnector' object has no attribute 'stage_id'`` the moment the scheduler inspects self.connector.stage_id. Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn> Made-with: Cursor
… yaml After junkang1991's entrypoints -> engine hook (e65275a) was reverted earlier in this branch (9cc7672), the only live transfer path for MoriTransferEngineConnector is the async_chunk=true chunk_transfer_ adapter route. qwen2_5_omni_mori_intranode.yaml has async_chunk=false and Qwen2.5-Omni's pipeline does not yet define thinker2talker_async_chunk / talker2code2wav_async_chunk input processors, so its Mori connector entries are silently ignored by the engine: verified by loading the yaml via load_omni_transfer_config and observing that get_stage_connector_spec returns {} for every stage when async_chunk=false (stage_init_utils.py:407 gate), which means no MoriTransferEngineConnector is ever instantiated and stage-to-stage transfer falls back to the default in-memory set_engine_outputs / process_engine_inputs path. Shipping a deploy yaml that claims Mori/XGMI but in practice uses none of it is user-confusing, so drop it rather than leave it as a dead pointer. Qwen3-Omni-MoE (qwen3_omni_moe_mori_intranode.yaml, added in c749f4d) is the supported Mori deploy target in this PR and exercises the full chunk_transfer_adapter + Mori path end-to-end. Also updates the Mori connector design doc to point at the Qwen3-Omni- MoE yaml and documents the Qwen2.5-Omni follow-up prerequisite (writing the async_chunk input processors) inline. Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn> Made-with: Cursor
A middle stage of a chunk_transfer_adapter pipeline (e.g. the talker
stage of Qwen3-Omni-MoE) is simultaneously the receiver of its
incoming edge and the sender of its outgoing edge. The adapter keeps
its historical "one connector per stage" model (same shape that
SharedMemoryConnector has always used -- single instance, put()+get()
from the same object), so middle-stage support for role-bound RDMA
connectors has to come from the connector itself: a single instance
capable of put and get simultaneously.
This commit adds a first-class ``role="dual"`` to both
MoriTransferEngineConnector and MooncakeTransferEngineConnector. A
dual connector:
* binds the ZMQ listener (as a sender would) so a downstream receiver
can pull data from it via the existing MoriPullRequest /
Mooncake ``put(...)``/``trans_done`` handshake;
* simultaneously populates ``sender_host`` / ``sender_zmq_port`` so
that its own ``get()`` can query an upstream sender, exactly as a
receiver-only instance would.
Both fields already existed -- dual-role simply flips ``can_put=True``
while keeping the receiver-side handshake path live. ``self.role`` is
recorded so boot-time log lines can distinguish sender / receiver /
dual ("... DUAL ready (ZMQ listening on ...; upstream sender at ...)").
The contract with the framework is unchanged: connectors still only
read their own role from ``config["role"]`` and never reverse-infer
it from side-channels like ``stage_id``. The framework decides
whether a stage should instantiate sender / receiver / dual based on
topology (see companion commit updating
``get_connectors_config_for_stage``).
Existing sender-only / receiver-only behaviour is bit-for-bit
unchanged; the only accepted-role set widens from {sender, receiver}
to {sender, receiver, dual}.
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Made-with: Cursor
…ection
Extends ``get_connectors_config_for_stage`` in
``distributed/omni_connectors/utils/initialization.py`` to:
1. Infer per-stage role from pipeline topology:
* stage with only outgoing edges -> ``role="sender"``
* stage with only incoming edges -> ``role="receiver"``
* stage with both incoming and outgoing edges -> ``role="dual"``
(the new role added to Mori / Mooncake in the companion commit)
Dual stages emit matching ``from_stage_*`` and ``to_stage_*``
entries that share the *same* composite extra (identical role,
identical ``zmq_port`` / ``sender_host`` / ``sender_zmq_port``),
so engine-side flattening (``get_stage_connector_spec`` in
``engine/stage_init_utils.py`` returning the first spec it sees --
untouched) always recovers a self-consistent per-stage config.
That lets ``OmniChunkTransferAdapter`` keep its historical
one-connector-per-stage model (same shape SharedMemoryConnector
always used) while still serving middle-stage put+get through a
single role-bound RDMA connector.
2. Inject per-edge ZMQ endpoints for role-bound connectors listed in
the new ``_ROLE_BOUND_ZMQ_CONNECTORS`` frozenset
(``MoriTransferEngineConnector`` and
``MooncakeTransferEngineConnector``):
* ``zmq_port = base + own_stage_id`` (sender / dual): offsetting
the listener by stage id prevents three co-located stage
listeners on a single intranode node from all binding the
same base port.
* ``sender_zmq_port = base + upstream_stage_id`` (receiver / dual):
points at the upstream sender's listener.
* ``sender_host``: framework-detected local IP (via new
``_detect_local_ip``, mirroring the connector-side
``_get_local_ip``) when yaml uses ``host: "auto"``; the explicit
yaml ``host`` value otherwise.
Non-role-bound connectors (SharedMemory, Yuanrong, ...) are not
touched by the endpoint injection. Explicit yaml ``sender_host``
/ ``sender_zmq_port`` always win, so cross-node deployments that
know their peer endpoint are unaffected.
3. Orchestrator-level ``create_connectors_from_config`` is untouched:
it has its own Mooncake-specific port adjustment for KV transfer
+ PD disaggregation, and its callers
(``build_stage_connectors`` etc.) only filter ``from_stage_*``
keys, so orchestrator instantiation behaviour is bit-for-bit
unchanged.
Design rationale
An earlier iteration tried to own the endpoint derivation inside
``MoriTransferEngineConnector.__init__`` using a
``stage_id >= 0`` side-channel, and later tried to fix the middle-
stage put+get problem by splitting each stage into two connector
instances (``sender_connector`` + ``receiver_connector``) with a
corresponding ``{name, extra, input, output}`` schema that had to
travel through ``engine/stage_init_utils.get_stage_connector_spec``
and ``engine/arg_utils.OmniEngineArgs.create_model_config``. Both
approaches imposed ripple-effect changes outside the
``distributed/omni_connectors`` tree -- either the connector had to
reverse-infer its deployment mode, or public engine files had to
learn a new connector-config schema. Moving role inference and
endpoint computation to the framework's topology layer avoids both:
connectors stay mechanical and read their role verbatim from
config, and engine/ needs zero changes -- the schema
``get_stage_connector_spec`` returns is exactly the same ``{name,
extra}`` it returned before this PR, with the ``extra`` content
pre-populated.
Tests (``test_omni_connector_configs.py``)
Twelve new parametrized-over-{Mori,Mooncake} cases:
* stage-0 sender-only: only ``to_stage_*``, role=sender,
zmq_port = base + 0;
* final-stage receiver-only: only ``from_stage_*``, role=receiver,
sender endpoints resolved;
* middle-stage dual: both directions present, both entries share
identical composite extra with role=dual, both own-stage
listener port and upstream sender endpoint populated;
* SharedMemoryConnector is not touched by endpoint injection;
* explicit yaml ``sender_host`` / ``sender_zmq_port`` override the
derivation;
* non-auto ``host`` cascades into ``sender_host``;
* ``_inject_chunk_path_endpoints`` is a no-op for unknown
connector types and non-integer stage ids.
Full 301-test suite (``tests/engine/``,
``tests/distributed/omni_connectors/``, ``tests/core/sched/``,
``tests/config/``) passes, including all four Mooncake orchestrator
path tests that exercise ``create_connectors_from_config``.
Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
Made-with: Cursor
Parallel to 7d7aeb0 for MoriTransferEngineConnector. OmniChunkTransferAdapter.process_pending_chunks and the AR / generation schedulers branch on ``self.connector.stage_id == 0`` to gate "is this stage 0 of the pipeline". SharedMemoryConnector and MoriTransferEngineConnector both already expose ``self.stage_id``; MooncakeTransferEngineConnector did not, so deploying Mooncake on the chunk_transfer_adapter path (e.g. ``qwen3_omni_moe_mooncake_intranode.yaml``) aborted the scheduler with an AttributeError on the first schedule() tick. The ``stage_id`` key is already injected into connector extras by ``OmniEngineArgs.create_model_config`` (via ``get_connectors_config_for_stage``); Mooncake now reads it like its siblings. Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn> Made-with: Cursor
Sibling of qwen3_omni_moe_mori_intranode.yaml for Mori-vs-Mooncake benchmark comparison on a single AMD Instinct MI300X node. Pipeline topology, sampling params, and chunk-transfer knobs are kept identical to the Mori yaml; only the connector definition changes. Verified end-to-end on the chunk_transfer_adapter path with the dual-role Mooncake connector from commit 6f3ecf4: stage boot log shows Stage 0 SENDER / Stage 1 DUAL / Stage 2 RECEIVER identically to the Mori topology, and three consecutive text-mode chat completions through the full thinker -> talker -> code2wav pipeline produce valid audio in 18.7 / 16.8 / 16.4 s warm-state with 93 RDMA GETs across three runs (per-chunk p50 = 0.7 ms on edge 0 -> 1, 0.9 ms on edge 1 -> 2; peak 398 MB/s on the thinker hidden-state edge). Transport note: MooncakeTransferEngineConnector currently forwards ``protocol`` to mooncake.engine.TransferEngine.initialize, and the mooncake build shipped in the current container image still installs ``type=rdma`` transport regardless of the string passed. Intranode MI300X transfers therefore flow over Mellanox RoCE/IB with memory_pool_device: cuda + GPUDirect RDMA rather than true XGMI Infinity Fabric -- whenever Mooncake lands a first-class XGMI transport this yaml just has to flip ``protocol`` to ``"xgmi"``. Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn> Made-with: Cursor
Clarifies how to opt this deploy yaml into true AMD Infinity Fabric XGMI via the HIP transport landed in Mooncake PRs vllm-project#1742 and vllm-project#1550, and pins the default value to ``protocol: "rdma"`` — the path that is actually end-to-end validated on this branch (three consecutive text completions at 18.7 / 16.8 / 16.4 s with 93 per-chunk RDMA GETs). XGMI opt-in requires three things in lock step and is therefore not the default: 1. A mooncake wheel rebuilt with ``-DUSE_HIP=ON`` and reinstalled into the container's Python env (stock wheels don't ship HIP transport). 2. ``memory_pool_device: "cuda"`` so Mooncake picks the HIP allocator. 3. ``MC_FORCE_MNNVL=1`` in the launch environment so ``TransferEngineImpl::init()`` installs the HIP transport instead of defaulting back to RDMA whenever the node has HCAs (the current auto-topology path treats RDMA as the preferred fabric, even when HIP is compiled). Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn> Made-with: Cursor
remove changes on public files
Pure cosmetic reformatting requested by ruff-format: - mooncake_transfer_engine_connector.py: collapse a 2-line f-string into one line. - mori_transfer_engine_connector.py: same. - utils/initialization.py: drop redundant indentation in the `_ROLE_BOUND_ZMQ_CONNECTORS` frozenset literal and collapse two list comprehensions in `get_connectors_config_for_stage` to a single line. Verified locally with `pre-commit run --all-files` (all hooks pass). Signed-off-by: Zejian Wang <zejianwang@sjtu.edu.cn>
- P1 length: validate pull.length against src_size before batch_write (prevents silent truncation / cross-allocation OOB read). - P2 port: drop EADDRINUSE silent fallback; match Mooncake's fail-fast. - P2 TTL: TODO mirroring wzliu's note; same race deferred to follow-up. Co-authored-by: Cursor <cursoragent@cursor.com>
Hi @gcanlin, we plan to share this PR this Wednesday. Does this meeting require any registration before the sharing session? |

Purpose
Add
MoriTransferEngineConnector— a new OmniConnector backend using Mori RDMA transfer engine for zero-copy data transfers between disaggregated pipeline stages. The implementation follows a similar architecture toMooncakeTransferEngineConnector, adapted to Mori'sIOEngine/MemoryDesc/EngineDescAPI.IOEngine.batch_write()with asyncTransferStatustrackingtorch.Tensor/bytesfast path bypassing serializationmoriis not installedChanges include the connector implementation, factory registration, module exports, design doc, and example stage config YAML.
Test Plan
Hardware: 3x AMD Instinct MI300X nodes (8 GPUs each), Mellanox ConnectX-7 400Gbps RoCE NIC (mlx5_0).
3-node disaggregated serving with Qwen2.5-Omni-7B, each stage on a separate node with TP=2:
Benchmark client:
Test Result
0→1 (Thinker → Talker)
1→2 (Talker → Code2Wav)
0→1 (Thinker → Talker)(intranode communication)
1→2 (Talker → Code2Wav)(intranode communication)
Mori shows lower in-flight latency compared to Mooncake, with comparable tx/rx performance. The correctness of the generated audio output was also manually verified by listening to the results. It provides a viable RDMA backend option for AMD devices.