diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index e7e30b3cdc3..c05c52732d7 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -17,14 +17,6 @@ steps: agents: queue: "cpu_queue_premerge" - # - label: "Test on NPU" - # depends_on: ~ - # key: npu-test - # commands: - # - ".buildkite/scripts/hardware_ci/run_npu_test.sh" - # agents: - # queue: "ascend" - - label: "Simple Unit Test" depends_on: image-build commands: @@ -182,12 +174,13 @@ steps: - "/fsx/hf_cache:/fsx/hf_cache" - - label: "Benchmark Test" + - label: "Benchmark&Engine Test" timeout_in_minutes: 15 depends_on: image-build commands: - export VLLM_WORKER_MULTIPROC_METHOD=spawn - pytest -s -v tests/benchmarks/test_serve_cli.py + - pytest -s -v tests/engine/test_async_omni_engine_abort.py agents: queue: "mithril-h100-pool" plugins: @@ -236,6 +229,43 @@ steps: volumes: - "/fsx/hf_cache:/fsx/hf_cache" + - label: "Omni Model Test with H100" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - export VLLM_WORKER_MULTIPROC_METHOD=spawn + - export VLLM_TEST_CLEAN_GPU_MEMORY="1" + - pytest -s -v tests/e2e/offline_inference/test_qwen3_omni.py + - pytest -s -v tests/e2e/online_serving/test_qwen3_omni.py -m "core_model" --run-level "core_model" + agents: + queue: "mithril-h100-pool" + plugins: + - kubernetes: + podSpec: + containers: + - image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + resources: + limits: + nvidia.com/gpu: 2 + volumeMounts: + - name: devshm + mountPath: /dev/shm + - name: hf-cache + mountPath: /root/.cache/huggingface + env: + - name: HF_HOME + value: /root/.cache/huggingface + nodeSelector: + node.kubernetes.io/instance-type: gpu-h100-sxm + volumes: + - name: devshm + emptyDir: + medium: Memory + - name: hf-cache + hostPath: + path: /mnt/hf-cache + type: DirectoryOrCreate + - label: "Qwen3-TTS E2E Test" timeout_in_minutes: 10 depends_on: image-build @@ -256,44 +286,6 @@ steps: volumes: - "/fsx/hf_cache:/fsx/hf_cache" - # - label: "Omni Model Test with H100" - # timeout_in_minutes: 30 - # depends_on: image-build - # commands: - # - export VLLM_WORKER_MULTIPROC_METHOD=spawn - # - export VLLM_TEST_CLEAN_GPU_MEMORY="1" - # - pytest -s -v tests/e2e/offline_inference/test_qwen3_omni.py - # - pytest -s -v tests/e2e/online_serving/test_qwen3_omni.py - # - pytest -s -v tests/e2e/online_serving/test_async_omni.py - # agents: - # queue: "mithril-h100-pool" - # plugins: - # - kubernetes: - # podSpec: - # containers: - # - image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT - # resources: - # limits: - # nvidia.com/gpu: 2 - # volumeMounts: - # - name: devshm - # mountPath: /dev/shm - # - name: hf-cache - # mountPath: /root/.cache/huggingface - # env: - # - name: HF_HOME - # value: /root/.cache/huggingface - # nodeSelector: - # node.kubernetes.io/instance-type: gpu-h100-sxm - # volumes: - # - name: devshm - # emptyDir: - # medium: Memory - # - name: hf-cache - # hostPath: - # path: /mnt/hf-cache - # type: DirectoryOrCreate - - label: "Diffusion Image Edit Test with H100 (1 GPU)" timeout_in_minutes: 20 depends_on: image-build diff --git a/.buildkite/test-amd.yaml b/.buildkite/test-amd.yaml index 5b4cedd3fd9..07faba874ed 100644 --- a/.buildkite/test-amd.yaml +++ b/.buildkite/test-amd.yaml @@ -101,7 +101,7 @@ steps: - export VLLM_TEST_CLEAN_GPU_MEMORY="1" - pytest -s -v tests/e2e/offline_inference/test_qwen3_omni.py - pytest -s -v tests/e2e/online_serving/test_qwen3_omni.py - - pytest -s -v tests/e2e/online_serving/test_async_omni.py + - pytest -s -v tests/engine/test_async_omni_engine_abort.py - label: "Diffusion Image Edit Test" timeout_in_minutes: 15 diff --git a/.buildkite/test-merge.yml b/.buildkite/test-merge.yml new file mode 100644 index 00000000000..8e380ad6df6 --- /dev/null +++ b/.buildkite/test-merge.yml @@ -0,0 +1,329 @@ +steps: + - label: ":docker: Build image" + key: image-build + commands: + - "aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws/q9t5s3a7" + - "docker build --file docker/Dockerfile.ci -t vllm-omni-ci ." + - "docker tag vllm-omni-ci public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT" + - "docker push public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT" + agents: + queue: "cpu_queue_premerge" + + - label: "Simple Unit Test" + depends_on: image-build + commands: + - "pytest -v -s -m 'core_model and cpu' --cov=vllm_omni --cov-branch --cov-report=term-missing --cov-report=html --cov-report=xml" + agents: + queue: "gpu_1_queue" + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Diffusion Model Test" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - pytest -s -v tests/e2e/offline_inference/test_t2i_model.py + agents: + queue: "gpu_1_queue" # g6.4xlarge instance on AWS, has 1 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Diffusion Images API LoRA E2E" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - pytest -s -v tests/e2e/online_serving/test_images_generations_lora.py + agents: + queue: "gpu_1_queue" # g6.4xlarge instance on AWS, has 1 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Diffusion Model CPU offloading Test" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - pytest -s -v tests/e2e/offline_inference/test_diffusion_cpu_offload.py + - pytest -s -v tests/e2e/offline_inference/test_diffusion_layerwise_offload.py + agents: + queue: "gpu_1_queue" # g6.4xlarge instance on AWS, has 1 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Audio Generation Model Test" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - pytest -s -v tests/e2e/offline_inference/test_stable_audio_model.py + agents: + queue: "gpu_1_queue" # g6.4xlarge instance on AWS, has 1 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Diffusion Cache Backend Test" + timeout_in_minutes: 15 + depends_on: image-build + commands: + - pytest -s -v tests/e2e/offline_inference/test_cache_dit.py tests/e2e/offline_inference/test_teacache.py + agents: + queue: "gpu_1_queue" # g6.4xlarge instance on AWS, has 1 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Diffusion Sequence Parallelism Test" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - pytest -s -v tests/e2e/offline_inference/test_sequence_parallel.py + agents: + queue: "gpu_4_queue" # g6.12xlarge instance on AWS, has 4 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + shm-size: "8gb" + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Diffusion Tensor Parallelism Test" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - pytest -s -v tests/e2e/offline_inference/test_zimage_tensor_parallel.py + agents: + queue: "gpu_4_queue" # g6.12xlarge instance on AWS, has 4 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + shm-size: "8gb" + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Diffusion GPU Worker Test" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - pytest -s -v tests/diffusion/test_diffusion_worker.py + agents: + queue: "gpu_4_queue" # g6.12xlarge instance on AWS, has 4 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + shm-size: "8gb" + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + + - label: "Benchmark&Engine Test" + timeout_in_minutes: 15 + depends_on: image-build + commands: + - export VLLM_WORKER_MULTIPROC_METHOD=spawn + - pytest -s -v tests/benchmarks/test_serve_cli.py + - pytest -s -v tests/engine/test_async_omni_engine_abort.py + agents: + queue: "mithril-h100-pool" + plugins: + - kubernetes: + podSpec: + containers: + - image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + resources: + limits: + nvidia.com/gpu: 2 + volumeMounts: + - name: devshm + mountPath: /dev/shm + - name: hf-cache + mountPath: /root/.cache/huggingface + env: + - name: HF_HOME + value: /root/.cache/huggingface + nodeSelector: + node.kubernetes.io/instance-type: gpu-h100-sxm + volumes: + - name: devshm + emptyDir: + medium: Memory + - name: hf-cache + hostPath: + path: /mnt/hf-cache + type: DirectoryOrCreate + + - label: "Omni Model Test" + timeout_in_minutes: 15 + depends_on: image-build + commands: + - export VLLM_LOGGING_LEVEL=DEBUG + - export VLLM_WORKER_MULTIPROC_METHOD=spawn + - pytest -s -v tests/e2e/offline_inference/test_qwen2_5_omni.py + agents: + queue: "gpu_4_queue" # g6.12xlarge instance on AWS, has 4 L4 GPU + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Omni Model Test with H100" + timeout_in_minutes: 30 + depends_on: image-build + commands: + - export VLLM_WORKER_MULTIPROC_METHOD=spawn + - export VLLM_TEST_CLEAN_GPU_MEMORY="1" + - pytest -s -v tests/e2e/offline_inference/test_qwen3_omni.py + - pytest -s -v tests/e2e/online_serving/test_qwen3_omni.py -m "advanced_model" --run-level "advanced_model" + agents: + queue: "mithril-h100-pool" + plugins: + - kubernetes: + podSpec: + containers: + - image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + resources: + limits: + nvidia.com/gpu: 2 + volumeMounts: + - name: devshm + mountPath: /dev/shm + - name: hf-cache + mountPath: /root/.cache/huggingface + env: + - name: HF_HOME + value: /root/.cache/huggingface + nodeSelector: + node.kubernetes.io/instance-type: gpu-h100-sxm + volumes: + - name: devshm + emptyDir: + medium: Memory + - name: hf-cache + hostPath: + path: /mnt/hf-cache + type: DirectoryOrCreate + + - label: "Diffusion Image Edit Test with H100 (1 GPU)" + timeout_in_minutes: 20 + depends_on: image-build + commands: + - export VLLM_WORKER_MULTIPROC_METHOD=spawn + - pytest -s -v tests/e2e/online_serving/test_image_gen_edit.py + agents: + queue: "mithril-h100-pool" + plugins: + - kubernetes: + podSpec: + containers: + - image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + resources: + limits: + nvidia.com/gpu: 1 + volumeMounts: + - name: devshm + mountPath: /dev/shm + - name: hf-cache + mountPath: /root/.cache/huggingface + env: + - name: HF_HOME + value: /root/.cache/huggingface + nodeSelector: + node.kubernetes.io/instance-type: gpu-h100-sxm + volumes: + - name: devshm + emptyDir: + medium: Memory + - name: hf-cache + hostPath: + path: /mnt/hf-cache + type: DirectoryOrCreate + + # - label: "Bagel Text2Img Model Test with H100" + # timeout_in_minutes: 30 + # depends_on: image-build + # commands: + # - export VLLM_WORKER_MULTIPROC_METHOD=spawn + # - pytest -s -v tests/e2e/offline_inference/test_bagel_text2img.py + # agents: + # queue: "mithril-h100-pool" + # plugins: + # - kubernetes: + # podSpec: + # containers: + # - image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + # resources: + # limits: + # nvidia.com/gpu: 1 + # volumeMounts: + # - name: devshm + # mountPath: /dev/shm + # - name: hf-cache + # mountPath: /root/.cache/huggingface + # env: + # - name: HF_HOME + # value: /root/.cache/huggingface + # nodeSelector: + # node.kubernetes.io/instance-type: gpu-h100-sxm + # volumes: + # - name: devshm + # emptyDir: + # medium: Memory + # - name: hf-cache + # hostPath: + # path: /mnt/hf-cache + # type: DirectoryOrCreate diff --git a/pyproject.toml b/pyproject.toml index 6af1939536c..b2293da7cf9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -147,7 +147,8 @@ addopts = [ ] markers = [ # ci/cd required - "core_model: Core model tests (run in each PR)", + "core_model: L1&L2 tests (run in each PR)", + "advanced_model: L3&L4 level tests (run in each merge or nightly)", # function module markers "diffusion: Diffusion model tests", "omni: Omni model tests", diff --git a/tests/conftest.py b/tests/conftest.py index 98a21d43cf6..840494d0894 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,11 +10,14 @@ if "VLLM_TARGET_DEVICE" not in os.environ: os.environ["VLLM_TARGET_DEVICE"] = "cpu" +import concurrent.futures import gc import socket import subprocess import sys +import threading import time +from dataclasses import dataclass from pathlib import Path from typing import Any @@ -23,12 +26,22 @@ import pytest import torch import yaml +from openai import OpenAI +from vllm import TextPrompt from vllm.distributed.parallel_state import cleanup_dist_env_and_memory from vllm.logger import init_logger from vllm.utils.network_utils import get_open_port +from vllm_omni.entrypoints.omni import Omni +from vllm_omni.inputs.data import OmniSamplingParams +from vllm_omni.outputs import OmniRequestOutput + logger = init_logger(__name__) +PromptAudioInput = list[tuple[Any, int]] | tuple[Any, int] | None +PromptImageInput = list[Any] | Any | None +PromptVideoInput = list[Any] | Any | None + @pytest.fixture(autouse=True) def default_vllm_config(): @@ -292,6 +305,11 @@ def generate_synthetic_audio( if max_amp > 0: audio_data = audio_data / max_amp * 0.8 + audio_array = audio_data.copy() + result = { + "np_array": audio_array, + } + # Handle file saving audio_bytes = None @@ -318,16 +336,14 @@ def generate_synthetic_audio( # Return result base64_audio = base64.b64encode(audio_bytes).decode("utf-8") - result = { - "base64": base64_audio, - } + result["base64"] = base64_audio if save_to_file and output_path: result["file_path"] = output_path return result -def generate_synthetic_video(width: int, height: int, num_frames: int, save_to_file: bool = False) -> str: +def generate_synthetic_video(width: int, height: int, num_frames: int, save_to_file: bool = False) -> dict[str, Any]: """Generate synthetic video with bouncing balls and return base64 string.""" import cv2 @@ -397,6 +413,10 @@ def generate_synthetic_video(width: int, height: int, num_frames: int, save_to_f frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) video_frames.append(frame_rgb) + video_array = np.array(video_frames) + result = { + "np_array": video_array, + } video_bytes = None saved_file_path = None @@ -451,16 +471,14 @@ def generate_synthetic_video(width: int, height: int, num_frames: int, save_to_f base64_video = base64.b64encode(video_bytes).decode("utf-8") - result = { - "base64": base64_video, - } + result["base64"] = base64_video if save_to_file and saved_file_path: result["file_path"] = saved_file_path return result -def generate_synthetic_image(width: int, height: int, save_to_file: bool = False) -> Any: +def generate_synthetic_image(width: int, height: int, save_to_file: bool = False) -> dict[str, Any]: """Generate synthetic image with randomly colored squares and return base64 string.""" from PIL import Image, ImageDraw @@ -488,6 +506,9 @@ def generate_synthetic_image(width: int, height: int, save_to_file: bool = False # Draw square draw.rectangle([x, y, x + square_size, y + square_size], fill=color, outline=(0, 0, 0), width=border_width) + image_array = np.array(image) + result = {"np_array": image_array.copy()} + # Handle file saving image_bytes = None saved_file_path = None @@ -521,9 +542,7 @@ def generate_synthetic_image(width: int, height: int, save_to_file: bool = False base64_image = base64.b64encode(image_bytes).decode("utf-8") # Return result - result = { - "base64": base64_image, - } + result["base64"] = base64_image if save_to_file and saved_file_path: result["file_path"] = saved_file_path @@ -594,7 +613,6 @@ def convert_audio_to_text(audio_data): audio_file.write(audio_data) print(f"audio data is saved: {output_path}") - text = convert_audio_file_to_text(output_path=output_path) return text @@ -602,7 +620,7 @@ def convert_audio_to_text(audio_data): def convert_audio_file_to_text(output_path): import whisper - model = whisper.load_model("base") + model = whisper.load_model("small") text = model.transcribe( output_path, temperature=0.0, @@ -641,7 +659,7 @@ def merge_base64_and_convert_to_text(base64_list): def modify_stage_config( yaml_path: str, - updates: dict[str, Any], + updates: dict[str, Any] = None, deletes: dict[str, Any] = None, ) -> str: """ @@ -792,7 +810,7 @@ def delete_by_path(config_dict: dict, path: str) -> None: elif isinstance(current, dict) and last_key in current: del current[last_key] else: - raise KeyError(f"Path {path} does not exist") + print(f"Path {path} does not exist") # Apply deletions first if deletes: @@ -829,42 +847,43 @@ def delete_by_path(config_dict: dict, path: str) -> None: # Delete entire key del config[key] - # Apply updates - for key, value in updates.items(): - if key == "stage_args": - if value and isinstance(value, dict): - stage_args = config.get("stage_args", []) - if not stage_args: - raise ValueError("stage_args does not exist in config") - - for stage_id, stage_updates in value.items(): - # Find stage by ID - target_stage = None - for stage in stage_args: - if stage.get("stage_id") == stage_id: - target_stage = stage - break - - if target_stage is None: - available_ids = [s.get("stage_id") for s in stage_args if "stage_id" in s] - raise KeyError(f"Stage ID {stage_id} not found, available: {available_ids}") - - # Apply updates to this stage - for path, val in stage_updates.items(): - # Check if this is a simple key (not dot-separated) - # Example: 'engine_input_source' vs 'engine_args.max_model_len' - if "." not in path: - # Direct key assignment (e.g., updating a list value) - target_stage[path] = val - else: - # Dot-separated path (e.g., nested dict access) - apply_update(target_stage, path, val) - elif "." in key: - # Apply using dot-separated path - apply_update(config, key, value) - else: - # Direct top-level key - config[key] = value + if updates: + # Apply updates + for key, value in updates.items(): + if key == "stage_args": + if value and isinstance(value, dict): + stage_args = config.get("stage_args", []) + if not stage_args: + raise ValueError("stage_args does not exist in config") + + for stage_id, stage_updates in value.items(): + # Find stage by ID + target_stage = None + for stage in stage_args: + if stage.get("stage_id") == stage_id: + target_stage = stage + break + + if target_stage is None: + available_ids = [s.get("stage_id") for s in stage_args if "stage_id" in s] + raise KeyError(f"Stage ID {stage_id} not found, available: {available_ids}") + + # Apply updates to this stage + for path, val in stage_updates.items(): + # Check if this is a simple key (not dot-separated) + # Example: 'engine_input_source' vs 'engine_args.max_model_len' + if "." not in path: + # Direct key assignment (e.g., updating a list value) + target_stage[path] = val + else: + # Dot-separated path (e.g., nested dict access) + apply_update(target_stage, path, val) + elif "." in key: + # Apply using dot-separated path + apply_update(config, key, value) + else: + # Direct top-level key + config[key] = value # Save to new file with timestamp timestamp = int(time.time()) @@ -1012,3 +1031,641 @@ def __exit__(self, exc_type, exc_val, exc_tb): _run_pre_test_cleanup(enable_force=True) _run_post_test_cleanup(enable_force=True) cleanup_dist_env_and_memory() + + +def pytest_addoption(parser): + parser.addoption( + "--run-level", + action="store", + default="core_model", + choices=["core_model", "advanced_model"], + help="Test level to run: L2, L3", + ) + + +@pytest.fixture(scope="session") +def run_level(request): + return request.config.getoption("--run-level") + + +_omni_server_lock = threading.Lock() + + +@pytest.fixture(scope="module") +def omni_server(request, run_level): + """Start vLLM-Omni server as a subprocess with actual model weights. + Uses session scope so the server starts only once for the entire test session. + Multi-stage initialization can take 10-20+ minutes. + """ + with _omni_server_lock: + model, stage_config_path = request.param + if run_level == "advanced_model": + stage_config_path = modify_stage_config( + stage_config_path, + deletes={ + "stage_args": { + 0: ["engine_args.load_format"], + 1: ["engine_args.load_format"], + 2: ["engine_args.load_format"], + } + }, + ) + + with OmniServer(model, ["--stage-configs-path", stage_config_path, "--stage-init-timeout", "120"]) as server: + print("OmniServer started successfully") + yield server + print("OmniServer stopping...") + + print("OmniServer stopped") + + +@dataclass +class OmniResponse: + text_content: str | None = None + audio_data: list[str] | None = None + audio_content: str | None = None + similarity: float | None = None + e2e_latency: float | None = None + success: bool = False + error_message: str | None = None + + +def assert_omni_response(response: OmniResponse, request_config: dict[str, Any], run_level): + """ + Validate response results. + + Args: + response: OmniResponse object + + Raises: + AssertionError: When the response does not meet validation criteria + """ + assert response.success, "The request failed." + e2e_latency = response.e2e_latency + if e2e_latency is not None: + print(f"the avg e2e is: {e2e_latency}") + + modalities = request_config.get("modalities", ["text", "audio"]) + + if "audio" in modalities: + assert response.audio_content is not None, "No audio output is generated" + print(f"audio content is: {response.audio_content}") + + if "text" in modalities: + assert response.text_content is not None, "No text output is generated" + print(f"text content is: {response.text_content}") + + if run_level == "advanced_model": + # Verify image description + word_types = ["text", "image", "audio", "video"] + keywords_dict = request_config.get("key_words", {}) + for word_type in word_types: + keywords = keywords_dict.get(word_type) + if "text" in modalities: + if keywords: + assert any(keyword in response.text_content.lower() for keyword in keywords), ( + "The output does not contain any of the keywords." + ) + else: + if keywords: + assert any(keyword in response.audio_content.lower() for keyword in keywords), ( + "The output does not contain any of the keywords." + ) + + # Verify similarity + if "text" in modalities and "audio" in modalities: + assert response.similarity > 0.9, "The audio content is not same as the text" + print(f"similarity is: {response.similarity}") + + +class OpenAIClientHandler: + """ + OpenAI client handler class, encapsulating both streaming and non-streaming response processing logic. + + This class integrates OpenAI API request sending, response handling, and validation functionality, + supporting both single request and concurrent request modes. + """ + + def __init__( + self, host: str = "127.0.0.1", port: int = get_open_port(), api_key: str = "EMPTY", run_level: str = None + ): + """ + Initialize the OpenAI client. + + Args: + host: vLLM-Omni server host address + port: vLLM-Omni server port + api_key: API key (defaults to "EMPTY") + """ + self.client = OpenAI(base_url=f"http://{host}:{port}/v1", api_key=api_key) + self.run_level = run_level + + def _process_stream_response(self, chat_completion) -> OmniResponse: + """ + Process streaming responses. + + Args: + chat_completion: OpenAI streaming response object + request_config: Request configuration dictionary + + Returns: + OmniResponse: Processed response object + """ + result = OmniResponse() + start_time = time.perf_counter() + + try: + text_content = "" + audio_data = [] + + for chunk in chat_completion: + for choice in chunk.choices: + # Get content data + if hasattr(choice, "delta"): + content = getattr(choice.delta, "content", None) + else: + content = None + + # Get modality type + modality = getattr(chunk, "modality", None) + + # Process content based on modality type + if modality == "audio" and content: + audio_data.append(content) + elif modality == "text" and content: + text_content += content if content else "" + + # Calculate end-to-end latency + result.e2e_latency = time.perf_counter() - start_time + + # Process audio and text content + audio_content = None + similarity = None + + if audio_data or text_content: + if audio_data: + audio_content = merge_base64_and_convert_to_text(audio_data) + if audio_content and text_content: + similarity = cosine_similarity_text(audio_content.lower(), text_content.lower()) + + # Populate result object + result.text_content = text_content + result.audio_data = audio_data + result.audio_content = audio_content + result.similarity = similarity + result.success = True + + except Exception as e: + result.error_message = f"Stream processing error: {str(e)}" + print(f"Error: {result.error_message}") + + return result + + def _process_non_stream_response(self, chat_completion) -> OmniResponse: + """ + Process non-streaming responses. + + Args: + chat_completion: OpenAI non-streaming response object + request_config: Request configuration dictionary + + Returns: + OmniResponse: Processed response object + """ + result = OmniResponse() + start_time = time.perf_counter() + + try: + audio_data = None + text_content = None + + # Iterate through all choices + for choice in chat_completion.choices: + # Process audio data + if hasattr(choice.message, "audio") and choice.message.audio is not None: + audio_message = choice.message + audio_data = audio_message.audio.data + + # Process text content + if hasattr(choice.message, "content") and choice.message.content is not None: + text_content = choice.message.content + + # Calculate end-to-end latency + result.e2e_latency = time.perf_counter() - start_time + + # Process audio and text content + audio_content = None + similarity = None + + if audio_data or text_content: + if audio_data: + audio_content = convert_audio_to_text(audio_data) + if audio_content and text_content: + similarity = cosine_similarity_text(audio_content.lower(), text_content.lower()) + + # Populate result object + result.text_content = text_content + result.audio_content = audio_content + result.similarity = similarity + result.success = True + + except Exception as e: + result.error_message = f"Non-stream processing error: {str(e)}" + print(f"Error: {result.error_message}") + + return result + + def send_request(self, request_config: dict[str, Any], request_num: int = 1) -> list[OmniResponse]: + """ + Send OpenAI requests. + + Args: + request_config: Request configuration dictionary containing parameters like model, messages, stream + request_num: Number of requests, defaults to 1 (single request) + + Returns: + List[OmniResponse]: List of response objects + """ + + responses = [] + stream = request_config.get("stream", False) + + if request_num == 1: + # Send single request + chat_completion = self.client.chat.completions.create( + model=request_config.get("model"), messages=request_config.get("messages"), stream=stream + ) + + if stream: + response = self._process_stream_response(chat_completion) + else: + response = self._process_non_stream_response(chat_completion) + + assert_omni_response(response, request_config, run_level=self.run_level) + responses.append(response) + + else: + # Send concurrent requests + with concurrent.futures.ThreadPoolExecutor(max_workers=request_num) as executor: + futures = [] + + # Submit all request tasks + for _ in range(request_num): + future = executor.submit( + self.client.chat.completions.create, + model=request_config.get("model"), + messages=request_config.get("messages"), + stream=stream, + ) + futures.append(future) + + # Process completed tasks + for future in concurrent.futures.as_completed(futures): + chat_completion = future.result() + + if stream: + response = self._process_stream_response(chat_completion) + else: + response = self._process_non_stream_response(chat_completion) + + assert_omni_response(response, request_config, run_level=self.run_level) + responses.append(response) + + return responses + + +@pytest.fixture +def openai_client(omni_server, run_level): + """Create OpenAIClientHandler fixture""" + return OpenAIClientHandler(host=omni_server.host, port=omni_server.port, api_key="EMPTY", run_level=run_level) + + +class OmniRunner: + """ + Test runner for Omni models. + """ + + def __init__( + self, + model_name: str, + seed: int = 42, + stage_init_timeout: int = 300, + batch_timeout: int = 10, + init_timeout: int = 300, + shm_threshold_bytes: int = 65536, + log_stats: bool = False, + stage_configs_path: str | None = None, + **kwargs, + ) -> None: + """ + Initialize an OmniRunner for testing. + + Args: + model_name: The model name or path + seed: Random seed for reproducibility + stage_init_timeout: Timeout for initializing a single stage in seconds + batch_timeout: Timeout for batching in seconds + init_timeout: Timeout for initializing stages in seconds + shm_threshold_bytes: Threshold for using shared memory + log_stats: Enable detailed statistics logging + stage_configs_path: Optional path to YAML stage config file + **kwargs: Additional arguments passed to Omni + """ + cleanup_dist_env_and_memory() + _run_pre_test_cleanup(enable_force=True) + _run_post_test_cleanup(enable_force=True) + self.model_name = model_name + self.seed = seed + + self.omni = Omni( + model=model_name, + log_stats=log_stats, + stage_init_timeout=stage_init_timeout, + batch_timeout=batch_timeout, + init_timeout=init_timeout, + shm_threshold_bytes=shm_threshold_bytes, + stage_configs_path=stage_configs_path, + **kwargs, + ) + + def get_default_sampling_params_list(self) -> list[OmniSamplingParams]: + """ + Get a list of default sampling parameters for all stages. + + Returns: + List of SamplingParams with default decoding for each stage + """ + return [st.default_sampling_params for st in self.omni.stage_list] + + def get_omni_inputs( + self, + prompts: list[str] | str, + system_prompt: str | None = None, + audios: PromptAudioInput = None, + images: PromptImageInput = None, + videos: PromptVideoInput = None, + mm_processor_kwargs: dict[str, Any] | None = None, + modalities: list[str] | None = None, + ) -> list[TextPrompt]: + """ + Construct Omni input format from prompts and multimodal data. + + Args: + prompts: Text prompt(s) - either a single string or list of strings + system_prompt: Optional system prompt (defaults to Qwen system prompt) + audios: Audio input(s) - tuple of (audio_array, sample_rate) or list of tuples + images: Image input(s) - PIL Image or list of PIL Images + videos: Video input(s) - numpy array or list of numpy arrays + mm_processor_kwargs: Optional processor kwargs (e.g., use_audio_in_video) + + Returns: + List of prompt dictionaries suitable for Omni.generate() + """ + if system_prompt is None: + system_prompt = ( + "You are Qwen, a virtual human developed by the Qwen Team, Alibaba " + "Group, capable of perceiving auditory and visual inputs, as well as " + "generating text and speech." + ) + + video_padding_token = "<|VIDEO|>" + image_padding_token = "<|IMAGE|>" + audio_padding_token = "<|AUDIO|>" + + if "Qwen3-Omni-30B-A3B-Instruct" in self.model_name: + video_padding_token = "<|video_pad|>" + image_padding_token = "<|image_pad|>" + audio_padding_token = "<|audio_pad|>" + + if isinstance(prompts, str): + prompts = [prompts] + + def _normalize_mm_input(mm_input, num_prompts): + if mm_input is None: + return [None] * num_prompts + if isinstance(mm_input, list): + if len(mm_input) != num_prompts: + raise ValueError( + f"Multimodal input list length ({len(mm_input)}) must match prompts length ({num_prompts})" + ) + return mm_input + return [mm_input] * num_prompts + + num_prompts = len(prompts) + audios_list = _normalize_mm_input(audios, num_prompts) + images_list = _normalize_mm_input(images, num_prompts) + videos_list = _normalize_mm_input(videos, num_prompts) + + omni_inputs = [] + for i, prompt_text in enumerate(prompts): + user_content = "" + multi_modal_data = {} + + audio = audios_list[i] + if audio is not None: + if isinstance(audio, list): + for _ in audio: + user_content += f"<|audio_bos|>{audio_padding_token}<|audio_eos|>" + multi_modal_data["audio"] = audio + else: + user_content += f"<|audio_bos|>{audio_padding_token}<|audio_eos|>" + multi_modal_data["audio"] = audio + + image = images_list[i] + if image is not None: + if isinstance(image, list): + for _ in image: + user_content += f"<|vision_bos|>{image_padding_token}<|vision_eos|>" + multi_modal_data["image"] = image + else: + user_content += f"<|vision_bos|>{image_padding_token}<|vision_eos|>" + multi_modal_data["image"] = image + + video = videos_list[i] + if video is not None: + if isinstance(video, list): + for _ in video: + user_content += f"<|vision_bos|>{video_padding_token}<|vision_eos|>" + multi_modal_data["video"] = video + else: + user_content += f"<|vision_bos|>{video_padding_token}<|vision_eos|>" + multi_modal_data["video"] = video + + user_content += prompt_text + + full_prompt = ( + f"<|im_start|>system\n{system_prompt}<|im_end|>\n" + f"<|im_start|>user\n{user_content}<|im_end|>\n" + f"<|im_start|>assistant\n" + ) + + input_dict: TextPrompt = {"prompt": full_prompt} + if multi_modal_data: + input_dict["multi_modal_data"] = multi_modal_data + if modalities: + input_dict["modalities"] = modalities + if mm_processor_kwargs: + input_dict["mm_processor_kwargs"] = mm_processor_kwargs + + omni_inputs.append(input_dict) + + return omni_inputs + + def generate( + self, + prompts: list[TextPrompt], + sampling_params_list: list[OmniSamplingParams] | None = None, + ) -> list[OmniRequestOutput]: + """ + Generate outputs for the given prompts. + + Args: + prompts: List of prompt dictionaries with 'prompt' and optionally + 'multi_modal_data' keys + sampling_params_list: List of sampling parameters for each stage. + If None, uses default parameters. + + Returns: + List of OmniRequestOutput objects from stages with final_output=True + """ + if sampling_params_list is None: + sampling_params_list = self.get_default_sampling_params_list() + + return self.omni.generate(prompts, sampling_params_list) + + def generate_multimodal( + self, + prompts: list[str] | str, + sampling_params_list: list[OmniSamplingParams] | None = None, + system_prompt: str | None = None, + audios: PromptAudioInput = None, + images: PromptImageInput = None, + videos: PromptVideoInput = None, + mm_processor_kwargs: dict[str, Any] | None = None, + modalities: list[str] | None = None, + ) -> list[OmniRequestOutput]: + """ + Convenience method to generate with multimodal inputs. + + Args: + prompts: Text prompt(s) + sampling_params_list: List of sampling parameters for each stage + system_prompt: Optional system prompt + audios: Audio input(s) + images: Image input(s) + videos: Video input(s) + mm_processor_kwargs: Optional processor kwargs + + Returns: + List of OmniRequestOutput objects from stages with final_output=True + """ + omni_inputs = self.get_omni_inputs( + prompts=prompts, + system_prompt=system_prompt, + audios=audios, + images=images, + videos=videos, + mm_processor_kwargs=mm_processor_kwargs, + modalities=modalities, + ) + return self.generate(omni_inputs, sampling_params_list) + + def _cleanup_process(self): + try: + keywords = ["enginecore"] + + for proc in psutil.process_iter(["pid", "name", "cmdline", "username"]): + try: + cmdline = " ".join(proc.cmdline()).lower() if proc.cmdline() else "" + name = proc.name().lower() + + is_process = any(keyword in cmdline for keyword in keywords) or any( + keyword in name for keyword in keywords + ) + + if is_process: + print(f"Found vllm process: PID={proc.pid}, cmd={cmdline[:100]}") + + try: + proc.terminate() + time.sleep(2) + except Exception: + proc.kill() + + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + except Exception as e: + print(f"Error in psutil vllm cleanup: {e}") + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - cleanup resources.""" + if hasattr(self.omni, "close"): + self.omni.close() + self._cleanup_process() + _run_pre_test_cleanup(enable_force=True) + _run_post_test_cleanup(enable_force=True) + cleanup_dist_env_and_memory() + + +@pytest.fixture(scope="module") +def omni_runner(request): + with _omni_server_lock: + model, stage_config_path = request.param + with OmniRunner(model, seed=42, stage_configs_path=stage_config_path, stage_init_timeout=300) as runner: + print("OmniRunner started successfully") + yield runner + print("OmniRunner stopping...") + + print("OmniRunner stopped") + + +class OmniRunnerHandler: + def __init__(self, omni_runner): + self.runner = omni_runner + + def _process_output(self, outputs: list[Any]) -> OmniResponse: + result = OmniResponse() + try: + text_content = None + audio_content = None + for stage_output in outputs: + if getattr(stage_output, "final_output_type", None) == "text": + text_content = stage_output.request_output[0].outputs[0].text + if getattr(stage_output, "final_output_type", None) == "audio": + audio_content = stage_output.request_output[0].outputs[0].multimodal_output["audio"] + + result.audio_content = audio_content + result.text_content = text_content + result.success = True + + except Exception as e: + result.error_message = f"Output processing error: {str(e)}" + result.success = False + print(f"Error: {result.error_message}") + + return result + + def send_request(self, request_config: dict[str, Any] | None = None) -> OmniResponse: + if request_config is None: + request_config = {} + prompts = request_config.get("prompts") + videos = request_config.get("videos") + images = request_config.get("images") + audios = request_config.get("audios") + modalities = request_config.get("modalities", ["text", "audio"]) + outputs = self.runner.generate_multimodal( + prompts=prompts, videos=videos, images=images, audios=audios, modalities=modalities + ) + response = self._process_output(outputs) + assert_omni_response(response, request_config, run_level="L2") + return response + + +@pytest.fixture +def omni_runner_handler(omni_runner): + return OmniRunnerHandler(omni_runner) diff --git a/tests/e2e/offline_inference/conftest.py b/tests/e2e/offline_inference/conftest.py deleted file mode 100644 index 89170983767..00000000000 --- a/tests/e2e/offline_inference/conftest.py +++ /dev/null @@ -1,353 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -""" -Pytest configuration and fixtures for vllm-omni tests. -""" - -from typing import Any - -import pytest -from vllm import TextPrompt -from vllm.distributed.parallel_state import cleanup_dist_env_and_memory - -from tests.conftest import _run_post_test_cleanup, _run_pre_test_cleanup -from vllm_omni.entrypoints.omni import Omni -from vllm_omni.inputs.data import OmniSamplingParams -from vllm_omni.outputs import OmniRequestOutput - -PromptAudioInput = list[tuple[Any, int]] | tuple[Any, int] | None -PromptImageInput = list[Any] | Any | None -PromptVideoInput = list[Any] | Any | None - - -class OmniRunner: - """ - Test runner for Omni models. - """ - - def __init__( - self, - model_name: str, - seed: int = 42, - stage_init_timeout: int = 300, - batch_timeout: int = 10, - init_timeout: int = 300, - shm_threshold_bytes: int = 65536, - log_stats: bool = False, - stage_configs_path: str | None = None, - **kwargs, - ) -> None: - """ - Initialize an OmniRunner for testing. - - Args: - model_name: The model name or path - seed: Random seed for reproducibility - stage_init_timeout: Timeout for initializing a single stage in seconds - batch_timeout: Timeout for batching in seconds - init_timeout: Timeout for initializing stages in seconds - shm_threshold_bytes: Threshold for using shared memory - log_stats: Enable detailed statistics logging - stage_configs_path: Optional path to YAML stage config file - **kwargs: Additional arguments passed to Omni - """ - cleanup_dist_env_and_memory() - _run_pre_test_cleanup(enable_force=True) - _run_post_test_cleanup(enable_force=True) - self.model_name = model_name - self.seed = seed - - self.omni = Omni( - model=model_name, - log_stats=log_stats, - stage_init_timeout=stage_init_timeout, - batch_timeout=batch_timeout, - init_timeout=init_timeout, - shm_threshold_bytes=shm_threshold_bytes, - stage_configs_path=stage_configs_path, - **kwargs, - ) - - def get_default_sampling_params_list(self) -> list[OmniSamplingParams]: - """ - Get a list of default sampling parameters for all stages. - - Returns: - List of SamplingParams with default decoding for each stage - """ - return [st.default_sampling_params for st in self.omni.stage_list] - - def get_omni_inputs( - self, - prompts: list[str] | str, - system_prompt: str | None = None, - audios: PromptAudioInput = None, - images: PromptImageInput = None, - videos: PromptVideoInput = None, - mm_processor_kwargs: dict[str, Any] | None = None, - modalities: list[str] | None = None, - ) -> list[TextPrompt]: - """ - Construct Omni input format from prompts and multimodal data. - - Args: - prompts: Text prompt(s) - either a single string or list of strings - system_prompt: Optional system prompt (defaults to Qwen system prompt) - audios: Audio input(s) - tuple of (audio_array, sample_rate) or list of tuples - images: Image input(s) - PIL Image or list of PIL Images - videos: Video input(s) - numpy array or list of numpy arrays - mm_processor_kwargs: Optional processor kwargs (e.g., use_audio_in_video) - - Returns: - List of prompt dictionaries suitable for Omni.generate() - """ - if system_prompt is None: - system_prompt = ( - "You are Qwen, a virtual human developed by the Qwen Team, Alibaba " - "Group, capable of perceiving auditory and visual inputs, as well as " - "generating text and speech." - ) - - video_padding_token = "<|VIDEO|>" - image_padding_token = "<|IMAGE|>" - audio_padding_token = "<|AUDIO|>" - - if self.model_name == "Qwen/Qwen3-Omni-30B-A3B-Instruct": - video_padding_token = "<|video_pad|>" - image_padding_token = "<|image_pad|>" - audio_padding_token = "<|audio_pad|>" - - if isinstance(prompts, str): - prompts = [prompts] - - def _normalize_mm_input(mm_input, num_prompts): - if mm_input is None: - return [None] * num_prompts - if isinstance(mm_input, list): - if len(mm_input) != num_prompts: - raise ValueError( - f"Multimodal input list length ({len(mm_input)}) must match prompts length ({num_prompts})" - ) - return mm_input - return [mm_input] * num_prompts - - num_prompts = len(prompts) - audios_list = _normalize_mm_input(audios, num_prompts) - images_list = _normalize_mm_input(images, num_prompts) - videos_list = _normalize_mm_input(videos, num_prompts) - - omni_inputs = [] - for i, prompt_text in enumerate(prompts): - user_content = "" - multi_modal_data = {} - - audio = audios_list[i] - if audio is not None: - if isinstance(audio, list): - for _ in audio: - user_content += f"<|audio_bos|>{audio_padding_token}<|audio_eos|>" - multi_modal_data["audio"] = audio - else: - user_content += f"<|audio_bos|>{audio_padding_token}<|audio_eos|>" - multi_modal_data["audio"] = audio - - image = images_list[i] - if image is not None: - if isinstance(image, list): - for _ in image: - user_content += f"<|vision_bos|>{image_padding_token}<|vision_eos|>" - multi_modal_data["image"] = image - else: - user_content += f"<|vision_bos|>{image_padding_token}<|vision_eos|>" - multi_modal_data["image"] = image - - video = videos_list[i] - if video is not None: - if isinstance(video, list): - for _ in video: - user_content += f"<|vision_bos|>{video_padding_token}<|vision_eos|>" - multi_modal_data["video"] = video - else: - user_content += f"<|vision_bos|>{video_padding_token}<|vision_eos|>" - multi_modal_data["video"] = video - - user_content += prompt_text - - full_prompt = ( - f"<|im_start|>system\n{system_prompt}<|im_end|>\n" - f"<|im_start|>user\n{user_content}<|im_end|>\n" - f"<|im_start|>assistant\n" - ) - - input_dict: TextPrompt = {"prompt": full_prompt} - if multi_modal_data: - input_dict["multi_modal_data"] = multi_modal_data - if modalities: - input_dict["modalities"] = modalities - if mm_processor_kwargs: - input_dict["mm_processor_kwargs"] = mm_processor_kwargs - - omni_inputs.append(input_dict) - - return omni_inputs - - def generate( - self, - prompts: list[TextPrompt], - sampling_params_list: list[OmniSamplingParams] | None = None, - ) -> list[OmniRequestOutput]: - """ - Generate outputs for the given prompts. - - Args: - prompts: List of prompt dictionaries with 'prompt' and optionally - 'multi_modal_data' keys - sampling_params_list: List of sampling parameters for each stage. - If None, uses default parameters. - - Returns: - List of OmniRequestOutput objects from stages with final_output=True - """ - if sampling_params_list is None: - sampling_params_list = self.get_default_sampling_params_list() - - return self.omni.generate(prompts, sampling_params_list) - - def generate_multimodal( - self, - prompts: list[str] | str, - sampling_params_list: list[OmniSamplingParams] | None = None, - system_prompt: str | None = None, - audios: PromptAudioInput = None, - images: PromptImageInput = None, - videos: PromptVideoInput = None, - mm_processor_kwargs: dict[str, Any] | None = None, - modalities: list[str] | None = None, - ) -> list[OmniRequestOutput]: - """ - Convenience method to generate with multimodal inputs. - - Args: - prompts: Text prompt(s) - sampling_params_list: List of sampling parameters for each stage - system_prompt: Optional system prompt - audios: Audio input(s) - images: Image input(s) - videos: Video input(s) - mm_processor_kwargs: Optional processor kwargs - - Returns: - List of OmniRequestOutput objects from stages with final_output=True - """ - omni_inputs = self.get_omni_inputs( - prompts=prompts, - system_prompt=system_prompt, - audios=audios, - images=images, - videos=videos, - mm_processor_kwargs=mm_processor_kwargs, - modalities=modalities, - ) - return self.generate(omni_inputs, sampling_params_list) - - def generate_audio( - self, - prompts: list[str] | str, - sampling_params_list: list[OmniSamplingParams] | None = None, - system_prompt: str | None = None, - audios: PromptAudioInput = None, - mm_processor_kwargs: dict[str, Any] | None = None, - ) -> list[OmniRequestOutput]: - """ - Convenience method to generate with multimodal inputs. - Args: - prompts: Text prompt(s) - sampling_params_list: List of sampling parameters for each stage - system_prompt: Optional system prompt - audios: Audio input(s) - mm_processor_kwargs: Optional processor kwargs - Returns: - List of OmniRequestOutput objects from stages with final_output=True - """ - omni_inputs = self.get_omni_inputs( - prompts=prompts, - system_prompt=system_prompt, - audios=audios, - mm_processor_kwargs=mm_processor_kwargs, - ) - return self.generate(omni_inputs, sampling_params_list) - - def generate_video( - self, - prompts: list[str] | str, - sampling_params_list: list[OmniSamplingParams] | None = None, - system_prompt: str | None = None, - videos: PromptVideoInput = None, - mm_processor_kwargs: dict[str, Any] | None = None, - ) -> list[OmniRequestOutput]: - """ - Convenience method to generate with multimodal inputs. - Args: - prompts: Text prompt(s) - sampling_params_list: List of sampling parameters for each stage - system_prompt: Optional system prompt - videos: Video input(s) - mm_processor_kwargs: Optional processor kwargs - Returns: - List of OmniRequestOutput objects from stages with final_output=True - """ - omni_inputs = self.get_omni_inputs( - prompts=prompts, - system_prompt=system_prompt, - videos=videos, - mm_processor_kwargs=mm_processor_kwargs, - ) - return self.generate(omni_inputs, sampling_params_list) - - def generate_image( - self, - prompts: list[str] | str, - sampling_params_list: list[OmniSamplingParams] | None = None, - system_prompt: str | None = None, - images: PromptImageInput = None, - mm_processor_kwargs: dict[str, Any] | None = None, - ) -> list[OmniRequestOutput]: - """ - Convenience method to generate with multimodal inputs. - Args: - prompts: Text prompt(s) - sampling_params_list: List of sampling parameters for each stage - system_prompt: Optional system prompt - images: Image input(s) - mm_processor_kwargs: Optional processor kwargs - Returns: - List of OmniRequestOutput objects from stages with final_output=True - """ - omni_inputs = self.get_omni_inputs( - prompts=prompts, - system_prompt=system_prompt, - images=images, - mm_processor_kwargs=mm_processor_kwargs, - ) - return self.generate(omni_inputs, sampling_params_list) - - def __enter__(self): - """Context manager entry.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit - cleanup resources.""" - self.close() - del self.omni - cleanup_dist_env_and_memory() - _run_post_test_cleanup(enable_force=True) - - def close(self): - """Close and cleanup the Omni instance.""" - if hasattr(self.omni, "close"): - self.omni.close() - - -@pytest.fixture(scope="session") -def omni_runner(): - return OmniRunner diff --git a/tests/e2e/offline_inference/stage_configs/qwen2_5_omni_ci.yaml b/tests/e2e/offline_inference/stage_configs/qwen2_5_omni_ci.yaml deleted file mode 100644 index e093ec51b99..00000000000 --- a/tests/e2e/offline_inference/stage_configs/qwen2_5_omni_ci.yaml +++ /dev/null @@ -1,106 +0,0 @@ -# stage config for running qwen2.5-omni with architecture of OmniLLM. - -# The following config has been verified on 2x 24GB GPU (L4/RTX3090/RTX4090). -# This config is optimized for CI e2e tests. -stage_args: - - stage_id: 0 - runtime: - process: true # Run this stage in a separate process - devices: "0" # Visible devices for this stage (CUDA_VISIBLE_DEVICES/torch.cuda.set_device) - max_batch_size: 1 - engine_args: - model_stage: thinker - model_arch: Qwen2_5OmniForConditionalGeneration - worker_type: ar - scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - max_model_len: 896 - max_num_batched_tokens: 896 - max_num_seqs: 1 - gpu_memory_utilization: 0.8 - skip_mm_profiling: true - enforce_eager: true # Now we only support eager mode - trust_remote_code: true - engine_output_type: latent - enable_prefix_caching: false - is_comprehension: true - final_output: true - final_output_type: text - default_sampling_params: - temperature: 0.0 - top_p: 1.0 - top_k: -1 - max_tokens: 128 - seed: 42 - detokenize: True - repetition_penalty: 1.1 - - stage_id: 1 - runtime: - process: true - devices: "1" - max_batch_size: 1 - engine_args: - model_stage: talker - model_arch: Qwen2_5OmniForConditionalGeneration - worker_type: ar - scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - max_model_len: 896 - max_num_batched_tokens: 896 - max_num_seqs: 1 - gpu_memory_utilization: 0.8 - skip_mm_profiling: true - enforce_eager: true - trust_remote_code: true - enable_prefix_caching: false - engine_output_type: latent - engine_input_source: [0] - custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen2_5_omni.thinker2talker - default_sampling_params: - temperature: 0.9 - top_p: 0.8 - top_k: 40 - max_tokens: 128 - seed: 42 - detokenize: True - repetition_penalty: 1.05 - stop_token_ids: [8294] - - stage_id: 2 - runtime: - process: true - devices: "0" # Example: use a different GPU than the previous stage; use "0" if single GPU - max_batch_size: 1 - engine_args: - model_stage: code2wav - model_arch: Qwen2_5OmniForConditionalGeneration - worker_type: generation - scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler - gpu_memory_utilization: 0.15 - enforce_eager: true - trust_remote_code: true - enable_prefix_caching: false - engine_output_type: audio - max_num_batched_tokens: 4069 - engine_input_source: [1] - final_output: true - final_output_type: audio - default_sampling_params: - temperature: 0.0 - top_p: 1.0 - top_k: -1 - max_tokens: 128 - seed: 42 - detokenize: True - repetition_penalty: 1.1 - -# Top-level runtime config (concise): default windows and stage edges -runtime: - enabled: true - defaults: - window_size: -1 # Simplified: trigger downstream only after full upstream completion - max_inflight: 1 # Simplified: process serially within each stage - edges: - - from: 0 # thinker → talker: trigger only after receiving full input (-1) - to: 1 - window_size: -1 - - from: 1 # talker → code2wav: trigger only after receiving full input (-1) - to: 2 - window_size: -1 diff --git a/tests/e2e/offline_inference/stage_configs/rocm/qwen3_omni_ci.yaml b/tests/e2e/offline_inference/stage_configs/rocm/qwen3_omni_ci.yaml deleted file mode 100644 index 477e6e59f29..00000000000 --- a/tests/e2e/offline_inference/stage_configs/rocm/qwen3_omni_ci.yaml +++ /dev/null @@ -1,99 +0,0 @@ -# Stage config for running Qwen3-Omni-MoE with 3-stage architecture -# Stage 0: Thinker (multimodal understanding + text generation) -# Stage 1: Talker (text embeddings → 16-layer RVQ codec codes) -# Stage 2: Code2Wav (8-layer RVQ codes → audio waveform) - -# The following config has been verified on 2x H100-80G GPUs. -stage_args: - - stage_id: 0 - runtime: - devices: "0" - max_batch_size: 1 - engine_args: - model_stage: thinker - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: ar - scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - gpu_memory_utilization: 0.9 - enforce_eager: false - trust_remote_code: true - engine_output_type: latent # Output hidden states for talker - distributed_executor_backend: "mp" - enable_prefix_caching: false - hf_config_name: thinker_config - tensor_parallel_size: 1 - load_format: dummy - final_output: true - final_output_type: text - is_comprehension: true - default_sampling_params: - temperature: 0.4 - top_p: 0.9 - top_k: 1 - max_tokens: 100 - seed: 42 - detokenize: True - repetition_penalty: 1.05 - - - stage_id: 1 - runtime: - devices: "1" - max_batch_size: 1 - engine_args: - model_stage: talker - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: ar - scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - gpu_memory_utilization: 0.6 - enforce_eager: true - trust_remote_code: true - engine_output_type: latent # Output codec codes for code2wav - # tensor_parallel_size: 2 - enable_prefix_caching: false - distributed_executor_backend: "mp" - hf_config_name: talker_config - load_format: dummy - engine_input_source: [0] - custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker - # final_output: true - # final_output_type: text - default_sampling_params: - temperature: 0.9 - top_k: 50 - max_tokens: 100 - seed: 42 - detokenize: False - repetition_penalty: 1.05 - stop_token_ids: [2150] - - - stage_id: 2 - runtime: - devices: "1" - max_batch_size: 1 - engine_args: - model_stage: code2wav - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: generation - scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler - enforce_eager: true - trust_remote_code: true - enable_prefix_caching: false - engine_output_type: audio # Final output: audio waveform - gpu_memory_utilization: 0.1 - distributed_executor_backend: "mp" - max_num_batched_tokens: 1000000 - hf_config_name: thinker_config - load_format: dummy - async_scheduling: false - engine_input_source: [1] - custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav - final_output: true - final_output_type: audio - default_sampling_params: - temperature: 0.0 - top_p: 1.0 - top_k: -1 - max_tokens: 200 - seed: 42 - detokenize: True - repetition_penalty: 1.1 diff --git a/tests/e2e/offline_inference/test_qwen2_5_omni.py b/tests/e2e/offline_inference/test_qwen2_5_omni.py index eda2f28b55b..c5bb658427e 100644 --- a/tests/e2e/offline_inference/test_qwen2_5_omni.py +++ b/tests/e2e/offline_inference/test_qwen2_5_omni.py @@ -1,138 +1,88 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project """ -E2E tests for Qwen2.5-Omni model with mixed modality inputs and audio output. +E2E tests for Qwen2.5-Omni model with mixed modality inputs, audio and text output. """ from pathlib import Path import pytest -from vllm.assets.audio import AudioAsset -from vllm.assets.image import ImageAsset -from vllm.assets.video import VideoAsset -from vllm.envs import VLLM_USE_MODELSCOPE -from vllm.multimodal.image import convert_image_mode -from tests.utils import create_new_process_for_each_test, hardware_test +from tests.conftest import ( + generate_synthetic_audio, + generate_synthetic_image, + generate_synthetic_video, +) +from tests.utils import hardware_test from vllm_omni.platforms import current_omni_platform -from .conftest import OmniRunner - -models = ["Qwen/Qwen2.5-Omni-3B"] +models = ["Qwen/Qwen2.5-Omni-7B"] # CI stage config optimized for 24GB GPU (L4/RTX3090) or NPU if current_omni_platform.is_npu(): stage_config = str(Path(__file__).parent / "stage_configs" / "npu" / "qwen2_5_omni_ci.yaml") elif current_omni_platform.is_rocm(): # ROCm stage config optimized for MI325 GPU - stage_config = str(Path(__file__).parent / "stage_configs" / "rocm" / "qwen2_5_omni_ci.yaml") + stage_config = str(Path(__file__).parent.parent / "stage_configs" / "rocm" / "qwen2_5_omni_ci.yaml") else: - stage_config = str(Path(__file__).parent / "stage_configs" / "qwen2_5_omni_ci.yaml") + stage_config = str(Path(__file__).parent.parent / "stage_configs" / "qwen2_5_omni_ci.yaml") # Create parameter combinations for model and stage config test_params = [(model, stage_config) for model in models] +def get_question(prompt_type="mix"): + prompts = { + "mix": "What is recited in the audio? What is in this image? Describe the video briefly.", + "text_only": "What is the capital of China?", + } + return prompts.get(prompt_type, prompts["mix"]) + + @pytest.mark.core_model @pytest.mark.omni @hardware_test(res={"cuda": "L4", "rocm": "MI325"}, num_cards={"cuda": 4, "rocm": 2}) -@create_new_process_for_each_test("spawn") -@pytest.mark.parametrize("test_config", test_params) -def test_mixed_modalities_to_audio(omni_runner: type[OmniRunner], test_config: tuple[str, str]) -> None: - """Test processing audio, image, and video together, generating audio output.""" - model, stage_config_path = test_config - with omni_runner(model, seed=42, stage_configs_path=stage_config_path) as runner: - # Prepare multimodal inputs - question = "What is recited in the audio? What is in this image? Describe the video briefly." - audio = AudioAsset("mary_had_lamb").audio_and_sample_rate - audio = (audio[0][: 16000 * 5], audio[1]) # Trim to first 5 seconds - image = convert_image_mode(ImageAsset("cherry_blossom").pil_image.resize((128, 128)), "RGB") - if not VLLM_USE_MODELSCOPE: - video = VideoAsset(name="baby_reading", num_frames=4).np_ndarrays - else: - # modelscope can't access raushan-testing-hf/videos-test, skip video input temporarily - video = None - - outputs = runner.generate_multimodal( - prompts=question, - audios=audio, - images=image, - videos=video, - ) - - # Find and verify text output (thinker stage) - text_output = None - output_count = 0 - for stage_output in outputs: - if stage_output.final_output_type == "text": - text_output = stage_output - output_count += 1 - break - assert output_count > 0 - - assert text_output is not None - assert len(text_output.request_output) > 0 - text_content = text_output.request_output[0].outputs[0].text - assert text_content is not None - assert len(text_content.strip()) > 0 - - # Find and verify audio output (code2wav stage) - audio_output = None - output_count = 0 - for stage_output in outputs: - if stage_output.final_output_type == "audio": - audio_output = stage_output - output_count += 1 - break - assert output_count > 0 - - assert audio_output is not None - assert len(audio_output.request_output) > 0 - - # Verify audio tensor exists and has content - audio_tensor = audio_output.request_output[0].outputs[0].multimodal_output["audio"] - assert audio_tensor is not None - assert audio_tensor.numel() > 0 +@pytest.mark.parametrize("omni_runner", test_params, indirect=True) +def test_mix_to_audio(omni_runner, omni_runner_handler) -> None: + """ + Test multi-modal input processing and text/audio output generation via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + audio + video + image + Output Modal: audio + Input Setting: stream=False + Datasets: single request + """ + video = generate_synthetic_video(16, 16, 30)["np_array"] + image = generate_synthetic_image(16, 16)["np_array"] + audio = generate_synthetic_audio(1, 1, 16000)["np_array"] + if len(audio.shape) == 2: + audio = audio.squeeze() + + request_config = { + "prompts": get_question(), + "videos": video, + "images": image, + "audios": (audio, 16000), + "modalities": ["audio"], + } + + # Test single completion + omni_runner_handler.send_request(request_config) @pytest.mark.core_model @pytest.mark.omni @hardware_test(res={"cuda": "L4", "rocm": "MI325"}, num_cards={"cuda": 4, "rocm": 2}) -@create_new_process_for_each_test("spawn") -@pytest.mark.parametrize("test_config", test_params) -def test_mixed_modalities_to_text_only(omni_runner: type[OmniRunner], test_config: tuple[str, str]) -> None: - """Test processing audio, image, and video together, generating audio output.""" - model, stage_config_path = test_config - with omni_runner(model, seed=42, stage_configs_path=stage_config_path) as runner: - # Prepare multimodal inputs - question = "What is recited in the audio? What is in this image? Describe the video briefly." - audio = AudioAsset("mary_had_lamb").audio_and_sample_rate - audio = (audio[0][: 16000 * 5], audio[1]) # Trim to first 5 seconds - image = convert_image_mode(ImageAsset("cherry_blossom").pil_image.resize((128, 128)), "RGB") - video = VideoAsset(name="baby_reading", num_frames=4).np_ndarrays - modalities = ["text"] - - outputs = runner.generate_multimodal( - prompts=question, - audios=audio, - images=image, - videos=video, - modalities=modalities, - ) - - # Find and verify text output (thinker stage) - text_output = None - output_count = 0 - for stage_output in outputs: - assert stage_output.final_output_type != "audio" - if stage_output.final_output_type == "text": - text_output = stage_output - output_count += 1 - break - assert output_count > 0 - - assert text_output is not None - assert len(text_output.request_output) > 0 - text_content = text_output.request_output[0].outputs[0].text - assert text_content is not None - assert len(text_content.strip()) > 0 +@pytest.mark.parametrize("omni_runner", test_params, indirect=True) +def test_text_to_text(omni_runner, omni_runner_handler) -> None: + """ + Test text input processing and text output generation via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: text + Input Setting: stream=False + Datasets: single request + """ + + request_config = {"prompts": get_question("text_only"), "modalities": ["text"]} + + # Test single completion + omni_runner_handler.send_request(request_config) diff --git a/tests/e2e/offline_inference/test_qwen3_omni.py b/tests/e2e/offline_inference/test_qwen3_omni.py index ecd09fdd322..9f76f911db0 100644 --- a/tests/e2e/offline_inference/test_qwen3_omni.py +++ b/tests/e2e/offline_inference/test_qwen3_omni.py @@ -1,5 +1,3 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project """ E2E offline tests for Omni model with video input and audio output. """ @@ -12,73 +10,42 @@ from pathlib import Path import pytest -from vllm.assets.video import VideoAsset +from tests.conftest import ( + generate_synthetic_video, +) from tests.utils import hardware_test from vllm_omni.platforms import current_omni_platform -from .conftest import OmniRunner - models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"] # CI stage config for 2xH100-80G GPUs or AMD GPU MI325 if current_omni_platform.is_rocm(): # ROCm stage config optimized for MI325 GPU - stage_configs = [str(Path(__file__).parent / "stage_configs" / "rocm" / "qwen3_omni_ci.yaml")] + stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "rocm" / "qwen3_omni_ci.yaml")] else: - stage_configs = [str(Path(__file__).parent / "stage_configs" / "qwen3_omni_ci.yaml")] + stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_ci.yaml")] # Create parameter combinations for model and stage config test_params = [(model, stage_config) for model in models for stage_config in stage_configs] +def get_question(prompt_type="video"): + prompts = { + "video": "Describe the video briefly.", + } + return prompts.get(prompt_type, prompts["video"]) + + @pytest.mark.core_model @pytest.mark.omni @hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) -@pytest.mark.parametrize("test_config", test_params) -def test_video_to_audio(omni_runner: type[OmniRunner], test_config) -> None: +@pytest.mark.parametrize("omni_runner", test_params, indirect=True) +def test_video_to_audio(omni_runner, omni_runner_handler) -> None: """Test processing video, generating audio output.""" - model, stage_config_path = test_config - with omni_runner(model, seed=42, stage_configs_path=stage_config_path, stage_init_timeout=300) as runner: - # Prepare inputs - question = "Describe the video briefly." - video = VideoAsset(name="baby_reading", num_frames=4).np_ndarrays - - outputs = runner.generate_multimodal( - prompts=question, - videos=video, - ) - - # Find and verify text output (thinker stage) - text_output = None - output_count = 0 - for stage_output in outputs: - if stage_output.final_output_type == "text": - text_output = stage_output - output_count += 1 - break - - assert output_count > 0 - assert text_output is not None - assert len(text_output.request_output) > 0 - text_content = text_output.request_output[0].outputs[0].text - assert text_content is not None - assert len(text_content.strip()) > 0 - - # Find and verify audio output (code2wav stage) - audio_output = None - output_count = 0 - for stage_output in outputs: - if stage_output.final_output_type == "audio": - audio_output = stage_output - output_count += 1 - break + video = generate_synthetic_video(224, 224, 300)["np_array"] - assert output_count > 0 - assert audio_output is not None - assert len(audio_output.request_output) > 0 + request_config = {"prompts": get_question(), "videos": video, "modalities": ["audio"]} - # Verify audio tensor exists and has content - audio_tensor = audio_output.request_output[0].outputs[0].multimodal_output["audio"] - assert audio_tensor is not None - assert audio_tensor.numel() > 0 + # Test single completion + omni_runner_handler.send_request(request_config) diff --git a/tests/e2e/online_serving/stage_configs/qwen3_omni_ci.yaml b/tests/e2e/online_serving/stage_configs/qwen3_omni_ci.yaml deleted file mode 100644 index 8f0161edd2d..00000000000 --- a/tests/e2e/online_serving/stage_configs/qwen3_omni_ci.yaml +++ /dev/null @@ -1,103 +0,0 @@ -# Stage config for running Qwen3-Omni-MoE with 3-stage architecture -# Stage 0: Thinker (multimodal understanding + text generation) -# Stage 1: Talker (text embeddings → 16-layer RVQ codec codes) -# Stage 2: Code2Wav (8-layer RVQ codes → audio waveform) - -# The following config has been verified on 2x H100-80G GPUs. -stage_args: - - stage_id: 0 - stage_type: llm # Use llm stage type to launch OmniLLM - runtime: - devices: "0" - max_batch_size: 5 - engine_args: - model_stage: thinker - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: ar - scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - gpu_memory_utilization: 0.9 - enforce_eager: false - trust_remote_code: true - engine_output_type: latent # Output hidden states for talker - distributed_executor_backend: "mp" - enable_prefix_caching: false - max_num_batched_tokens: 32768 - hf_config_name: thinker_config - tensor_parallel_size: 1 - load_format: dummy - final_output: true - final_output_type: text - is_comprehension: true - default_sampling_params: - temperature: 0.4 - top_p: 0.9 - top_k: 1 - max_tokens: 100 - seed: 42 - detokenize: True - repetition_penalty: 1.05 - - - stage_id: 1 - stage_type: llm # Use llm stage type to launch OmniLLM - runtime: - devices: "1" - max_batch_size: 5 - engine_args: - model_stage: talker - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: ar - scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - gpu_memory_utilization: 0.6 - enforce_eager: false - trust_remote_code: true - engine_output_type: latent # Output codec codes for code2wav - # tensor_parallel_size: 2 - enable_prefix_caching: false - distributed_executor_backend: "mp" - hf_config_name: talker_config - load_format: dummy - engine_input_source: [0] - custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker - # final_output: true - # final_output_type: text - default_sampling_params: - temperature: 0.9 - top_k: 50 - max_tokens: 100 - seed: 42 - detokenize: False - repetition_penalty: 1.05 - stop_token_ids: [2150] - - - stage_id: 2 - stage_type: llm # Use llm stage type to launch OmniLLM - runtime: - devices: "1" - max_batch_size: 1 - engine_args: - model_stage: code2wav - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: generation - scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler - enforce_eager: true - trust_remote_code: true - async_scheduling: false - enable_prefix_caching: false - engine_output_type: audio # Final output: audio waveform - gpu_memory_utilization: 0.1 - distributed_executor_backend: "mp" - max_num_batched_tokens: 1000000 - hf_config_name: thinker_config - load_format: dummy - engine_input_source: [1] - custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav - final_output: true - final_output_type: audio - default_sampling_params: - temperature: 0.0 - top_p: 1.0 - top_k: -1 - max_tokens: 200 - seed: 42 - detokenize: True - repetition_penalty: 1.1 diff --git a/tests/e2e/online_serving/stage_configs/rocm/qwen3_omni_ci.yaml b/tests/e2e/online_serving/stage_configs/rocm/qwen3_omni_ci.yaml deleted file mode 100644 index 59642a77b6f..00000000000 --- a/tests/e2e/online_serving/stage_configs/rocm/qwen3_omni_ci.yaml +++ /dev/null @@ -1,95 +0,0 @@ -# Stage config for running Qwen3-Omni-MoE with 3-stage architecture -# Stage 0: Thinker (multimodal understanding + text generation) -# Stage 1: Talker (text embeddings → 16-layer RVQ codec codes) -# Stage 2: Code2Wav (8-layer RVQ codes → audio waveform) -# The following config has been verified on 2x H100-80G GPUs. -stage_args: - - stage_id: 0 - runtime: - devices: "0" - max_batch_size: 5 - engine_args: - model_stage: thinker - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: ar - scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - gpu_memory_utilization: 0.9 - enforce_eager: false - trust_remote_code: true - engine_output_type: latent # Output hidden states for talker - distributed_executor_backend: "mp" - enable_prefix_caching: false - hf_config_name: thinker_config - tensor_parallel_size: 1 - final_output: true - final_output_type: text - is_comprehension: true - default_sampling_params: - temperature: 0.4 - top_p: 0.9 - top_k: 1 - max_tokens: 100 - seed: 42 - detokenize: True - repetition_penalty: 1.05 - - - stage_id: 1 - runtime: - devices: "1" - max_batch_size: 5 - engine_args: - model_stage: talker - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: ar - scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - gpu_memory_utilization: 0.6 - enforce_eager: true - trust_remote_code: true - engine_output_type: latent # Output codec codes for code2wav - # tensor_parallel_size: 2 - enable_prefix_caching: false - distributed_executor_backend: "mp" - hf_config_name: talker_config - engine_input_source: [0] - custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker - # final_output: true - # final_output_type: text - default_sampling_params: - temperature: 0.9 - top_k: 50 - max_tokens: 1000 - seed: 42 - detokenize: False - repetition_penalty: 1.05 - stop_token_ids: [2150] - - - stage_id: 2 - runtime: - devices: "1" - max_batch_size: 1 - engine_args: - model_stage: code2wav - model_arch: Qwen3OmniMoeForConditionalGeneration - worker_type: generation - scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler - enforce_eager: true - trust_remote_code: true - enable_prefix_caching: false - engine_output_type: audio # Final output: audio waveform - gpu_memory_utilization: 0.1 - distributed_executor_backend: "mp" - max_num_batched_tokens: 1000000 - hf_config_name: thinker_config - async_scheduling: false - engine_input_source: [1] - custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav - final_output: true - final_output_type: audio - default_sampling_params: - temperature: 0.0 - top_p: 1.0 - top_k: -1 - max_tokens: 2000 - seed: 42 - detokenize: True - repetition_penalty: 1.1 diff --git a/tests/e2e/online_serving/test_qwen3_omni.py b/tests/e2e/online_serving/test_qwen3_omni.py index 7f879939f5c..61927b1a89d 100644 --- a/tests/e2e/online_serving/test_qwen3_omni.py +++ b/tests/e2e/online_serving/test_qwen3_omni.py @@ -1,5 +1,3 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project """ E2E Online tests for Qwen3-Omni model with video input and audio output. """ @@ -9,23 +7,15 @@ os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" -import concurrent.futures -import threading -import time from pathlib import Path -import openai import pytest from tests.conftest import ( - OmniServer, - convert_audio_to_text, - cosine_similarity_text, dummy_messages_from_mix_data, generate_synthetic_audio, generate_synthetic_image, generate_synthetic_video, - merge_base64_and_convert_to_text, modify_stage_config, ) from tests.utils import hardware_test @@ -34,13 +24,9 @@ models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"] -def get_default_config(): - return str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_ci.yaml") - - def get_chunk_config(): path = modify_stage_config( - get_default_config(), + str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_ci.yaml"), updates={ "async_chunk": True, "stage_args": { @@ -57,49 +43,17 @@ def get_chunk_config(): return path -CHUNK_CONFIG_PATH = get_chunk_config() # CI stage config for 2xH100-80G GPUs or AMD GPU MI325 if current_omni_platform.is_rocm(): # ROCm stage config optimized for MI325 GPU - stage_configs = [str(Path(__file__).parent / "stage_configs" / "rocm" / "qwen3_omni_ci.yaml")] + stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "rocm" / "qwen3_omni_ci.yaml")] else: - stage_configs = [get_default_config(), CHUNK_CONFIG_PATH] + stage_configs = [get_chunk_config()] # Create parameter combinations for model and stage config test_params = [(model, stage_config) for model in models for stage_config in stage_configs] -_omni_server_lock = threading.Lock() - - -@pytest.fixture(scope="module") -def omni_server(request): - """Start vLLM-Omni server as a subprocess with actual model weights. - Uses session scope so the server starts only once for the entire test session. - Multi-stage initialization can take 10-20+ minutes. - """ - with _omni_server_lock: - model, stage_config_path = request.param - - print(f"Starting OmniServer with model: {model}") - - with OmniServer(model, ["--stage-configs-path", stage_config_path, "--stage-init-timeout", "120"]) as server: - print("OmniServer started successfully") - yield server - print("OmniServer stopping...") - - print("OmniServer stopped") - - -@pytest.fixture -def client(omni_server): - """OpenAI client for the running vLLM-Omni server.""" - return openai.OpenAI( - base_url=f"http://{omni_server.host}:{omni_server.port}/v1", - api_key="EMPTY", - ) - - def get_system_prompt(): return { "role": "system", @@ -116,23 +70,6 @@ def get_system_prompt(): } -def dummy_messages_from_video_data( - video_data_url: str, - content_text: str = "Describe the video briefly.", -): - """Create messages with video data URL for OpenAI API.""" - return [ - get_system_prompt(), - { - "role": "user", - "content": [ - {"type": "video_url", "video_url": {"url": video_data_url}}, - {"type": "text", "text": content_text}, - ], - }, - ] - - def get_prompt(prompt_type="text_only"): prompts = { "text_only": "What is the capital of China? Answer in 20 words.", @@ -146,11 +83,12 @@ def get_max_batch_size(size_type="few"): return batch_sizes.get(size_type, 5) +@pytest.mark.advanced_model @pytest.mark.core_model @pytest.mark.omni @hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) @pytest.mark.parametrize("omni_server", test_params, indirect=True) -def test_mix_to_text_audio_001(client: openai.OpenAI, omni_server, request) -> None: +def test_mix_to_text_audio_001(omni_server, openai_client) -> None: """ Test multi-modal input processing and text/audio output generation via OpenAI API. Deploy Setting: default yaml @@ -160,8 +98,6 @@ def test_mix_to_text_audio_001(client: openai.OpenAI, omni_server, request) -> N Datasets: single request """ - # Test single completion - e2e_list = list() video_data_url = f"data:video/mp4;base64,{generate_synthetic_video(224, 224, 300)['base64']}" image_data_url = f"data:image/jpeg;base64,{generate_synthetic_image(224, 224)['base64']}" audio_data_url = f"data:audio/wav;base64,{generate_synthetic_audio(5, 1)['base64']}" @@ -173,57 +109,25 @@ def test_mix_to_text_audio_001(client: openai.OpenAI, omni_server, request) -> N content_text=get_prompt("mix"), ) - # Test single completion - start_time = time.perf_counter() - chat_completion = client.chat.completions.create(model=omni_server.model, messages=messages, stream=True) - - text_content = "" - audio_data = [] - for chunk in chat_completion: - for choice in chunk.choices: - if hasattr(choice, "delta"): - content = getattr(choice.delta, "content", None) - else: - content = None - - modality = getattr(chunk, "modality", None) - - if modality == "audio" and content: - audio_data.append(content) - elif modality == "text" and content: - # Text chunk - accumulate text content - text_content += content if content else "" - - # Verify E2E - current_e2e = time.perf_counter() - start_time - print(f"the request e2e is: {current_e2e}") - # TODO: Verify the E2E latency after confirmation baseline. - e2e_list.append(current_e2e) - - print(f"the avg e2e is: {sum(e2e_list) / len(e2e_list)}") - # Verify all completions succeeded - assert audio_data is not None, "No audio output is generated" - - # Verify text output success - assert text_content is not None and len(text_content) >= 2, "No text output is generated" - assert any( - keyword in text_content.lower() for keyword in ["square", "quadrate", "sphere", "globe", "circle", "round"] - ), "The output does not contain any of the keywords." + request_config = { + "model": omni_server.model, + "messages": messages, + "stream": True, + "key_words": { + "audio": ["water", "chirping"], + "image": ["square", "quadrate"], + }, + } - # Verify text output same as audio output - audio_content = merge_base64_and_convert_to_text(audio_data) - print(f"text content is: {text_content}") - print(f"audio content is: {audio_content}") - similarity = cosine_similarity_text(audio_content.lower(), text_content.lower()) - print(f"similarity is: {similarity}") - assert similarity > 0.9, "The audio content is not same as the text" + # Test single completion + openai_client.send_request(request_config) +@pytest.mark.advanced_model @pytest.mark.core_model @pytest.mark.omni -@hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=2) @pytest.mark.parametrize("omni_server", test_params, indirect=True) -def test_text_to_text_audio_001(client: openai.OpenAI, omni_server) -> None: +def test_text_to_text_001(omni_server, openai_client) -> None: """ Test text input processing and text/audio output generation via OpenAI API. Deploy Setting: default yaml @@ -231,50 +135,14 @@ def test_text_to_text_audio_001(client: openai.OpenAI, omni_server) -> None: Output Modal: text + audio Datasets: few requests """ - - num_concurrent_requests = get_max_batch_size() messages = dummy_messages_from_mix_data(system_prompt=get_system_prompt(), content_text=get_prompt()) - e2e_list = list() - with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent_requests) as executor: - # Submit multiple completion requests concurrently - futures = [ - executor.submit(client.chat.completions.create, model=omni_server.model, messages=messages) - for _ in range(num_concurrent_requests) - ] - start_time = time.perf_counter() - # Wait for all requests to complete and collect results - chat_completions = list() - for future in concurrent.futures.as_completed(futures): - chat_completions.append(future.result()) - # Verify E2E - current_e2e = time.perf_counter() - start_time - print(f"the request e2e is: {current_e2e}") - # TODO: Verify the E2E latency after confirmation baseline. - e2e_list.append(current_e2e) - - print(f"the avg e2e is: {sum(e2e_list) / len(e2e_list)}") - # Verify all completions succeeded - assert len(chat_completions) == num_concurrent_requests, "Not all requests succeeded." - for chat_completion in chat_completions: - # Verify audio output success - audio_data = None - text_content = None - for choice in chat_completion.choices: - if choice.message.audio is not None: - audio_message = choice.message - audio_data = audio_message.audio.data - assert audio_message.audio.expires_at > time.time(), "The generated audio has expired." - - if choice.message.content is not None: - # Verify text output success - text_content = choice.message.content - assert "beijing" in text_content.lower(), "The output do not contain keywords." + request_config = { + "model": omni_server.model, + "messages": messages, + "stream": False, + "modalities": ["text"], + "key_words": {"text": ["beijing"]}, + } - # Verify text output same as audio output - audio_content = convert_audio_to_text(audio_data) - print(f"text content is: {text_content}") - print(f"audio content is: {audio_content}") - similarity = cosine_similarity_text(audio_content.lower(), text_content.lower()) - print(f"similarity is: {similarity}") - assert similarity > 0.9, "The audio content is not same as the text" + openai_client.send_request(request_config, request_num=get_max_batch_size()) diff --git a/tests/e2e/stage_configs/qwen2_5_omni_ci.yaml b/tests/e2e/stage_configs/qwen2_5_omni_ci.yaml index aaed4de92cc..f8655f77ea5 100644 --- a/tests/e2e/stage_configs/qwen2_5_omni_ci.yaml +++ b/tests/e2e/stage_configs/qwen2_5_omni_ci.yaml @@ -11,10 +11,10 @@ stage_args: engine_args: model_stage: thinker model_arch: Qwen2_5OmniForConditionalGeneration - worker_cls: vllm_omni.worker.gpu_ar_worker.GPUARWorker + worker_type: ar scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - max_model_len: 32768 - max_num_batched_tokens: 32768 + max_model_len: 2400 + max_num_batched_tokens: 2400 max_num_seqs: 1 gpu_memory_utilization: 0.8 skip_mm_profiling: true @@ -41,10 +41,10 @@ stage_args: engine_args: model_stage: talker model_arch: Qwen2_5OmniForConditionalGeneration - worker_cls: vllm_omni.worker.gpu_ar_worker.GPUARWorker + worker_type: ar scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - max_model_len: 32768 - max_num_batched_tokens: 32768 + max_model_len: 2400 + max_num_batched_tokens: 2400 max_num_seqs: 1 gpu_memory_utilization: 0.8 skip_mm_profiling: true @@ -71,7 +71,7 @@ stage_args: engine_args: model_stage: code2wav model_arch: Qwen2_5OmniForConditionalGeneration - worker_cls: vllm_omni.worker.gpu_generation_worker.GPUGenerationWorker + worker_type: generation scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler gpu_memory_utilization: 0.15 enforce_eager: true diff --git a/tests/e2e/stage_configs/qwen3_omni_ci.yaml b/tests/e2e/stage_configs/qwen3_omni_ci.yaml index 479e4d6e99d..5a16c340d31 100644 --- a/tests/e2e/stage_configs/qwen3_omni_ci.yaml +++ b/tests/e2e/stage_configs/qwen3_omni_ci.yaml @@ -24,6 +24,7 @@ stage_args: enable_prefix_caching: false hf_config_name: thinker_config tensor_parallel_size: 1 + load_format: dummy final_output: true final_output_type: text is_comprehension: true @@ -55,6 +56,7 @@ stage_args: max_model_len: 32768 distributed_executor_backend: "mp" hf_config_name: talker_config + load_format: dummy engine_input_source: [0] custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker default_sampling_params: diff --git a/tests/e2e/online_serving/stage_configs/qwen3_omni_thinker_ci.yaml b/tests/e2e/stage_configs/qwen3_omni_thinker_ci.yaml similarity index 100% rename from tests/e2e/online_serving/stage_configs/qwen3_omni_thinker_ci.yaml rename to tests/e2e/stage_configs/qwen3_omni_thinker_ci.yaml diff --git a/tests/e2e/offline_inference/stage_configs/rocm/qwen2_5_omni_ci.yaml b/tests/e2e/stage_configs/rocm/qwen2_5_omni_ci.yaml similarity index 96% rename from tests/e2e/offline_inference/stage_configs/rocm/qwen2_5_omni_ci.yaml rename to tests/e2e/stage_configs/rocm/qwen2_5_omni_ci.yaml index 474df5e7968..ec815ad1f5f 100644 --- a/tests/e2e/offline_inference/stage_configs/rocm/qwen2_5_omni_ci.yaml +++ b/tests/e2e/stage_configs/rocm/qwen2_5_omni_ci.yaml @@ -13,8 +13,8 @@ stage_args: model_arch: Qwen2_5OmniForConditionalGeneration worker_type: ar scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - max_model_len: 896 - max_num_batched_tokens: 896 + max_model_len: 2400 + max_num_batched_tokens: 2400 max_num_seqs: 1 gpu_memory_utilization: 0.8 skip_mm_profiling: true @@ -43,8 +43,8 @@ stage_args: model_arch: Qwen2_5OmniForConditionalGeneration worker_type: ar scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler - max_model_len: 896 - max_num_batched_tokens: 896 + max_model_len: 2400 + max_num_batched_tokens: 2400 max_num_seqs: 1 gpu_memory_utilization: 0.8 skip_mm_profiling: true diff --git a/tests/e2e/offline_inference/stage_configs/qwen3_omni_ci.yaml b/tests/e2e/stage_configs/rocm/qwen3_omni_ci.yaml similarity index 100% rename from tests/e2e/offline_inference/stage_configs/qwen3_omni_ci.yaml rename to tests/e2e/stage_configs/rocm/qwen3_omni_ci.yaml diff --git a/tests/engine/__init__.py b/tests/engine/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/e2e/online_serving/test_async_omni.py b/tests/engine/test_async_omni_engine_abort.py similarity index 97% rename from tests/e2e/online_serving/test_async_omni.py rename to tests/engine/test_async_omni_engine_abort.py index cab3e6e2286..b5f9bac9914 100644 --- a/tests/e2e/online_serving/test_async_omni.py +++ b/tests/engine/test_async_omni_engine_abort.py @@ -15,7 +15,7 @@ SEED = 42 -stage_config = str(Path(__file__).parent / "stage_configs" / "qwen3_omni_thinker_ci.yaml") +stage_config = str(Path(__file__).parent.parent / "e2e" / "stage_configs" / "qwen3_omni_thinker_ci.yaml") model = "Qwen/Qwen3-Omni-30B-A3B-Instruct"