From 76e1acc742060871b3baf2a7d739ee43e8983c2d Mon Sep 17 00:00:00 2001 From: JuanPZuluaga Date: Mon, 16 Mar 2026 17:11:34 +0000 Subject: [PATCH 1/2] remove dynamic initial chunk and only compute on initial request Signed-off-by: JuanPZuluaga --- .../chunk_transfer_adapter.py | 3 +++ .../stage_input_processors/qwen3_tts.py | 26 ++++++++----------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py b/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py index d96ef74db09..e90a8e76682 100644 --- a/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py +++ b/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py @@ -238,6 +238,9 @@ def _send_single_request(self, task: dict): if is_finished: self.code_prompt_token_ids.pop(request_id, None) + cached_ic = getattr(self, "_cached_ic", None) + if cached_ic is not None: + cached_ic.pop(request_id, None) ######################################################################## # Cleanup diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index 435e45f0381..c01517066ec 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -115,22 +115,18 @@ def talker2code2wav_async_chunk( initial_chunk_size = int(entry.list_data[0]) per_request_override = True - # Dynamic IC: always derived from chunk_size, recomputed every call to adapt to load. + # Dynamic IC: cache per request so boundaries stay stable for its lifetime. if not per_request_override: - max_ic = max_ic_for_chunk_size(chunk_size) - active = sum(1 for v in transfer_manager.code_prompt_token_ids.values() if len(v) > 0) - capacity = getattr(transfer_manager, "scheduler_max_num_seqs", 1) - initial_chunk_size = compute_dynamic_initial_chunk_size(active, capacity, max_ic) - logger.debug( - "Dynamic IC: active=%d, capacity=%d, max_ic=%d, ic=%d, cs=%d, req=%s, keys=%d", - active, - capacity, - max_ic, - initial_chunk_size, - chunk_size, - request_id, - len(transfer_manager.code_prompt_token_ids), - ) + _ic_cache = getattr(transfer_manager, "_cached_ic", None) + if _ic_cache is None: + _ic_cache = {} + transfer_manager._cached_ic = _ic_cache + if request_id not in _ic_cache: + max_ic = max_ic_for_chunk_size(chunk_size) + active = sum(1 for v in transfer_manager.code_prompt_token_ids.values() if len(v) > 0) + capacity = getattr(transfer_manager, "scheduler_max_num_seqs", 1) + _ic_cache[request_id] = compute_dynamic_initial_chunk_size(active, capacity, max_ic) + initial_chunk_size = _ic_cache[request_id] if chunk_size <= 0 or left_context_size_config < 0 or initial_chunk_size < 0: raise ValueError( From 0213d6d723dbd94ea6f818007aad265b7d2e3f52 Mon Sep 17 00:00:00 2001 From: JuanPZuluaga Date: Mon, 16 Mar 2026 18:33:37 +0000 Subject: [PATCH 2/2] add cleanup cached_ic Signed-off-by: JuanPZuluaga --- .../transfer_adapter/chunk_transfer_adapter.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py b/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py index e90a8e76682..8e9939d38ae 100644 --- a/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py +++ b/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py @@ -276,6 +276,10 @@ def cleanup( self.request_payload.pop(external_req_id, None) self.code_prompt_token_ids.pop(external_req_id, None) + cached_ic = getattr(self, "_cached_ic", None) + if cached_ic is not None: + cached_ic.pop(external_req_id, None) + ######################################################################## # Schedule Helper ########################################################################