From cf7c02a5162ecb2c5a07b64bdbf2a14757def7db Mon Sep 17 00:00:00 2001 From: pablo Date: Sun, 1 Mar 2026 19:17:27 +0000 Subject: [PATCH 01/13] reduce TTFA by lower initial codec frames required at start of decoding Signed-off-by: pablo --- .../stage_configs/qwen3_tts.yaml | 4 ++ .../stage_configs/qwen3_tts_batch.yaml | 4 ++ .../stage_input_processors/qwen3_tts.py | 47 +++++++++++++++---- .../npu/stage_configs/qwen3_tts.yaml | 4 ++ 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml b/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml index 90053852acc..27250ef2bf7 100644 --- a/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml +++ b/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml @@ -96,6 +96,10 @@ runtime: # Align with Omni: small chunks with sufficient context overlap. codec_chunk_frames: 25 codec_left_context_frames: 25 + # First chunk size for reduced TTFA (0 = disabled). + # When > 0, emits small chunks every N frames during warmup, + # then switches to codec_chunk_frames cadence. + initial_codec_chunk_frames: 0 edges: - from: 0 diff --git a/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml b/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml index d737391095b..2fc4b55078d 100644 --- a/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml +++ b/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml @@ -98,6 +98,10 @@ runtime: # Align with Omni: small chunks with sufficient context overlap. codec_chunk_frames: 25 codec_left_context_frames: 25 + # First chunk size for reduced TTFA (0 = disabled). + # When > 0, emits small chunks every N frames during warmup, + # then switches to codec_chunk_frames cadence. + initial_codec_chunk_frames: 0 edges: - from: 0 diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index 72e17bf4f3e..0c921e26a94 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -3,6 +3,9 @@ from typing import Any import torch +from vllm.logger import init_logger + +logger = init_logger(__name__) def _extract_last_frame(pooling_output: dict[str, Any]) -> torch.Tensor | None: @@ -42,11 +45,20 @@ def talker2code2wav_async_chunk( cfg = raw_cfg.get("extra", raw_cfg) if isinstance(raw_cfg, dict) else {} chunk_size = int(cfg.get("codec_chunk_frames", 25)) left_context_size = int(cfg.get("codec_left_context_frames", 25)) + initial_chunk_size = int(cfg.get("initial_codec_chunk_frames", 0)) if chunk_size <= 0 or left_context_size < 0: raise ValueError( f"Invalid codec chunk config: codec_chunk_frames={chunk_size}, " f"codec_left_context_frames={left_context_size}" ) + if initial_chunk_size >= chunk_size: + if initial_chunk_size > 0: + logger.warning( + "initial_codec_chunk_frames=%d >= codec_chunk_frames=%d, clamping to codec_chunk_frames.", + initial_chunk_size, + chunk_size, + ) + initial_chunk_size = chunk_size length = len(transfer_manager.code_prompt_token_ids[request_id]) # Avoid emitting empty chunks during normal streaming. If the request is @@ -59,15 +71,34 @@ def talker2code2wav_async_chunk( } return None - chunk_length = length % chunk_size - - if chunk_length != 0 and not finished: - return None + in_warmup = initial_chunk_size > 0 and length <= chunk_size - context_length = chunk_length if chunk_length != 0 else chunk_size - end_index = min(length, left_context_size + context_length) - ctx_frames = max(0, int(end_index - context_length)) - window_frames = transfer_manager.code_prompt_token_ids[request_id][-end_index:] + if in_warmup: + # Warmup phase: emit every initial_chunk_size frames with full context. + # Track frames already delivered using put_req_chunk counter. + already_sent = transfer_manager.put_req_chunk[request_id] * initial_chunk_size + pending = length - already_sent + at_initial_boundary = pending >= initial_chunk_size + at_chunk_boundary = length >= chunk_size + if not at_initial_boundary and not at_chunk_boundary and not finished: + return None + # At chunk_size boundary, flush remaining even if < initial_chunk_size. + context_length = min(pending, initial_chunk_size) + if at_chunk_boundary and not at_initial_boundary: + context_length = pending + end_index = length + ctx_frames = max(0, length - context_length) + window_frames = transfer_manager.code_prompt_token_ids[request_id][:length] + else: + # Normal phase: standard chunk_size cadence with left_context sliding window. + adjusted = (length - chunk_size) if initial_chunk_size > 0 else length + chunk_length = adjusted % chunk_size + if chunk_length != 0 and not finished: + return None + context_length = chunk_length if chunk_length != 0 else chunk_size + end_index = min(length, left_context_size + context_length) + ctx_frames = max(0, int(end_index - context_length)) + window_frames = transfer_manager.code_prompt_token_ids[request_id][-end_index:] # Pack context + chunk into codebook-major flat codes for adapter. code_predictor_codes = torch.tensor(window_frames).transpose(0, 1).reshape(-1).tolist() diff --git a/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml b/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml index 60659a9768b..2518cdfec91 100644 --- a/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml +++ b/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml @@ -96,6 +96,10 @@ runtime: # Align with Omni: small chunks with sufficient context overlap. codec_chunk_frames: 25 codec_left_context_frames: 25 + # First chunk size for reduced TTFA (0 = disabled). + # When > 0, emits small chunks every N frames during warmup, + # then switches to codec_chunk_frames cadence. + initial_codec_chunk_frames: 0 edges: - from: 0 From 88ed0211a952a29ff19b0102d2ea0d4e36ee1e4e Mon Sep 17 00:00:00 2001 From: pablo Date: Sun, 1 Mar 2026 19:30:39 +0000 Subject: [PATCH 02/13] update docs Signed-off-by: pablo --- docs/design/feature/async_chunk_design.md | 2 +- docs/user_guide/examples/offline_inference/qwen3_tts.md | 3 ++- docs/user_guide/examples/online_serving/qwen3_tts.md | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/design/feature/async_chunk_design.md b/docs/design/feature/async_chunk_design.md index b6a673a9068..79af439614d 100644 --- a/docs/design/feature/async_chunk_design.md +++ b/docs/design/feature/async_chunk_design.md @@ -19,7 +19,7 @@ The `async_chunk` feature enables asynchronous, chunked processing of data acros For qwen3-omni: - **Thinker → Talker**: Per decode step (typically chunk_size=1) -- **Talker → Code2Wav**: Accumulated to code2wav chunk_size(default=25, currently only support default, will support chunk_size soon) before sending +- **Talker → Code2Wav**: Accumulated to `codec_chunk_frames` (default=25) before sending. Set `initial_codec_chunk_frames` to emit smaller chunks during warmup for reduced TTFA - **Code2Wav**: Streaming decode with code2wav chunk_size With `async_chunk`: diff --git a/docs/user_guide/examples/offline_inference/qwen3_tts.md b/docs/user_guide/examples/offline_inference/qwen3_tts.md index a5acf32d67a..5c3b044d789 100644 --- a/docs/user_guide/examples/offline_inference/qwen3_tts.md +++ b/docs/user_guide/examples/offline_inference/qwen3_tts.md @@ -98,7 +98,8 @@ Add `--streaming` to stream audio chunks progressively via `AsyncOmni` (requires python end2end.py --query-type CustomVoice --streaming --output-dir /tmp/out_stream ``` -Each 25-frame Code2Wav chunk is logged as it arrives. The final WAV file is written once generation +Each Code2Wav chunk is logged as it arrives (default 25 frames; configurable via `codec_chunk_frames` +and `initial_codec_chunk_frames` in the stage config). The final WAV file is written once generation completes. This demonstrates that audio data is available progressively rather than only at the end. > **Note:** Streaming uses `AsyncOmni` internally. The non-streaming path (`Omni`) is unchanged. diff --git a/docs/user_guide/examples/online_serving/qwen3_tts.md b/docs/user_guide/examples/online_serving/qwen3_tts.md index 82f187b25e0..77a3947b56e 100644 --- a/docs/user_guide/examples/online_serving/qwen3_tts.md +++ b/docs/user_guide/examples/online_serving/qwen3_tts.md @@ -268,7 +268,7 @@ Returns binary audio data with appropriate `Content-Type` header (e.g., `audio/w ## Streaming Set `stream=true` with `response_format="pcm"` to receive raw PCM audio chunks as they are decoded -(one chunk per 25-frame Code2Wav window): +(one chunk per Code2Wav window, default 25 frames; configurable in the stage config): ```bash curl -X POST http://localhost:8091/v1/audio/speech \ From c2f45509db7fca39251c689cd497ebbeba77f511 Mon Sep 17 00:00:00 2001 From: pablo Date: Sun, 1 Mar 2026 19:31:03 +0000 Subject: [PATCH 03/13] update examples Signed-off-by: pablo --- examples/offline_inference/qwen3_tts/README.md | 3 ++- examples/online_serving/qwen3_tts/README.md | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/offline_inference/qwen3_tts/README.md b/examples/offline_inference/qwen3_tts/README.md index f4ce92f4dad..c198f2c6eb2 100644 --- a/examples/offline_inference/qwen3_tts/README.md +++ b/examples/offline_inference/qwen3_tts/README.md @@ -95,7 +95,8 @@ Add `--streaming` to stream audio chunks progressively via `AsyncOmni` (requires python end2end.py --query-type CustomVoice --streaming --output-dir /tmp/out_stream ``` -Each 25-frame Code2Wav chunk is logged as it arrives. The final WAV file is written once generation +Each Code2Wav chunk is logged as it arrives (default 25 frames; configurable via `codec_chunk_frames` +and `initial_codec_chunk_frames` in the stage config). The final WAV file is written once generation completes. This demonstrates that audio data is available progressively rather than only at the end. > **Note:** Streaming uses `AsyncOmni` internally. The non-streaming path (`Omni`) is unchanged. diff --git a/examples/online_serving/qwen3_tts/README.md b/examples/online_serving/qwen3_tts/README.md index d50baa28972..aaf8e63c900 100644 --- a/examples/online_serving/qwen3_tts/README.md +++ b/examples/online_serving/qwen3_tts/README.md @@ -265,7 +265,7 @@ Returns binary audio data with appropriate `Content-Type` header (e.g., `audio/w ## Streaming Set `stream=true` with `response_format="pcm"` to receive raw PCM audio chunks as they are decoded -(one chunk per 25-frame Code2Wav window): +(one chunk per Code2Wav window, default 25 frames; configurable in the stage config): ```bash curl -X POST http://localhost:8091/v1/audio/speech \ From 19f5f8058370a35bc46014fff7b18f477127cd16 Mon Sep 17 00:00:00 2001 From: pablo Date: Mon, 2 Mar 2026 08:16:32 +0000 Subject: [PATCH 04/13] add time to e2e script to compute TTFC Signed-off-by: pablo --- examples/offline_inference/qwen3_tts/end2end.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/examples/offline_inference/qwen3_tts/end2end.py b/examples/offline_inference/qwen3_tts/end2end.py index 2839a69586f..98303533a3c 100644 --- a/examples/offline_inference/qwen3_tts/end2end.py +++ b/examples/offline_inference/qwen3_tts/end2end.py @@ -7,6 +7,7 @@ import asyncio import logging import os +import time from typing import Any, NamedTuple import soundfile as sf @@ -337,13 +338,27 @@ async def main_streaming(args): for i, prompt in enumerate(inputs): request_id = str(i) + t_start = time.perf_counter() + t_prev = t_start + chunk_idx = 0 async for stage_output in omni.generate(prompt, request_id=request_id): mm = stage_output.request_output.outputs[0].multimodal_output if not stage_output.finished: + t_now = time.perf_counter() audio = mm.get("audio") n = len(audio) if isinstance(audio, list) else (0 if audio is None else 1) - logger.info(f"Request {request_id}: received chunk {n}") + dt_ms = (t_now - t_prev) * 1000 + ttfa_ms = (t_now - t_start) * 1000 + if chunk_idx == 0: + logger.info(f"Request {request_id}: chunk {chunk_idx} samples={n} TTFA={ttfa_ms:.1f}ms") + else: + logger.info(f"Request {request_id}: chunk {chunk_idx} samples={n} inter_chunk={dt_ms:.1f}ms") + t_prev = t_now + chunk_idx += 1 else: + t_end = time.perf_counter() + total_ms = (t_end - t_start) * 1000 + logger.info(f"Request {request_id}: done total={total_ms:.1f}ms chunks={chunk_idx}") _save_wav(output_dir, request_id, mm) From 29e3416ff6f935da9ab1d20a559cc5f108d7a987 Mon Sep 17 00:00:00 2001 From: pablo Date: Mon, 2 Mar 2026 09:45:26 +0000 Subject: [PATCH 05/13] add a simple test for streaming decoding with variable initial chunk size Signed-off-by: pablo --- .../test_qwen3_tts_async_chunk.py | 104 +++++++++++++----- 1 file changed, 78 insertions(+), 26 deletions(-) diff --git a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py index e29b6af82a7..db3afa73ce3 100644 --- a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py +++ b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py @@ -4,27 +4,53 @@ from collections import defaultdict from types import SimpleNamespace +import pytest import torch from vllm_omni.model_executor.stage_input_processors.qwen3_tts import talker2code2wav_async_chunk +_FRAME = [1, 2, 3, 4] # 4-codebook frame +_Q = len(_FRAME) # num quantizers -def _req(external_req_id: str, *, finished: bool): + +def _req(rid: str, *, finished: bool): + return SimpleNamespace(external_req_id=rid, is_finished=lambda: finished) + + +def _tm(*, chunk_frames=25, left_context=25, initial_chunk=0): return SimpleNamespace( - external_req_id=external_req_id, - is_finished=lambda: finished, + code_prompt_token_ids=defaultdict(list), + put_req_chunk=defaultdict(int), + connector=SimpleNamespace( + config={ + "extra": { + "codec_chunk_frames": chunk_frames, + "codec_left_context_frames": left_context, + "initial_codec_chunk_frames": initial_chunk, + } + } + ), ) -def test_talker2code2wav_async_chunk_does_not_emit_empty_chunk_when_not_finished(): - transfer_manager = SimpleNamespace( - code_prompt_token_ids=defaultdict(list), - connector=SimpleNamespace(config={"extra": {"codec_chunk_frames": 25, "codec_left_context_frames": 25}}), +def _call(tm, rid, *, n_frames, put_req=0, finished=False): + """Feed n_frames into transfer_manager and call the gate function.""" + tm.code_prompt_token_ids[rid] = [_FRAME[:] for _ in range(n_frames)] + tm.put_req_chunk[rid] = put_req + return talker2code2wav_async_chunk( + transfer_manager=tm, + pooling_output={"audio_codes": torch.zeros((0,))}, + request=_req(rid, finished=finished), + is_finished=finished, ) + + +def test_does_not_emit_empty_chunk_when_not_finished(): + tm = _tm() request = _req("rid-empty", finished=False) payload = talker2code2wav_async_chunk( - transfer_manager=transfer_manager, + transfer_manager=tm, pooling_output={"audio_codes": torch.zeros((0,))}, request=request, ) @@ -32,36 +58,29 @@ def test_talker2code2wav_async_chunk_does_not_emit_empty_chunk_when_not_finished assert payload is None -def test_talker2code2wav_async_chunk_flushes_tail_when_finished_without_pooler_output(): - transfer_manager = SimpleNamespace( - code_prompt_token_ids=defaultdict(list), - connector=SimpleNamespace(config={"extra": {"codec_chunk_frames": 25, "codec_left_context_frames": 25}}), - ) - request_id = "rid-tail" - transfer_manager.code_prompt_token_ids[request_id] = [[1, 2, 3, 4] for _ in range(24)] - request = _req(request_id, finished=True) +def test_flushes_tail_when_finished_without_pooler_output(): + tm = _tm() + rid = "rid-tail" + tm.code_prompt_token_ids[rid] = [_FRAME[:] for _ in range(24)] + request = _req(rid, finished=True) payload = talker2code2wav_async_chunk( - transfer_manager=transfer_manager, - pooling_output=None, # e.g. EOS step with no audio_codes + transfer_manager=tm, + pooling_output=None, request=request, ) assert payload is not None assert payload["finished"].item() is True - # ctx_frames header + flat codes - assert len(payload["code_predictor_codes"]) == 1 + 4 * 24 + assert len(payload["code_predictor_codes"]) == 1 + _Q * 24 -def test_talker2code2wav_async_chunk_emits_eof_marker_when_finished_with_no_frames(): - transfer_manager = SimpleNamespace( - code_prompt_token_ids=defaultdict(list), - connector=SimpleNamespace(config={"extra": {"codec_chunk_frames": 25, "codec_left_context_frames": 25}}), - ) +def test_emits_eof_marker_when_finished_with_no_frames(): + tm = _tm() request = _req("rid-eof", finished=True) payload = talker2code2wav_async_chunk( - transfer_manager=transfer_manager, + transfer_manager=tm, pooling_output=None, request=request, ) @@ -70,3 +89,36 @@ def test_talker2code2wav_async_chunk_emits_eof_marker_when_finished_with_no_fram "code_predictor_codes": [], "finished": torch.tensor(True, dtype=torch.bool), } + + +_CASES = [ + # Normal path (initial=0): emit at chunk_size boundaries + ((25, 25, 0), (24, 0, False), None), + ((25, 25, 0), (25, 0, False), (0, 25)), + # Warmup: first emit, hold, second emit, non-divisible boundary + ((25, 25, 10), (9, 0, False), None), + ((25, 25, 10), (10, 0, False), (0, 10)), + ((25, 25, 10), (20, 1, False), (10, 20)), + ((25, 25, 10), (25, 2, False), (20, 25)), + # Normal phase after warmup + ((25, 25, 10), (50, 3, False), (25, 50)), + # initial >= chunk clamps to chunk_size (behaves as normal) + ((25, 25, 30), (25, 0, False), (0, 25)), +] + + +@pytest.mark.parametrize("config, state, expected", _CASES) +def test_streaming_decoding_with_variable_initial(config, state, expected): + chunk_frames, left_context, initial_chunk = config + n_frames, put_req, finished = state + + tm = _tm(chunk_frames=chunk_frames, left_context=left_context, initial_chunk=initial_chunk) + payload = _call(tm, "r", n_frames=n_frames, put_req=put_req, finished=finished) + + if expected is None: + assert payload is None + else: + exp_ctx, exp_window = expected + assert payload is not None + assert payload["code_predictor_codes"][0] == exp_ctx + assert len(payload["code_predictor_codes"]) == 1 + _Q * exp_window From ff6d6c4124324add5fa63c8ab608f8f347b5227e Mon Sep 17 00:00:00 2001 From: pablo Date: Mon, 2 Mar 2026 12:37:07 +0000 Subject: [PATCH 06/13] last warmup chunk must overlap with the normal path Signed-off-by: pablo --- .../test_qwen3_tts_async_chunk.py | 9 +++++---- .../stage_input_processors/qwen3_tts.py | 11 ++++------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py index db3afa73ce3..84766b3761a 100644 --- a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py +++ b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py @@ -95,13 +95,14 @@ def test_emits_eof_marker_when_finished_with_no_frames(): # Normal path (initial=0): emit at chunk_size boundaries ((25, 25, 0), (24, 0, False), None), ((25, 25, 0), (25, 0, False), (0, 25)), - # Warmup: first emit, hold, second emit, non-divisible boundary + # Warmup: hold, first emit, second emit ((25, 25, 10), (9, 0, False), None), ((25, 25, 10), (10, 0, False), (0, 10)), ((25, 25, 10), (20, 1, False), (10, 20)), - ((25, 25, 10), (25, 2, False), (20, 25)), - # Normal phase after warmup - ((25, 25, 10), (50, 3, False), (25, 50)), + # Non-divisible: holds at chunk boundary + ((25, 25, 12), (25, 2, False), None), + # Normal phase: offset by warmup coverage (put_req * initial) + ((25, 25, 10), (45, 2, False), (20, 45)), # initial >= chunk clamps to chunk_size (behaves as normal) ((25, 25, 30), (25, 0, False), (0, 25)), ] diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index 0c921e26a94..c085511d930 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -78,20 +78,17 @@ def talker2code2wav_async_chunk( # Track frames already delivered using put_req_chunk counter. already_sent = transfer_manager.put_req_chunk[request_id] * initial_chunk_size pending = length - already_sent - at_initial_boundary = pending >= initial_chunk_size - at_chunk_boundary = length >= chunk_size - if not at_initial_boundary and not at_chunk_boundary and not finished: + if pending < initial_chunk_size and not finished: return None - # At chunk_size boundary, flush remaining even if < initial_chunk_size. context_length = min(pending, initial_chunk_size) - if at_chunk_boundary and not at_initial_boundary: - context_length = pending end_index = length ctx_frames = max(0, length - context_length) window_frames = transfer_manager.code_prompt_token_ids[request_id][:length] else: # Normal phase: standard chunk_size cadence with left_context sliding window. - adjusted = (length - chunk_size) if initial_chunk_size > 0 else length + # Offset by warmup coverage so normal starts from where warmup left off. + warmup_sent = transfer_manager.put_req_chunk[request_id] * initial_chunk_size if initial_chunk_size > 0 else 0 + adjusted = length - warmup_sent chunk_length = adjusted % chunk_size if chunk_length != 0 and not finished: return None From d54d2bf3fb2a36f1c8c6a22eee6335ce21dc20f5 Mon Sep 17 00:00:00 2001 From: pablo Date: Mon, 2 Mar 2026 12:54:02 +0000 Subject: [PATCH 07/13] fix Signed-off-by: pablo --- .../test_qwen3_tts_async_chunk.py | 10 ++++++- .../stage_input_processors/qwen3_tts.py | 27 ++++++++++--------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py index 84766b3761a..7b7944ef35b 100644 --- a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py +++ b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py @@ -101,10 +101,18 @@ def test_emits_eof_marker_when_finished_with_no_frames(): ((25, 25, 10), (20, 1, False), (10, 20)), # Non-divisible: holds at chunk boundary ((25, 25, 12), (25, 2, False), None), - # Normal phase: offset by warmup coverage (put_req * initial) + # Normal phase: offset by warmup coverage (chunk//initial * initial) ((25, 25, 10), (45, 2, False), (20, 45)), + # Second normal emit (put_req includes normal emissions, offset must stay stable) + ((25, 25, 10), (70, 3, False), (25, 50)), # initial >= chunk clamps to chunk_size (behaves as normal) ((25, 25, 30), (25, 0, False), (0, 25)), + # finished=True flushes warmup tail + ((25, 25, 10), (5, 0, True), (0, 5)), + # finished=True flushes non-divisible warmup residual + ((25, 25, 12), (25, 2, True), (24, 25)), + # finished=True flushes normal phase tail + ((25, 25, 10), (30, 2, True), (20, 30)), ] diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index c085511d930..76ee9beb2fe 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -46,18 +46,18 @@ def talker2code2wav_async_chunk( chunk_size = int(cfg.get("codec_chunk_frames", 25)) left_context_size = int(cfg.get("codec_left_context_frames", 25)) initial_chunk_size = int(cfg.get("initial_codec_chunk_frames", 0)) - if chunk_size <= 0 or left_context_size < 0: + if chunk_size <= 0 or left_context_size < 0 or initial_chunk_size < 0: raise ValueError( f"Invalid codec chunk config: codec_chunk_frames={chunk_size}, " - f"codec_left_context_frames={left_context_size}" + f"codec_left_context_frames={left_context_size}, " + f"initial_codec_chunk_frames={initial_chunk_size}" + ) + if initial_chunk_size > chunk_size: + logger.warning( + "initial_codec_chunk_frames=%d > codec_chunk_frames=%d, clamping to codec_chunk_frames.", + initial_chunk_size, + chunk_size, ) - if initial_chunk_size >= chunk_size: - if initial_chunk_size > 0: - logger.warning( - "initial_codec_chunk_frames=%d >= codec_chunk_frames=%d, clamping to codec_chunk_frames.", - initial_chunk_size, - chunk_size, - ) initial_chunk_size = chunk_size length = len(transfer_manager.code_prompt_token_ids[request_id]) @@ -78,6 +78,8 @@ def talker2code2wav_async_chunk( # Track frames already delivered using put_req_chunk counter. already_sent = transfer_manager.put_req_chunk[request_id] * initial_chunk_size pending = length - already_sent + if pending <= 0: + return None if pending < initial_chunk_size and not finished: return None context_length = min(pending, initial_chunk_size) @@ -86,9 +88,10 @@ def talker2code2wav_async_chunk( window_frames = transfer_manager.code_prompt_token_ids[request_id][:length] else: # Normal phase: standard chunk_size cadence with left_context sliding window. - # Offset by warmup coverage so normal starts from where warmup left off. - warmup_sent = transfer_manager.put_req_chunk[request_id] * initial_chunk_size if initial_chunk_size > 0 else 0 - adjusted = length - warmup_sent + # Offset by warmup coverage (static from config) so normal starts + # from where warmup left off. + warmup_coverage = (chunk_size // initial_chunk_size) * initial_chunk_size if initial_chunk_size > 0 else 0 + adjusted = length - warmup_coverage chunk_length = adjusted % chunk_size if chunk_length != 0 and not finished: return None From 912e536d486456db8a0fc1daeda3b3d60f6bc659 Mon Sep 17 00:00:00 2001 From: pablo Date: Mon, 2 Mar 2026 22:18:58 +0000 Subject: [PATCH 08/13] from warmup to initial phase Signed-off-by: pablo --- .../stage_input_processors/qwen3_tts.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index 76ee9beb2fe..70c65eff556 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -71,10 +71,10 @@ def talker2code2wav_async_chunk( } return None - in_warmup = initial_chunk_size > 0 and length <= chunk_size + in_initial_phase = initial_chunk_size > 0 and length <= chunk_size - if in_warmup: - # Warmup phase: emit every initial_chunk_size frames with full context. + if in_initial_phase: + # Initial-chunk phase: emit every initial_chunk_size frames with full context. # Track frames already delivered using put_req_chunk counter. already_sent = transfer_manager.put_req_chunk[request_id] * initial_chunk_size pending = length - already_sent @@ -88,10 +88,10 @@ def talker2code2wav_async_chunk( window_frames = transfer_manager.code_prompt_token_ids[request_id][:length] else: # Normal phase: standard chunk_size cadence with left_context sliding window. - # Offset by warmup coverage (static from config) so normal starts - # from where warmup left off. - warmup_coverage = (chunk_size // initial_chunk_size) * initial_chunk_size if initial_chunk_size > 0 else 0 - adjusted = length - warmup_coverage + # Offset by initial-chunk coverage (static from config) so normal starts + # from where the initial-chunk phase left off. + initial_coverage = (chunk_size // initial_chunk_size) * initial_chunk_size if initial_chunk_size > 0 else 0 + adjusted = length - initial_coverage chunk_length = adjusted % chunk_size if chunk_length != 0 and not finished: return None From b78a1467cd05fa46b93aafee5799e51de89c9d86 Mon Sep 17 00:00:00 2001 From: pablo Date: Tue, 3 Mar 2026 10:57:59 +0000 Subject: [PATCH 09/13] remove warmup in docs Signed-off-by: pablo --- docs/design/feature/async_chunk_design.md | 2 +- .../test_qwen3_tts_async_chunk.py | 10 +++++----- vllm_omni/model_executor/stage_configs/qwen3_tts.yaml | 2 +- .../model_executor/stage_configs/qwen3_tts_batch.yaml | 2 +- .../model_executor/stage_input_processors/qwen3_tts.py | 6 ++---- 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/docs/design/feature/async_chunk_design.md b/docs/design/feature/async_chunk_design.md index 79af439614d..40b8f215a76 100644 --- a/docs/design/feature/async_chunk_design.md +++ b/docs/design/feature/async_chunk_design.md @@ -19,7 +19,7 @@ The `async_chunk` feature enables asynchronous, chunked processing of data acros For qwen3-omni: - **Thinker → Talker**: Per decode step (typically chunk_size=1) -- **Talker → Code2Wav**: Accumulated to `codec_chunk_frames` (default=25) before sending. Set `initial_codec_chunk_frames` to emit smaller chunks during warmup for reduced TTFA +- **Talker → Code2Wav**: Accumulated to `codec_chunk_frames` (default=25) before sending. Set `initial_codec_chunk_frames` to emit smaller chunks during the initial phase for reduced TTFA - **Code2Wav**: Streaming decode with code2wav chunk_size With `async_chunk`: diff --git a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py index 7b7944ef35b..e5e85185e57 100644 --- a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py +++ b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py @@ -95,21 +95,21 @@ def test_emits_eof_marker_when_finished_with_no_frames(): # Normal path (initial=0): emit at chunk_size boundaries ((25, 25, 0), (24, 0, False), None), ((25, 25, 0), (25, 0, False), (0, 25)), - # Warmup: hold, first emit, second emit + # Initial-chunk phase: hold, first emit, second emit ((25, 25, 10), (9, 0, False), None), ((25, 25, 10), (10, 0, False), (0, 10)), ((25, 25, 10), (20, 1, False), (10, 20)), # Non-divisible: holds at chunk boundary ((25, 25, 12), (25, 2, False), None), - # Normal phase: offset by warmup coverage (chunk//initial * initial) + # Normal phase: offset by initial_coverage (chunk//initial * initial) ((25, 25, 10), (45, 2, False), (20, 45)), - # Second normal emit (put_req includes normal emissions, offset must stay stable) + # Second normal emit (offset must stay stable) ((25, 25, 10), (70, 3, False), (25, 50)), # initial >= chunk clamps to chunk_size (behaves as normal) ((25, 25, 30), (25, 0, False), (0, 25)), - # finished=True flushes warmup tail + # finished=True flushes IC tail ((25, 25, 10), (5, 0, True), (0, 5)), - # finished=True flushes non-divisible warmup residual + # finished=True flushes non-divisible IC residual ((25, 25, 12), (25, 2, True), (24, 25)), # finished=True flushes normal phase tail ((25, 25, 10), (30, 2, True), (20, 30)), diff --git a/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml b/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml index 27250ef2bf7..564f1160c3c 100644 --- a/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml +++ b/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml @@ -97,7 +97,7 @@ runtime: codec_chunk_frames: 25 codec_left_context_frames: 25 # First chunk size for reduced TTFA (0 = disabled). - # When > 0, emits small chunks every N frames during warmup, + # When > 0, emits small chunks every N frames during the initial phase, # then switches to codec_chunk_frames cadence. initial_codec_chunk_frames: 0 diff --git a/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml b/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml index 2fc4b55078d..43df70fc275 100644 --- a/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml +++ b/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml @@ -99,7 +99,7 @@ runtime: codec_chunk_frames: 25 codec_left_context_frames: 25 # First chunk size for reduced TTFA (0 = disabled). - # When > 0, emits small chunks every N frames during warmup, + # When > 0, emits small chunks every N frames during the initial phase, # then switches to codec_chunk_frames cadence. initial_codec_chunk_frames: 0 diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index 70c65eff556..a9bd9533c9d 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -74,8 +74,7 @@ def talker2code2wav_async_chunk( in_initial_phase = initial_chunk_size > 0 and length <= chunk_size if in_initial_phase: - # Initial-chunk phase: emit every initial_chunk_size frames with full context. - # Track frames already delivered using put_req_chunk counter. + # Initial-chunk phase: emit every initial_chunk_size frames with full accumulated context. already_sent = transfer_manager.put_req_chunk[request_id] * initial_chunk_size pending = length - already_sent if pending <= 0: @@ -88,8 +87,7 @@ def talker2code2wav_async_chunk( window_frames = transfer_manager.code_prompt_token_ids[request_id][:length] else: # Normal phase: standard chunk_size cadence with left_context sliding window. - # Offset by initial-chunk coverage (static from config) so normal starts - # from where the initial-chunk phase left off. + # Offset by initial_coverage so normal starts from where the initial-chunk phase left off. initial_coverage = (chunk_size // initial_chunk_size) * initial_chunk_size if initial_chunk_size > 0 else 0 adjusted = length - initial_coverage chunk_length = adjusted % chunk_size From f9977b77f7eb8639db32eca40c1b5621d15930d1 Mon Sep 17 00:00:00 2001 From: pablo Date: Wed, 4 Mar 2026 07:59:41 +0000 Subject: [PATCH 10/13] update docs and examples Signed-off-by: pablo --- docs/serving/speech_api.md | 1 + examples/online_serving/qwen3_tts/README.md | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/serving/speech_api.md b/docs/serving/speech_api.md index 86b7c73f8f6..b22aee84cdd 100644 --- a/docs/serving/speech_api.md +++ b/docs/serving/speech_api.md @@ -96,6 +96,7 @@ Content-Type: application/json | `language` | string | "Auto" | Language (see supported languages below) | | `instructions` | string | "" | Voice style/emotion instructions | | `max_new_tokens` | integer | 2048 | Maximum tokens to generate | +| `initial_codec_chunk_frames` | integer | null | Initial chunk size for reduced TTFA (overrides stage config) | **Supported languages:** Auto, Chinese, English, Japanese, Korean, German, French, Russian, Portuguese, Spanish, Italian diff --git a/examples/online_serving/qwen3_tts/README.md b/examples/online_serving/qwen3_tts/README.md index aaf8e63c900..2046c650866 100644 --- a/examples/online_serving/qwen3_tts/README.md +++ b/examples/online_serving/qwen3_tts/README.md @@ -250,6 +250,7 @@ Returns binary audio data with appropriate `Content-Type` header (e.g., `audio/w | `language` | string | "Auto" | Language (see supported languages below) | | `instructions` | string | "" | Voice style/emotion instructions | | `max_new_tokens` | int | 2048 | Maximum tokens to generate | +| `initial_codec_chunk_frames` | int | null | Initial chunk size for reduced TTFA (overrides stage config) | | `stream` | bool | false | Stream raw PCM chunks as they are decoded (requires `response_format="pcm"`) | **Supported languages:** Auto, Chinese, English, Japanese, Korean, German, French, Russian, Portuguese, Spanish, Italian From 439abba67de22a9386b10e77e138a8efa06d4030 Mon Sep 17 00:00:00 2001 From: pablo Date: Wed, 4 Mar 2026 08:02:01 +0000 Subject: [PATCH 11/13] add per-request configurable initial_codec_chunk_frames Signed-off-by: pablo --- vllm_omni/entrypoints/openai/protocol/audio.py | 5 +++++ vllm_omni/entrypoints/openai/serving_speech.py | 3 +++ .../model_executor/stage_input_processors/qwen3_tts.py | 10 ++++++++++ 3 files changed, 18 insertions(+) diff --git a/vllm_omni/entrypoints/openai/protocol/audio.py b/vllm_omni/entrypoints/openai/protocol/audio.py index 8915e544b85..1efab8ebec2 100644 --- a/vllm_omni/entrypoints/openai/protocol/audio.py +++ b/vllm_omni/entrypoints/openai/protocol/audio.py @@ -55,6 +55,11 @@ class OpenAICreateSpeechRequest(BaseModel): default=None, description="Maximum tokens to generate", ) + initial_codec_chunk_frames: int | None = Field( + default=None, + ge=0, + description="Initial chunk size for reduced TTFA. Overrides stage config for this request.", + ) @field_validator("stream_format") @classmethod diff --git a/vllm_omni/entrypoints/openai/serving_speech.py b/vllm_omni/entrypoints/openai/serving_speech.py index 986e06ace95..9ff7bc76737 100644 --- a/vllm_omni/entrypoints/openai/serving_speech.py +++ b/vllm_omni/entrypoints/openai/serving_speech.py @@ -468,6 +468,9 @@ def _build_tts_params(self, request: OpenAICreateSpeechRequest) -> dict[str, Any else: params["max_new_tokens"] = [2048] + if request.initial_codec_chunk_frames is not None: + params["initial_codec_chunk_frames"] = [request.initial_codec_chunk_frames] + # VoiceDesign requires non_streaming_mode (match offline script behaviour). # CustomVoice and Base rely on the model default (True and False respectively). if params["task_type"][0] == "VoiceDesign": diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index c5c2f640433..c0bcfa612df 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -46,6 +46,16 @@ def talker2code2wav_async_chunk( chunk_size = int(cfg.get("codec_chunk_frames", 25)) left_context_size_config = int(cfg.get("codec_left_context_frames", 25)) initial_chunk_size = int(cfg.get("initial_codec_chunk_frames", 0)) + # Per-request override (takes priority over stage config) + additional_information = getattr(request, "additional_information", None) + if ( + additional_information is not None + and hasattr(additional_information, "entries") + and "initial_codec_chunk_frames" in additional_information.entries + ): + entry = additional_information.entries["initial_codec_chunk_frames"] + if entry.list_data is not None and len(entry.list_data) == 1: + initial_chunk_size = int(entry.list_data[0]) if chunk_size <= 0 or left_context_size_config < 0 or initial_chunk_size < 0: raise ValueError( f"Invalid codec chunk config: codec_chunk_frames={chunk_size}, " From e6798d5f22570b2ad3dd27e801cdea09accba079 Mon Sep 17 00:00:00 2001 From: pablo Date: Wed, 4 Mar 2026 08:02:23 +0000 Subject: [PATCH 12/13] add test for configurable initial_codec_chunk_frames Signed-off-by: pablo --- .../test_qwen3_tts_async_chunk.py | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py index 941c2a3906b..15464032b37 100644 --- a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py +++ b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py @@ -13,8 +13,16 @@ _Q = len(_FRAME) # num quantizers -def _req(rid: str, *, finished: bool): - return SimpleNamespace(external_req_id=rid, is_finished=lambda: finished) +def _req(rid: str, *, finished: bool, initial_codec_chunk_frames: int | None = None): + ai = None + if initial_codec_chunk_frames is not None: + entry = SimpleNamespace(list_data=[initial_codec_chunk_frames]) + ai = SimpleNamespace(entries={"initial_codec_chunk_frames": entry}) + return SimpleNamespace( + external_req_id=rid, + is_finished=lambda: finished, + additional_information=ai, + ) def _tm(*, chunk_frames=25, left_context=25, initial_chunk=0): @@ -33,14 +41,13 @@ def _tm(*, chunk_frames=25, left_context=25, initial_chunk=0): ) -def _call(tm, rid, *, n_frames, put_req=0, finished=False): - """Feed n_frames into transfer_manager and call the gate function.""" +def _call(tm, rid, *, n_frames, put_req=0, finished=False, req_ic=None): tm.code_prompt_token_ids[rid] = [_FRAME[:] for _ in range(n_frames)] tm.put_req_chunk[rid] = put_req return talker2code2wav_async_chunk( transfer_manager=tm, pooling_output={"audio_codes": torch.zeros((0,))}, - request=_req(rid, finished=finished), + request=_req(rid, finished=finished, initial_codec_chunk_frames=req_ic), is_finished=finished, ) @@ -131,3 +138,17 @@ def test_streaming_decoding_with_variable_initial(config, state, expected): assert payload is not None assert payload["left_context_size"] == exp_ctx assert len(payload["code_predictor_codes"]) == _Q * exp_window + + +def test_per_request_override_activates_initial_phase(): + tm = _tm(initial_chunk=0) + payload = _call(tm, "r-override", n_frames=10, req_ic=10) + assert payload is not None + assert payload["left_context_size"] == 0 + assert len(payload["code_predictor_codes"]) == _Q * 10 + + +def test_per_request_override_wins_over_stage_config(): + tm = _tm(initial_chunk=5) + payload = _call(tm, "r-override2", n_frames=10, put_req=0, req_ic=15) + assert payload is None From 4b0bb24a6a0b2df6558d76440f5cafea56efdd83 Mon Sep 17 00:00:00 2001 From: pablo Date: Wed, 4 Mar 2026 09:12:58 +0000 Subject: [PATCH 13/13] update comment Signed-off-by: pablo --- vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml b/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml index 2518cdfec91..5ad0cc90e37 100644 --- a/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml +++ b/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml @@ -97,7 +97,7 @@ runtime: codec_chunk_frames: 25 codec_left_context_frames: 25 # First chunk size for reduced TTFA (0 = disabled). - # When > 0, emits small chunks every N frames during warmup, + # When > 0, emits small chunks every N frames during the initial phase, # then switches to codec_chunk_frames cadence. initial_codec_chunk_frames: 0