Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8655ca0
add mori transfer engine
inkcherry Mar 17, 2026
8171308
format
Mar 17, 2026
ed7bde1
update
inkcherry Mar 17, 2026
075ee48
Merge branch 'main' into moriio
inkcherry Mar 24, 2026
e65275a
fix(mori): fix MoriTransferEngineConnector for vllm-omni 0.18.0 intra…
junkang1991 Mar 31, 2026
02d9b69
Merge remote-tracking branch 'upstream/main' into mori_0408
inkcherry Apr 8, 2026
3515f2d
format
inkcherry Apr 8, 2026
375a1e4
remove unused config
inkcherry Apr 10, 2026
7d3d4bc
update doc
inkcherry Apr 10, 2026
58a74bc
update doc
inkcherry Apr 10, 2026
221c187
Merge branch 'main' of https://github.com/vllm-project/vllm-omni into…
inkcherry Apr 10, 2026
b538a3a
update
inkcherry Apr 10, 2026
90dded5
Merge branch 'main' into moriio
inkcherry Apr 10, 2026
b2331a1
Merge upstream/main into moriio-fix
knitcapcat Apr 20, 2026
87f0518
Merge pull request #2 from knitcapcat-amd/moriio-fix
inkcherry Apr 20, 2026
9ad895b
Merge branch 'main' into moriio
inkcherry Apr 20, 2026
a4c2083
migrate mori intranode config to deploy schema
knitcapcat Apr 28, 2026
dab541a
[bugfix] fix the hard-coded backend type
knitcapcat Apr 28, 2026
9cc7672
revert changes on public files- like 'engine'
knitcapcat Apr 28, 2026
c749f4d
add qwen3_omni_moe mori schema
knitcapcat Apr 29, 2026
7d7aeb0
fix(mori): read stage_id from config like other connectors
knitcapcat Apr 29, 2026
e7e5e76
chore(mori): remove non-functional qwen2_5_omni_mori_intranode deploy…
knitcapcat Apr 29, 2026
6f3ecf4
feat(connectors): add role="dual" to Mori and Mooncake connectors
knitcapcat Apr 29, 2026
dcf836c
feat(framework): per-stage role inference and chunk-path endpoint inj…
knitcapcat Apr 29, 2026
b40f323
fix(mooncake): read stage_id from config on chunk path
knitcapcat Apr 29, 2026
3d6e668
add qwen3_omni_moe mooncake intranode deploy yaml
knitcapcat Apr 29, 2026
aa1a563
docs(mooncake): document xgmi vs rdma protocol choice in deploy yaml
knitcapcat Apr 29, 2026
750d137
Merge pull request #4 from knitcapcat-amd/moriio-stage-cfg-fix
knitcapcat-amd Apr 29, 2026
c7c7c1f
style: apply ruff format to fix pre-commit CI
knitcapcat May 5, 2026
92a5ff1
fix(mori): address Codex review (length, ZMQ fail-fast, TTL TODO)
knitcapcat May 5, 2026
2fa4220
Merge branch 'main' into moriio
knitcapcat-amd May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/design/feature/disaggregated_inference.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Backend-specific setup lives in separate docs:
- [SharedMemoryConnector](omni_connectors/shared_memory_connector.md)
- [MooncakeStoreConnector](omni_connectors/mooncake_store_connector.md)
- [MooncakeTransferEngineConnector](omni_connectors/mooncake_transfer_engine_connector.md)
- [MoriTransferEngineConnector](omni_connectors/mori_transfer_engine_connector.md)
- [YuanrongConnector](omni_connectors/yuanrong_connector.md)

## Overview
Expand All @@ -22,6 +23,7 @@ Current connectors operate in D2H2D (device to host to device) mode.
| Single node | SharedMemoryConnector | Auto-configured if no connector is specified. |
| Multi node (Mooncake Store) | MooncakeStoreConnector | TCP-based, requires Mooncake Master + metadata server. |
| Multi node (Mooncake RDMA) | MooncakeTransferEngineConnector | RDMA/TCP direct transfer with managed memory pool. Fastest. |
| Multi node (Mori RDMA) | MoriTransferEngineConnector | RDMA direct transfer via Mori IOEngine. |
| Multi node (Yuanrong) | YuanrongConnector | Requires Yuanrong Datasystem + etcd. |

## Core API
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# MoriTransferEngineConnector

## When to Use

Currently supports intra-node deployment with Mori.

As noted in #1742, inter-node support will be added back in a future
refactor.

## Mechanism

Uses Mori's `IOEngine` / `MemoryDesc` API for zero-copy RDMA transfers.

- Data Plane: RDMA (InfiniBand/RoCE) with managed memory pool.
- Control Plane: ZMQ for pull-request handshake and async completion.

## Installation

See the [Mori repository](https://github.com/ROCm/mori) for installation instructions.

## Configuration

Mori is configured through the new deploy-config schema (see
[`docs/configuration/stage_configs.md`](../../../configuration/stage_configs.md)).
Define the connector at the top level of the deploy YAML and reference it
by name from each stage's `input_connectors` / `output_connectors`:

```yaml
connectors:
mori_connector:
name: MoriTransferEngineConnector
extra:
host: "auto"
zmq_port: 50051
device_name: ""
memory_pool_size: 536870912
memory_pool_device: "cuda"

stages:
- stage_id: 0
output_connectors:
to_stage_1: mori_connector

- stage_id: 1
input_connectors:
from_stage_0: mori_connector
```

A ready-to-run intra-node example for Qwen3-Omni-MoE on AMD MI300X lives
at
[`vllm_omni/deploy/qwen3_omni_moe_mori_intranode.yaml`](../../../../vllm_omni/deploy/qwen3_omni_moe_mori_intranode.yaml)
and can be loaded with:

```bash
vllm-omni serve Qwen/Qwen3-Omni-30B-A3B-Instruct --omni --log-stats \
--deploy-config vllm_omni/deploy/qwen3_omni_moe_mori_intranode.yaml
```

The yaml wires `MoriTransferEngineConnector` (with `backend_type: xgmi`) to
the chunk_transfer_adapter path (`async_chunk: true`) so stage-to-stage
hidden-state and codec-frame streams ship GPU-to-GPU over AMD Infinity
Fabric instead of SHM. Qwen2.5-Omni + Mori is not yet functional on
the chunk path: it needs `thinker2talker_async_chunk` /
`talker2code2wav_async_chunk` input processors that do not exist yet
(the orchestrator-level path the upstream PR originally targeted was
lost during the entrypoints → engine refactor and removed in #1742).

Parameters:

- host: local RDMA IP (`"auto"` for auto-detect).
- zmq_port: ZMQ base port for control-plane communication.
- device_name: RDMA device (e.g., `"mlx5_0"`), empty for auto-detect.
- memory_pool_size: RDMA memory pool size in bytes.
- memory_pool_device: `"cpu"` (pinned) or `"cuda"` (GPUDirect / XGMI RDMA).

For more details, refer to the
[Mori repository](https://github.com/ROCm/mori).
201 changes: 200 additions & 1 deletion tests/distributed/omni_connectors/test_omni_connector_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import pytest

# Use the new import path for initialization utilities
from vllm_omni.distributed.omni_connectors.utils.initialization import load_omni_transfer_config
from vllm_omni.distributed.omni_connectors.utils.config import ConnectorSpec, OmniTransferConfig
from vllm_omni.distributed.omni_connectors.utils.initialization import (
_inject_chunk_path_endpoints,
get_connectors_config_for_stage,
load_omni_transfer_config,
)

pytestmark = [pytest.mark.core_model, pytest.mark.cpu]

Expand Down Expand Up @@ -64,3 +69,197 @@ def test_load_qwen_yaml_configs(yaml_file):

except Exception as e:
pytest.fail(f"Failed to load config {yaml_file.name}: {e}")


# ---------------------------------------------------------------------------
# Framework-level per-stage role + endpoint derivation for the chunk
# transfer adapter path.
#
# ``get_connectors_config_for_stage`` is responsible for turning a
# role-neutral edge-level ConnectorSpec into a per-stage view where each
# stage carries:
#
# * ``role=sender`` if the stage only has outgoing edges;
# * ``role=receiver`` if the stage only has incoming edges;
# * ``role=dual`` if the stage has both (middle stage in a 3+ stage
# pipeline; e.g. the Qwen3-Omni-MoE talker stage). Dual stages
# emit ``from_stage_*`` and ``to_stage_*`` entries that share the
# same composite extra so downstream flattening (engine-side
# ``get_stage_connector_spec`` returning the first spec) always
# recovers a self-consistent config.
#
# For role-bound ZMQ connectors (Mori / Mooncake) the function also
# pre-computes ``zmq_port`` / ``sender_host`` / ``sender_zmq_port`` so
# an intranode pipeline can come up without an external handshake.
#
# The orchestrator-level path (``create_connectors_from_config``) has
# its own Mooncake-specific port adjustment and is intentionally NOT
# exercised here.
# ---------------------------------------------------------------------------


def _linear_pipeline_config(
connector_name: str,
extra: dict | None = None,
edges: tuple[tuple[str, str], ...] = (("0", "1"), ("1", "2")),
) -> OmniTransferConfig:
shared_extra = dict(extra or {})
specs = {edge: ConnectorSpec(name=connector_name, extra=dict(shared_extra)) for edge in edges}
return OmniTransferConfig(connectors=specs)


@pytest.fixture
def _stable_local_ip(monkeypatch: pytest.MonkeyPatch) -> str:
"""Pin framework-level local-IP detection so endpoint assertions are deterministic."""
import vllm_omni.distributed.omni_connectors.utils.initialization as init_mod

monkeypatch.setattr(init_mod, "_detect_local_ip", lambda: "10.20.30.40")
return "10.20.30.40"


@pytest.mark.parametrize(
"connector_name",
["MoriTransferEngineConnector", "MooncakeTransferEngineConnector"],
)
def test_stage_0_is_sender_only(connector_name, _stable_local_ip):
"""Stage 0 has only outgoing edges → role=sender, listener port = base + 0."""
cfg = _linear_pipeline_config(connector_name, extra={"zmq_port": 50051, "host": "auto"})

stage0 = get_connectors_config_for_stage(cfg, 0)

assert list(stage0.keys()) == ["to_stage_1"], "Sender-only stage should emit only to_stage_*"
extra = stage0["to_stage_1"]["spec"]["extra"]
assert extra["role"] == "sender"
assert extra["zmq_port"] == 50051
assert "sender_host" not in extra
assert "sender_zmq_port" not in extra


@pytest.mark.parametrize(
"connector_name",
["MoriTransferEngineConnector", "MooncakeTransferEngineConnector"],
)
def test_final_stage_is_receiver_only(connector_name, _stable_local_ip):
"""Stage 2 has only incoming edges → role=receiver, points at upstream sender."""
cfg = _linear_pipeline_config(connector_name, extra={"zmq_port": 50051, "host": "auto"})

stage2 = get_connectors_config_for_stage(cfg, 2)

assert list(stage2.keys()) == ["from_stage_1"], "Receiver-only stage should emit only from_stage_*"
extra = stage2["from_stage_1"]["spec"]["extra"]
assert extra["role"] == "receiver"
assert extra["sender_zmq_port"] == 50052 # base + upstream stage id (1)
assert extra["sender_host"] == "10.20.30.40"


@pytest.mark.parametrize(
"connector_name",
["MoriTransferEngineConnector", "MooncakeTransferEngineConnector"],
)
def test_middle_stage_is_dual(connector_name, _stable_local_ip):
"""Middle stage has both → role=dual, both entries share composite spec."""
cfg = _linear_pipeline_config(connector_name, extra={"zmq_port": 50051, "host": "auto"})

stage1 = get_connectors_config_for_stage(cfg, 1)

# Both directions exposed, so whichever one get_stage_connector_spec
# (which does "return first") picks, it recovers the same dual spec.
assert set(stage1.keys()) == {"from_stage_0", "to_stage_2"}
incoming = stage1["from_stage_0"]["spec"]["extra"]
outgoing = stage1["to_stage_2"]["spec"]["extra"]

for extra in (incoming, outgoing):
assert extra["role"] == "dual"
assert extra["zmq_port"] == 50052 # this stage's listener, base + own stage id (1)
assert extra["sender_host"] == "10.20.30.40"
assert extra["sender_zmq_port"] == 50051 # upstream sender at base + 0

# Composite must be identical so order of iteration does not matter.
assert incoming == outgoing


def test_shm_connector_is_untouched_by_endpoint_injection(_stable_local_ip):
"""SharedMemoryConnector is not role-bound -> no port/host fields appear."""
cfg = _linear_pipeline_config(
"SharedMemoryConnector",
extra={"shm_threshold_bytes": 65536},
)

for sid in (0, 1, 2):
stage_cfg = get_connectors_config_for_stage(cfg, sid)
for entry in stage_cfg.values():
extra = entry["spec"]["extra"]
assert "zmq_port" not in extra
assert "sender_host" not in extra
assert "sender_zmq_port" not in extra
assert extra["shm_threshold_bytes"] == 65536
# Role is still injected (passive connectors ignore it, which
# is fine -- the string is just metadata to them).
assert extra["role"] in {"sender", "receiver", "dual"}


def test_explicit_sender_host_and_port_override_win(_stable_local_ip):
"""User-provided ``sender_host`` / ``sender_zmq_port`` beat framework derivation."""
cfg = _linear_pipeline_config(
"MoriTransferEngineConnector",
extra={
"zmq_port": 50051,
"host": "auto",
"sender_host": "192.168.1.10",
"sender_zmq_port": 60000,
},
)

stage1 = get_connectors_config_for_stage(cfg, 1)
recv_extra = stage1["from_stage_0"]["spec"]["extra"]
assert recv_extra["sender_host"] == "192.168.1.10"
assert recv_extra["sender_zmq_port"] == 60000

# Sender-side zmq_port still offsets so co-located stage listeners do
# not collide; the override is the upstream peer's address, not this
# stage's own bind port.
assert recv_extra["zmq_port"] == 50052


def test_explicit_non_auto_host_cascades_to_sender_host(_stable_local_ip):
"""Non-auto ``host`` is reused as ``sender_host`` for the receiver side."""
cfg = _linear_pipeline_config(
"MoriTransferEngineConnector",
extra={"zmq_port": 50051, "host": "172.16.0.5"},
)

stage1 = get_connectors_config_for_stage(cfg, 1)
recv_extra = stage1["from_stage_0"]["spec"]["extra"]
assert recv_extra["sender_host"] == "172.16.0.5"
assert recv_extra["sender_zmq_port"] == 50051


def test_inject_helper_is_noop_for_unknown_connector(_stable_local_ip):
"""Non-role-bound connectors are left untouched by the helper."""
extra: dict = {"zmq_port": 50051, "host": "auto"}
_inject_chunk_path_endpoints(
extra,
connector_name="SomeFutureConnector",
role="dual",
own_stage="1",
upstream_stage="0",
)
assert extra == {"zmq_port": 50051, "host": "auto"}


def test_inject_helper_is_noop_for_non_integer_stage(_stable_local_ip):
"""Non-integer pipeline keys (e.g. ``"prefill"``) short-circuit safely."""
extra: dict = {"zmq_port": 50051, "host": "auto"}
_inject_chunk_path_endpoints(
extra,
connector_name="MoriTransferEngineConnector",
role="sender",
own_stage="prefill",
upstream_stage=None,
)
assert extra == {"zmq_port": 50051, "host": "auto"}


def test_get_connectors_config_for_stage_none_transfer_config():
"""Callers pass ``None`` when no yaml was loaded; return an empty dict."""
assert get_connectors_config_for_stage(None, 0) == {}
Loading
Loading