diff --git a/.buildkite/test-ready.yml b/.buildkite/test-ready.yml index 2f7441ad867..89839a2d1ed 100644 --- a/.buildkite/test-ready.yml +++ b/.buildkite/test-ready.yml @@ -216,44 +216,42 @@ steps: volumes: - "/fsx/hf_cache:/fsx/hf_cache" - # - label: "Omni Model Test with H100" - # depends_on: upload-ready-pipeline - # commands: - # - | - # timeout 20m bash -c ' - # export VLLM_WORKER_MULTIPROC_METHOD=spawn - # export VLLM_TEST_CLEAN_GPU_MEMORY="1" - # # - pytest -s -v tests/e2e/offline_inference/test_qwen3_omni.py - # pytest -s -v tests/e2e/online_serving/test_qwen3_omni.py -m "core_model" --run-level "core_model" - # ' - # agents: - # queue: "mithril-h100-pool" - # plugins: - # - kubernetes: - # podSpec: - # containers: - # - image: 936637512419.dkr.ecr.us-west-2.amazonaws.com/vllm-ci-pull-through-cache/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT - # resources: - # limits: - # nvidia.com/gpu: 2 - # volumeMounts: - # - name: devshm - # mountPath: /dev/shm - # - name: hf-cache - # mountPath: /root/.cache/huggingface - # env: - # - name: HF_HOME - # value: /root/.cache/huggingface - # nodeSelector: - # node.kubernetes.io/instance-type: gpu-h100-sxm - # volumes: - # - name: devshm - # emptyDir: - # medium: Memory - # - name: hf-cache - # hostPath: - # path: /mnt/hf-cache - # type: DirectoryOrCreate + - label: "Omni Model Test with H100" + depends_on: upload-ready-pipeline + commands: + - | + timeout 20m bash -c ' + export VLLM_WORKER_MULTIPROC_METHOD=spawn + pytest -s -v tests/e2e/online_serving/test_qwen3_omni.py -m "core_model" --run-level "core_model" + ' + agents: + queue: "mithril-h100-pool" + plugins: + - kubernetes: + podSpec: + containers: + - image: 936637512419.dkr.ecr.us-west-2.amazonaws.com/vllm-ci-pull-through-cache/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + resources: + limits: + nvidia.com/gpu: 2 + volumeMounts: + - name: devshm + mountPath: /dev/shm + - name: hf-cache + mountPath: /root/.cache/huggingface + env: + - name: HF_HOME + value: /root/.cache/huggingface + nodeSelector: + node.kubernetes.io/instance-type: gpu-h100-sxm + volumes: + - name: devshm + emptyDir: + medium: Memory + - name: hf-cache + hostPath: + path: /mnt/hf-cache + type: DirectoryOrCreate - label: "MiMo-Audio E2E Test with H100" depends_on: upload-ready-pipeline diff --git a/tests/conftest.py b/tests/conftest.py index 4539fec2722..adc048e8473 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -688,8 +688,112 @@ def _enhance_speech(audio: np.ndarray) -> np.ndarray: return result -def generate_synthetic_video(width: int, height: int, num_frames: int, save_to_file: bool = False) -> dict[str, Any]: - """Generate synthetic video with bouncing balls and return base64 string.""" +def _mux_mp4_bytes_with_synthetic_audio( + video_mp4_bytes: bytes, + *, + num_frames: int, + fps: float = 30.0, + sample_rate: int = 48000, +) -> bytes: + """ + Mux a video-only MP4 with mono TTS audio from :func:`generate_synthetic_audio` (AAC). + + Audio length is at least the video duration in whole seconds (rounded up); ffmpeg + ``-shortest`` trims to the video when the WAV is longer. + + Uses ffmpeg from ``imageio_ffmpeg`` when available, else ``ffmpeg`` on PATH. + If TTS or mux fails, returns ``video_mp4_bytes`` unchanged. + + Mux subprocess does **not** use ``capture_output=True``: ffmpeg can block writing + to a full stderr pipe while :func:`subprocess.run` waits for exit (classic deadlock). + """ + duration_sec = num_frames / fps if fps > 0 else 0.0 + # generate_synthetic_audio(duration=int) uses at least 1s of buffer internally + duration_int = max(1, int(math.ceil(duration_sec))) + + try: + audio_result = generate_synthetic_audio( + duration=duration_int, + num_channels=1, + sample_rate=sample_rate, + save_to_file=False, + ) + audio_pcm = audio_result["np_array"] + except Exception as e: + logger.warning("Synthetic video: generate_synthetic_audio failed (%s); using video-only MP4.", e) + return video_mp4_bytes + + try: + import imageio_ffmpeg + + ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() + except Exception: + ffmpeg_exe = "ffmpeg" + + import tempfile + + try: + with tempfile.TemporaryDirectory(prefix="syn_vid_mux_") as tmp: + vid_path = os.path.join(tmp, "video.mp4") + wav_path = os.path.join(tmp, "audio.wav") + out_path = os.path.join(tmp, "out.mp4") + with open(vid_path, "wb") as f: + f.write(video_mp4_bytes) + sf.write(wav_path, audio_pcm, sample_rate, format="WAV", subtype="PCM_16") + cmd = [ + ffmpeg_exe, + "-y", + "-nostdin", + "-hide_banner", + "-loglevel", + "error", + "-i", + vid_path, + "-i", + wav_path, + "-c:v", + "copy", + "-c:a", + "aac", + "-b:a", + "128k", + "-shortest", + "-movflags", + "+faststart", + out_path, + ] + subprocess.run( + cmd, + check=True, + stdin=subprocess.DEVNULL, + timeout=300, + ) + with open(out_path, "rb") as f: + return f.read() + except ( + FileNotFoundError, + subprocess.CalledProcessError, + subprocess.TimeoutExpired, + OSError, + ) as e: + logger.warning("Synthetic video: audio mux failed (%s); using video-only MP4.", e) + return video_mp4_bytes + + +def generate_synthetic_video( + width: int, + height: int, + num_frames: int, + save_to_file: bool = False, + *, + embed_audio: bool = False, +) -> dict[str, Any]: + """Generate synthetic video with bouncing balls and base64 MP4. + + When ``embed_audio`` is True, muxes mono AAC from :func:`generate_synthetic_audio` + (TTS + ffmpeg) into the MP4; otherwise returns video-only MP4 (faster when tests do + not need an audio track). + """ import cv2 import imageio @@ -762,13 +866,13 @@ def generate_synthetic_video(width: int, height: int, num_frames: int, save_to_f result = { "np_array": video_array, } - video_bytes = None saved_file_path = None + fps = 30 buffer = io.BytesIO() writer_kwargs = { "format": "mp4", - "fps": 30, + "fps": fps, "codec": "libx264", "quality": 7, "pixelformat": "yuv420p", @@ -787,32 +891,31 @@ def generate_synthetic_video(width: int, height: int, num_frames: int, save_to_f ], } - if save_to_file: - import datetime + try: + with imageio.get_writer(buffer, **writer_kwargs) as writer: + for frame in video_frames: + writer.append_data(frame) + buffer.seek(0) + video_only_bytes = buffer.read() + except Exception as e: + print(f"Warning: Failed to encode synthetic video: {e}") + raise + if embed_audio: + video_bytes = _mux_mp4_bytes_with_synthetic_audio(video_only_bytes, num_frames=num_frames, fps=float(fps)) + else: + video_bytes = video_only_bytes + + if save_to_file: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") output_path = f"video_{width}x{height}_{timestamp}.mp4" try: - with imageio.get_writer(output_path, **writer_kwargs) as writer: - for frame in video_frames: - writer.append_data(frame) - + with open(output_path, "wb") as f: + f.write(video_bytes) saved_file_path = output_path print(f"Video saved to: {saved_file_path}") - with open(output_path, "rb") as f: - video_bytes = f.read() - except Exception as e: print(f"Warning: Failed to save video to file {output_path}: {e}") - save_to_file = False - - if not save_to_file or video_bytes is None: - with imageio.get_writer(buffer, **writer_kwargs) as writer: - for frame in video_frames: - writer.append_data(frame) - - buffer.seek(0) - video_bytes = buffer.read() base64_video = base64.b64encode(video_bytes).decode("utf-8") @@ -1262,10 +1365,12 @@ def delete_by_path(config_dict: dict, path: str) -> None: # Direct top-level key config[key] = value - # Save to new file with timestamp - timestamp = int(time.time()) + # Unique suffix: multiple modify_stage_config calls in one process often run + # within the same second (e.g. test_qwen3_omni_expansion imports both + # get_chunk_config and get_batch_token_config). int(time.time()) would collide + # and the later write would overwrite the earlier YAML on disk. base_name = yaml_path.rsplit(".", 1)[0] if "." in yaml_path else yaml_path - output_path = f"{base_name}_{timestamp}.yaml" + output_path = f"{base_name}_{time.time_ns()}.yaml" with open(output_path, "w", encoding="utf-8") as f: yaml.dump(config, f, default_flow_style=None, sort_keys=False, allow_unicode=True, indent=2) @@ -1733,9 +1838,11 @@ def assert_omni_response(response: OmniResponse, request_config: dict[str, Any], "The output does not contain any of the keywords." ) - # Verify similarity + # Verify similarity (Whisper transcript vs streamed/detokenized text) if "text" in modalities and "audio" in modalities: - assert response.similarity > 0.9, "The audio content is not same as the text" + assert response.similarity is not None and response.similarity > 0.9, ( + "The audio content is not same as the text" + ) print(f"similarity is: {response.similarity}") @@ -2135,7 +2242,11 @@ def send_omni_request(self, request_config: dict[str, Any], request_num: int = 1 Send OpenAI requests. Args: - request_config: Request configuration dictionary containing parameters like model, messages, stream + request_config: Request configuration dictionary containing parameters like model, messages, stream. + Optional ``use_audio_in_video`` (bool): when true, sets + ``extra_body["mm_processor_kwargs"] = {"use_audio_in_video": True}`` for Qwen-Omni video+audio + extraction (merged with any existing ``extra_body`` / ``mm_processor_kwargs``). + Optional ``extra_body`` (dict): passed through to ``chat.completions.create`` after merge. request_num: Number of requests, defaults to 1 (single request) Returns: @@ -2146,14 +2257,28 @@ def send_omni_request(self, request_config: dict[str, Any], request_num: int = 1 stream = request_config.get("stream", False) modalities = request_config.get("modalities", ["text", "audio"]) + extra_body: dict[str, Any] = {} + raw_extra = request_config.get("extra_body") + if raw_extra: + extra_body.update(raw_extra) + if request_config.get("use_audio_in_video"): + mm = dict(extra_body.get("mm_processor_kwargs") or {}) + mm["use_audio_in_video"] = True + extra_body["mm_processor_kwargs"] = mm + extra_body_arg: dict[str, Any] | None = extra_body if extra_body else None + + create_kwargs: dict[str, Any] = { + "model": request_config.get("model"), + "messages": request_config.get("messages"), + "stream": stream, + "modalities": modalities, + } + if extra_body_arg is not None: + create_kwargs["extra_body"] = extra_body_arg + if request_num == 1: # Send single request - chat_completion = self.client.chat.completions.create( - model=request_config.get("model"), - messages=request_config.get("messages"), - stream=stream, - modalities=modalities, - ) + chat_completion = self.client.chat.completions.create(**create_kwargs) if stream: response = self._process_stream_omni_response(chat_completion) diff --git a/tests/e2e/online_serving/test_qwen3_omni_expansion.py b/tests/e2e/online_serving/test_qwen3_omni_expansion.py index 6fb6a069ea4..4055ad42670 100644 --- a/tests/e2e/online_serving/test_qwen3_omni_expansion.py +++ b/tests/e2e/online_serving/test_qwen3_omni_expansion.py @@ -23,7 +23,7 @@ ) from tests.utils import hardware_test -models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"] +model = "Qwen/Qwen3-Omni-30B-A3B-Instruct" AUDIO_KEY = ["test"] IMAGE_KEY = ["square", "quadrate"] @@ -49,16 +49,32 @@ def get_chunk_config(default_path): return path +def get_batch_token_config(default_path): + path = modify_stage_config( + default_path, + updates={ + "stage_args": {1: {"engine_args.max_num_batched_tokens": 64}}, + }, + ) + return path + + # CI stage config for 2*H100-80G GPUs default_path = str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_ci.yaml") -stage_configs = [default_path, get_chunk_config(default_path)] if current_omni_platform.is_xpu(): - stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "xpu" / "qwen3_omni_ci.yaml")] + default_path = str(Path(__file__).parent.parent / "stage_configs" / "xpu" / "qwen3_omni_ci.yaml") # Create parameter combinations for model and stage config test_params = [ - OmniServerParams(model=model, stage_config_path=stage_config) for model in models for stage_config in stage_configs + pytest.param(OmniServerParams(model=model, stage_config_path=default_path), id="default"), + pytest.param(OmniServerParams(model=model, stage_config_path=get_chunk_config(default_path)), id="async_chunk"), +] + +test_token_params = [ + pytest.param( + OmniServerParams(model=model, stage_config_path=get_batch_token_config(default_path)), id="batch_token_64" + ) ] @@ -85,6 +101,8 @@ def get_prompt(prompt_type="text_only"): "text_video": "What is in this video? ", "text_image": "What is in this image? ", "text_audio": "What is in this audio? ", + "text_audio_video": "First, what is in this audio? Then, what is in this video? ", + "one_word": "What is the capital of UK? Answer in one word", } return prompts.get(prompt_type, prompts["text_only"]) @@ -121,7 +139,7 @@ def test_text_to_audio_001(omni_server, openai_client) -> None: @pytest.mark.advanced_model @pytest.mark.omni @hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) -@pytest.mark.parametrize("omni_server", test_params, indirect=True) +@pytest.mark.parametrize("omni_server", test_params + test_token_params, indirect=True) def test_text_to_text_audio_001(omni_server, openai_client) -> None: """ Input Modal: text @@ -288,7 +306,7 @@ def test_video_to_text_audio_001(omni_server, openai_client) -> None: @pytest.mark.advanced_model @pytest.mark.omni @hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) -@pytest.mark.parametrize("omni_server", test_params, indirect=True) +@pytest.mark.parametrize("omni_server", test_params + test_token_params, indirect=True) def test_text_audio_to_text_audio_001(omni_server, openai_client) -> None: """ Input Modal: text, audio @@ -313,7 +331,7 @@ def test_text_audio_to_text_audio_001(omni_server, openai_client) -> None: @pytest.mark.advanced_model @pytest.mark.omni @hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) -@pytest.mark.parametrize("omni_server", test_params, indirect=True) +@pytest.mark.parametrize("omni_server", test_params + test_token_params, indirect=True) def test_text_image_to_text_audio_001(omni_server, openai_client) -> None: """ Input Modal: text, image @@ -339,7 +357,7 @@ def test_text_image_to_text_audio_001(omni_server, openai_client) -> None: @pytest.mark.advanced_model @pytest.mark.omni @hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) -@pytest.mark.parametrize("omni_server", test_params, indirect=True) +@pytest.mark.parametrize("omni_server", test_params + test_token_params, indirect=True) def test_text_video_to_text_audio_001(omni_server, openai_client) -> None: """ Input Modal: text, video @@ -367,7 +385,7 @@ def test_text_video_to_text_audio_001(omni_server, openai_client) -> None: @pytest.mark.advanced_model @pytest.mark.omni @hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) -@pytest.mark.parametrize("omni_server", test_params, indirect=True) +@pytest.mark.parametrize("omni_server", test_params + test_token_params, indirect=True) def test_mix_to_text_audio_001(omni_server, openai_client) -> None: """ Input Modal: text, audio, image, video @@ -393,3 +411,106 @@ def test_mix_to_text_audio_001(omni_server, openai_client) -> None: "key_words": {"audio": AUDIO_KEY, "image": IMAGE_KEY, "video": VIDEO_KEY}, } openai_client.send_omni_request(request_config, request_num=get_max_batch_size()) + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +def test_audio_in_video_001(omni_server, openai_client) -> None: + """ + Input Modal: text + video (synthetic MP4 with embedded audio; ``use_audio_in_video`` uses audio from the video). + Output Modal: text, audio + Input Setting: stream=False + Datasets: single request + """ + video_data_url = f"data:video/mp4;base64,{generate_synthetic_video(224, 224, 300, embed_audio=True)['base64']}" + messages = dummy_messages_from_mix_data( + system_prompt=get_system_prompt(), + video_data_url=video_data_url, + content_text=get_prompt("text_audio_video"), + ) + + request_config = { + "model": omni_server.model, + "messages": messages, + "stream": False, + "use_audio_in_video": True, + "key_words": {"video": VIDEO_KEY, "audio": AUDIO_KEY + ["beep", "electronic"]}, + } + openai_client.send_omni_request(request_config) + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +def test_audio_in_video_002(omni_server, openai_client) -> None: + """ + Input Modal: text + video (synthetic MP4 with embedded audio; ``use_audio_in_video`` uses audio from the video). + Output Modal: text, audio + Input Setting: stream=True + Datasets: few requests + """ + video_data_url = f"data:video/mp4;base64,{generate_synthetic_video(224, 224, 300, embed_audio=True)['base64']}" + messages = dummy_messages_from_mix_data( + system_prompt=get_system_prompt(), + video_data_url=video_data_url, + content_text=get_prompt("text_audio_video"), + ) + + request_config = { + "model": omni_server.model, + "messages": messages, + "stream": True, + "use_audio_in_video": True, + "key_words": {"video": VIDEO_KEY, "audio": AUDIO_KEY + ["beep", "electronic"]}, + } + + # Retry when assert_omni_response fails on key_words (see tests/conftest.py). + _keyword_assert_msg = "The output does not contain any of the keywords." + _max_retries = 3 + for attempt in range(_max_retries): + try: + openai_client.send_omni_request(request_config, request_num=get_max_batch_size()) + break + except AssertionError as e: + if _keyword_assert_msg not in str(e) or attempt == _max_retries - 1: + raise + print(f"Keyword assertion failed, retrying {attempt + 2}/{_max_retries}: {e!r}") + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +def test_one_word_prompt_001(omni_server, openai_client) -> None: + """ + Input Modal: text only (one-word answer constraint). + Output Modal: text, audio (default ``modalities``); ``key_words`` only assert on text. + Input Setting: stream=True + Datasets: single request + """ + messages = dummy_messages_from_mix_data( + system_prompt=get_system_prompt(), + content_text=get_prompt("one_word"), + ) + + request_config = { + "model": omni_server.model, + "messages": messages, + "stream": True, + "key_words": {"text": ["london"]}, + } + + # Retry only when assert_omni_response fails on text/audio cosine similarity (see tests/conftest.py). + _similarity_assert_msg = "The audio content is not same as the text" + _max_retries = 3 + for attempt in range(_max_retries): + try: + openai_client.send_omni_request(request_config, request_num=get_max_batch_size()) + break + except AssertionError as e: + if _similarity_assert_msg not in str(e) or attempt == _max_retries - 1: + raise + print(f"Similarity assertion failed, retrying {attempt + 2}/{_max_retries}: {e!r}") diff --git a/tests/e2e/stage_configs/qwen3_omni_ci.yaml b/tests/e2e/stage_configs/qwen3_omni_ci.yaml index 8b08bbb5e7f..08dd49de953 100644 --- a/tests/e2e/stage_configs/qwen3_omni_ci.yaml +++ b/tests/e2e/stage_configs/qwen3_omni_ci.yaml @@ -33,7 +33,7 @@ stage_args: temperature: 0.4 top_p: 0.9 top_k: 1 - max_tokens: 100 + max_tokens: 150 seed: 42 ignore_eos: False detokenize: True @@ -74,7 +74,7 @@ stage_args: devices: "1" engine_args: model_stage: code2wav - max_num_seqs: 1 + max_num_seqs: 5 model_arch: Qwen3OmniMoeForConditionalGeneration worker_type: generation scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler