Skip to content

[3/5] [WIP][core]refactor communication layer: PR 3 of 5, all other models in non async mode #3719

Open
natureofnature wants to merge 15 commits into
vllm-project:mainfrom
natureofnature:pr/refactor/pr3
Open

[3/5] [WIP][core]refactor communication layer: PR 3 of 5, all other models in non async mode #3719
natureofnature wants to merge 15 commits into
vllm-project:mainfrom
natureofnature:pr/refactor/pr3

Conversation

@natureofnature
Copy link
Copy Markdown
Contributor

@natureofnature natureofnature commented May 19, 2026

PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.

Purpose

For the Refactor of commnication layer, there are going to be 5 PRs in total.
Refer to PR #1555 as the first PR.
PR:#2677 as the second PR.
There are going to be 3 remaining PRs .

  • 1/5: PR 1555, merged, basic infra
  • 2/5: Refactor on Qwen3 Omni non async mode
  • [~] 3/5: Refactor of other models using non async mode (in progress)
  • 4/5: Refactor async execution workflow
  • 5/5: Remove redundant codes and add docs.

Scope

The PR3 closure migrated these 10 (arch, stage) transitions. Payload
shape
is the actual dict layout shipped over the connector (verified
against 81208065); per-builder details live in §8.

Arch Stage Group Transition Payload shape (top-level keys)
Qwen3OmniMoeForConditionalGeneration talker (PR1) thinker → talker OmniPayload-nested (hidden_states.*, embed.prefill, ids.*)
Qwen3OmniMoeForConditionalGeneration code2wav (PR1) talker → code2wav {codes.audio, meta.finished}
Qwen2_5OmniForConditionalGeneration talker B thinker → talker OmniPayload-nested: hidden_states.{output, output_shape}, embed.{prefill, prefill_shape}, ids.{prompt, output}
Qwen2_5OmniForConditionalGeneration code2wav B talker → code2wav {codes.audio (boundary-stripped tokens), meta.finished}
CovoAudioForConditionalGeneration code2wav B fused → code2wav {codes.audio (filtered tokens), meta.finished}
MiMoAudioModel code2wav B fused → code2wav {codes.audio (col-major flat tokens), meta.finished}
Qwen3TTSCode2Wav code2wav C talker → code2wav {codes.audio (ref-prefixed, flattened), meta.finished}codes.ref / ref_code_len are producer-side intermediates only, folded into codes.audio before send
CosyVoice3Model cosyvoice3_code2wav D text → code2wav {embed.{speech_token, speech_feat, embedding}, meta.finished}
DyninOmniForConditionalGeneration token2image B token2text → token2image Hybrid: normalized prompt-metadata fields flattened at top level (speaker, language, detok_id, …) + codes.audio (token_ids) + meta.finished
DyninOmniForConditionalGeneration token2audio B token2image → token2audio same shape, second hop

Group A — protect-only (no migration): archs that have a multi-stage pipeline but stay on the legacy additional_information data plane. PR3 doesn't migrate them; it only generalizes the new gate plumbing so
their absence from _OMNI_CONNECTOR_INIT_ARCHS and _FULL_PAYLOAD_INPUT_STAGES doesn't break init. Verification = topology guard tests that prevent accidental future migration.
Members: MingFlash.

Group B — straightforward migration (1-D codec or hidden-state tensor): producer emits per-step pooler output, accumulator either keeps the latest (hidden-state edges) or CONCATs along dim 0 (codec
edges), builder packages into nested OmniPayload-style dict. No producer-side splicing of multiple accumulator sources, no out-of-band metadata channels. Verification = e2e on each member individually.
Members: Qwen2_5Omni (both transitions), CovoAudio, MiMoAudio, DyninOmni (both hops).

Group C — splice + reshape: producer accumulates two independent streams (generated codec + reference codec) plus an integer metadata field, then fuses them producer-side before shipping. The fused
payload is still single-keyed (codes.audio), but the splicing logic (transpose, codebook-major flatten, ref prepend) means correctness depends on accumulator ordering, length contracts, and a per-codebook filter — all PR3-new code. Verification = e2e plus a dedicated _filter_audio_codes_qwen3_tts unit test on edge cases (negative,
all-zero, out-of-range).
Members: Qwen3TTS.

Group D — partial migration with explicit exception: arch has multiple sub-paths and only the prompt-conditioning sub-path moves to the connector. The sync codec sub-path stays on legacy as a
deliberate scoped exception。 The connector edge here carries embed.* prompt tensors only — not codec tokens. Verification = e2e on both zh and en prompts to catch the producer/consumer divergence under realistic content.
Members: CosyVoice3.

Architecture

image
sequenceDiagram
    autonumber
    participant Sched as Stage N+1 Scheduler
    participant Coord as Coordinator
    participant RunP as Stage N Runner
    participant Acc as accumulator
    participant Conn as Connector (shm)
    participant RunC as Stage N+1 Runner
    participant Model as Stage N+1 model.forward

    Note over Sched,Coord: A fresh request lands on Stage N+1.
    Sched->>Coord: process_pending_full_payload_inputs(...)
    Coord->>Coord: WAITING -> WAITING_FOR_INPUT
    Coord-->>Sched: pending_input_registrations=[OmniChunkRecvHandle(...)]
    Sched->>RunC: SchedulerOutput (with handles)
    RunC->>Conn: register_chunk_recv(req_id, external_req_id)

    Note over RunP,Acc: Stage N executes its forward steps.
    loop per token/step
        RunP->>Acc: accumulate_full_payload_output(req_id, pooler_output)
    end
    RunP->>Conn: flush_full_payload_outputs({req_id_finished})
    Conn->>Conn: SIP._full_payload(pooling_output, request) -> payload dict
    Conn-->>RunC: bg thread delivers payload to Stage N+1 connector

    Sched->>RunC: next schedule() cycle
    RunC->>RunC: recv_full_payload_inputs(scheduler_output)<br/> -> stage_recv_req_ids
    RunC-->>Sched: OmniConnectorOutput(request_metadata, recv_ids)
    Sched->>Coord: _consume_pending_connector_output(...)
    Coord->>Coord: WAITING_FOR_INPUT -> WAITING (req now schedulable)

    Note over Sched,Model: Subsequent cycle schedules the req for Stage N+1 forward.
    Sched->>RunC: SchedulerOutput (req now runnable)
    RunC->>Model: forward(model_intermediate_buffer={req_id: payload})
    Model-->>RunC: outputs
Loading

Change of lines

Source: git diff --numstat upstream/main..HEAD on pr/refactor/pr3 branch
(rebased on top of upstream/main 89f88195, 10 commits, 36 files).

Top-level buckets

Bucket +adds -dels files
Production (vllm_omni/) +1718 -224 28
Tests (tests/) +1202 -58 8
Total +2920 -282 36

Production breakdown

Sub-bucket +adds -dels files % of prod adds
stage_input_processors/*.py +1210 -43 8 70%
Other production (scheduler / worker / mixin / output / pipeline) +504 -181 17 29%
yaml configs (stage_configs/, deploy/) +4 0 3 0.2%

Stage input processors (8 files, +1210 / -43)

File +adds -dels Scope
qwen2_5_omni.py +331 0 real producer + token-only
qwen3_tts.py +203 -3 per-arch producer + negative-codec filter
cosyvoice3.py +148 -1 legacy text2flow_token_only + SIP (stage input processor) prompt-prefix strip
mimo_audio.py +132 0 per-arch producer + all-zero codec rows return empty
dynin_omni.py +125 0 _build_full_payload + nested codes.audio / meta.finished
qwen3_omni.py +100 -32 REPLACE keys + thinker_emb/hid trim refinement
ming_flash_omni.py +95 0 per-arch SIP builders + arch-not-in-allowlist note
covo_audio.py +76 -7 per-arch SIP builders

Test Plan

test-ready, test-merge, test-nightly

Test Result

TO BE Added


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan. Please provide the test scripts & test commands. Please state the reasons if your codes don't require additional test scripts. For test file guidelines, please check the test style doc
  • The test results. Please paste the results comparison before and after, or the e2e results.
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model. Please run mkdocs serve to sync the documentation editions to ./docs.
  • (Optional) Release notes update. If your change is user-facing, please update the release notes draft.

BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)

…giene, IPC type safety, DRY.

Five reviewer items bundled (file:line per item in the original review):

   + standard-freeing block in try/finally so the input_coordinator
   entry is pruned along every return path. Mirrors the pattern
   already in omni_generation_scheduler._free_request. Four inline
   "if self.input_coordinator is not None: self._free_input_coordinator_request(...)"
   calls collapsed into a single finally clause.

   self._omni_connector_initialized = True at the end of init.
   OmniGPUModelRunner._update_states gates cleanup_finished_request on
   this explicit flag instead of probing the private
   "_request_ids_mapping" attribute name. Removes the implicit
   "is the mixin done initialising" contract.

   list[OmniInputRegistration] (new minimal dataclass in
   vllm_omni/core/sched/output.py carrying request_id + external_req_id
   only - the two fields register_chunk_recv actually consumes).
   Replaces the prior list[Any], which msgspec falls back to JSON-ish
   serialisation for under PD-disagg / multi-node executor IPC.
   Wire payload also drops by ~one Request struct per pending
   registration. Tests stay green via duck-typed attribute access.

   capture boilerplate duplicated between omni_ar_scheduler and
   omni_generation_scheduler:
   - _consume_pending_connector_output(model_mode) -- drains
     _latest_omni_connector_output at top of schedule()
   - _capture_omni_connector_output(model_runner_output, model_mode) --
     stashes omni_connector_output at tail of update_from_output()
   - _wrap_omni_scheduler_output(base, **extras) -- builds
     OmniSchedulerOutput from a base SchedulerOutput
   AR + generation schedulers each lose 3 copy-pasted blocks.

Verified on H800 dev environment with --run-level full_model -m
"full_model and H800 and omni": test_one_word_prompt_001 +
test_speaker_002 ([default] and [async_chunk]) 4 passed in 6:32.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Three squashed bugfix commits all targeting qwen3_omni
thinker2talker_full_payload's row-counting logic for the worker
connector data plane:

1. Length-aware trim (orig c805e487): switch from unconditional [:-1]
   trim to a target_rows = len(all_token_ids) computation, so max-token
   finishes (which do not append a stop-emission row) are not
   over-trimmed.  Fixes long-output regression on test_mix_to_text_audio
   from BK 9702 main build.

2. Drop output_token_ids hoist (orig 0f14863a): surgical revert of an
   unused hoist left behind by the prior trim commit; no functional
   change.

3. Finish-reason-aware trim (orig 6d95f8bd): add a stop_emission_drop
   subtraction so FINISHED_STOPPED requests still drop their extra
   accumulated hidden-state row.  Codex P1 review on the prior commit
   identified this as a regression on short stop-finished outputs
   (spurious-phoneme on test_speaker_002).  Detection: primary via
   request.status; fallback heuristic via last-token-in-stop-set
   when worker-side CachedRequestState has no .status field.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Consolidates the two infrastructure commits that prepared PR3 for per-model
sync-data-plane migration:

- 6d4b4890 [Phase 2a] Structural gate via `_is_sync_input` marker
            (drop hard-coded `model_arch == Qwen3OmniMoeForConditionalGeneration`
             in `omni_scheduling_coordinator.uses_full_payload_input_coordinator`).
            Marker is set on the consumer-side `*_token_only` builder in
            each model's SIP module; the consumer-side scheduler gate
            reads it via the resolved `custom_process_input_func` callable.
- d7bc85fa [Phase 2d] REPLACE-keys accumulator hook + arch-gate cleanup.
            Per-model `_FULL_PAYLOAD_REPLACE_KEYS: frozenset[str]` declared
            in the SIP module; the worker accumulator
            (`omni_connector_model_runner_mixin.accumulate_full_payload_output`)
            looks it up via `proc.__module__` and uses REPLACE semantics for
            those keys (default is CONCAT).  Used by models where a key
            carries the full result so far rather than per-step deltas.

Net effect:
- `core/sched/omni_scheduling_coordinator.py`: marker-based structural gate
- `worker/omni_connector_model_runner_mixin.py`:
    `accumulate_full_payload_output`, `_resolve_full_payload_replace_keys`,
    `should_accumulate_full_payload_output`, related arch-gate cleanup
- `model_executor/stage_input_processors/qwen3_omni.py`: declares
    `_FULL_PAYLOAD_REPLACE_KEYS` for qwen3_omni's `talker2code2wav` keys
- Per-stage `custom_process_input_func` and `sync_process_input_func`
  selection plumbing remains in `config/stage_config.py:_select_processor_funcs`.

After this commit, per-arch SIP modules can declare:
  - a `*_token_only` builder (sync_process_input_func, marked `_is_sync_input = True`)
  - a `*_full_payload` builder (custom_process_next_stage_input_func)
  - optional `_FULL_PAYLOAD_REPLACE_KEYS` for REPLACE semantics
and the worker connector + scheduler coordinator handle the rest uniformly.

Per-arch migrations follow in 9 subsequent commits (covo_audio, dynin_omni × 2,
mimo_audio, qwen3_tts, cosyvoice3, ming_flash_omni, qwen2_5_omni × 2).

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Squash of 9 commits that wire per-arch stage_input_processor (SIP)
builders for the worker-connector data plane, plus the coordinator
gate generalization that makes them reachable:

- [Pilot] covo_audio (Group B llm->code2wav): pilot impl exercising
  the new structural gate.
- dynin_omni Group B (both transitions): SIP builders + yaml wires
  for stage_configs.
- qwen2_5_omni Group B half (talker->code2wav).
- mimo_audio Group B (llm->code2wav).
- qwen3_tts Group C (talker->code2wav).
- cosyvoice3 Group D-ish (text->flow).
- ming_flash_omni Group D (thinker->talker).
- qwen2_5_omni Group A reduced to D-minimal (thinker->talker
  structural sync builder).
- Coordinator gate generalization (drops hard-coded Qwen3-only check
  in uses_full_payload_input_coordinator and replaces it with the
  _FULL_PAYLOAD_INPUT_STAGES (arch, stage) whitelist).  Tests in
  tests/core/sched/test_omni_scheduling_coordinator.py +
  tests/worker/test_omni_gpu_model_runner.py adjust to the new
  whitelist contract.

Each per-arch SIP commit adds a builder pair (`*_token_only` and
`*_full_payload`) and a pipeline.py wire; tests in
test_qwen3_omni_streaming_helpers.py cover the structural
expectations.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Squash of 5 [PR3 Block A] Enable commits that wire each migrated arch
into both the worker init allowlist (_BLOCK_A_INIT_ALLOWLIST in
gpu_ar_model_runner.py + gpu_generation_model_runner.py) and the
scheduler coordinator allowlist (_FULL_PAYLOAD_INPUT_STAGES in
omni_scheduling_coordinator.py), keeping the two in lockstep so a
gate-enabled stage always has a wired-up worker connector.

Archs activated (in commit order):

- qwen2_5_omni talker -> code2wav (Block A pilot for q25;
  thinker -> talker stays orchestrator-routed at this commit because
  the producer builder is still a no-op).  Also widens
  init_omni_connectors arch allowlist from Qwen3-only to a 7-arch
  frozenset.
- covo_audio fused_thinker_talker -> code2wav.
- mimo_audio fused_thinker_talker -> code2wav.
- qwen3_tts talker -> code2wav (Qwen3TTSTalkerForConditionalGeneration
  -> Qwen3TTSCode2Wav).
- cosyvoice3 cosyvoice3_talker -> cosyvoice3_code2wav.

After this commit each arch's Stage-1 receives the full-payload
delivery via the worker connector instead of via the orchestrator-
side additional_information path.  The producer builders themselves
were added in the previous "Per-arch SIP builders" commit.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
…ation + Codex review

Consolidates the PR3 closure work: prefix_caching state-leak fix,
qwen2_5_omni real producer plumbing, dynin connector data-plane
migration, and Codex review feedback.

`omni_connector_model_runner_mixin.py`: `flush_full_payload_outputs`
is invoked at the start of `cleanup_finished_request` to drain any
pending full_payload entry for the finishing request before the rest
of the cleanup runs.  An earlier unconditional pop of
`_pending_full_payload_send` raced with flush and broke audio
consumers; flush-then-cleanup is idempotent and safe for paths
without a downstream consumer.  Fixes
`test_thinker_prefix_caching[omni_server0]` state-leak regression.

`stage_input_processors/qwen2_5_omni.py`: `thinker2talker_full_payload`
is no longer a no-op.  The real producer-side packer reads
`pooling_output["hidden"]`, splits prefill/decode, applies a
stop-emission trim aligned with the legacy contract (mirrors
qwen3_omni; covers max-token finishes without losing a hidden row),
and builds an `OmniPayloadStruct` matching the field set that
`thinker2talker_token_only` writes into `additional_information`.

`deploy/dynin_omni_ci.yaml`: adds `custom_process_next_stage_input_func`
on Stage 0 (token2text) and Stage 1 (token2image) pointing at the
`_full_payload` builders.  Without this entry, `_load_custom_func`
finds no builder, `_should_accumulate_full_payload_output()` returns
False, and the producer never enqueues a connector message.

End-to-end connector flow observed at runtime:
  Stage-0 flush_full_payload_outputs(req_id) -> to_send=[req_id]
  Stage-0 send_full_payload_outputs payload_keys=[...
      'code_predictor_codes', ...]
  Stage-1 full_payload recv complete: payload_type=dict
  Stage-1 flush_full_payload_outputs ... -> to_send=[req_id]
  (Stage-2 recv likewise)

dynin's legacy consumer-side `custom_process_input_func` on Stage 1
and Stage 2 is retained (see DECISIONS.md D-017): the connector path
is primary, but `_bridge_tokens` is still wired to propagate
`additional_information` to the downstream `OmniTokensPrompt` -- a
propagation the scheduler-side rewrite (`metadata` ->
`request.prompt_token_ids`) does not yet do.  Offline t2s passes
through the pure connector pipeline; the online server test relies
on `request.additional_information` for response assembly, so the
legacy SIP stays until the scheduler is extended in a follow-up
(see PR4 direction in D-017).

`core/sched/omni_scheduling_coordinator.py` adds three (arch, stage)
pairs to `_FULL_PAYLOAD_INPUT_STAGES`:
- `(Qwen2_5OmniForConditionalGeneration, talker)` -- consumer gate
  for the newly-active qwen2_5_omni connector path.
- `(DyninOmniForConditionalGeneration, token2image)` and
  `(..., token2audio)` -- consumer gates that park the dynin Stage 1
  and Stage 2 requests in WAITING_FOR_INPUT until the upstream
  payload arrives.

`worker/gpu_ar_model_runner.py` and
`worker/gpu_generation_model_runner.py` add
`DyninOmniForConditionalGeneration` to `_BLOCK_A_INIT_ALLOWLIST` so
`init_omni_connectors` runs for dynin workers.

Codex review fixes:

- Issue 1 (q25 SIP trim heuristic): replace unconditional
  `output_token_ids[:-1] + h[:-1]` trim with a `stop_emission_drop`-
  based trim that mirrors qwen3_omni's contract.  Folded into the
  qwen2_5_omni SIP rewrite above.
- Issue 3 (external_req_id None fallback): `register_chunk_recv` and
  `_resolve_external_req_id` now treat an explicit `None` on the
  request struct as a fallback to the internal `request_id`,
  preventing recv-key collisions like `None_<stage>_<chunk>` across
  requests.
- Issue 4 (coordinator test): rewrite the positive case in
  `tests/core/sched/test_omni_scheduling_coordinator.py` to iterate
  `_FULL_PAYLOAD_INPUT_STAGES` so newly-whitelisted (arch, stage)
  pairs fire the gate by construction.  Remove the q2.5/talker entry
  from the negative cases (now whitelisted).

`stage_configs/dynin_omni.yaml` and
`stage_configs/dynin_omni_multiconnector.yaml`: remove a transiently-
added `sync_process_input_func` line.  The active deploy path is the
`deploy/dynin_omni_ci.yaml` rewrite above; the stage_configs siblings
are kept in sync to avoid drift.

Verified on H800 dev environment:

- dynin e2e (offline + online, full): 11 passed / 1 skipped / 0 fail
  in 19:12.  Connector flow verified by stage-0/1/2 flush+send+recv
  log triplet.
- qwen3_omni online_serving (3 tests, dynin diff applied):
  3 passed / 0 failed.
- Canonical CI sweep (test-ready.yml + test-merge.yml, 21 steps):
  21 pass / 0 fail in 77.7 min.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Copy link
Copy Markdown
Collaborator

@hsliuustc0106 hsliuustc0106 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a [WIP] PR and part of a multi-PR refactor series (3/5), I'll hold off on a full blocker scan. Ready for full review when the WIP label is removed and test results are added to the PR description.

Layered fix for cosyvoice3 sync voice cloning: producer emits real
codec via the worker connector only when the talker reaches a stop
token; consumer overlays it into the placeholder input ids and trims
the prompt-conditioning mel only when that connector path is active.
Max-token fallbacks keep the legacy `additional_information` path
intact so non-cosyvoice3 archs and other terminal conditions are
unaffected.

Producer (model side, `models/cosyvoice3/cosyvoice3.py`)
-------------------------------------------------------
- New `build_pooler_payload(req_id, req_index, input_batch,
  sampled_token_ids, invalid_req_indices)` hook discovered via
  duck-typed `getattr` on the runner.  Returns
  `{"codes.audio": tensor}` only when the per-request codec history
  is non-empty AND the talker has emitted at least one
  stop-class token (id >= speech_token_size).  Mid-step or
  pre-finish polling returns `None` so the connector message is
  shipped exactly once at finish.
- `_pooler_codec_rows` keeps cumulative codec ids in
  `self._pooler_codec_history_by_req` and tracks two per-req
  sentinels:
    - `_pooler_codec_sampled_seen_by_req`: any in-vocab token
      observed for the request.
    - `_pooler_codec_sampled_finished_by_req`: a stop-class id was
      observed.
  Sampled path is preferred; on cold-start it falls back to
  `_pooler_output_history_from_input_batch` (vllm leaves the
  decoded slots at -1 under `prefer_model_sampler=True`, so this
  fallback is normally inert here but kept for resume paths).

SIP (`stage_input_processors/cosyvoice3.py`)
--------------------------------------------
- `text2flow_full_payload` reads `pooling_output["codes.audio"]`
  (flat dotted + nested fallback), tensor-wraps into
  `codes.audio`, and sets `meta.next_stage_prompt_len = len(token_ids)`
  for the overlay length contract.
- `_FULL_PAYLOAD_REPLACE_KEYS` adds `codes.audio` (per-step
  payload carries cumulative codec, not delta).
- `text2flow_token_only` keeps the legacy `additional_information`
  packing (multimodal_output + `ids.prompt`) so the orchestrator
  still has a usable fallback when `codes.audio` is not shipped.

Runner dispatch (`worker/gpu_ar_model_runner.py`)
-------------------------------------------------
- `_attach_model_pooler_payload` invokes the model hook and merges
  returned keys via `_pooler_payload_has_key` (handles both flat
  dotted and nested layouts).  Non-cosyvoice3 archs fall through
  unchanged.
- `_output_token_ids_for_model_sampler` now trims at the first -1
  per request so the model sampler never sees placeholder slots.

Consumer overlay (`worker/gpu_generation_model_runner.py`)
----------------------------------------------------------
- `_overlay_full_payload_input_ids` runs before each generation
  step (non-async-chunk path).  For each scheduled request, looks
  up the connector payload via `model_intermediate_buffer`, reads
  `_payload_audio_codes`, flattens to a 1-D tensor, and copies it
  into the placeholder slots in `input_ids`.  Length mismatch is a
  loud RuntimeError (drift catches misaligned producer + scheduler
  next_stage_prompt_len contract).

code2wav trim (`models/cosyvoice3/cosyvoice3_code2wav.py`)
---------------------------------------------------------
- Sync `forward()` now accepts `token_offset_tokens: int = 0` and
  threads it through to `_forward_mel`; the model-side caller in
  `cosyvoice3.py` passes `speech_token.shape[1]` only when
  `payload.codes is not None and payload.codes.audio is not None`,
  i.e., the request actually traveled the connector path.  Legacy
  fallback continues to call `forward()` with the default (0) so
  pre-existing en_001 behavior is preserved.

Tests
-----
- `tests/model_executor/models/cosyvoice3/test_cosyvoice3_model_helpers.py`
  extends with hook contract cases (sampled-vs-history priority,
  finish gating, cache reuse).
- `tests/model_executor/models/cosyvoice3/test_cosyvoice3_components.py`
  covers the new sentinel sets.
- `tests/worker/test_omni_gpu_model_runner.py` adds dispatch +
  dotted-key resolution coverage for the new hook.

Verified on H800 dev environment with `--run-level full_model -m
"full_model and tts"`:

  * voice_clone_zh_001 (sync, connector path active):
      payload_keys=["codes", "embed", "meta"] code_len=292
      similarity=0.903 PASS
  * voice_clone_en_001 (sync, legacy fallback when talker hits
    max-tokens without stop):
      payload_keys=["embed", "meta"] code_len=None
      similarity=0.963 PASS

Follow-up (not gating): `_pooler_codec_history_by_req`,
`_pooler_codec_sampled_seen_by_req`, and
`_pooler_codec_sampled_finished_by_req` are not yet pruned in
`cleanup_finished_request`; long-running multi-request servers may
accumulate per-req entries.  CI is unaffected (per-test server).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Address issue on 809f6e1 (v5):
- High #1: build_pooler_payload received `out_idx` but sampler_output/invalid_req_indices index by input_batch row.
- High vllm-project#2: gpu_generation_model_runner._overlay_full_payload_input_ids was CosyVoice3-specific in the common runner.
- Medium vllm-project#3: cosyvoice3._pooler_output_history_from_input_batch didn't stop at -1 placeholder.

Resolution: drop the connector codec path for CosyVoice3 sync and deliver codec via legacy `additional_information`, strip prompt/reference prefix at the SIP layer, and gate code2wav mel-trim on the talker-prefill offset only when a speech-stop token was seen.

Source changes:
- gpu_ar_model_runner.py: remove build_pooler_payload hook + _attach_model_pooler_payload + _pooler_payload_has_key.
- gpu_generation_model_runner.py: remove _flatten_audio_codes_to_tensor + _overlay_full_payload_input_ids + its call site.
- cosyvoice3.py (model): remove build_pooler_payload + _pooler_codec_rows + _pooler_output_history_from_input_batch + _pooler_sampled_token_ids (and three per-req caches: _pooler_codec_history_by_req, _pooler_codec_sampled_seen_by_req, _pooler_codec_sampled_finished_by_req). code2wav.forward token_offset_tokens now reads `meta.talker_prefill_offset` (already a struct field used by qwen3_tts).
- SIP cosyvoice3.py: text2flow + text2flow_token_only strip the prompt token prefix and the prompt speech_token prefix from cumulative_token_ids; set meta.talker_prefill_offset only when raw output contains a speech-stop token. text2flow_full_payload no longer ships codes.audio (embed/meta only). Drop `codes.audio` from _FULL_PAYLOAD_REPLACE_KEYS.
- _to_token_id_list no longer filters negative ids (needed for stop-token detection on raw cumulative ids).

Side effects:
- v5's cosyvoice3 per-req cache leak is gone (no pooler hook → no accumulator).
- The pre-existing baseline `voice_clone_zh_001[cosyvoice3]` sim=0.00 (transcript "先") failure is fixed.

Verification on H800 GPU  with `--run-level full_model -m "full_model and tts"`:
- test_voice_clone_zh_001[cosyvoice3]: PASS sim=1.000 (baseline FAIL sim=0.00; v5 PASS sim=0.903)
- test_voice_clone_en_001[cosyvoice3]: PASS sim=0.963 (baseline PASS sim=0.946; v5 PASS sim=0.963)

Trade-off vs project_pr3_scope: CosyVoice3 sync codec stays on legacy additional_information; embed/prompt conditioning still ships via connector. Other PR3-migrated archs are unaffected (none consumed codes.audio via the removed overlay).

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
…ix docstring accuracy

Comment-and-naming cleanup across PR3-touched files.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
@natureofnature natureofnature marked this pull request as ready for review May 21, 2026 09:48
@chatgpt-codex-connector
Copy link
Copy Markdown

Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits.
Credits must be used to enable repository wide code reviews.

@natureofnature
Copy link
Copy Markdown
Contributor Author

@hsliuustc0106 PTAL

Comment thread vllm_omni/core/sched/output.py Outdated


@dataclass
class OmniInputRegistration:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name is not accurate

@hsliuustc0106 hsliuustc0106 added the merge-test label to trigger buildkite merge test CI label May 21, 2026
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
# path.
try:
self.flush_full_payload_outputs({req_id})
except Exception:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bare except Exception: pass swallows all connector errors. cleanup_finished_request already checks _omni_connector_initialized before reaching this point, so the comment's reasoning ("connector may not be initialised for archs outside the connector init allowlist") doesn't apply here — those archs won't have _omni_connector_initialized == True. If flush_full_payload_outputs fails due to a real connector error (shared memory corruption, background thread crash), this silently discards it. At minimum, log a warning so connector failures don't go completely dark.

``prompt_token_ids`` / ``_output_token_ids`` / ``num_computed_tokens``
and would clobber any progress between the two calls.
"""
del model_mode # only used by the (removed) double-apply branch
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model_mode parameter is accepted and immediately deleted. The comment references "(removed) double-apply branch" — if that branch is gone, the parameter should be removed from the signature too. Keeping it and del-ing it is confusing for future readers.

"""
del transfer_manager
if not isinstance(pooling_output, dict):
return None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This silently returns None when pooling_output is not a dict or hidden is missing. Same pattern in qwen3_tts.talker2code2wav_full_payload, cosyvoice3.text2flow_full_payload, mimo_audio.llm2code2wav_full_payload, dynin_omni._build_full_payload, and covo_audio.llm2code2wav_full_payload. If the _FULL_PAYLOAD_INPUT_STAGES whitelist fires the consumer-wait gate but the producer sends nothing (config mismatch, upstream bug), the consumer hangs indefinitely with no diagnostic. A logger.warning before the return None would make this debuggable.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
@Gaohan123 Gaohan123 added this to the v0.22.0 milestone May 22, 2026
@hsliuustc0106 hsliuustc0106 added ready label to trigger buildkite CI nightly-test label to trigger buildkite nightly test CI labels May 23, 2026
@npuichigo
Copy link
Copy Markdown

Any updates here

@natureofnature
Copy link
Copy Markdown
Contributor Author

Any updates here

The servers I'm using have been down. Need more time than expected to do the verification.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

merge-test label to trigger buildkite merge test CI nightly-test label to trigger buildkite nightly test CI ready label to trigger buildkite CI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants