[FEAT] support multi-stage deployment#2396
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 660e3c64d8
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if 0 <= lid < self.num_logical_stages: | ||
| stage_ids.extend(self.logical_stage_to_clients[lid]) | ||
| else: | ||
| stage_ids.append(lid) # keep invalid id for error reporting |
There was a problem hiding this comment.
Reject invalid logical stage IDs before RPC expansion
The API comment here states that stage_ids are logical stage IDs, but the fallback path appends an out-of-range logical ID directly into the client target list. In multi-replica deployments (num_clients > num_logical_stages), an invalid logical ID (for example lid=2 when only logical stages 0-1 exist) can still pass the later stage_id < self.num_clients check and execute the RPC on an unintended client, so control operations like sleep/wake_up/profile may affect the wrong stage instead of returning an invalid-stage error.
Useful? React with 👍 / 👎.
ee7e60f to
bf789f9
Compare
Signed-off-by: ZhengWG <zwg0606@gmail.com> Made-with: Cursor
Signed-off-by: ZhengWG <zwg0606@gmail.com> Made-with: Cursor
Signed-off-by: ZhengWG <zwg0606@gmail.com> Made-with: Cursor
Signed-off-by: ZhengWG <zwg0606@gmail.com>
bf789f9 to
4d9b160
Compare
|
Thanks for the PR! This feature is very critical.
cc list: @fake0fan @chickeyton @wuhang2014 @Gaohan123 @tzhouam |
Thanks for the feedback! The suggestions on DP naming, routing separation, and aligning with vLLM DP design all make sense — we're thinking along similar lines. |
|
Yeah great! We can have a sync for how to collaborate then! |
| self.stage_clients = flat_clients | ||
| self.output_processors = flat_output_processors | ||
| self.stage_vllm_configs = flat_vllm_configs | ||
| self.logical_stage_to_clients = logical_stage_to_clients |
There was a problem hiding this comment.
Here I suggest to add a lightweight StageEnginePool / StagePool class, serve as a list of EngineCoreClient from the same stage. The output processors also could be place in the StagePool.
After init, the AsyncOmniEngine would holds:
self.stage_pool_list = [pool1, pool2] # List[StagePool]
The routing logic could be put inside the stage_pool. For example, we may call functions like self.stage_pool_list[stage_id].select to choose a logical engine core client.
There was a problem hiding this comment.
vllm_configs could also be placed inside the StagePool
| all_output_processors[stage_id] = stage_output_procs | ||
| all_vllm_configs[stage_id] = stage_vllm_cfgs | ||
| # Use first replica for finalize_initialized_stages | ||
| logical_stage_clients_for_finalize[stage_id] = stage_clients_list[0] |
There was a problem hiding this comment.
The init function becomes more complex. We may launch the stages and then register to the StagePool.
| if any(getattr(stage_client, "is_comprehension", False) for stage_client in flat_clients): | ||
| supported_tasks.add("generate") | ||
| if any(metadata.get("final_output_type") == "audio" for metadata in stage_metadata): | ||
| if any(metadata.get("final_output_type") == "audio" for metadata in logical_stage_metadata): |
There was a problem hiding this comment.
Wrap them into the stage pool?
| for logical_id, client_indices in enumerate(self.logical_stage_to_clients): | ||
| for ri, ci in enumerate(client_indices): | ||
| self._client_to_logical[ci] = logical_id | ||
| self._client_to_replica[ci] = ri |
There was a problem hiding this comment.
If we parse List[StagePool] into here, do we still need these data?
| # Multi-replica: maps logical_stage_id -> client_index chosen for this | ||
| # request. Ensures the same request always hits the same replica within | ||
| # a given logical stage (KV / intermediate-state affinity). | ||
| chosen_client_index: dict[int, int] = field(default_factory=dict) |
There was a problem hiding this comment.
We can use the Router here to record the chosen client index, use a map inside the router. This could be consider later.
| raise | ||
| except Exception: | ||
| for stage_id in range(self.num_logical_stages): | ||
| for replica_index in range(len(self.logical_stage_to_clients[stage_id])): |
There was a problem hiding this comment.
I am afraid here would be a potential bottleneck of the orchestrator thread under larger scale. But we can just kept it for now.
|
|
||
| async def _poll_stage_raw(self, stage_id: int) -> EngineCoreOutputs | None: | ||
| """Pull raw EngineCoreOutputs from a stage client without processing. | ||
| async def _poll_stage_raw( |
There was a problem hiding this comment.
I found all these functions have both a stage_id and a replica_index, what about offload these functions to StagePool? If so, the orchestrator would become lighter. For example:
class StagePool:
async def _poll_stage_raw(..., replica_index):
...
# in orchestrator:
pool = self.stage_pool_list[stage_id]
pool.poll_stage_raw(replica_id)
There was a problem hiding this comment.
BTW, rename all replica_index to replica_id, to align with the naming of the parameters.
| return outputs | ||
|
|
||
| async def _process_stage_outputs(self, stage_id: int, raw_outputs: EngineCoreOutputs) -> list[RequestOutput]: | ||
| async def _process_stage_outputs( |
There was a problem hiding this comment.
These functions could also consider move to StagePool? We can further discuss about it.
| companion_state.stage_submit_ts[0] = _time.time() | ||
| self.request_states[companion_id] = companion_state | ||
|
|
||
| # Use same replica as the parent for affinity, or choose one |
There was a problem hiding this comment.
Do we support CFG in this pr?
There was a problem hiding this comment.
CFG is partially supported in this PR.
The companion flow is implemented, and it works for the common setup with single-replica stage-0. For multi-replica stage-0, there is still a processor/client alignment risk. We’ll add follow-up hardening + tests for that case.
| chosen_client_index: dict[int, int] = field(default_factory=dict) | ||
|
|
||
|
|
||
| class Orchestrator: |
There was a problem hiding this comment.
We need to add tests for multi-stage scenarios
There was a problem hiding this comment.
Sure. I will add it.
|
@yinpeiqi I understand that your main concern is having a standalone StagePool design. I’ll take a deeper look and start with a baseline StagePool implementation. |
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Signed-off-by: ZhengWG <zwg0606@gmail.com>
56a8be6 to
658415e
Compare
Signed-off-by: ZhengWG <zwg0606@gmail.com>
Updated Benchmark ResultsRe-ran the full benchmark on the latest branch with the same parameters as before:
Complete Results (2gpu_base c=24,32 and 3gpu_single c=32 added)
Peak per-GPU efficiency: 3gpu_replica2 reaches 0.138 req/s/GPU (+14% vs 2gpu_base, +47% vs 3gpu_single). On the Latency/Throughput TradeoffThe TTFT/TPOT degradation at higher concurrency is not specific to multi-replica — it applies to all configurations equally. A fairer comparison is at equivalent QPS:
At the same throughput level, multi-replica achieves equal or lower latency — this is a Pareto improvement, not a tradeoff. On Stage-Level AttributionAgreed that TTFT/TPOT are dominated by the thinker stage (1 replica in all configs). Two signals directly attribute the gains to talker/code2wav: 1) TTFT is nearly identical across configs at the same concurrency (e.g., c=8: 123/131/131ms), confirming the thinker bottleneck is unchanged and multi-replica adds no overhead. 2) Audio throughput directly measures talker+code2wav capacity (thinker does not produce audio):
Additionally, 3gpu_single plateaus at c=16–24 (0.248→0.249 req/s), showing its talker/code2wav pipeline is saturated. Multi-replica breaks through this ceiling to 0.414 req/s at c=32. Per-step profiling confirms framework overhead is <0.3ms/step (<0.5% of compute), and the gains come from parallel talker/code2wav execution across replicas. |
| A replica consumes the devices required by that diffusion stage. For single-device diffusion pipelines, set `runtime.devices` to one device per replica: | ||
|
|
||
| ```yaml | ||
| stages: |
There was a problem hiding this comment.
we we deprecate these yamls in v0.22.0, please remember to rm them in the follow-up PRs
Signed-off-by: ZhengWG <zwg0606@gmail.com>
|
Given the issues that still exist in the Bagel model itself for the current version of the code, I think we should leave out the diffusion part for now. You can go ahead and remove the corresponding CI , YAML , README content, and any other related materials. |
Signed-off-by: ZhengWG <zwg0606@gmail.com> Signed-off-by: Zheng Wengang <zwg0606@gmail.com> Signed-off-by: Peiqi Yin <60515999+yinpeiqi@users.noreply.github.com> Signed-off-by: yinpe <11810305@mail.sustech.edu.cn> Signed-off-by: yinpeiqi <yinpeiqi809@gmail.com> Co-authored-by: Peiqi Yin <60515999+yinpeiqi@users.noreply.github.com> Co-authored-by: yinpe <11810305@mail.sustech.edu.cn> Co-authored-by: yinpeiqi <yinpeiqi809@gmail.com> Co-authored-by: Hongsheng Liu <liuhongsheng4@huawei.com> Co-authored-by: Gao Han <hgaoaf@connect.ust.hk> Co-authored-by: Chenguang Zheng <645327136@qq.com>
Purpose
Implement Multi-Instance Stage Deployment for vLLM-Omni, enabling horizontal scaling of individual logical stages (e.g., talker) with multiple replicas.
Problem: Each YAML stage_id maps 1:1 to a single StageEngineCoreClient. The same logical role (e.g., talker) cannot be horizontally scaled — compute and throughput cannot be independently scaled per stage.
**Solution:**Introduce a
StagePoollayer between the Orchestrator and individual engine clients. Each logical stage owns a pool of replicas; the Orchestrator operates exclusively onStageReplicahandles, never on flat indices. Replica selection (round-robin + per-request affinity + CFG companion binding) is encapsulated inside the pool. Related to #2634. cc @yinpeiqi @fake0fanMain Design:
vllm_omni/engine/stage_pool.py)select_replica: Three-phase resolution — (1) req_state cache hit → same replica as before; (2)affinity_fromcross-request binding (CFG companion → parent); (3) round-robin.admit: Atomically couplesselect_replica+output_processor.add_request, ensuring the processor that receives raw outputs is always the one that registered the request. This fixes a silent output loss bug whennum_replicas > 1on stage 0.All orchestrator methods (
_orchestration_loop,_route_output,_forward_to_next_stage,_poll_stage_raw,_process_stage_outputs,_build_stage_metrics,_handle_abort,_handle_collective_rpc,_shutdown_stages, etc.) operate onStageReplicahandles directly. No flat-index resolution anywhere.Usage:
TP=2 with 2 replicas:
Backward compatible: When num_replicas is omitted (defaults to 1), behavior is identical to the original code. No existing YAML configs need modification. More details in
vllm_omni/model_executor/stage_configs/qwen3_omni_moe_async_chunk_multi_replicas.yaml.Scope / Known Limitations:
single_stage_mode, remote stages are pinned to 1 replica. Extending to N replicas requiresOmniMasterServerprotocol changes ((stage_id, replica_index)addressing). See TODO in_initialize_stages.Test Plan
Unit tests — tests/engine/test_orchestrator.py (9 cases):
Test Result
================================================================================================================ test session starts ================================================================================================================ platform linux -- Python 3.10.16, pytest-9.0.3, pluggy-1.5.0 -- /opt/conda/bin/python cachedir: .pytest_cache rootdir: /home/nas/pengyu.zwg/vllm-omni-dev configfile: pyproject.toml plugins: asyncio-1.3.0, anyio-4.13.0 asyncio: mode=auto, debug=False, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function collected 9 items tests/engine/test_orchestrator.py::test_run_two_stage_llm PASSED [ 11%] tests/engine/test_orchestrator.py::test_run_single_stage_diffusion PASSED [ 22%] tests/engine/test_orchestrator.py::test_run_llm_to_diffusion PASSED [ 33%] tests/engine/test_orchestrator.py::test_run_async_chunk PASSED [ 44%] tests/engine/test_orchestrator.py::test_run_shutdown PASSED [ 55%] tests/engine/test_orchestrator.py::test_run_abort PASSED [ 66%] tests/engine/test_orchestrator.py::test_multi_replica_round_robin_distribution PASSED [ 77%] tests/engine/test_orchestrator.py::test_multi_replica_abort_broadcasts_to_all_replicas PASSED [ 88%] tests/engine/test_orchestrator.py::test_multi_replica_shutdown_all_replicas PASSEDEssential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)