[Optim][Qwen3TTS] big boost model throughput+latency high concurrency#1852
Conversation
Signed-off-by: pablo <pablo@agigo.ai>
Signed-off-by: pablo <pablo@agigo.ai>
Signed-off-by: pablo <pablo@agigo.ai>
Signed-off-by: pablo <pablo@agigo.ai>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 75930ff3e7
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| wav_list, sr = await self._resolve_ref_audio(request.ref_audio) | ||
| tts_params["ref_audio"] = [[wav_list, sr]] | ||
| wav_np, sr = await self._resolve_ref_audio(request.ref_audio) | ||
| tts_params["ref_audio"] = [(wav_np, sr)] |
There was a problem hiding this comment.
Keep ref_audio list_data msgspec-serializable
This now places a NumPy array inside additional_information["ref_audio"] ([(wav_np, sr)]), but OmniInputProcessor._build_engine_request forwards list values into AdditionalInformationEntry(list_data=...) without converting nested objects, and AdditionalInformationEntry is documented as requiring msgspec-serializable list content. In multiprocess/distributed engine paths that msgpack-encode OmniEngineCoreRequest, Base voice-cloning requests with ref_audio can fail serialization because nested np.ndarray is not encoded like tensor payloads.
Useful? React with 👍 / 👎.
| self.stop_event.set() | ||
| with self._recv_cond: | ||
| self._recv_cond.notify_all() | ||
| with self._save_cond: | ||
| self._save_cond.notify_all() |
There was a problem hiding this comment.
Close connector resources in adapter shutdown
The new shutdown implementation wakes the worker threads but never closes the underlying connector, even though connector cleanup is defined via OmniConnectorBase.close(). When shutdown is used during stage teardown/restart, transports like shared memory or Mooncake can keep resources open (handles, pools, executors), causing leaks and cross-run interference; call self.connector.close() as part of shutdown after signaling threads.
Useful? React with 👍 / 👎.
|
@linyueqian PTAL |
Signed-off-by: JuanPZuluaga <juanz9312@gmail.com>
Signed-off-by: JuanPZuluaga <juanz9312@gmail.com>
|
addressed Codex Reviewer. |
hsliuustc0106
left a comment
There was a problem hiding this comment.
Summary
This is a well-structured optimization PR with clear benchmark evidence. The three main changes are:
-
GPU-resident buffer keys - Correctly follows the existing pattern from
qwen3_omni.py. Thegpu_resident_buffer_keysset is properly consumed bygpu_model_runner.pyto keeplast_talker_hidden,tts_pad_embed, andtailing_text_hiddenon GPU, eliminating CPU round-trips. -
Threading.Condition for recv/save loops - Good pattern change from busy-spin polling to proper synchronization. The logic correctly processes exactly
nrequests per pass (prevents starvation), waits 100ms when idle, and backs off 1ms on no progress. -
Pure Python transpose - Verified the list comprehension produces identical output to the original
torch.tensor().transpose().reshape(-1).tolist()chain. Avoids tensor allocation overhead.
Validated
- ✅ DCO signed
- ✅ Benchmark evidence with clear before/after comparison
- ✅ GPU-resident pattern matches existing
qwen3_omniimplementation - ✅
finished: booltype change is correct (matchesOmniRequestOutput.finished: bool) - ✅ Transpose optimization is semantically equivalent
- ✅
shutdown()implementation properly signals stop and wakes waiting threads
Minor Notes
- The
shutdown()implementation silently swallows errors fromconnector.close()via try/except pass - this is acceptable for cleanup code but worth awareness
Test Coverage
The existing tests in tests/model_executor/stage_input_processors/test_qwen3_tts_async_chunk.py and tests/distributed/omni_connectors/test_chunk_transfer_adapter.py should cover the modified paths. The benchmark results demonstrate functional correctness.
Nice performance wins - especially the GPU-resident buffers which eliminate 25k+ cudaMemcpy calls per generation at high concurrency.
There was a problem hiding this comment.
Pull request overview
Improve Qwen3-TTS high-concurrency throughput/latency by reducing per-step device transfers and lowering connector polling CPU overhead.
Changes:
- Keep selected Talker intermediate tensors GPU-resident across decode steps to avoid repeated CPU↔GPU round-trips.
- Replace tight polling sleeps in transfer adapter loops with condition-variable wakeups and backoff.
- Optimize chunk-streaming payload construction by avoiding heavy tensor/list conversions.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| vllm_omni/model_executor/stage_input_processors/qwen3_tts.py | Stream payload now uses Python-native transforms and bool finished flags |
| vllm_omni/model_executor/models/qwen3_tts/qwen3_tts_talker.py | Introduces GPU-resident intermediate keys and removes CPU staging for select tensors |
| vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py | Notifies recv/save conditions when new pending work is queued |
| vllm_omni/distributed/omni_connectors/transfer_adapter/base.py | Refactors recv/save loops to use Conditions + backoff and implements shutdown |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| n = len(self._pending_load_reqs) | ||
| any_success = False | ||
| for _ in range(n): | ||
| if not self._pending_load_reqs: | ||
| break | ||
| request = self._pending_load_reqs.popleft() | ||
| request_id = request.request_id | ||
| self.request_ids_mapping[request_id] = request.external_req_id | ||
| try: | ||
| is_success = self._poll_single_request(request) | ||
| if not is_success: | ||
| if is_success: | ||
| any_success = True | ||
| else: | ||
| self._pending_load_reqs.append(request) | ||
| except Exception as e: | ||
| self._pending_load_reqs.append(request) | ||
| logger.warning(f"Error receiving data for {request_id}: {e}") | ||
|
|
||
| time.sleep(0.001) | ||
| with self._recv_cond: | ||
| if not self._pending_load_reqs and not self.stop_event.is_set(): | ||
| self._recv_cond.wait(timeout=0.1) | ||
| elif not any_success and not self.stop_event.is_set(): | ||
| self._recv_cond.wait(timeout=0.001) |
There was a problem hiding this comment.
The Condition is not used to protect access to _pending_load_reqs. Because producers can append without holding _recv_cond’s lock, recv_loop can observe the deque as empty and go into wait() while a request is appended/notify() happens just before it starts waiting (missed wakeup), delaying processing until timeout. Fix by using the condition lock as the mutex for all _pending_load_reqs mutations + checks (append/popleft/len/emptiness) or by switching to a thread-safe queue (queue.Queue) and blocking get() with timeout.
| if not hasattr(request, "additional_information"): | ||
| request.additional_information = None | ||
| self._pending_load_reqs.append(request) | ||
| with self._recv_cond: | ||
| self._recv_cond.notify() |
There was a problem hiding this comment.
This notify() doesn’t prevent missed wakeups because the enqueue (_pending_load_reqs.append) is not done while holding the same _recv_cond lock that the consumer uses to decide whether to wait. Move the append inside the with self._recv_cond: block (and ensure the consumer also checks/consumes under that same lock), or use a blocking queue abstraction to avoid subtle races.
| tail = info_dict.get("tailing_text_hidden") | ||
| if isinstance(tail, torch.Tensor) and tail.ndim == 2 and tail.shape[0] > 0: | ||
| text_step = tail[:1].to(device=input_ids.device, dtype=torch.bfloat16).reshape(1, -1) | ||
| new_tail = tail[1:] if tail.shape[0] > 1 else tail[:0] |
There was a problem hiding this comment.
On GPU, new_tail = tail[1:] is a view into the original tensor storage. Even as the logical length shrinks, the full original allocation can remain live, and repeated slicing can keep large GPU buffers resident longer than intended. Consider tracking an integer offset into tailing_text_hidden (no slicing / no copies) or, if you do want to physically drop consumed elements, make new_tail a new tensor (e.g., contiguous()/clone()) before storing it back so the old storage can be released.
| new_tail = tail[1:] if tail.shape[0] > 1 else tail[:0] | |
| if tail.shape[0] > 1: | |
| # Materialize a new tensor for the remaining queue to avoid keeping | |
| # the original (potentially large) GPU storage alive via a view. | |
| new_tail = tail[1:].contiguous() | |
| else: | |
| # Create a truly empty tensor with matching feature dimension. | |
| new_tail = torch.empty( | |
| (0, tail.shape[1]), | |
| device=tail.device, | |
| dtype=tail.dtype, | |
| ) |
| if int(take.shape[0]) < span_len: | ||
| pad_n = int(span_len - int(take.shape[0])) | ||
| pad_rows = tts_pad_embed.detach().to("cpu").contiguous().reshape(1, -1).expand(pad_n, -1) | ||
| pad_rows = tts_pad_embed.reshape(1, -1).expand(pad_n, -1).to("cpu") |
There was a problem hiding this comment.
This expands on GPU and then copies the expanded view to CPU, which can transfer pad_n * hidden_dim elements over the interconnect. If the intent is to keep prefill prompt building CPU-side, copy the 1×D row to CPU first and then expand/repeat on CPU (or construct the padding directly on CPU) to reduce GPU→CPU transfer volume. Same pattern appears again at line 596.
| return {} | ||
| last = hidden_states[-1, :].detach().to("cpu").contiguous() | ||
| return {"last_talker_hidden": last} | ||
| return {"last_talker_hidden": hidden_states[-1, :]} |
There was a problem hiding this comment.
Previously this value was .detach()’d before storing. Returning a view of hidden_states without detaching can keep autograd history alive if this code is ever run with gradients enabled (or under certain tracing setups), increasing memory usage. Keep the GPU-resident behavior but still detach (and optionally make it contiguous if downstream relies on contiguous reshapes).
| return {"last_talker_hidden": hidden_states[-1, :]} | |
| last_hidden = hidden_states[-1, :].detach() | |
| last_hidden = last_hidden.contiguous() | |
| return {"last_talker_hidden": last_hidden} |
| @@ -189,10 +188,12 @@ def talker2code2wav_async_chunk( | |||
| window_frames = ref_frames + window_frames | |||
| left_context_size += len(ref_frames) | |||
|
|
|||
There was a problem hiding this comment.
window_frames[0] will raise IndexError if window_frames is ever empty. The previous torch.tensor(window_frames) path would also fail on empty input, but if empty windows are possible in some streaming boundary conditions, this needs an explicit guard (e.g., handle empty by returning code_predictor_codes = [] and still emitting finished/left_context_size).
| # Handle potential empty window to avoid IndexError and follow empty-window behavior. | |
| if not window_frames: | |
| return { | |
| "code_predictor_codes": [], | |
| "left_context_size": left_context_size, | |
| "finished": finished, | |
| } |
| self._pending_load_reqs.append(request) | ||
| logger.warning(f"Error receiving data for {request_id}: {e}") | ||
|
|
||
| time.sleep(0.001) |
Signed-off-by: JuanPZuluaga <juanz9312@gmail.com>
… feat/qwen3tts-optimize-decode
|
Nice work on the optimization. I ran an independent benchmark on an H200 (same config for both,
Results confirm the PR's claims. The gains scale with concurrency as expected from eliminating per-step GPU<->CPU round-trips. A couple of nits on the PR description:
|
… feat/qwen3tts-optimize-decode
… feat/qwen3tts-optimize-decode
… feat/qwen3tts-optimize-decode
I have updated the PR description. Let me know if there's something else to be done. |
… feat/qwen3tts-optimize-decode
… feat/qwen3tts-optimize-decode
…anPZuluaga/vllm-omni into feat/qwen3tts-optimize-decode
… feat/qwen3tts-optimize-decode
…anPZuluaga/vllm-omni into feat/qwen3tts-optimize-decode
|
@natureofnature Does this align with your design? |
Yes, I can rebase with this PR for the refactor. |
…vllm-project#1852) Signed-off-by: pablo <pablo@agigo.ai> Signed-off-by: JuanPZuluaga <juanz9312@gmail.com> Co-authored-by: pablo <pablo@agigo.ai> Co-authored-by: Hongsheng Liu <liuhongsheng4@huawei.com> Signed-off-by: yiliu30 <yi4.liu@intel.com>
Fix test assertions and mocks that fell out of sync with source code changes in qwen3_tts.py across PRs vllm-project#1930, vllm-project#1852, and vllm-project#2104. - test_flush_on_finish: `finished` is now a plain bool, not a tensor; remove `.item()` call - test_ic_load_change_mid_request: IC is cached per request since vllm-project#1930; update expected emission frames to match current logic - test_non_async_processor_prepends_ref_code_and_sets_trim_context: add missing `finished=True` and `token_ids` to mock (required since vllm-project#2104) - test_non_async_processor_filters_out_of_range_codec_values: same fix Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: linyueqian <linyueqian@outlook.com>
…vllm-project#1852) Signed-off-by: pablo <pablo@agigo.ai> Signed-off-by: JuanPZuluaga <juanz9312@gmail.com> Co-authored-by: pablo <pablo@agigo.ai> Co-authored-by: Hongsheng Liu <liuhongsheng4@huawei.com>
PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.
Purpose
in the
Qwen3-TTS Talkerwe move 3 big tensors (last_talker_hidden,tts_pad_embed,tailing_text_hidden) fromGPU to CPUafter every decode step, thenCPU to GPUat the start of the next step. At batch_size=32 with ~200 decode steps per utterance (ie high concurrency settings), this creates 25k cudaMemcpy + GPU pipeline stalls per generation. That's why even in high concurrency is hard to see GPU util near 100%This PR eliminates that round-trip and fixes two additional inefficiencies:
gpu_resident_buffer_keys(this thing already exists inqwen3_omni, just wasn't used here). Tensors stay on GPU; the .to(device, dtype) calls become no-ops. Zero-copy per decode step.threading.Condition()replacingtime.sleep(0.001)in bothrecv/saveloops for instant wakeup when data arrives.Test Plan
Run benchmark with this PR vs Main.
the yaml i changed with:
Test Result
see the plot, results are clear, we improve in every setting.
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)