diff --git a/docker/Dockerfile.ci b/docker/Dockerfile.ci index 5e1d00a5f88..aecc429454c 100644 --- a/docker/Dockerfile.ci +++ b/docker/Dockerfile.ci @@ -6,6 +6,12 @@ WORKDIR ${APP_DIR} COPY . . +# Install system dependencies +RUN apt-get update && \ + apt-get install -y ffmpeg && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + # Install vllm-omni into the same uv-managed Python environment used by the base image. RUN uv pip install --python "$(python3 -c 'import sys; print(sys.executable)')" --no-cache-dir ".[dev]" diff --git a/pyproject.toml b/pyproject.toml index 4833b117487..2e2cddc5b7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,8 @@ dev = [ "pytest-cov>=4.0.0", "mypy==1.11.1", "pre-commit==4.0.1", + "openai-whisper>=20250625", + "psutil>=7.2.0" ] docs = [ diff --git a/tests/conftest.py b/tests/conftest.py index 82c959f07ca..5b21f671bdb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,19 @@ +import base64 import os +import socket +import subprocess +import sys +import time +from pathlib import Path +from typing import Any +import psutil import pytest import torch +import whisper +import yaml from vllm.logger import init_logger +from vllm.utils import get_open_port logger = init_logger(__name__) @@ -34,3 +45,286 @@ def clean_gpu_memory_between_tests(): if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() + + +def dummy_messages_from_mix_data( + system_prompt: dict[str, Any] = None, + video_data_url: Any = None, + audio_data_url: Any = None, + image_data_url: Any = None, + content_text: str = None, +): + """Create messages with video、image、audio data URL for OpenAI API.""" + + if content_text is not None: + content = [{"type": "text", "text": content_text}] + else: + content = [] + + media_items = [] + if isinstance(video_data_url, list): + for video_url in video_data_url: + media_items.append((video_url, "video")) + else: + media_items.append((video_data_url, "video")) + + if isinstance(image_data_url, list): + for url in image_data_url: + media_items.append((url, "image")) + else: + media_items.append((image_data_url, "image")) + + if isinstance(audio_data_url, list): + for url in audio_data_url: + media_items.append((url, "audio")) + else: + media_items.append((audio_data_url, "audio")) + + content.extend( + {"type": f"{media_type}_url", f"{media_type}_url": {"url": url}} + for url, media_type in media_items + if url is not None + ) + messages = [{"role": "user", "content": content}] + if system_prompt is not None: + messages = [system_prompt] + messages + return messages + + +def cosine_similarity_text(s1, s2): + """ + Calculate cosine similarity between two text strings. + Notes: + ------ + - Higher score means more similar texts + - Score of 1.0 means identical word composition (bag-of-words) + - Score of 0.0 means completely different vocabulary + """ + from sklearn.feature_extraction.text import CountVectorizer + from sklearn.metrics.pairwise import cosine_similarity + + vectorizer = CountVectorizer().fit_transform([s1, s2]) + vectors = vectorizer.toarray() + return cosine_similarity([vectors[0]], [vectors[1]])[0][0] + + +def convert_audio_to_text(audio_data): + """ + Convert base64 encoded audio data to text using speech recognition. + """ + + audio_data = base64.b64decode(audio_data) + output_path = f"./test_{int(time.time())}" + with open(output_path, "wb") as audio_file: + audio_file.write(audio_data) + + print(f"audio data is saved: {output_path}") + model = whisper.load_model("base") + text = model.transcribe(output_path)["text"] + if text: + return text + else: + return "" + + +def modify_stage_config( + yaml_path: str, + stage_updates: dict[int, dict[str, Any]], +) -> str: + """ + Batch modify configurations for multiple stages in a YAML file. + + Args: + yaml_path: Path to the YAML configuration file. + stage_updates: Dictionary where keys are stage IDs and values are dictionaries of + modifications for that stage. Each modification dictionary uses + dot-separated paths as keys and new configuration values as values. + Example: { + 0: {'engine_args.max_model_len': 5800}, + 1: {'runtime.max_batch_size': 2} + } + + Returns: + str: Path to the newly created modified YAML file with timestamp suffix. + + Example: + >>> output_file = modify_stage_config( + ... 'config.yaml', + ... { + ... 0: {'engine_args.max_model_len': 5800}, + ... 1: {'runtime.max_batch_size': 2} + ... } + ... ) + >>> print(f"Modified configuration saved to: {output_file}") + Modified configuration saved to: config_1698765432.yaml + """ + path = Path(yaml_path) + if not path.exists(): + raise FileNotFoundError(f"yaml does not exist: {path}") + try: + with open(yaml_path, encoding="utf-8") as f: + config = yaml.safe_load(f) or {} + except Exception as e: + raise ValueError(f"Cannot parse YAML file: {e}") + + stage_args = config.get("stage_args", []) + if not stage_args: + raise ValueError("the stage_args does not exist") + + for stage_id, config_dict in stage_updates.items(): + target_stage = None + for stage in stage_args: + if stage.get("stage_id") == stage_id: + target_stage = stage + break + + if target_stage is None: + available_ids = [s.get("stage_id") for s in stage_args if "stage_id" in s] + raise KeyError(f"Stage ID {stage_id} is not exist, available IDs: {available_ids}") + + for key_path, value in config_dict.items(): + current = target_stage + keys = key_path.split(".") + for i in range(len(keys) - 1): + key = keys[i] + if key not in current: + raise KeyError(f"the {'.'.join(keys[: i + 1])} does not exist") + + elif not isinstance(current[key], dict) and i < len(keys) - 2: + raise ValueError(f"{'.'.join(keys[: i + 1])}' cannot continue deeper because it's not a dict") + current = current[key] + current[keys[-1]] = value + + output_path = f"{yaml_path.split('.')[0]}_{int(time.time())}.yaml" + with open(output_path, "w", encoding="utf-8") as f: + yaml.dump(config, f, default_flow_style=False, sort_keys=False, allow_unicode=True, indent=2) + + return output_path + + +class OmniServer: + """Omniserver for vLLM-Omni tests.""" + + def __init__( + self, + model: str, + serve_args: list[str], + *, + env_dict: dict[str, str] | None = None, + ) -> None: + self.model = model + self.serve_args = serve_args + self.env_dict = env_dict + self.proc: subprocess.Popen | None = None + self.host = "127.0.0.1" + self.port = get_open_port() + + def _start_server(self) -> None: + """Start the vLLM-Omni server subprocess.""" + env = os.environ.copy() + env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" + if self.env_dict is not None: + env.update(self.env_dict) + + cmd = [ + sys.executable, + "-m", + "vllm_omni.entrypoints.cli.main", + "serve", + self.model, + "--omni", + "--host", + self.host, + "--port", + str(self.port), + ] + self.serve_args + + print(f"Launching OmniServer with: {' '.join(cmd)}") + self.proc = subprocess.Popen( + cmd, + env=env, + cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__))), # Set working directory to vllm-omni root + ) + + # Wait for server to be ready + max_wait = 600 # 10 minutes + start_time = time.time() + while time.time() - start_time < max_wait: + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(1) + result = sock.connect_ex((self.host, self.port)) + if result == 0: + print(f"Server ready on {self.host}:{self.port}") + return + except Exception: + pass + time.sleep(2) + + raise RuntimeError(f"Server failed to start within {max_wait} seconds") + + def _kill_process_tree(self, pid): + """kill process and its children""" + try: + parent = psutil.Process(pid) + children = parent.children(recursive=True) + for child in children: + try: + child.terminate() + except psutil.NoSuchProcess: + pass + + gone, still_alive = psutil.wait_procs(children, timeout=10) + + for child in still_alive: + try: + child.kill() + except psutil.NoSuchProcess: + pass + + try: + parent.terminate() + parent.wait(timeout=10) + except (psutil.NoSuchProcess, psutil.TimeoutExpired): + try: + parent.kill() + except psutil.NoSuchProcess: + pass + + except psutil.NoSuchProcess: + pass + + def __enter__(self): + self._start_server() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.proc: + try: + parent = psutil.Process(self.proc.pid) + children = parent.children(recursive=True) + for child in children: + try: + child.terminate() + except psutil.NoSuchProcess: + pass + + gone, still_alive = psutil.wait_procs(children, timeout=10) + + for child in still_alive: + try: + child.kill() + except psutil.NoSuchProcess: + pass + + try: + parent.terminate() + parent.wait(timeout=10) + except (psutil.NoSuchProcess, psutil.TimeoutExpired): + try: + parent.kill() + except psutil.NoSuchProcess: + pass + + except psutil.NoSuchProcess: + pass diff --git a/tests/e2e/online_serving/test_qwen3_omni_expansion.py b/tests/e2e/online_serving/test_qwen3_omni_expansion.py new file mode 100644 index 00000000000..6a47e96f866 --- /dev/null +++ b/tests/e2e/online_serving/test_qwen3_omni_expansion.py @@ -0,0 +1,158 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +E2E Online tests for Qwen3-Omni model. +""" + +import concurrent.futures +import os +import time +from pathlib import Path + +import openai +import pytest + +from tests.conftest import ( + OmniServer, + convert_audio_to_text, + cosine_similarity_text, + dummy_messages_from_mix_data, + modify_stage_config, +) + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" + +models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"] + +# CI stage config for 2*H100-80G GPUs +stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_ci.yaml")] + +# Create parameter combinations for model and stage config +test_params = [(model, stage_config) for model in models for stage_config in stage_configs] + + +def client(omni_server): + """OpenAI client for the running vLLM-Omni server.""" + return openai.OpenAI( + base_url=f"http://{omni_server.host}:{omni_server.port}/v1", + api_key="EMPTY", + ) + + +def get_system_prompt(): + return { + "role": "system", + "content": [ + { + "type": "text", + "text": ( + "You are Qwen, a virtual human developed by the Qwen Team, " + "Alibaba Group, capable of perceiving auditory and visual inputs, " + "as well as generating text and speech." + ), + } + ], + } + + +def get_prompt(prompt_type="text_only"): + prompts = { + "text_only": "What is the capital of China?", + "mix": "What is recited in the audio? What is in this image? Describe the video briefly.", + } + return prompts.get(prompt_type, prompts["text_only"]) + + +def get_max_batch_size(size_type="few"): + batch_sizes = {"few": 5, "medium": 100, "large": 256} + return batch_sizes.get(size_type, 5) + + +@pytest.mark.parametrize("test_config", test_params) +def test_text_to_text_001(test_config: tuple[str, str]) -> None: + """Test processing text, generating text output via OpenAI API.""" + model, stage_config_path = test_config + with OmniServer(model, ["--stage-configs-path", stage_config_path, "--stage-init-timeout", "90"]) as server: + messages = dummy_messages_from_mix_data(system_prompt=get_system_prompt(), content_text=get_prompt()) + + # Test single completion + api_client = client(server) + start_time = time.perf_counter() + chat_completion = api_client.chat.completions.create( + model=server.model, messages=messages, max_tokens=20, modalities=["text"] + ) + # Verify E2E + print(f"the request e2e is: {time.perf_counter() - start_time}") + # TODO: Verify the E2E latency after confirmation baseline. + + # Verify only output text + assert len(chat_completion.choices) == 1, "The generated content includes more than just text." + + # Verify text output success + text_choice = chat_completion.choices[0] + assert text_choice.message.content is not None, "No text output is generated" + assert chat_completion.usage.completion_tokens <= 20, "The output length more than the requested max_tokens." + assert "beijing" in text_choice.message.content.lower(), "The output do not contain keywords." + + +@pytest.mark.parametrize("test_config", test_params) +def test_text_to_text_audio_001(test_config: tuple[str, str]) -> None: + """Test processing text, generating text and audio output via OpenAI API.""" + + model, stage_config_path = test_config + num_concurrent_requests = get_max_batch_size() + stage_config_path = modify_stage_config( + stage_config_path, + { + 0: {"runtime.max_batch_size": num_concurrent_requests}, + 1: {"runtime.max_batch_size": num_concurrent_requests}, + }, + ) + with OmniServer(model, ["--stage-configs-path", stage_config_path, "--stage-init-timeout", "90"]) as server: + messages = dummy_messages_from_mix_data( + system_prompt=get_system_prompt(), content_text="What is the capital of China?" + ) + + # Test single completion + api_client = client(server) + e2e_list = list() + with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent_requests) as executor: + # Submit multiple completion requests concurrently + futures = [ + executor.submit(api_client.chat.completions.create, model=server.model, messages=messages) + for _ in range(num_concurrent_requests) + ] + start_time = time.perf_counter() + # Wait for all requests to complete and collect results + chat_completions = list() + for future in concurrent.futures.as_completed(futures): + chat_completions.append(future.result()) + # Verify E2E + current_e2e = time.perf_counter() - start_time + print(f"the request e2e is: {current_e2e}") + # TODO: Verify the E2E latency after confirmation baseline. + e2e_list.append(current_e2e) + + print(f"the avg e2e is: {sum(e2e_list) / len(e2e_list)}") + # Verify all completions succeeded + assert len(chat_completions) == num_concurrent_requests, "Not all requests succeeded." + for chat_completion in chat_completions: + # Verify audio output success + audio_message = chat_completion.choices[1].message + audio_data = audio_message.audio.data + assert audio_data is not None, "No audio output is generated" + assert audio_message.audio.expires_at > time.time(), "The generated audio has expired." + + # Verify text output success + text_choice = chat_completion.choices[0] + text_content = text_choice.message.content + assert text_choice.message.content is not None, "No text output is generated" + assert "beijing" in text_choice.message.content.lower(), "The output do not contain keywords." + + # Verify text output same as audio output + audio_content = convert_audio_to_text(audio_data) + print(f"text content is: {text_content}") + print(f"audio content is: {audio_content}") + assert cosine_similarity_text(audio_content.lower(), text_content.lower()) > 0.9, ( + "The audio content is not same as the text" + ) diff --git a/tests/e2e/stage_configs/qwen3_omni_ci.yaml b/tests/e2e/stage_configs/qwen3_omni_ci.yaml new file mode 100644 index 00000000000..5106b185419 --- /dev/null +++ b/tests/e2e/stage_configs/qwen3_omni_ci.yaml @@ -0,0 +1,95 @@ +# Stage config for running Qwen3-Omni-MoE with 3-stage architecture +# Stage 0: Thinker (multimodal understanding + text generation) +# Stage 1: Talker (text embeddings → 16-layer RVQ codec codes) +# Stage 2: Code2Wav (8-layer RVQ codes → audio waveform) + +# The following config has been verified on 2x H100-80G GPUs. +stage_args: +- stage_id: 0 + runtime: + devices: "0,1" + max_batch_size: 1 + engine_args: + model_stage: thinker + model_arch: Qwen3OmniMoeForConditionalGeneration + worker_cls: vllm_omni.worker.gpu_ar_worker.GPUARWorker + scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler + gpu_memory_utilization: 0.6 + enforce_eager: true + trust_remote_code: true + engine_output_type: latent # Output hidden states for talker + distributed_executor_backend: "mp" + max_num_batched_tokens: 32768 + enable_prefix_caching: false + hf_config_name: thinker_config + tensor_parallel_size: 2 + final_output: true + final_output_type: text + is_comprehension: true + default_sampling_params: + temperature: 0.4 + top_p: 0.9 + top_k: 1 + max_tokens: 100 + seed: 42 + ignore_eos: False + detokenize: True + repetition_penalty: 1.05 + +- stage_id: 1 + runtime: + devices: "1" + max_batch_size: 1 + engine_args: + model_stage: talker + model_arch: Qwen3OmniMoeForConditionalGeneration + worker_cls: vllm_omni.worker.gpu_ar_worker.GPUARWorker + scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler + gpu_memory_utilization: 0.3 + enforce_eager: true + trust_remote_code: true + engine_output_type: latent # Output codec codes for code2wav + enable_prefix_caching: false + max_num_batched_tokens: 32768 + distributed_executor_backend: "mp" + hf_config_name: talker_config + engine_input_source: [0] + custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker + default_sampling_params: + temperature: 0.9 + top_k: 50 + max_tokens: 100 + seed: 42 + detokenize: False + repetition_penalty: 1.05 + stop_token_ids: [2150] + +- stage_id: 2 + runtime: + devices: "0" + max_batch_size: 1 + engine_args: + model_stage: code2wav + model_arch: Qwen3OmniMoeForConditionalGeneration + worker_cls: vllm_omni.worker.gpu_generation_worker.GPUGenerationWorker + scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler + enforce_eager: true + trust_remote_code: true + enable_prefix_caching: false + engine_output_type: audio # Final output: audio waveform + gpu_memory_utilization: 0.1 + distributed_executor_backend: "mp" + max_num_batched_tokens: 1000000 + hf_config_name: thinker_config + engine_input_source: [1] + custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav + final_output: true + final_output_type: audio + default_sampling_params: + temperature: 0.0 + top_p: 1.0 + top_k: -1 + max_tokens: 200 + seed: 42 + detokenize: True + repetition_penalty: 1.1