From a393004f814ee6fd0c6e4b7bbcfe8545fd7226bf Mon Sep 17 00:00:00 2001 From: amy-why-3459 Date: Sat, 25 Apr 2026 17:47:27 +0800 Subject: [PATCH 1/2] Fix the chunked prefill issue in thinker Signed-off-by: amy-why-3459 --- tests/dfx/perf/tests/test_qwen_omni.json | 450 ++++++++++++++++++ .../chunk_transfer_adapter.py | 53 +-- .../models/qwen3_omni/qwen3_omni.py | 20 +- .../stage_input_processors/qwen3_omni.py | 39 +- 4 files changed, 495 insertions(+), 67 deletions(-) create mode 100644 tests/dfx/perf/tests/test_qwen_omni.json diff --git a/tests/dfx/perf/tests/test_qwen_omni.json b/tests/dfx/perf/tests/test_qwen_omni.json new file mode 100644 index 00000000000..d46d9043197 --- /dev/null +++ b/tests/dfx/perf/tests/test_qwen_omni.json @@ -0,0 +1,450 @@ +[ + { + "test_name": "test_qwen3_omni", + "server_params": { + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "extra_cli_args": ["--no-async-chunk"] + }, + "benchmark_params": [ + { + "dataset_name": "random", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [4, 16, 32, 64, 128], + "max_concurrency": [1, 4, 8, 16, 32], + "random_input_len": 2500, + "random_output_len": 900, + "ignore_eos": true, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000], + "mean_audio_ttfp_ms": [30000, 60000, 90000, 120000, 150000], + "mean_audio_rtf": [0.35, 0.45, 0.55, 0.65, 0.75] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [10], + "request_rate": [0.1], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 1, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "audio": 1 + }, + "random_mm_bucket_config": { + "(0, 60, 3)": 1.0 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": [2000], + "mean_audio_ttfp_ms": [10000], + "mean_audio_rtf": [0.25] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [40], + "request_rate": [0.5], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 2, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1 + }, + "random_mm_bucket_config": { + "(256, 256, 1)": 0.5, + "(720, 1280, 2)": 0.5 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": [6000], + "mean_audio_ttfp_ms": [15000], + "mean_audio_rtf": [0.45] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [100], + "request_rate": [1.0], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 3, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1, + "audio": 1 + }, + "random_mm_bucket_config": { + "(256, 256, 1)": 0.34, + "(720, 1280, 2)": 0.33, + "(0, 60, 3)": 0.33 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": [12000], + "mean_audio_ttfp_ms": [18000], + "mean_audio_rtf": [0.9] + } + } + ] + }, + { + "test_name": "test_qwen3_omni_chunk", + "server_params": { + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "extra_cli_args": ["--async-chunk"] + }, + "benchmark_params": [ + { + "dataset_name": "random", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [4, 16, 32, 64, 128], + "max_concurrency": [1, 4, 8, 16, 32], + "random_input_len": 2500, + "random_output_len": 900, + "ignore_eos": true, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000], + "mean_audio_ttfp_ms": [1000, 3000, 5000, 7000, 9000], + "mean_audio_rtf": [0.2, 0.35, 0.6, 0.85, 0.9] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [10], + "request_rate": [0.1], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 1, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "audio": 1 + }, + "random_mm_bucket_config": { + "(0, 60, 3)": 1.0 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": [2000], + "mean_audio_ttfp_ms": [2000], + "mean_audio_rtf": [0.25] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [40], + "request_rate": [0.5], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 2, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1 + }, + "random_mm_bucket_config": { + "(256, 256, 1)": 0.5, + "(720, 1280, 2)": 0.5 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": [6000], + "mean_audio_ttfp_ms": [6000], + "mean_audio_rtf": [0.7] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [100], + "request_rate": [1.0], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 3, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1, + "audio": 1 + }, + "random_mm_bucket_config": { + "(256, 256, 1)": 0.34, + "(720, 1280, 2)": 0.33, + "(0, 60, 3)": 0.33 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": [12000], + "mean_audio_ttfp_ms": [12000], + "mean_audio_rtf": [1.0] + } + }, + { + "dataset_name": "random", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [4, 16, 32, 64, 128], + "max_concurrency": [1, 4, 8, 16, 32], + "random_input_len": 2500, + "random_output_len": 900, + "ignore_eos": true, + "extra_body": { + "modalities": ["text"] + }, + "percentile-metrics": "ttft,tpot,itl,e2el", + "baseline": { + "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [10], + "request_rate": [0.1], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "extra_body": { + "modalities": ["text"] + }, + "random_mm_base_items_per_request": 1, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "audio": 1 + }, + "random_mm_bucket_config": { + "(0, 60, 3)": 1.0 + }, + "percentile-metrics": "ttft,tpot,itl,e2el", + "baseline": { + "mean_ttft_ms": [2000] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [40], + "request_rate": [0.5], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "extra_body": { + "modalities": ["text"] + }, + "random_mm_base_items_per_request": 2, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1 + }, + "random_mm_bucket_config": { + "(256, 256, 1)": 0.5, + "(720, 1280, 2)": 0.5 + }, + "percentile-metrics": "ttft,tpot,itl,e2el", + "baseline": { + "mean_ttft_ms": [6000] + } + }, + { + "dataset_name": "random-mm", + "backend": "openai-chat-omni", + "endpoint": "/v1/chat/completions", + "num_prompts": [100], + "request_rate": [1.0], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "extra_body": { + "modalities": ["text"] + }, + "random_mm_base_items_per_request": 3, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1, + "audio": 1 + }, + "random_mm_bucket_config": { + "(256, 256, 1)": 0.34, + "(720, 1280, 2)": 0.33, + "(0, 60, 3)": 0.33 + }, + "percentile-metrics": "ttft,tpot,itl,e2el", + "baseline": { + "mean_ttft_ms": [6000] + } + } + ] + }, + { + "test_name": "test_qwen3_omni_vllm_text", + "server_params": { + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "use_omni": false, + "extra_cli_args": ["--no-enable-prefix-caching"] + }, + "benchmark_params": [ + { + "dataset_name": "random", + "backend": "vllm", + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "endpoint": "/v1/completions", + "num_prompts": [4, 16, 32, 64, 128], + "max_concurrency": [1, 4, 8, 16, 32], + "random_input_len": 2500, + "random_output_len": 900, + "temperature": 0.4, + "top_p": 0.9, + "top_k": 1, + "seed": 42, + "repetition_penalty": 1.05, + "ignore_eos": true, + "extra_body": { + "modalities": ["text"] + }, + "percentile-metrics": "ttft,tpot,itl,e2el", + "baseline": { + "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000] + } + }, + { + "dataset_name": "random-mm", + "backend": "vllm", + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "endpoint": "/v1/completions", + "num_prompts": [10], + "request_rate": [0.1], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "temperature": 0.4, + "top_p": 0.9, + "top_k": 1, + "seed": 42, + "repetition_penalty": 1.05, + "ignore_eos": true, + "extra_body": { + "modalities": ["text"] + }, + "random_mm_base_items_per_request": 1, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "audio": 1 + }, + "random_mm_bucket_config": { + "(0, 60, 3)": 1.0 + }, + "percentile-metrics": "ttft,tpot,itl,e2el", + "baseline": { + "mean_ttft_ms": [2000] + } + }, + { + "dataset_name": "random-mm", + "backend": "vllm", + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "endpoint": "/v1/completions", + "num_prompts": [40], + "request_rate": [0.5], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "temperature": 0.4, + "top_p": 0.9, + "top_k": 1, + "seed": 42, + "repetition_penalty": 1.05, + "ignore_eos": true, + "extra_body": { + "modalities": ["text"] + }, + "random_mm_base_items_per_request": 2, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1 + }, + "random_mm_bucket_config": { + "(256, 256, 1)": 0.5, + "(720, 1280, 2)": 0.5 + }, + "percentile-metrics": "ttft,tpot,itl,e2el", + "baseline": { + "mean_ttft_ms": [6000] + } + }, + { + "dataset_name": "random-mm", + "backend": "vllm", + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "endpoint": "/v1/completions", + "num_prompts": [100], + "request_rate": [1.0], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "temperature": 0.4, + "top_p": 0.9, + "top_k": 1, + "seed": 42, + "repetition_penalty": 1.05, + "ignore_eos": true, + "extra_body": { + "modalities": ["text"] + }, + "random_mm_base_items_per_request": 3, + "random_mm_num_mm_items_range_ratio": 0.5, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1, + "audio": 1 + }, + "random_mm_bucket_config": { + "(256, 256, 1)": 0.34, + "(720, 1280, 2)": 0.33, + "(0, 60, 3)": 0.33 + }, + "percentile-metrics": "ttft,tpot,itl,e2el", + "baseline": { + "mean_ttft_ms": [6000] + } + } + ] + } +] diff --git a/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py b/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py index 2bdb1136976..0cecc79f4d2 100644 --- a/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py +++ b/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py @@ -62,6 +62,7 @@ def __init__(self, vllm_config: Any): self.waiting_for_chunk_running_requests: deque[Any] = deque() self.requests_with_ready_chunks = set() self.requests_origin_status = {} + self.requests_num_chunks_sent: dict[str, int] = defaultdict(int) @classmethod def create_connector(cls, model_config: Any): @@ -117,6 +118,17 @@ def save_async( pooling_output: Partial pooling output dictionary request: Request object """ + + # If the request is preempted, skip the already saved chunks. + if request.num_computed_tokens < self.requests_num_chunks_sent.get(request.external_req_id, 0): + logger.error( + f"Enqueue save_async for request {request.external_req_id}, " + f"request.num_computed_tokens={request.num_computed_tokens}, " + f"previous_chunks_sent={self.requests_num_chunks_sent.get(request.external_req_id, 0)}" + ) + return + + self.requests_num_chunks_sent[request.external_req_id] = request.num_computed_tokens task = { "pooling_output": pooling_output, "request": request, @@ -155,8 +167,7 @@ def _poll_single_request(self, request: Request): meta = payload_data.get("meta", {}) if self.model_mode == "ar": - merged_payload = self._update_request_payload(external_req_id, payload_data) - request.additional_information = merged_payload + request.additional_information = payload_data if meta.get("finished"): self.finished_requests.add(req_id) else: @@ -198,42 +209,6 @@ def _poll_single_request(self, request: Request): return False - def _update_request_payload(self, req_id: str, payload_data: dict[str, Any]) -> dict[str, Any]: - """Update the stored payload for *req_id* with the latest chunk.""" - if req_id not in self.request_payload: - self.request_payload[req_id] = payload_data - return payload_data - origin = self.request_payload[req_id] - raw_ok = payload_data.get("meta", {}).pop("override_keys", []) - override_keys = {tuple(k) if isinstance(k, list) else k for k in raw_ok} - - for key, value in payload_data.items(): - if isinstance(value, dict): - origin_sub = origin.get(key) - if not isinstance(origin_sub, dict): - continue - for qual, qval in value.items(): - if key == "meta" and qual == "finished": - continue - if (key, qual) in override_keys: - continue - osv = origin_sub.get(qual) - if isinstance(qval, torch.Tensor) and isinstance(osv, torch.Tensor): - value[qual] = torch.cat([osv, qval], dim=0) - elif isinstance(qval, list) and isinstance(osv, list): - value[qual] = osv + qval - else: - if key in override_keys: - continue - ov = origin.get(key) - if isinstance(value, torch.Tensor) and isinstance(ov, torch.Tensor): - payload_data[key] = torch.cat([ov, value], dim=0) - elif isinstance(value, list) and isinstance(ov, list): - payload_data[key] = ov + value - - self.request_payload[req_id] = payload_data - return payload_data - def _send_single_request(self, task: dict): raw_po = task["pooling_output"] pooling_output = unflatten_payload(raw_po) if isinstance(raw_po, dict) else raw_po @@ -290,6 +265,7 @@ def _send_single_request(self, task: dict): if is_finished: self.code_prompt_token_ids.pop(external_req_id, None) + self.requests_num_chunks_sent.pop(external_req_id, None) cached_ic = getattr(self, "_cached_ic", None) if cached_ic is not None: cached_ic.pop(external_req_id, None) @@ -327,6 +303,7 @@ def cleanup_sender(self, external_req_id: str) -> None: self.put_req_chunk.pop(external_req_id, None) self.request_payload.pop(external_req_id, None) self.code_prompt_token_ids.pop(external_req_id, None) + self.requests_num_chunks_sent.pop(external_req_id, None) cached_ic = getattr(self, "_cached_ic", None) if cached_ic is not None: diff --git a/vllm_omni/model_executor/models/qwen3_omni/qwen3_omni.py b/vllm_omni/model_executor/models/qwen3_omni/qwen3_omni.py index d7765026524..bdf50d69001 100644 --- a/vllm_omni/model_executor/models/qwen3_omni/qwen3_omni.py +++ b/vllm_omni/model_executor/models/qwen3_omni/qwen3_omni.py @@ -982,19 +982,11 @@ def _thinker_decode_to_talker_decode( """ embed = payload.get("embed", {}) meta = payload.get("meta", {}) - ids = payload.get("ids", {}) cached_thinker_decode_embeds = embed.get("cached_decode", None) thinker_decode_embed = embed.get("decode", None) start_index = meta.get("num_processed_tokens", 0) thinker_output_token_ids = ids.get("output", []) - if start_index >= len(thinker_output_token_ids) - 1: - # When the tokens output by the thinker are exhausted, an EOS token needs to be appended. - # Use the finished_flag to mark that all tokens output by thinker have been consumed. - if meta.get("eos_emitted", False): - return self.tts_pad_embed.to(device) - update_dict.setdefault("meta", {})["eos_emitted"] = True - return self.tts_eos_embed.to(device) if cached_thinker_decode_embeds is not None and start_index < cached_thinker_decode_embeds.shape[0]: cached_thinker_decode_embeds = cached_thinker_decode_embeds.to(device) @@ -1003,10 +995,20 @@ def _thinker_decode_to_talker_decode( thinker_decode_embed = thinker_decode_embed.to(device) cached_thinker_decode_embeds = torch.cat([cached_thinker_decode_embeds, thinker_decode_embed], dim=0) update_dict.setdefault("embed", {})["cached_decode"] = cached_thinker_decode_embeds - else: + + elif thinker_decode_embed is not None: thinker_embed = thinker_decode_embed if thinker_embed.device != device: thinker_embed = thinker_embed.to(device) + + else: + # When the tokens output by the thinker are exhausted, an EOS token needs to be appended. + # Use the finished_flag to mark that all tokens output by thinker have been consumed. + if meta.get("eos_emitted", False): + return self.tts_pad_embed.to(device) + update_dict.setdefault("meta", {})["eos_emitted"] = True + return self.tts_eos_embed.to(device) + update_dict.setdefault("embed", {})["decode"] = None return self.talker.text_projection(thinker_embed).to(device) diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_omni.py b/vllm_omni/model_executor/stage_input_processors/qwen3_omni.py index 63403619e9b..80974ad3b3d 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_omni.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_omni.py @@ -349,28 +349,27 @@ def _maybe_cpu(t: Any) -> torch.Tensor | None: payload.hidden_states.output = torch.cat( (save_payload.get("hidden_states", {}).get("output"), payload.hidden_states.output), dim=0 ) + prefill_shape = talker_additional_info["embed"]["prefill"].shape[0] + if not is_finished and prefill_shape <= len(prompt_token_ids): + transfer_manager.request_payload[request_id] = talker_additional_info + return None else: - output_token_ids = _ensure_list(request.output_token_ids) - meta = MetaStruct(finished=torch.tensor(is_finished, dtype=torch.bool)) - if output_token_ids: - meta.override_keys = [("embed", "decode"), ("ids", "output")] - payload = OmniPayloadStruct( - meta=meta, - embed=EmbeddingsStruct(decode=thinker_emb.detach().cpu()), - ids=IdsStruct(output=output_token_ids), - speaker=speaker, - language=language, - ) - else: - # When prefilling a chunked thinker, thinker_hidden_states needs to be updated. - payload = OmniPayloadStruct( - meta=meta, - embed=EmbeddingsStruct(prefill=thinker_emb.detach().cpu()), - hidden_states=HiddenStatesStruct(output=thinker_hid.detach().cpu()), - speaker=speaker, - language=language, + talker_additional_info: OmniPayload = { + "meta": {"finished": torch.tensor(is_finished, dtype=torch.bool)}, + } + talker_additional_info["meta"]["override_keys"] = [("embed", "decode"), ("ids", "output")] + talker_additional_info["embed"] = {"decode": thinker_layers[int(_EMBED_LAYER_KEY)].detach().cpu()} + if talker_additional_info["embed"]["decode"].shape[0] > 1: + logger.warning( + "Unexpected multiple embeddings in thinker2talker_async_chunk for chunk_id %d: " + "request_id %s, num_computed_tokens%d %s. Expected shape [1, D].", + chunk_id, + request_id, + request.num_computed_tokens, + talker_additional_info["embed"]["decode"].shape, ) - return payload + return None + return talker_additional_info def thinker2talker( From df11178f1bc1683a71aef893c56b950a8df5af4a Mon Sep 17 00:00:00 2001 From: amy-why-3459 Date: Sat, 25 Apr 2026 17:47:27 +0800 Subject: [PATCH 2/2] Fix the issue of thinker requests being preempted, causing shape mismatch Signed-off-by: amy-why-3459 --- .../tests/test_qwen3_omni_async_chunk.json | 10 +- tests/dfx/perf/tests/test_qwen_omni.json | 450 ------------------ .../test_qwen3_omni_expansion.py | 9 +- .../chunk_transfer_adapter.py | 9 +- .../models/qwen3_omni/qwen3_omni.py | 1 - .../stage_input_processors/qwen3_omni.py | 22 +- 6 files changed, 32 insertions(+), 469 deletions(-) delete mode 100644 tests/dfx/perf/tests/test_qwen_omni.json diff --git a/tests/dfx/perf/tests/test_qwen3_omni_async_chunk.json b/tests/dfx/perf/tests/test_qwen3_omni_async_chunk.json index 4f73f2b6a96..98e31174817 100644 --- a/tests/dfx/perf/tests/test_qwen3_omni_async_chunk.json +++ b/tests/dfx/perf/tests/test_qwen3_omni_async_chunk.json @@ -10,16 +10,16 @@ "dataset_name": "random", "backend": "openai-chat-omni", "endpoint": "/v1/chat/completions", - "num_prompts": [4, 16, 32, 64], - "max_concurrency": [1, 4, 8, 16], + "num_prompts": [4, 16, 32, 64, 128], + "max_concurrency": [1, 4, 8, 16, 32], "random_input_len": 2500, "random_output_len": 900, "ignore_eos": true, "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", "baseline": { - "mean_ttft_ms": [1000, 3000, 5000, 7000], - "mean_audio_ttfp_ms": [1000, 3000, 5000, 7000], - "mean_audio_rtf": [0.2, 0.35, 0.6, 0.85] + "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000], + "mean_audio_ttfp_ms": [1000, 3000, 5000, 7000, 9000], + "mean_audio_rtf": [0.2, 0.35, 0.6, 0.85, 0.9] } }, { diff --git a/tests/dfx/perf/tests/test_qwen_omni.json b/tests/dfx/perf/tests/test_qwen_omni.json deleted file mode 100644 index d46d9043197..00000000000 --- a/tests/dfx/perf/tests/test_qwen_omni.json +++ /dev/null @@ -1,450 +0,0 @@ -[ - { - "test_name": "test_qwen3_omni", - "server_params": { - "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", - "extra_cli_args": ["--no-async-chunk"] - }, - "benchmark_params": [ - { - "dataset_name": "random", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [4, 16, 32, 64, 128], - "max_concurrency": [1, 4, 8, 16, 32], - "random_input_len": 2500, - "random_output_len": 900, - "ignore_eos": true, - "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", - "baseline": { - "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000], - "mean_audio_ttfp_ms": [30000, 60000, 90000, 120000, 150000], - "mean_audio_rtf": [0.35, 0.45, 0.55, 0.65, 0.75] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [10], - "request_rate": [0.1], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "random_mm_base_items_per_request": 1, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "audio": 1 - }, - "random_mm_bucket_config": { - "(0, 60, 3)": 1.0 - }, - "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", - "baseline": { - "mean_ttft_ms": [2000], - "mean_audio_ttfp_ms": [10000], - "mean_audio_rtf": [0.25] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [40], - "request_rate": [0.5], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "random_mm_base_items_per_request": 2, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "image": 1, - "video": 1 - }, - "random_mm_bucket_config": { - "(256, 256, 1)": 0.5, - "(720, 1280, 2)": 0.5 - }, - "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", - "baseline": { - "mean_ttft_ms": [6000], - "mean_audio_ttfp_ms": [15000], - "mean_audio_rtf": [0.45] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [100], - "request_rate": [1.0], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "random_mm_base_items_per_request": 3, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "image": 1, - "video": 1, - "audio": 1 - }, - "random_mm_bucket_config": { - "(256, 256, 1)": 0.34, - "(720, 1280, 2)": 0.33, - "(0, 60, 3)": 0.33 - }, - "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", - "baseline": { - "mean_ttft_ms": [12000], - "mean_audio_ttfp_ms": [18000], - "mean_audio_rtf": [0.9] - } - } - ] - }, - { - "test_name": "test_qwen3_omni_chunk", - "server_params": { - "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", - "extra_cli_args": ["--async-chunk"] - }, - "benchmark_params": [ - { - "dataset_name": "random", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [4, 16, 32, 64, 128], - "max_concurrency": [1, 4, 8, 16, 32], - "random_input_len": 2500, - "random_output_len": 900, - "ignore_eos": true, - "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", - "baseline": { - "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000], - "mean_audio_ttfp_ms": [1000, 3000, 5000, 7000, 9000], - "mean_audio_rtf": [0.2, 0.35, 0.6, 0.85, 0.9] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [10], - "request_rate": [0.1], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "random_mm_base_items_per_request": 1, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "audio": 1 - }, - "random_mm_bucket_config": { - "(0, 60, 3)": 1.0 - }, - "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", - "baseline": { - "mean_ttft_ms": [2000], - "mean_audio_ttfp_ms": [2000], - "mean_audio_rtf": [0.25] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [40], - "request_rate": [0.5], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "random_mm_base_items_per_request": 2, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "image": 1, - "video": 1 - }, - "random_mm_bucket_config": { - "(256, 256, 1)": 0.5, - "(720, 1280, 2)": 0.5 - }, - "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", - "baseline": { - "mean_ttft_ms": [6000], - "mean_audio_ttfp_ms": [6000], - "mean_audio_rtf": [0.7] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [100], - "request_rate": [1.0], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "random_mm_base_items_per_request": 3, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "image": 1, - "video": 1, - "audio": 1 - }, - "random_mm_bucket_config": { - "(256, 256, 1)": 0.34, - "(720, 1280, 2)": 0.33, - "(0, 60, 3)": 0.33 - }, - "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", - "baseline": { - "mean_ttft_ms": [12000], - "mean_audio_ttfp_ms": [12000], - "mean_audio_rtf": [1.0] - } - }, - { - "dataset_name": "random", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [4, 16, 32, 64, 128], - "max_concurrency": [1, 4, 8, 16, 32], - "random_input_len": 2500, - "random_output_len": 900, - "ignore_eos": true, - "extra_body": { - "modalities": ["text"] - }, - "percentile-metrics": "ttft,tpot,itl,e2el", - "baseline": { - "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [10], - "request_rate": [0.1], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "extra_body": { - "modalities": ["text"] - }, - "random_mm_base_items_per_request": 1, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "audio": 1 - }, - "random_mm_bucket_config": { - "(0, 60, 3)": 1.0 - }, - "percentile-metrics": "ttft,tpot,itl,e2el", - "baseline": { - "mean_ttft_ms": [2000] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [40], - "request_rate": [0.5], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "extra_body": { - "modalities": ["text"] - }, - "random_mm_base_items_per_request": 2, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "image": 1, - "video": 1 - }, - "random_mm_bucket_config": { - "(256, 256, 1)": 0.5, - "(720, 1280, 2)": 0.5 - }, - "percentile-metrics": "ttft,tpot,itl,e2el", - "baseline": { - "mean_ttft_ms": [6000] - } - }, - { - "dataset_name": "random-mm", - "backend": "openai-chat-omni", - "endpoint": "/v1/chat/completions", - "num_prompts": [100], - "request_rate": [1.0], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "ignore_eos": true, - "extra_body": { - "modalities": ["text"] - }, - "random_mm_base_items_per_request": 3, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "image": 1, - "video": 1, - "audio": 1 - }, - "random_mm_bucket_config": { - "(256, 256, 1)": 0.34, - "(720, 1280, 2)": 0.33, - "(0, 60, 3)": 0.33 - }, - "percentile-metrics": "ttft,tpot,itl,e2el", - "baseline": { - "mean_ttft_ms": [6000] - } - } - ] - }, - { - "test_name": "test_qwen3_omni_vllm_text", - "server_params": { - "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", - "use_omni": false, - "extra_cli_args": ["--no-enable-prefix-caching"] - }, - "benchmark_params": [ - { - "dataset_name": "random", - "backend": "vllm", - "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", - "endpoint": "/v1/completions", - "num_prompts": [4, 16, 32, 64, 128], - "max_concurrency": [1, 4, 8, 16, 32], - "random_input_len": 2500, - "random_output_len": 900, - "temperature": 0.4, - "top_p": 0.9, - "top_k": 1, - "seed": 42, - "repetition_penalty": 1.05, - "ignore_eos": true, - "extra_body": { - "modalities": ["text"] - }, - "percentile-metrics": "ttft,tpot,itl,e2el", - "baseline": { - "mean_ttft_ms": [1000, 3000, 5000, 7000, 9000] - } - }, - { - "dataset_name": "random-mm", - "backend": "vllm", - "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", - "endpoint": "/v1/completions", - "num_prompts": [10], - "request_rate": [0.1], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "temperature": 0.4, - "top_p": 0.9, - "top_k": 1, - "seed": 42, - "repetition_penalty": 1.05, - "ignore_eos": true, - "extra_body": { - "modalities": ["text"] - }, - "random_mm_base_items_per_request": 1, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "audio": 1 - }, - "random_mm_bucket_config": { - "(0, 60, 3)": 1.0 - }, - "percentile-metrics": "ttft,tpot,itl,e2el", - "baseline": { - "mean_ttft_ms": [2000] - } - }, - { - "dataset_name": "random-mm", - "backend": "vllm", - "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", - "endpoint": "/v1/completions", - "num_prompts": [40], - "request_rate": [0.5], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "temperature": 0.4, - "top_p": 0.9, - "top_k": 1, - "seed": 42, - "repetition_penalty": 1.05, - "ignore_eos": true, - "extra_body": { - "modalities": ["text"] - }, - "random_mm_base_items_per_request": 2, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "image": 1, - "video": 1 - }, - "random_mm_bucket_config": { - "(256, 256, 1)": 0.5, - "(720, 1280, 2)": 0.5 - }, - "percentile-metrics": "ttft,tpot,itl,e2el", - "baseline": { - "mean_ttft_ms": [6000] - } - }, - { - "dataset_name": "random-mm", - "backend": "vllm", - "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", - "endpoint": "/v1/completions", - "num_prompts": [100], - "request_rate": [1.0], - "random_input_len": 100, - "random_output_len": 100, - "random_range_ratio": 0.0, - "temperature": 0.4, - "top_p": 0.9, - "top_k": 1, - "seed": 42, - "repetition_penalty": 1.05, - "ignore_eos": true, - "extra_body": { - "modalities": ["text"] - }, - "random_mm_base_items_per_request": 3, - "random_mm_num_mm_items_range_ratio": 0.5, - "random_mm_limit_mm_per_prompt": { - "image": 1, - "video": 1, - "audio": 1 - }, - "random_mm_bucket_config": { - "(256, 256, 1)": 0.34, - "(720, 1280, 2)": 0.33, - "(0, 60, 3)": 0.33 - }, - "percentile-metrics": "ttft,tpot,itl,e2el", - "baseline": { - "mean_ttft_ms": [6000] - } - } - ] - } -] diff --git a/tests/e2e/online_serving/test_qwen3_omni_expansion.py b/tests/e2e/online_serving/test_qwen3_omni_expansion.py index aeaf27b31df..bf022dd306e 100644 --- a/tests/e2e/online_serving/test_qwen3_omni_expansion.py +++ b/tests/e2e/online_serving/test_qwen3_omni_expansion.py @@ -44,7 +44,7 @@ def get_batch_token_config(default_path): return modify_stage_config( default_path, updates={ - "stages": {1: {"max_num_batched_tokens": 64}}, + "stages": {0: {"max_num_batched_tokens": 64}, 1: {"max_num_batched_tokens": 64}}, }, ) @@ -95,7 +95,12 @@ def get_default_config(default_path): test_token_params = [ pytest.param( - OmniServerParams(model=model, stage_config_path=get_batch_token_config(default_path), use_stage_cli=True), + OmniServerParams( + model=model, + stage_config_path=get_batch_token_config(default_path), + use_stage_cli=True, + server_args=["--async-chunk"], + ), id="batch_token_64", ) ] diff --git a/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py b/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py index 0cecc79f4d2..bc840c739bf 100644 --- a/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py +++ b/vllm_omni/distributed/omni_connectors/transfer_adapter/chunk_transfer_adapter.py @@ -121,7 +121,7 @@ def save_async( # If the request is preempted, skip the already saved chunks. if request.num_computed_tokens < self.requests_num_chunks_sent.get(request.external_req_id, 0): - logger.error( + logger.warning( f"Enqueue save_async for request {request.external_req_id}, " f"request.num_computed_tokens={request.num_computed_tokens}, " f"previous_chunks_sent={self.requests_num_chunks_sent.get(request.external_req_id, 0)}" @@ -376,6 +376,11 @@ def postprocess_scheduler_output( Add additional info for cached requests and clean up ready chunks from scheduler output. """ + stage_id = self.connector.stage_id + + if stage_id == 0: + return + if requests is not None: self.attach_cached_additional_information(scheduler_output, requests) self._clear_chunk_ready(scheduler_output) @@ -391,6 +396,8 @@ def attach_cached_additional_information(scheduler_output: Any, requests: dict[s request = requests.get(req_id) if req_id else None additional_info = getattr(request, "additional_information", None) if request else None cached_reqs.additional_information[req_id] = additional_info + if request and additional_info: + request.additional_information = None def _process_chunk_queue( self, diff --git a/vllm_omni/model_executor/models/qwen3_omni/qwen3_omni.py b/vllm_omni/model_executor/models/qwen3_omni/qwen3_omni.py index bdf50d69001..c0c6cdfbddb 100644 --- a/vllm_omni/model_executor/models/qwen3_omni/qwen3_omni.py +++ b/vllm_omni/model_executor/models/qwen3_omni/qwen3_omni.py @@ -986,7 +986,6 @@ def _thinker_decode_to_talker_decode( cached_thinker_decode_embeds = embed.get("cached_decode", None) thinker_decode_embed = embed.get("decode", None) start_index = meta.get("num_processed_tokens", 0) - thinker_output_token_ids = ids.get("output", []) if cached_thinker_decode_embeds is not None and start_index < cached_thinker_decode_embeds.shape[0]: cached_thinker_decode_embeds = cached_thinker_decode_embeds.to(device) diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_omni.py b/vllm_omni/model_executor/stage_input_processors/qwen3_omni.py index 80974ad3b3d..1b1dc0f7740 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_omni.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_omni.py @@ -349,27 +349,29 @@ def _maybe_cpu(t: Any) -> torch.Tensor | None: payload.hidden_states.output = torch.cat( (save_payload.get("hidden_states", {}).get("output"), payload.hidden_states.output), dim=0 ) - prefill_shape = talker_additional_info["embed"]["prefill"].shape[0] + prefill_shape = payload.embed.prefill.shape[0] if not is_finished and prefill_shape <= len(prompt_token_ids): - transfer_manager.request_payload[request_id] = talker_additional_info + transfer_manager.request_payload[request_id] = to_dict(payload) return None else: - talker_additional_info: OmniPayload = { - "meta": {"finished": torch.tensor(is_finished, dtype=torch.bool)}, - } - talker_additional_info["meta"]["override_keys"] = [("embed", "decode"), ("ids", "output")] - talker_additional_info["embed"] = {"decode": thinker_layers[int(_EMBED_LAYER_KEY)].detach().cpu()} - if talker_additional_info["embed"]["decode"].shape[0] > 1: + if thinker_emb.shape[0] > 1: logger.warning( "Unexpected multiple embeddings in thinker2talker_async_chunk for chunk_id %d: " "request_id %s, num_computed_tokens%d %s. Expected shape [1, D].", chunk_id, request_id, request.num_computed_tokens, - talker_additional_info["embed"]["decode"].shape, + thinker_emb.shape, ) return None - return talker_additional_info + meta = MetaStruct(finished=torch.tensor(is_finished, dtype=torch.bool)) + payload = OmniPayloadStruct( + meta=meta, + embed=EmbeddingsStruct(decode=thinker_emb.detach().cpu()), + speaker=speaker, + language=language, + ) + return payload def thinker2talker(