support qwen3 tts streaming output#1189
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 22d078aa39
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
0edb11a to
b8aa200
Compare
hsliuustc0106
left a comment
There was a problem hiding this comment.
could this work with online serving?
| # Convert result to OmniOutput format | ||
| return self.make_omni_output(result, **kwargs) | ||
|
|
||
| def forward_streaming( |
There was a problem hiding this comment.
does every tts model need this function?
There was a problem hiding this comment.
Externally we only expose forward. If a model needs to support streaming output, it should additionally implement forward_streaming; otherwise forward alone is sufficient.
| # ==================== Streaming Generation Methods ==================== | ||
|
|
||
| @torch.inference_mode() | ||
| def generate_custom_voice_streaming( |
|
I think we need to discuss a general abstraction of streaming output such that every tts model can inherit from it. @Gaohan123 @linyueqian @amy-why-3459 |
There was a problem hiding this comment.
Pull request overview
Adds streaming audio output support for Qwen3-TTS to enable lower-latency, chunked waveform generation.
Changes:
- Introduces token-level streaming generation in the talker and an async background decoding pipeline to turn codec chunks into audio.
- Adds high-level streaming APIs and per-request streaming state handling in the Qwen3-TTS vLLM model wrapper.
- Extends the offline example and adds a streaming-vs-nonstreaming consistency script/test.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
vllm_omni/model_executor/models/qwen3_tts/tokenizer_12hz/modeling_qwen3_tts_tokenizer_v2.py |
Disables cache during decoder transformer forward; adds invalid-token truncation for chunked decoding. |
vllm_omni/model_executor/models/qwen3_tts/qwen3_tts.py |
Adds streaming request path (forward_streaming) and public streaming generation helpers. |
vllm_omni/model_executor/models/qwen3_tts/modeling_qwen3_tts.py |
Implements streaming token iterator, async decoding thread pipeline, and streaming audio generator. |
tests/model_executor/models/qwen3_tts/test_streaming_vs_nonstreaming.py |
Adds a codec-token consistency check script (currently placed as a pytest-discoverable test). |
tests/model_executor/models/qwen3_tts/__init__.py |
Initializes the new Qwen3-TTS test package. |
examples/offline_inference/qwen3_tts/end2end.py |
Adds a streaming test query and a streaming test runner that writes chunk WAVs + timing JSON. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| audio_tensor = output.multimodal_output["audio"] | ||
| audio_samplerate = output.multimodal_output["sr"].item() | ||
|
|
There was a problem hiding this comment.
In streaming mode, multimodal_output["audio"] (and potentially sr) may become a list due to multimodal accumulation across iterations. This loop assumes a tensor and will break after the first chunk (audio_tensor.float(), sr.item()). Update the example to handle list values (e.g., take the last element) so it works with accumulated streaming outputs.
| audio_tensor = output.multimodal_output["audio"] | |
| audio_samplerate = output.multimodal_output["sr"].item() | |
| # In streaming mode, multimodal_output["audio"] / ["sr"] may be lists | |
| audio_value = output.multimodal_output["audio"] | |
| if isinstance(audio_value, list) and len(audio_value) > 0: | |
| audio_tensor = audio_value[-1] | |
| else: | |
| audio_tensor = audio_value | |
| sr_value = output.multimodal_output.get("sr", audio_samplerate) | |
| if isinstance(sr_value, list) and len(sr_value) > 0: | |
| sr_item = sr_value[-1] | |
| else: | |
| sr_item = sr_value | |
| if hasattr(sr_item, "item"): | |
| audio_samplerate = int(sr_item.item()) | |
| else: | |
| audio_samplerate = int(sr_item) |
| model = Qwen3TTSModel.from_pretrained( | ||
| MODEL_PATH, | ||
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | ||
| device_map=device, |
There was a problem hiding this comment.
device_map is forwarded to transformers.AutoModel.from_pretrained(...), where it expects a string like "auto"/"cuda:0" or a device mapping dict. Passing a torch.device object here is likely to error. Use device_map="auto" (or a proper dict), or move the model to the desired device via .to(device) after loading.
| device_map=device, | |
| device_map="auto", |
There was a problem hiding this comment.
isn't device map Union[torch.device, dict, str]? Think this is wrong
| request_id = kwargs.get("request_id", "default") | ||
|
|
There was a problem hiding this comment.
Using request_id = kwargs.get("request_id", "default") risks mixing state across concurrent requests whenever request_id is missing/empty. Please require request_id (raise) or generate a unique id (e.g., uuid4) instead of defaulting to a constant.
| ) | ||
|
|
||
| self._validate_languages(languages) | ||
|
|
There was a problem hiding this comment.
generate_voice_design_streaming accepts text: str | list[str], but the underlying streaming implementation only supports batch size 1 (assert in _prepare_talker_inputs). Please validate len(texts)==1 and raise a clear error, or add batching support.
| if len(texts) != 1: | |
| raise ValueError( | |
| "generate_voice_design_streaming currently supports only batch size 1; " | |
| f"got batch size {len(texts)}. For batched synthesis, use the non-streaming " | |
| "generate_voice_design API instead." | |
| ) |
There was a problem hiding this comment.
Nice work on the streaming primitives, @gerayking. The model-level token streaming iterator and async decode pipeline are well structured. We've been running a streaming TTS setup in production on our fork, so I wanted to share some context on how our approaches compare and flag a few things.
How our fork approaches streaming vs this PR
We took a slightly different architecture. Rather than a threaded AsyncDecodingPipeline, we do synchronous inline decoding — the codec-to-audio decode happens in the same thread as the generator, without queues. The tradeoff is less potential parallelism between token generation and audio decoding, but simpler concurrency and no queue-related edge cases (like the bounded stop() put).
At the token generation level, we have a single generate_streaming() generator on the ForConditionalGeneration class rather than a separate talker-level iterator. We also factored sampling and repetition penalty into standalone helper functions (_sample_token(), _apply_repetition_penalty() using vectorized torch.where) instead of inlining them in the loop.
For context stripping when decoding chunks, we use a proportional ratio (context_frames / total_decoded_frames * total_decoded_samples) rather than a fixed upsample_rate. The transformer-based decoder doesn't always produce exactly frames * upsample_rate samples, so the proportional approach adapts to the actual decoder output and avoids subtle audio glitches at chunk boundaries.
The bigger architectural difference is in the integration layers — which brings me to the main gaps I see in this PR:
Scheduler integration
forward_streaming() returns OmniOutput with "finished": torch.tensor(is_finished), but the generation scheduler (omni_generation_scheduler.py) doesn't know to look for this field. Without a corresponding scheduler change, the request will be marked FINISHED_STOPPED after the first forward() call, so subsequent chunks will never be generated in the online serving path.
In our fork we added an is_final flag to OmniOutput.multimodal_outputs and patched the scheduler to keep requests in RUNNING status until is_final=True. The scheduler reads is_final from the pooler output and only transitions to FINISHED_STOPPED when it's true. This required handling deserialization edge cases too — is_final can arrive as a bool, list, tensor, or scalar after ZMQ transport.
Is this PR intentionally scoped to offline inference only, or is the scheduler/serving integration planned as a follow-up?
Serving layer
Related — there are no changes to serving_speech.py or any entrypoint. For online serving you'd need a StreamingResponse path that yields chunked audio. In our fork we added _stream_progressive_audio() as a FastAPI StreamingResponse that yields PCM bytes or WAV (with a placeholder max-size WAV header). One non-obvious thing: the output processor accumulates all tensors across iterations (not just new ones), so you need a cursor to track which chunks have already been yielded.
Stride-0 tensor serialization
One production bug we hit: when chunks cross certain boundaries, the audio tensor from the decoder can have stride-0 dimensions (from expand() or numpy views). Calling .contiguous() alone doesn't fix stride-0 — you need an explicit .clone(). This crashes during ZMQ serialization in the connector layer. Something to watch for when wiring this into online serving.
Streaming state cleanup
In forward_streaming(), the streaming state is only cleaned up on a subsequent call after is_finished=True. If the caller stops iterating once it sees finished=True (as the docstring suggests), the generator and accumulated audio chunks will leak in self._streaming_state. Cleaning up immediately when the generator signals completion would prevent this.
Minor
print(f"chunk_size : {chunk_size}, {runtime_additional_information}")on line 142 ofqwen3_tts.pylooks like a debug print that should be removed or converted tologger.debug().
What this PR does well
The decoder-side fixes in this PR are solid and independently valuable:
codec_valid_maxtruncation inchunked_decode— prevents out-of-bounds embedding lookupsuse_cache=Falsein the tokenizer 12Hz decoder — prevents KV cache leakssuppress_tokenson the code predictor — prevents generating tokens >= 2048 that would crash the decoder
These are fixes we don't have in our fork yet and plan to adopt.
Thanks for the detailed review and sharing your production experience! Scope clarification: This PR is intentionally scoped to offline inference only. Scheduler integration and online serving changes are planned as follow-up work. Will fix in this PR:
|
|
Hi, is this going to be online anytime soon ? Thank you ! |
Signed-off-by: gerayking <399geray@gmail.com>
Signed-off-by: gerayking <399geray@gmail.com>
Qwen3-TTS online serving streaming output will be supported in another PR. I’ll push it soon. |
0894b02 to
9719de5
Compare
…ress_tokens guard Three production-ready fixes for the 12Hz tokenizer decoder: 1. codec_valid_max truncation: prevents out-of-bounds embedding lookups when tokens >= codebook_size (including EOS tokens) reach the decoder 2. use_cache=False: prevents KV cache accumulation in the ConvTranspose1d decoder's pre_transformer, which caused memory leaks during chunked decode 3. suppress_tokens: added via the streaming pipeline code to block tokens >= 2048 that crash the codec decoder Cherry-picked from vllm-project/vllm-omni PR vllm-project#1189 (gerayking). Co-Authored-By: Claude <noreply@anthropic.com>
Adds StreamingChunkOutput, AsyncDecodingPipeline (background thread + bounded queue), generate_streaming_iter(), and forward_streaming() to the Qwen3-TTS model layer. Includes streaming variants for all three voice paths: custom voice, voice design, and voice clone. Key components: - StreamingChunkOutput: dataclass for incremental codec chunks - AsyncDecodingPipeline: background thread decodes codec→audio with context overlap, communicates via bounded queues - generate_streaming_iter(): yields StreamingChunkOutput per chunk - forward_streaming(): top-level entry that returns OmniOutput with intermediate chunks and a "finished" flag - generate_custom_voice_streaming/voice_design_streaming/voice_clone_streaming Includes test (test_streaming_vs_nonstreaming.py) for token consistency (88 tokens across 16 codebooks) and updated end2end example. Cherry-picked from vllm-project/vllm-omni PR vllm-project#1189 (gerayking). Co-Authored-By: Claude <noreply@anthropic.com>
Extends OmniRequestOutput with finished flag and streaming_audio_chunk for progressive audio delivery. Updates the scheduler to check the "finished" key in multimodal_outputs — keeping requests in RUNNING state when finished=False instead of prematurely transitioning to FINISHED_STOPPED. Modified files: - omni_generation_scheduler.py: streaming-aware update_from_output() - output_processor.py: handle intermediate streaming outputs - outputs.py: OmniRequestOutput.finished + streaming_audio_chunk fields - async_omni.py: yield intermediate OmniRequestOutput in generate() Cherry-picked from vllm-project/vllm-omni PR vllm-project#1189 (gerayking). Co-Authored-By: Claude <noreply@anthropic.com>
1. Replace debug print() with logger.warning() in mel_spectrogram() 2. Fix request_id collision: use uuid4 instead of "default" fallback 3. Remove dead deferred-cleanup branch in forward_streaming() — the immediate cleanup at is_finished already handles state deletion 4. Add batch-size-1 guards to all three streaming generation methods (generate_custom_voice_streaming, generate_voice_design_streaming, generate_voice_clone_streaming) — the underlying model asserts batch=1 but the public API accepted lists silently 5. Remove no-op assignment in scheduler update_from_output() Co-Authored-By: Claude <noreply@anthropic.com>
|
I think there are some bugs in this PR, we should not have changed so many files :) |
🤖 Code Review: PR #1189 — Streaming Audio Output 🔴Verdict: Request Changes Summary: Adds streaming audio output for TTS. However, the PR is ~132KB and bundles unrelated changes: CI infrastructure (pytest markers, nightly tests), ComfyUI integration app, GitHub workflows, and CFG parallel mixin docs. The PR must be split before meaningful review of the streaming core is possible. The CI changes and ComfyUI integration are entirely separate concerns that increase merge conflict risk and make bisecting regressions impossible. Key Concerns:
Detailed Feedback📄
|
|
@vllm-omni-reviewer |
|
The size of PR make it difficult to review and test, can we downsize the scope? Edit: most of changes are not from this PR. |
|
Looks like the branch was forked from an older main and carries ~28 already-merged commits (CI, ComfyUI, docs, etc.) — that's where the 149 files come from. The actual streaming work is only ~6 files.
|
sry, I will fix it tonight. |
9719de5 to
b021f2e
Compare
|
@vllm-omni-reviewer |
|
check #1438 for details |
Purpose
Add streaming output support for Qwen3-TTS model to enable real-time audio generation with lower latency. #938
Key Changes
Streaming Generation Interface (
modeling_qwen3_tts.py):StreamingChunkOutputdataclass for streaming chunk outputAsyncDecodingPipelineclass for asynchronous audio decoding in background threadgenerate_streaming_iter()inQwen3TTSTalkerForConditionalGenerationfor token-level streamingHigh-Level Streaming API (
qwen3_tts.py):forward_streaming()method inQwen3TTSModelForGenerationgenerate_custom_voice_streaming(),generate_voice_design_streaming(),generate_voice_clone_streaming()Examples and Tests:
examples/offline_inference/qwen3_tts/end2end.pytest_streaming_vs_nonstreaming.pyto verify streaming and non-streaming outputs produce identical tokensTest Plan
Test Result
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)