[Feat] Support async_chunk additional_information delivery to V2 model runner#2607
Conversation
…nner - Add update_requests() to OmniGPUModelRunner to propagate additional_information from scheduler to intermediate_buffer - Use _resolve_additional_information for AdditionalInformationPayload deserialization in both AR and generation runners - Revert cleanup() to cleanup_receiver() for concurrent safety - Fix _safe_get_rope control flow (remove exception-as-goto pattern) - Add Talker M-RoPE fallback returning 3D sequential positions Signed-off-by: Sy03 <1370724210@qq.com>
4e80cc4 to
a6ef196
Compare
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
| and isinstance(getattr(add_info, "entries"), dict) | ||
| ): | ||
| request.additional_information = deserialize_additional_information(add_info) | ||
| from vllm_omni.worker_v2.model_states.intermediate_buffer import ( |
There was a problem hiding this comment.
Does this change keep MR V1 runnable?
lishunyang12
left a comment
There was a problem hiding this comment.
Review Summary
The core goal — propagating additional_information from OmniCachedRequestData to the V2 model runner's intermediate_buffer — is correct and clearly needed. The in-place update approach in OmniGenerationModelRunner is a good simplification over the previous remove+re-add cycle. The M-RoPE dimensionality check in omni_model_state.py is a sensible hardening.
However, there are two issues that should be addressed before merging:
1. Double intermediate-buffer update in OmniGenerationModelRunner (correctness/perf)
OmniGenerationModelRunner.execute_model calls:
_handle_async_chunk_updates(scheduler_output)— which resolves and mergesadditional_informationintointermediate_bufferself.update_requests(scheduler_output)— which now (via the newOmniGPUModelRunner.update_requests) also resolves and merges the exact sameadditional_informationintointermediate_buffer
Since OmniGenerationModelRunner inherits OmniGPUModelRunner.update_requests without overriding it, every async_chunk cached request gets _resolve_additional_information + intermediate_buffer.update called twice with the same data. While the merge is idempotent for dict values, the tensor .detach().cpu().contiguous() clone path in intermediate_buffer.update() runs twice per tensor per request per step, which is needless GPU-to-CPU traffic.
Suggested fix: Either (a) override update_requests in OmniGenerationModelRunner to skip the additional_information merge (since _handle_async_chunk_updates already handles it), or (b) remove the intermediate-buffer update from _handle_async_chunk_updates and let the inherited update_requests be the single source of truth.
2. _resolve_additional_information drops scalar_data entries — potential regression in scheduler
The PR replaces deserialize_additional_information (from serialization.py) with _resolve_additional_information (from intermediate_buffer.py) in omni_ar_scheduler.py:_free_request. However, these two functions are not equivalent:
deserialize_additional_informationhandlestensor_data,list_data, andscalar_dataentries._resolve_additional_informationonly handlestensor_dataand falls through togetattr(entry, "list_data", None)for everything else —scalar_dataentries becomeNone.
This is a regression for the scheduler path. If any additional_information entry uses scalar_data, it will be silently dropped after this change. The PR description itself lists "silently dropping tensor_data and scalar_data entries" as a bug being fixed (issue #2), but the function being switched to has the same gap.
Suggested fix: Add scalar_data handling to _resolve_additional_information in intermediate_buffer.py, matching what deserialize_additional_information does:
tensor_data = getattr(entry, "tensor_data", None)
if tensor_data is not None:
...
info[k] = torch.from_numpy(arr.copy())
elif getattr(entry, "list_data", None) is not None:
info[k] = entry.list_data
elif getattr(entry, "scalar_data", None) is not None:
info[k] = entry.scalar_data
else:
info[k] = NoneMinor / style
-
The inline
from ... import _resolve_additional_informationinside_handle_async_chunk_updatesandupdate_requestsis fine for avoiding circular imports, but since both methods are in files that already import from the same package at module level, consider hoisting the import to the top of each file if there is no actual circular dependency. This would be a minor readability improvement. -
The
cleanup_receiverchange (issue #3 in the PR description) is sound — only cleaning the receiver side inupdate_from_outputavoids race conditions with background sender threads.
Overall this is a well-motivated fix with good test evidence. Requesting changes only for the two functional issues above.
- Add scalar_data branch to _resolve_additional_information to match deserialize_additional_information (scheduler path was silently dropping scalar entries) - Remove duplicate intermediate_buffer.update in _handle_async_chunk_updates; inherited update_requests is the single source of truth (avoids double CPU clone per tensor per step) - Hoist inline imports of _resolve_additional_information to module top Signed-off-by: Sy03 <1370724210@qq.com>
|
I have fixed all issues and Plz merge it and we can check model v2 migration progress later. @tzhouam |
Purpose
Fix async_chunk mode producing garbage/short audio in V2 model runner.
Root cause:
additional_information(containingthinker_decode_embeddingsandthinker_output_token_ids) was never propagated from the scheduler'sCachedRequestDatato the runner'sintermediate_bufferduring decode steps. The chunk_transfer_adapter correctly polled data from SharedMemoryConnector and attached it toscheduled_cached_reqs.additional_information, butGPUModelRunner.update_requests()does not handle this field — so the data was silently dropped.Additionally fixes three correctness issues found during review:
_handle_async_chunk_updatespassed rawAdditionalInformationPayloadobjects tointermediate_buffer.update(), which expectsdict— causingAttributeErrorwhen payload is not pre-resolvedlist_data, silently droppingtensor_dataandscalar_dataentriescleanup()(sender+receiver) replacedcleanup_receiver()in the failed-KV-load path, risking race conditions with background save threadsTest Plan
Test Result
Before fix: Talker sees
thinker_output_token_ids=[],thinker_decode_embeddings=None-> early EOS after ~33 decode steps -> 4.57s noise audioAfter fix: Talker correctly receives incremental thinker data -> 329+ decode steps -> 22.04s audio, ASR output: