diff --git a/docs/design/feature/async_chunk_design.md b/docs/design/feature/async_chunk_design.md index b6a673a9068..40b8f215a76 100644 --- a/docs/design/feature/async_chunk_design.md +++ b/docs/design/feature/async_chunk_design.md @@ -19,7 +19,7 @@ The `async_chunk` feature enables asynchronous, chunked processing of data acros For qwen3-omni: - **Thinker → Talker**: Per decode step (typically chunk_size=1) -- **Talker → Code2Wav**: Accumulated to code2wav chunk_size(default=25, currently only support default, will support chunk_size soon) before sending +- **Talker → Code2Wav**: Accumulated to `codec_chunk_frames` (default=25) before sending. Set `initial_codec_chunk_frames` to emit smaller chunks during the initial phase for reduced TTFA - **Code2Wav**: Streaming decode with code2wav chunk_size With `async_chunk`: diff --git a/docs/serving/speech_api.md b/docs/serving/speech_api.md index 86b7c73f8f6..b22aee84cdd 100644 --- a/docs/serving/speech_api.md +++ b/docs/serving/speech_api.md @@ -96,6 +96,7 @@ Content-Type: application/json | `language` | string | "Auto" | Language (see supported languages below) | | `instructions` | string | "" | Voice style/emotion instructions | | `max_new_tokens` | integer | 2048 | Maximum tokens to generate | +| `initial_codec_chunk_frames` | integer | null | Initial chunk size for reduced TTFA (overrides stage config) | **Supported languages:** Auto, Chinese, English, Japanese, Korean, German, French, Russian, Portuguese, Spanish, Italian diff --git a/docs/user_guide/examples/offline_inference/qwen3_tts.md b/docs/user_guide/examples/offline_inference/qwen3_tts.md index a5acf32d67a..5c3b044d789 100644 --- a/docs/user_guide/examples/offline_inference/qwen3_tts.md +++ b/docs/user_guide/examples/offline_inference/qwen3_tts.md @@ -98,7 +98,8 @@ Add `--streaming` to stream audio chunks progressively via `AsyncOmni` (requires python end2end.py --query-type CustomVoice --streaming --output-dir /tmp/out_stream ``` -Each 25-frame Code2Wav chunk is logged as it arrives. The final WAV file is written once generation +Each Code2Wav chunk is logged as it arrives (default 25 frames; configurable via `codec_chunk_frames` +and `initial_codec_chunk_frames` in the stage config). The final WAV file is written once generation completes. This demonstrates that audio data is available progressively rather than only at the end. > **Note:** Streaming uses `AsyncOmni` internally. The non-streaming path (`Omni`) is unchanged. diff --git a/docs/user_guide/examples/online_serving/qwen3_tts.md b/docs/user_guide/examples/online_serving/qwen3_tts.md index 82f187b25e0..77a3947b56e 100644 --- a/docs/user_guide/examples/online_serving/qwen3_tts.md +++ b/docs/user_guide/examples/online_serving/qwen3_tts.md @@ -268,7 +268,7 @@ Returns binary audio data with appropriate `Content-Type` header (e.g., `audio/w ## Streaming Set `stream=true` with `response_format="pcm"` to receive raw PCM audio chunks as they are decoded -(one chunk per 25-frame Code2Wav window): +(one chunk per Code2Wav window, default 25 frames; configurable in the stage config): ```bash curl -X POST http://localhost:8091/v1/audio/speech \ diff --git a/examples/offline_inference/qwen3_tts/README.md b/examples/offline_inference/qwen3_tts/README.md index f4ce92f4dad..c198f2c6eb2 100644 --- a/examples/offline_inference/qwen3_tts/README.md +++ b/examples/offline_inference/qwen3_tts/README.md @@ -95,7 +95,8 @@ Add `--streaming` to stream audio chunks progressively via `AsyncOmni` (requires python end2end.py --query-type CustomVoice --streaming --output-dir /tmp/out_stream ``` -Each 25-frame Code2Wav chunk is logged as it arrives. The final WAV file is written once generation +Each Code2Wav chunk is logged as it arrives (default 25 frames; configurable via `codec_chunk_frames` +and `initial_codec_chunk_frames` in the stage config). The final WAV file is written once generation completes. This demonstrates that audio data is available progressively rather than only at the end. > **Note:** Streaming uses `AsyncOmni` internally. The non-streaming path (`Omni`) is unchanged. diff --git a/examples/offline_inference/qwen3_tts/end2end.py b/examples/offline_inference/qwen3_tts/end2end.py index 2839a69586f..98303533a3c 100644 --- a/examples/offline_inference/qwen3_tts/end2end.py +++ b/examples/offline_inference/qwen3_tts/end2end.py @@ -7,6 +7,7 @@ import asyncio import logging import os +import time from typing import Any, NamedTuple import soundfile as sf @@ -337,13 +338,27 @@ async def main_streaming(args): for i, prompt in enumerate(inputs): request_id = str(i) + t_start = time.perf_counter() + t_prev = t_start + chunk_idx = 0 async for stage_output in omni.generate(prompt, request_id=request_id): mm = stage_output.request_output.outputs[0].multimodal_output if not stage_output.finished: + t_now = time.perf_counter() audio = mm.get("audio") n = len(audio) if isinstance(audio, list) else (0 if audio is None else 1) - logger.info(f"Request {request_id}: received chunk {n}") + dt_ms = (t_now - t_prev) * 1000 + ttfa_ms = (t_now - t_start) * 1000 + if chunk_idx == 0: + logger.info(f"Request {request_id}: chunk {chunk_idx} samples={n} TTFA={ttfa_ms:.1f}ms") + else: + logger.info(f"Request {request_id}: chunk {chunk_idx} samples={n} inter_chunk={dt_ms:.1f}ms") + t_prev = t_now + chunk_idx += 1 else: + t_end = time.perf_counter() + total_ms = (t_end - t_start) * 1000 + logger.info(f"Request {request_id}: done total={total_ms:.1f}ms chunks={chunk_idx}") _save_wav(output_dir, request_id, mm) diff --git a/examples/online_serving/qwen3_tts/README.md b/examples/online_serving/qwen3_tts/README.md index d50baa28972..2046c650866 100644 --- a/examples/online_serving/qwen3_tts/README.md +++ b/examples/online_serving/qwen3_tts/README.md @@ -250,6 +250,7 @@ Returns binary audio data with appropriate `Content-Type` header (e.g., `audio/w | `language` | string | "Auto" | Language (see supported languages below) | | `instructions` | string | "" | Voice style/emotion instructions | | `max_new_tokens` | int | 2048 | Maximum tokens to generate | +| `initial_codec_chunk_frames` | int | null | Initial chunk size for reduced TTFA (overrides stage config) | | `stream` | bool | false | Stream raw PCM chunks as they are decoded (requires `response_format="pcm"`) | **Supported languages:** Auto, Chinese, English, Japanese, Korean, German, French, Russian, Portuguese, Spanish, Italian @@ -265,7 +266,7 @@ Returns binary audio data with appropriate `Content-Type` header (e.g., `audio/w ## Streaming Set `stream=true` with `response_format="pcm"` to receive raw PCM audio chunks as they are decoded -(one chunk per 25-frame Code2Wav window): +(one chunk per Code2Wav window, default 25 frames; configurable in the stage config): ```bash curl -X POST http://localhost:8091/v1/audio/speech \ diff --git a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py index e29b6af82a7..15464032b37 100644 --- a/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py +++ b/tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py @@ -4,27 +4,60 @@ from collections import defaultdict from types import SimpleNamespace +import pytest import torch from vllm_omni.model_executor.stage_input_processors.qwen3_tts import talker2code2wav_async_chunk +_FRAME = [1, 2, 3, 4] # 4-codebook frame +_Q = len(_FRAME) # num quantizers -def _req(external_req_id: str, *, finished: bool): + +def _req(rid: str, *, finished: bool, initial_codec_chunk_frames: int | None = None): + ai = None + if initial_codec_chunk_frames is not None: + entry = SimpleNamespace(list_data=[initial_codec_chunk_frames]) + ai = SimpleNamespace(entries={"initial_codec_chunk_frames": entry}) return SimpleNamespace( - external_req_id=external_req_id, + external_req_id=rid, is_finished=lambda: finished, + additional_information=ai, ) -def test_talker2code2wav_async_chunk_does_not_emit_empty_chunk_when_not_finished(): - transfer_manager = SimpleNamespace( +def _tm(*, chunk_frames=25, left_context=25, initial_chunk=0): + return SimpleNamespace( code_prompt_token_ids=defaultdict(list), - connector=SimpleNamespace(config={"extra": {"codec_chunk_frames": 25, "codec_left_context_frames": 25}}), + put_req_chunk=defaultdict(int), + connector=SimpleNamespace( + config={ + "extra": { + "codec_chunk_frames": chunk_frames, + "codec_left_context_frames": left_context, + "initial_codec_chunk_frames": initial_chunk, + } + } + ), + ) + + +def _call(tm, rid, *, n_frames, put_req=0, finished=False, req_ic=None): + tm.code_prompt_token_ids[rid] = [_FRAME[:] for _ in range(n_frames)] + tm.put_req_chunk[rid] = put_req + return talker2code2wav_async_chunk( + transfer_manager=tm, + pooling_output={"audio_codes": torch.zeros((0,))}, + request=_req(rid, finished=finished, initial_codec_chunk_frames=req_ic), + is_finished=finished, ) + + +def test_does_not_emit_empty_chunk_when_not_finished(): + tm = _tm() request = _req("rid-empty", finished=False) payload = talker2code2wav_async_chunk( - transfer_manager=transfer_manager, + transfer_manager=tm, pooling_output={"audio_codes": torch.zeros((0,))}, request=request, ) @@ -32,36 +65,29 @@ def test_talker2code2wav_async_chunk_does_not_emit_empty_chunk_when_not_finished assert payload is None -def test_talker2code2wav_async_chunk_flushes_tail_when_finished_without_pooler_output(): - transfer_manager = SimpleNamespace( - code_prompt_token_ids=defaultdict(list), - connector=SimpleNamespace(config={"extra": {"codec_chunk_frames": 25, "codec_left_context_frames": 25}}), - ) - request_id = "rid-tail" - transfer_manager.code_prompt_token_ids[request_id] = [[1, 2, 3, 4] for _ in range(24)] - request = _req(request_id, finished=True) +def test_flushes_tail_when_finished_without_pooler_output(): + tm = _tm() + rid = "rid-tail" + tm.code_prompt_token_ids[rid] = [_FRAME[:] for _ in range(24)] + request = _req(rid, finished=True) payload = talker2code2wav_async_chunk( - transfer_manager=transfer_manager, - pooling_output=None, # e.g. EOS step with no audio_codes + transfer_manager=tm, + pooling_output=None, request=request, ) assert payload is not None assert payload["finished"].item() is True - # ctx_frames header + flat codes - assert len(payload["code_predictor_codes"]) == 1 + 4 * 24 + assert len(payload["code_predictor_codes"]) == _Q * 24 -def test_talker2code2wav_async_chunk_emits_eof_marker_when_finished_with_no_frames(): - transfer_manager = SimpleNamespace( - code_prompt_token_ids=defaultdict(list), - connector=SimpleNamespace(config={"extra": {"codec_chunk_frames": 25, "codec_left_context_frames": 25}}), - ) +def test_emits_eof_marker_when_finished_with_no_frames(): + tm = _tm() request = _req("rid-eof", finished=True) payload = talker2code2wav_async_chunk( - transfer_manager=transfer_manager, + transfer_manager=tm, pooling_output=None, request=request, ) @@ -70,3 +96,59 @@ def test_talker2code2wav_async_chunk_emits_eof_marker_when_finished_with_no_fram "code_predictor_codes": [], "finished": torch.tensor(True, dtype=torch.bool), } + + +_CASES = [ + # Normal path (initial=0): emit at chunk_size boundaries + ((25, 25, 0), (24, 0, False), None), + ((25, 25, 0), (25, 0, False), (0, 25)), + # Initial-chunk phase: hold, first emit, second emit + ((25, 25, 10), (9, 0, False), None), + ((25, 25, 10), (10, 0, False), (0, 10)), + ((25, 25, 10), (20, 1, False), (10, 20)), + # Non-divisible: holds at chunk boundary + ((25, 25, 12), (25, 2, False), None), + # Normal phase: offset by initial_coverage (chunk//initial * initial) + ((25, 25, 10), (45, 2, False), (20, 45)), + # Second normal emit (offset must stay stable) + ((25, 25, 10), (70, 3, False), (25, 50)), + # initial >= chunk clamps to chunk_size (behaves as normal) + ((25, 25, 30), (25, 0, False), (0, 25)), + # finished=True flushes IC tail + ((25, 25, 10), (5, 0, True), (0, 5)), + # finished=True flushes non-divisible IC residual + ((25, 25, 12), (25, 2, True), (24, 25)), + # finished=True flushes normal phase tail + ((25, 25, 10), (30, 2, True), (20, 30)), +] + + +@pytest.mark.parametrize("config, state, expected", _CASES) +def test_streaming_decoding_with_variable_initial(config, state, expected): + chunk_frames, left_context, initial_chunk = config + n_frames, put_req, finished = state + + tm = _tm(chunk_frames=chunk_frames, left_context=left_context, initial_chunk=initial_chunk) + payload = _call(tm, "r", n_frames=n_frames, put_req=put_req, finished=finished) + + if expected is None: + assert payload is None + else: + exp_ctx, exp_window = expected + assert payload is not None + assert payload["left_context_size"] == exp_ctx + assert len(payload["code_predictor_codes"]) == _Q * exp_window + + +def test_per_request_override_activates_initial_phase(): + tm = _tm(initial_chunk=0) + payload = _call(tm, "r-override", n_frames=10, req_ic=10) + assert payload is not None + assert payload["left_context_size"] == 0 + assert len(payload["code_predictor_codes"]) == _Q * 10 + + +def test_per_request_override_wins_over_stage_config(): + tm = _tm(initial_chunk=5) + payload = _call(tm, "r-override2", n_frames=10, put_req=0, req_ic=15) + assert payload is None diff --git a/vllm_omni/entrypoints/openai/protocol/audio.py b/vllm_omni/entrypoints/openai/protocol/audio.py index 8915e544b85..1efab8ebec2 100644 --- a/vllm_omni/entrypoints/openai/protocol/audio.py +++ b/vllm_omni/entrypoints/openai/protocol/audio.py @@ -55,6 +55,11 @@ class OpenAICreateSpeechRequest(BaseModel): default=None, description="Maximum tokens to generate", ) + initial_codec_chunk_frames: int | None = Field( + default=None, + ge=0, + description="Initial chunk size for reduced TTFA. Overrides stage config for this request.", + ) @field_validator("stream_format") @classmethod diff --git a/vllm_omni/entrypoints/openai/serving_speech.py b/vllm_omni/entrypoints/openai/serving_speech.py index 986e06ace95..9ff7bc76737 100644 --- a/vllm_omni/entrypoints/openai/serving_speech.py +++ b/vllm_omni/entrypoints/openai/serving_speech.py @@ -468,6 +468,9 @@ def _build_tts_params(self, request: OpenAICreateSpeechRequest) -> dict[str, Any else: params["max_new_tokens"] = [2048] + if request.initial_codec_chunk_frames is not None: + params["initial_codec_chunk_frames"] = [request.initial_codec_chunk_frames] + # VoiceDesign requires non_streaming_mode (match offline script behaviour). # CustomVoice and Base rely on the model default (True and False respectively). if params["task_type"][0] == "VoiceDesign": diff --git a/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml b/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml index 90053852acc..564f1160c3c 100644 --- a/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml +++ b/vllm_omni/model_executor/stage_configs/qwen3_tts.yaml @@ -96,6 +96,10 @@ runtime: # Align with Omni: small chunks with sufficient context overlap. codec_chunk_frames: 25 codec_left_context_frames: 25 + # First chunk size for reduced TTFA (0 = disabled). + # When > 0, emits small chunks every N frames during the initial phase, + # then switches to codec_chunk_frames cadence. + initial_codec_chunk_frames: 0 edges: - from: 0 diff --git a/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml b/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml index d737391095b..43df70fc275 100644 --- a/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml +++ b/vllm_omni/model_executor/stage_configs/qwen3_tts_batch.yaml @@ -98,6 +98,10 @@ runtime: # Align with Omni: small chunks with sufficient context overlap. codec_chunk_frames: 25 codec_left_context_frames: 25 + # First chunk size for reduced TTFA (0 = disabled). + # When > 0, emits small chunks every N frames during the initial phase, + # then switches to codec_chunk_frames cadence. + initial_codec_chunk_frames: 0 edges: - from: 0 diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index 8c21052e9ee..c0bcfa612df 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -3,6 +3,9 @@ from typing import Any import torch +from vllm.logger import init_logger + +logger = init_logger(__name__) def _extract_last_frame(pooling_output: dict[str, Any]) -> torch.Tensor | None: @@ -40,13 +43,32 @@ def talker2code2wav_async_chunk( connector = getattr(transfer_manager, "connector", None) raw_cfg = getattr(connector, "config", {}) or {} cfg = raw_cfg.get("extra", raw_cfg) if isinstance(raw_cfg, dict) else {} - chunk_size_config = int(cfg.get("codec_chunk_frames", 25)) + chunk_size = int(cfg.get("codec_chunk_frames", 25)) left_context_size_config = int(cfg.get("codec_left_context_frames", 25)) - if chunk_size_config <= 0 or left_context_size_config < 0: + initial_chunk_size = int(cfg.get("initial_codec_chunk_frames", 0)) + # Per-request override (takes priority over stage config) + additional_information = getattr(request, "additional_information", None) + if ( + additional_information is not None + and hasattr(additional_information, "entries") + and "initial_codec_chunk_frames" in additional_information.entries + ): + entry = additional_information.entries["initial_codec_chunk_frames"] + if entry.list_data is not None and len(entry.list_data) == 1: + initial_chunk_size = int(entry.list_data[0]) + if chunk_size <= 0 or left_context_size_config < 0 or initial_chunk_size < 0: raise ValueError( - f"Invalid codec chunk config: codec_chunk_frames={chunk_size_config}, " - f"codec_left_context_frames={left_context_size_config}" + f"Invalid codec chunk config: codec_chunk_frames={chunk_size}, " + f"codec_left_context_frames={left_context_size_config}, " + f"initial_codec_chunk_frames={initial_chunk_size}" + ) + if initial_chunk_size > chunk_size: + logger.warning( + "initial_codec_chunk_frames=%d > codec_chunk_frames=%d, clamping to codec_chunk_frames.", + initial_chunk_size, + chunk_size, ) + initial_chunk_size = chunk_size length = len(transfer_manager.code_prompt_token_ids[request_id]) # Avoid emitting empty chunks during normal streaming. If the request is @@ -59,21 +81,36 @@ def talker2code2wav_async_chunk( } return None - chunk_length = length % chunk_size_config - - if chunk_length != 0 and not finished: - return None + in_initial_phase = initial_chunk_size > 0 and length <= chunk_size - context_length = chunk_length if chunk_length != 0 else chunk_size_config - end_index = min(length, left_context_size_config + context_length) - left_context_size = max(0, int(end_index - context_length)) - window_frames = transfer_manager.code_prompt_token_ids[request_id][-end_index:] + if in_initial_phase: + # Initial-chunk phase: emit every initial_chunk_size frames with full accumulated context. + already_sent = transfer_manager.put_req_chunk[request_id] * initial_chunk_size + pending = length - already_sent + if pending <= 0: + return None + if pending < initial_chunk_size and not finished: + return None + context_length = min(pending, initial_chunk_size) + end_index = length + left_context_size = max(0, length - context_length) + window_frames = transfer_manager.code_prompt_token_ids[request_id][:length] + else: + # Normal phase: standard chunk_size cadence with left_context sliding window. + # Offset by initial_coverage so normal starts from where the initial-chunk phase left off. + initial_coverage = (chunk_size // initial_chunk_size) * initial_chunk_size if initial_chunk_size > 0 else 0 + adjusted = length - initial_coverage + chunk_length = adjusted % chunk_size + if chunk_length != 0 and not finished: + return None + context_length = chunk_length if chunk_length != 0 else chunk_size + end_index = min(length, left_context_size_config + context_length) + left_context_size = max(0, int(end_index - context_length)) + window_frames = transfer_manager.code_prompt_token_ids[request_id][-end_index:] # Pack context + chunk into codebook-major flat codes for adapter. code_predictor_codes = torch.tensor(window_frames).transpose(0, 1).reshape(-1).tolist() - # Build final prompt_token_ids and left_context_size header for Qwen3-TTS Code2Wav. - # The model expects input_ids layout: [*flat_codes]. return { "code_predictor_codes": code_predictor_codes, "left_context_size": left_context_size, diff --git a/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml b/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml index 60659a9768b..5ad0cc90e37 100644 --- a/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml +++ b/vllm_omni/platforms/npu/stage_configs/qwen3_tts.yaml @@ -96,6 +96,10 @@ runtime: # Align with Omni: small chunks with sufficient context overlap. codec_chunk_frames: 25 codec_left_context_frames: 25 + # First chunk size for reduced TTFA (0 = disabled). + # When > 0, emits small chunks every N frames during the initial phase, + # then switches to codec_chunk_frames cadence. + initial_codec_chunk_frames: 0 edges: - from: 0