[WIP] Integrate OmniCoordinator into stage engine pipeline#3569
[WIP] Integrate OmniCoordinator into stage engine pipeline#3569chickeyton wants to merge 10 commits into
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e751bcee5e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| info = self._replicas.get(input_addr) | ||
| if info is not None: | ||
| info.last_heartbeat = time() |
There was a problem hiding this comment.
Update queue length from heartbeat events
When using LeastQueueLengthBalancer, the only periodic source of live load is the new _on_heartbeat hook in the stage/diffusion procs, which sends the refreshed queue_length on heartbeat messages. This heartbeat path updates only last_heartbeat, so the coordinator keeps publishing the initial queue length (usually 0) and the least-queue policy routes with stale load information, allowing busy replicas to be selected as if they were idle. Please copy event.queue_length into info.queue_length for heartbeat events and schedule a broadcast when it changes.
Useful? React with 👍 / 👎.
|
resolve conflicts please |
As a follow up, raised a chickeyton#4 into this PR code It is an ad-hoc fix that:
A more elegant way to assign port number would be a future work to discuss about. |
|
A similar deploy config to test with HunyuanImage-3.0-Instruct with multi-replicas: |
Signed-off-by: chickeyton <ngton2014@gmail.com>
LeastQueueLengthBalancer relies on heartbeats as the only periodic source of live load, because StageEngineCoreProc/StageDiffusionProc refresh ``queue_length`` just-in-time via the ``_on_heartbeat`` hook before each heartbeat send. The coordinator's heartbeat handler was only updating ``last_heartbeat`` though, so it kept publishing the initial queue_length (usually 0) and the least-queue policy could pick busy replicas as if they were idle. Copy ``event.queue_length`` into ``info.queue_length`` on heartbeat events and request a broadcast when it changes so subscribers see fresh load promptly. Coalescing in the periodic loop keeps the wire traffic bounded. Also corrects the now-outdated docstring on ``_send_event`` that claimed heartbeats sent ``queue_length=null``. Signed-off-by: chickeyton <ngton2014@gmail.com>
- Drop unused ``vllm_config`` local in ``StageEngineCoreProc.run_stage_core`` (F841); the comment about the removed hardcoded data_parallel_size is retained. - Wrap the long ``[Headless] Launching ... OmniMasterServer`` log line in serve.py to keep it under the 120-char limit (E501). - Reflow multi-line ``raise`` / ``logger`` calls that fit on one line per ``ruff format`` rules in stage_diffusion_proc, async_omni_engine, omni_coord_client_for_hub, omni_core_engine_proc_manager, orchestrator, stage_engine_core_proc and serve. Signed-off-by: chickeyton <ngton2014@gmail.com>
Signed-off-by: chickeyton <ngton2014@gmail.com>
Signed-off-by: chickeyton <ngton2014@gmail.com>
Signed-off-by: herotai214 <herotai214@gmail.com>
affb36a to
73301a8
Compare
|
Based on commit ba30955, I verified that in multi-replicas case:
The replicas are working well. I added some codes for loggings, see if the logging codes benefit this PR. |
TestSetupmodel: tencent/HunyuanImage-3.0-Instruct server2: Headless cli with stage 0 (AR) 1 x Replicas connector: RDMA Mooncake InputOutput
|
|
This PR adds useful functionality, but the implementation feels too tightly coupled. Right now topology planning, headless registration/handshake, replica lifecycle, routing, and backend-specific behavior are spread across serve.py, async_omni_engine.py, stage_engine_startup.py, orchestrator.py, and stage_pool.py. That makes the control flow hard to follow and will make future changes risky. Upstream vLLM handles this more cleanly by separating concerns at the boundaries: config/parallel arguments are normalized first, then a small number of factory/launcher paths choose the execution mode, and launch/handshake logic is concentrated in dedicated utilities. The frontend mostly talks to abstract client/executor interfaces, and the DP coordinator stays a control-plane component rather than being mixed into request orchestration. I think vLLM-Omni should move in the same direction: introduce a clear topology/plan layer, isolate registration + bootstrap + handshake into dedicated runtime helpers, keep LLM/diffusion differences behind small runtime/client abstractions, let Orchestrator focus on request flow only, and let StagePool focus on routing rather than membership/lifecycle management. This would improve readability, reduce cross-module coupling, and make the multi-node path much easier to maintain. |
|
@fake0fan @Gaohan123 plz review this PR asap |
I will refactor it after merged |




Integrate OmniCoordinator into the stage engine pipeline
Closes / relates to: #984
Motivation
OmniCoordinator,OmniCoordClientForStage, andOmniCoordClientForHubalready existed in
vllm_omni/distributed/omni_coordinator/but were notwired into the running system. This PR is the integration work: it makes
the running pipeline actually use them so that
vllm serve <model> --omni --stage-id Nwithout--headless) hosts anOmniCoordinatoralongside the existingOmniMasterServer.externally-launched headless replicas, AR or DiT — reports liveness
to the coordinator from inside the engine subprocess via
OmniCoordClientForStage.Orchestratordiscovers replicas and load-balancesacross them via a single
OmniCoordClientForHub+ a per-stageLoadBalancerinjected into eachStagePool.head; the head attaches them dynamically through
Orchestrator._attach_remote_replica/_detach_remote_replica.Design constraints honoured
--omni-dp-size-local,--omni-lb-policy,--omni-heartbeat-timeout,--omni-replica-address. No new environment variables.OmniCoordinatorruns unconditionally when--stage-idis setand
--headlessis not — the user-stated invariant.--omni-master-address;its ROUTER/PUB ports are auto-picked and published to registrants
through
OmniMasterServer's registration reply.--omni-dp-size-localis per-runtime, not per-cluster — eachhead and each headless invocation reads its own copy and launches
that many replicas locally for its own
--stage-id. Values maydiffer across processes; the master server's auto-assignment keeps
replica ids globally unique within a stage.
Architecture
flowchart LR classDef new stroke-width:6px subgraph HEAD["Head: <code>vllm serve --omni --stage-id 0 --omni-dp-size-local 2</code>"] direction TB AsyncOmni["AsyncOmni<br/>(EngineClient API)"] Orch["Orchestrator<br/>(asyncio loop)<br/>+ remote replica attach/detach<br/>+ OmniCoordClientForHub"] Pools["StagePools<br/>(addr-keyed clients)<br/>+ LoadBalancer + affinity<br/>+ pick()"] OMS["OmniMasterServer<br/>tcp://H:9000 ROUTER"] subgraph OCR["<b>OmniCoordinatorRuntime</b>"] direction TB Coord["OmniCoordinator<br/>ROUTER + PUB<br/>(auto ports)"] end LocalMgr["<b>OmniCoreEngineProcManager</b>"] LocalProc0["StageEngineCoreProc<br/>stage 0, replica 0<br/>OmniCoordClientForStage"] LocalProc1["StageEngineCoreProc<br/>stage 0, replica 1<br/>OmniCoordClientForStage"] AsyncOmni --> Orch Orch --> Pools Coord -.PUB.-> Orch LocalMgr --> LocalProc0 LocalMgr --> LocalProc1 LocalProc0 -.DEALER.-> Coord LocalProc1 -.DEALER.-> Coord Pools <-->|ZMQ ROUTER/PULL| LocalProc0 Pools <-->|ZMQ ROUTER/PULL| LocalProc1 OMS --on_register--> Orch end subgraph HL1["Headless A: <code>--headless --stage-id 1 --omni-dp-size-local 4</code>"] direction TB HL1Main["run_headless<br/>main process"] HL1Mgr["<b>OmniCoreEngineProcManager</b>"] HL1P0["StageEngineCoreProc<br/>stage 1, replica 0<br/>OmniCoordClientForStage"] HL1P1["StageEngineCoreProc<br/>stage 1, replica 1<br/>OmniCoordClientForStage"] HL1P2["StageEngineCoreProc<br/>stage 1, replica 2<br/>OmniCoordClientForStage"] HL1P3["StageEngineCoreProc<br/>stage 1, replica 3<br/>OmniCoordClientForStage"] HL1Main --> HL1Mgr HL1Mgr --> HL1P0 HL1Mgr --> HL1P1 HL1Mgr --> HL1P2 HL1Mgr --> HL1P3 end subgraph HL2["Headless B: <code>--headless --stage-id 1 --omni-dp-size-local 1</code>"] direction TB HL2Main["run_headless"] HL2Mgr["<b>OmniCoreEngineProcManager</b>"] HL2P0["StageEngineCoreProc<br/>stage 1, replica 4<br/>OmniCoordClientForStage"] HL2Main --> HL2Mgr HL2Mgr --> HL2P0 end subgraph HL3["Headless C: <code>--headless --stage-id 2 --omni-dp-size-local 3</code>"] direction TB HL3Main["run_headless"] HL3P0["StageDiffusionProc<br/>stage 2, replica 0<br/>OmniCoordClientForStage"] HL3P1["StageDiffusionProc<br/>stage 2, replica 1<br/>OmniCoordClientForStage"] HL3P2["StageDiffusionProc<br/>stage 2, replica 2<br/>OmniCoordClientForStage"] HL3Main --> HL3P0 HL3Main --> HL3P1 HL3Main --> HL3P2 end HL1Main -.register DEALER.-> OMS HL2Main -.register DEALER.-> OMS HL3Main -.register DEALER.-> OMS HL1P0 -.DEALER.-> Coord HL1P1 -.DEALER.-> Coord HL1P2 -.DEALER.-> Coord HL1P3 -.DEALER.-> Coord HL2P0 -.DEALER.-> Coord HL3P0 -.DEALER.-> Coord HL3P1 -.DEALER.-> Coord HL3P2 -.DEALER.-> Coord Pools <-->|ZMQ| HL1P0 Pools <-->|ZMQ| HL1P1 Pools <-->|ZMQ| HL1P2 Pools <-->|ZMQ| HL1P3 Pools <-->|ZMQ| HL2P0 Pools <-->|ZMQ| HL3P0 Pools <-->|ZMQ| HL3P1 Pools <-->|ZMQ| HL3P2 %% Thicken borders of newly added classes (no fill change). class LocalMgr,HL1Mgr,HL2Mgr new style OCR stroke-width:6pxLegend
socket pattern.
Pools <--> procis the head-sideStageEngineCoreClient/StageDiffusionClientROUTER+PULL bound to ports allocated byOmniMasterServer; the engine subprocesses connect via DEALER+PUSH.OmniCoordinatorRuntimeand
OmniCoreEngineProcManager.OrchestratorandStagePoolareexisting classes that absorb new responsibilities (dispatch and
remote-attach), so they keep the default thin border.
Where dispatch and attach logic live
To keep the class graph small, the routing and attach concerns are folded
into the two classes that already own those neighborhoods:
OmniCoordinator's PUB; cache the clusterReplicaListOrchestratorOmniCoordClientForHubOmniMasterServerregister events into head-side stage clientsOrchestrator_attach_remote_replica,_detach_remote_replicaStagePoolpick(request_id, task),bind(request_id, addr),invalidate_addr(addr)Per-runtime
--omni-dp-size-localEvery box that carries a
--stage-idcarries its own--omni-dp-size-local. The flag is process-local: each invocationreads its own copy and launches that many replicas locally. Values are
independent.
In the diagram above:
--stage-id--omni-dp-size-localCluster total: 2 + 4 + 1 + 3 = 10 replicas across 3 stages. The head's
OmniCoordClientForHub(owned byOrchestrator) sees them alluniformly; each
StagePoolpicks among its own stage's UP replicas.Replica IDs within a stage are auto-assigned by
OmniMasterServersothey stay unique even when contributors run with different local sizes.
Head and headless startup order
There is no hard ordering requirement between head and headless processes
— ZMQ
DEALERqueues registration messages client-side until the head'sOmniMasterServerROUTERbinds, and the headless waits up to 300 s(
_DEFAULT_STARTUP_TIMEOUT_S) for the reply. Two soft constraints apply:listening within 300 s of the headless's
register_stage_with_omni_mastercall.num_replicas: Nfor a remote stage, the head'sconnect_remote_engine_coresblocks at bring-up until N concrete--replica-idregistrations arrive (bounded by--stage-init-timeout).One additional inter-headless rule applies only when two headless
processes target the same stage with mixed concrete + auto-assign
(one uses
--replica-id 0, another relies on--omni-dp-size-local):the concrete registrant must finish registration before the
auto-assigner sends its first request, otherwise auto-assign can steal
slot 0 (bug #9). Pure-concrete and pure-auto-assign clusters are
unaffected.
Recommended pattern. Start the head first; wait for
OmniMasterServer] Listening on tcpin its log; then launch headlessprocesses in any order (subject to the mixed-mode rule above if
applicable). This is what every smoke-test scenario does and it is the
only pattern that has been validated end-to-end.
New CLI flags
Four flags are added to the
OmniConfigargument group ofvllm serve.All three are consumed by both head and headless invocations (where
applicable); none introduces a new environment variable.
--omni-dp-size-local <int>(default1)Number of stage replicas this runtime launches locally for its own
--stage-id. Mapped onto that stage'sruntime_cfg.num_replicasforthis process only.
own copy. Values may differ across invocations — e.g. one headless
may run
--omni-dp-size-local 4while another runs--omni-dp-size-local 1on the same stage; the master server'sauto-assigned replica ids keep the cluster's per-stage namespace
globally unique.
--stage-idwhen the value is!= 1(validated inOmniServeCommand.validate; barevllm serve --omniwithout astage id is unaffected).
> 1on a headless invocation, the runtime narrowsCUDA_VISIBLE_DEVICESper spawned replica (so two replicas do notstack on
cuda:0) and gives each DiT replica a uniquetorch.distributedMASTER_PORT(bugs init main repo structure and demonstrate the AR + DiT demo for omni models #6 and vllm-omni framework and support for qwen2.5-omni [WIP] #7 above).--omni-lb-policy <random|round-robin|least-queue-length>(defaultrandom)Per-stage load-balancing policy used by the head's
StagePooltoroute incoming requests across UP replicas. Validated against the
LoadBalancingPolicyenum. Only consulted on the head runtime —the orchestrator wires a fresh
LoadBalancerof this kind into eachStagePoolit owns; pools on headless processes never see the flag.random— uniform random pick over UP replicas.round-robin— cycle through UP replicas in registration order.least-queue-length— pick the UP replica with the smallestqueue_lengthreported via heartbeat (get_num_unfinished_requests()for LLM stages, in-flight task count for DiT stages).
Request affinity (CFG companions, multi-step requests) takes
precedence over the policy: once a request is bound to a replica, the
same replica is reused as long as it stays UP.
--omni-heartbeat-timeout <float>(default30.0)Seconds before an unreporting replica is marked
ERRORin theOmniCoordinator. Only consulted on the head runtime — it is theparameter the coordinator's periodic loop uses to decide a replica is
stale. Headless processes always heartbeat on a fixed interval
(
heartbeat_interval, ~5 s for the 30 s default) regardless of thisflag.
When a replica is flipped to
ERROR, the orchestrator's_watch_replica_listtask enqueues anunregister_remote_replicacontrol message; pinned requests are aborted with a clear error and
the head-side client is torn down.
--omni-replica-address <ip>/-ora(default auto-detect)Local bind address (this host's IP) that the headless stage
advertises to the Omni master for its per-stage handshake / input /
output ZMQ sockets. Only consulted on headless runtimes —
validated to require
--headlessinOmniServeCommand.validate; thehead ignores the flag (its own stages are co-located with the master
and already use the master-rooted addresses).
register_stage_with_omni_masterruns a UDP-connect routing probe against
--omni-master-address:--omni-master-port: it opens aSOCK_DGRAMsocket, calls
connect()(no packets actually sent — just forces aroute lookup), and reads
getsockname()[0]to learn the source IPthe kernel would use to reach the master. That IP is what the
headless's ZMQ sockets must bind on. On a single-host setup this
returns the loopback / eth0 IP and the resulting registration is a
behaviour-preserving no-op; on a cross-host pod it returns the
headless's own routable interface IP.
the master is reachable on the wrong interface, or environments where
the source-route lookup picks an address that the head cannot reach
back, are the cases that need the explicit override.
via
get_open_ports_list(count=3)and includesreplica_bind_address+replica_{handshake,input,output}_portinthe registration payload.
OmniMasterServer._handle_registrationrecognises the fields and rewrites
_stage_routes[(stage_id, replica_id)]so subsequent head-side lookups viaget_engine_zmq_addresses(stage_id)return the new, headless-rootedaddresses. Without this rewrite the head would hand back the
master's own IP — which the headless cannot bind on a different host
(
zmq.error.ZMQError: Cannot assign requested address).This flag is what makes the cross-server topology (head on one pod,
headless DiT on another) work without operator-supplied per-host
configuration in the common case, while still leaving an escape hatch
for unusual NIC layouts.
--replica-id <int>(deprecated, ignored)The pre-existing
--replica-idflag is now deprecated and ignored.Replica ids are auto-assigned by
OmniMasterServerso headlessprocesses carry no knowledge of their per-replica id at launch time —
the master is the sole authority on the per-stage namespace and a
launching headless cannot race or collide with any other registrant
(bug #9 is now structurally impossible). When
--replica-idissupplied on the CLI,
run_headlessemits a single warning log lineidentifying the supplied value and continues with auto-assignment;
the flag itself stays in the parser only so existing launchers
(scripts, CI configs) keep working without modification.
CLI usage examples — BAGEL scenarios A, B, C
The bundled
vllm_omni/deploy/bagel.yamlputs both stages on GPU 0.The scenarios below use the test harness's
bagel_two_gpu.yaml/bagel_dp_local_2.yamldeploy configs to spread the AR thinker andthe DiT onto separate cards; you can use any deploy YAML that
declares
runtime.devicesconsistently with the GPU mapping passedvia
CUDA_VISIBLE_DEVICES.Common variables used across the examples:
Scenario A — single in-process runtime (AR + DiT)
One
vllm serveprocess; no coordinator, no headless, no--stage-id.The AR thinker and the DiT both run inside one engine, on GPUs 0 and 1.
Then issue a request against
http://$HOST:$API_PORT/v1/chat/completionswith
modalities=["image"].Scenario B — head
dp_local=2(AR) + single headlessdp_local=2(DiT)Two
vllm serveprocesses, four replicas total. The head hosts theAR thinker with two local replicas on GPUs 0,1 and runs the
coordinator + master server. A single headless hosts the DiT stage
with two replicas, exercising the single-headless multi-replica DiT
path (bugs #6 and #7 above).
Wait for two
Diffusion replica id=N for stage 1 is uplines in theheadless log, then
curl http://$HOST:$API_PORT/healthbeforeissuing requests.
Scenario C — head + headless AR
dp_local=2+ headless DiTThree
vllm serveprocesses, four replicas total — the AR analogueof scenario B. The head runs 1 AR replica, a second headless runs 2
more AR replicas on the same stage (exercising
single-headless multi-replica AR at 7B scale, bugs #6 + #8),
and a third headless runs 1 DiT replica.
Wait for two
Stage 0 replica id=N uplines in headless A's log, oneDiffusion replica id=0 for stage 1 is upline in headless B's log,and
/health200 on the head before issuing requests.Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)