Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ def _send_single_request(self, task: dict):

if is_finished:
self.code_prompt_token_ids.pop(request_id, None)
cached_ic = getattr(self, "_cached_ic", None)
if cached_ic is not None:
cached_ic.pop(request_id, None)
Comment on lines +241 to +243
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Evict cached initial chunk size during adapter cleanup

The new _cached_ic state is only cleared in the is_finished branch here, but requests can also terminate through OmniChunkTransferAdapter.cleanup() (for example, error/failed-load paths in omni_generation_scheduler) without necessarily executing this send-loop branch. That leaves per-request cache entries resident indefinitely and can apply stale IC values if request IDs are reused; please also remove _cached_ic[external_req_id] in cleanup() alongside code_prompt_token_ids and request_payload.

Useful? React with 👍 / 👎.


########################################################################
# Cleanup
Expand Down Expand Up @@ -273,6 +276,10 @@ def cleanup(
self.request_payload.pop(external_req_id, None)
self.code_prompt_token_ids.pop(external_req_id, None)

cached_ic = getattr(self, "_cached_ic", None)
if cached_ic is not None:
cached_ic.pop(external_req_id, None)

########################################################################
# Schedule Helper
########################################################################
Expand Down
26 changes: 11 additions & 15 deletions vllm_omni/model_executor/stage_input_processors/qwen3_tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,18 @@ def talker2code2wav_async_chunk(
initial_chunk_size = int(entry.list_data[0])
per_request_override = True

# Dynamic IC: always derived from chunk_size, recomputed every call to adapt to load.
# Dynamic IC: cache per request so boundaries stay stable for its lifetime.
if not per_request_override:
max_ic = max_ic_for_chunk_size(chunk_size)
active = sum(1 for v in transfer_manager.code_prompt_token_ids.values() if len(v) > 0)
capacity = getattr(transfer_manager, "scheduler_max_num_seqs", 1)
initial_chunk_size = compute_dynamic_initial_chunk_size(active, capacity, max_ic)
logger.debug(
"Dynamic IC: active=%d, capacity=%d, max_ic=%d, ic=%d, cs=%d, req=%s, keys=%d",
active,
capacity,
max_ic,
initial_chunk_size,
chunk_size,
request_id,
len(transfer_manager.code_prompt_token_ids),
)
_ic_cache = getattr(transfer_manager, "_cached_ic", None)
if _ic_cache is None:
_ic_cache = {}
transfer_manager._cached_ic = _ic_cache
if request_id not in _ic_cache:
max_ic = max_ic_for_chunk_size(chunk_size)
active = sum(1 for v in transfer_manager.code_prompt_token_ids.values() if len(v) > 0)
capacity = getattr(transfer_manager, "scheduler_max_num_seqs", 1)
_ic_cache[request_id] = compute_dynamic_initial_chunk_size(active, capacity, max_ic)
initial_chunk_size = _ic_cache[request_id]

if chunk_size <= 0 or left_context_size_config < 0 or initial_chunk_size < 0:
raise ValueError(
Expand Down
Loading