[Feat]audio streaming input for async chunk#3614
Conversation
08eccbe to
92a5230
Compare
|
hello, I’m opening a smaller related PR for The PR only enables a commit-then-generate compatibility bridge: realtime audio chunks are buffered until It does not implement early-start streaming input, prompt extension, or slice-by-slice async-chunk scheduling, so I believe it is complementary to this draft rather than a replacement. Leaving a note here since the PRs touch the same realtime/async-chunk area. Happy to adjust the scope if you think it conflicts with #3614. Related PR: #3654 |
53b0a2a to
2260de2
Compare
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
d8f14c2 to
e73729a
Compare
|
pytest -sv qwen3/tests/e2e/online_serving/test_qwen3_omni_expansion.py 36 passed, 3 skipped, 19 warnings in 4288.87s (1:11:28) |
6a401a0 to
919d63d
Compare
919d63d to
5e85587
Compare
Signed-off-by: CHEN <116010019@link.cuhk.edu.cn>
realtime_demo_new.mp4 |
hsliuustc0106
left a comment
There was a problem hiding this comment.
Review Summary
This PR adds streaming audio input support for the async chunk path, enabling /v1/realtime to work with async_chunk enabled. The design is sound: a resumable flag marks streaming sessions, is_segment_finished separates segment-level from request-level finish, and prefill chunks are cached until the first decode chunk arrives.
Validated:
- CI gates passing (DCO, build, pre-commit, docs)
- PR provides accuracy test output and performance benchmarks (TTFP improvement 17-50%, RTF parity)
- New unit tests cover AR scheduler segment handling, adapter segment tracking, and streaming input prefill caching
- Realtime websocket test now runs with async_chunk enabled
Blocking issue: bare except Exception with incomplete fallback in construct_next_stage_streaming_input_prompt.
PR size note: 18 files changed (exceeds 10-file threshold). Please run L3 tests locally and paste results: https://docs.vllm.ai/projects/vllm-omni/en/latest/contributing/ci/test_guide/#l3-level--l4-level
Reviewed by Claude Code with glm-5.1
| request.prompt_token_ids.extend(new_prompt or ()) | ||
| request.update_block_hashes() | ||
| request.num_prompt_tokens = len(request.prompt_token_ids) | ||
| except Exception: |
There was a problem hiding this comment.
This bare except Exception silently swallows all errors. More critically, the fallback (line 249) only sets request.prompt_token_ids but does not update _all_token_ids, num_prompt_tokens, num_computed_tokens, or call update_block_hashes(). This can leave the request in an inconsistent state where the scheduler allocates the wrong number of KV blocks.
Suggested fix:
- Catch a narrower exception type (e.g.,
except (KeyError, AttributeError, IndexError)), or at minimum log the error. - In the fallback, complete the state update:
except Exception:
if prompt_token_ids is not None:
next_prompt_len = max(1, len(prompt_token_ids))
request._all_token_ids.clear()
request.prompt_token_ids = [0] * next_prompt_len
request._all_token_ids.extend(request.prompt_token_ids)
request.num_computed_tokens = 0
request.update_block_hashes()
request.num_prompt_tokens = len(request.prompt_token_ids)| pending_prefills = getattr(transfer_manager, "_pending_streaming_prefills", None) | ||
| if pending_prefills is None: | ||
| pending_prefills = {} | ||
| transfer_manager._pending_streaming_prefills = pending_prefills |
There was a problem hiding this comment.
_pending_streaming_prefills is dynamically attached to transfer_manager here rather than initialized in ChunkTransferAdapter.__init__. This is fragile:
- The attribute may not exist when
cleanup_senderruns (it usesgetattrwithNonedefault, so cleanup is a no-op instead of cleaning a real dict). - If this function is called after cleanup, it recreates the dict and adds an entry that is never cleaned up.
Please add self._pending_streaming_prefills: dict[str, dict] = {} to ChunkTransferAdapter.__init__ (line ~58 of chunk_transfer_adapter.py) and replace the getattr / monkey-patch pattern with a direct self._pending_streaming_prefills access.
7b31184 to
3f3580f
Compare
Signed-off-by: CHEN <116010019@link.cuhk.edu.cn>
3f3580f to
5689239
Compare
Signed-off-by: CHEN <116010019@link.cuhk.edu.cn>
|
can you help resolve the doc build failures? |
Signed-off-by: CHEN <116010019@link.cuhk.edu.cn>
sure, resolved |
b63d1d0 to
b287f81
Compare
|
In def construct_next_stage_streaming_input_prompt(payload_data: dict[str, Any], request: Any) -> None:
"""Update a downstream streaming request prompt from connector payload ids.
Async chunk downstream stages are prewarmed before the real Talker prompt is
known. When a Thinker payload carries `ids.prompt`, this helper rebuilds the
placeholder prompt length for the next stage, resets the computed-token
cursor, and refreshes block hashes so the scheduler allocates KV slots that
match the newly received streaming slice.
"""
ids = payload_data.get("ids", {})
prompt_token_ids = ids.get("prompt", None)
if not prompt_token_ids:
return
num_computed_tokens = request.num_computed_tokens
kept_output_tokens = request._all_token_ids[request.num_prompt_tokens : num_computed_tokens]
del request._all_token_ids[num_computed_tokens:]
request._output_token_ids.clear()
assert request.prompt_token_ids is not None
# Extend prompt with kept output tokens.
request.prompt_token_ids.extend(kept_output_tokens)
next_prompt_len = max(1, compute_talker_prompt_ids_length(prompt_token_ids))
new_prompt = [0] * next_prompt_len
request._all_token_ids.extend(new_prompt or ())
request.prompt_token_ids.extend(new_prompt or ())
request.update_block_hashes()
request.num_prompt_tokens = len(request.prompt_token_ids)This path appears to work in the current e2e flow, so I would not call it a correctness bug by itself. But the docstring should be clarified: this helper preserves The rest content LGTM. Thanks for amazing improvement. |
Signed-off-by: CHEN <116010019@link.cuhk.edu.cn>
3b1b400 to
93122b2
Compare
thx, updated |
Resolve docstring whitespace conflict in Qwen3-TTS prompt_embeds_builder; align with upstream/main style introduced by #3614. Signed-off-by: Yueqian Lin <linyueqian@outlook.com>
Signed-off-by: CHEN <116010019@link.cuhk.edu.cn>
PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.
Purpose
Streaming audio input means the client does not wait for the full audio request to arrive before starting inference. Instead, the audio is split into multiple slices. Each slice
<|im_start|>user\n{audio_placeholder}<|im_end|>\n<|im_start|>assistant\nis submitted to the engine as a sub-request of the same session. A sub-request waits until the previous round finishes, then continues with conversation memory from earlier slices.Difference from Chunked Prefill
Streaming input changes request boundaries: audio arrives as multiple slices, and generation can run between slices. Its output may differ from submitting the whole audio at once.
Chunked prefill only changes scheduling granularity: the full input is already known, but prefill is scheduled in smaller token chunks. It should preserve the same request semantics and output.
Flow Comparison
Without

async_chunk, each streaming input segment is forwarded by the Orchestrator after the previous stage finishes. Downstream stages receive normalsubmit_updatecalls, and each stage replaces or extends its request state through the scheduler update path.With

async_chunk, the Orchestrator only prewarms downstream stages. Stage-to-stage data is transferred by connector chunks: the upstream stage puts chunks, while the downstream scheduler polls and gets chunks before running the next segment.Main Changes
Orchestrator
add_requestfor the first audio slice andstreaming_updatefor later slices. This reuses the samereq_stateand keeps all slices under one logical session._prewarm_async_chunk_stages()passesresumable=req_state.streaming.enabled. This marks the downstream request as an unfinished streaming-input session.Cross-stage data processing layer
Existing async-chunk logic treats the first transferred chunk as the prefill input.
For streaming input, a later slice can also produce a new sub-request prefill. The current implementation identifies this case with
request.resumableandthinker_emb.shape[0] > 1.When async scheduling has produced one extra placeholder embedding at the end of a slice, that unconfirmed embedding must not be transferred downstream.
Data Transfer Layer
is_segment_finishedbeside the existing request-level finish marker:Test Plan
accuracy:
input wave:
output.wav
performance:
Use different lengths' speech request and split it into streaming input slices.
Expected behavior:
Reproduce script:
benchmark_realtime_vs_chat.py
Test Result
accuracy test output:
realtime_output.wav
perf:
output.wavoutput1.wavTTFP:
1.Realtime is almost unaffected by input length: 0.45s → 0.40s, which matches the expectation of chunk-wise independent inference + concurrent uploading.
2.Chat latency increases significantly with longer input: 0.54s → 0.81s, because it requires uploading the entire audio as base64 at once and completing the full prefill before emitting the first packet.
3.The longer the input audio, the larger the advantage of realtime:
short input: only ~17% improvement
long input: nearly 50% reduction in TTFP
RTF:
1.The two endpoints are almost identical (difference ≤ 0.01), indicating that the throughput after the first packet is essentially the same.
2.RTF itself remains stable across different input lengths as well:
realtime: 0.149–0.155
chat: 0.147–0.155
Conclusion:
For long-audio scenarios, the primary value of the realtime endpoint lies in reducing TTFP, while the generation-phase throughput remains comparable between the two endpoints.
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)