diff --git a/.buildkite/test-nightly.yaml b/.buildkite/test-nightly.yaml index 92534e15cc..f665de2706 100644 --- a/.buildkite/test-nightly.yaml +++ b/.buildkite/test-nightly.yaml @@ -55,3 +55,40 @@ steps: - "HF_HOME=/fsx/hf_cache" volumes: - "/fsx/hf_cache:/fsx/hf_cache" + + + - label: "Omni Model Perf Test" + timeout_in_minutes: 120 + depends_on: image-build + if: build.env("NIGHTLY") == "1" + commands: + - export VLLM_WORKER_MULTIPROC_METHOD=spawn + - pytest -s -v tests/perf/scripts/run_benchmark.py + agents: + queue: "mithril-h100-pool" + plugins: + - kubernetes: + podSpec: + containers: + - image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + resources: + limits: + nvidia.com/gpu: 2 + volumeMounts: + - name: devshm + mountPath: /dev/shm + - name: hf-cache + mountPath: /root/.cache/huggingface + env: + - name: HF_HOME + value: /root/.cache/huggingface + nodeSelector: + node.kubernetes.io/instance-type: gpu-h100-sxm + volumes: + - name: devshm + emptyDir: + medium: Memory + - name: hf-cache + hostPath: + path: /mnt/hf-cache + type: DirectoryOrCreate diff --git a/tests/conftest.py b/tests/conftest.py index 840494d089..267386aa38 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -828,7 +828,7 @@ def delete_by_path(config_dict: dict, path: str) -> None: # Find stage by ID target_stage = None for stage in stage_args: - if stage.get("stage_id") == stage_id: + if stage.get("stage_id") == int(stage_id): target_stage = stage break @@ -847,43 +847,42 @@ def delete_by_path(config_dict: dict, path: str) -> None: # Delete entire key del config[key] - if updates: - # Apply updates - for key, value in updates.items(): - if key == "stage_args": - if value and isinstance(value, dict): - stage_args = config.get("stage_args", []) - if not stage_args: - raise ValueError("stage_args does not exist in config") - - for stage_id, stage_updates in value.items(): - # Find stage by ID - target_stage = None - for stage in stage_args: - if stage.get("stage_id") == stage_id: - target_stage = stage - break - - if target_stage is None: - available_ids = [s.get("stage_id") for s in stage_args if "stage_id" in s] - raise KeyError(f"Stage ID {stage_id} not found, available: {available_ids}") - - # Apply updates to this stage - for path, val in stage_updates.items(): - # Check if this is a simple key (not dot-separated) - # Example: 'engine_input_source' vs 'engine_args.max_model_len' - if "." not in path: - # Direct key assignment (e.g., updating a list value) - target_stage[path] = val - else: - # Dot-separated path (e.g., nested dict access) - apply_update(target_stage, path, val) - elif "." in key: - # Apply using dot-separated path - apply_update(config, key, value) - else: - # Direct top-level key - config[key] = value + # Apply updates + for key, value in updates.items(): + if key == "stage_args": + if value and isinstance(value, dict): + stage_args = config.get("stage_args", []) + if not stage_args: + raise ValueError("stage_args does not exist in config") + + for stage_id, stage_updates in value.items(): + # Find stage by ID + target_stage = None + for stage in stage_args: + if stage.get("stage_id") == int(stage_id): + target_stage = stage + break + + if target_stage is None: + available_ids = [s.get("stage_id") for s in stage_args if "stage_id" in s] + raise KeyError(f"Stage ID {stage_id} not found, available: {available_ids}") + + # Apply updates to this stage + for path, val in stage_updates.items(): + # Check if this is a simple key (not dot-separated) + # Example: 'engine_input_source' vs 'engine_args.max_model_len' + if "." not in path: + # Direct key assignment (e.g., updating a list value) + target_stage[path] = val + else: + # Dot-separated path (e.g., nested dict access) + apply_update(target_stage, path, val) + elif "." in key: + # Apply using dot-separated path + apply_update(config, key, value) + else: + # Direct top-level key + config[key] = value # Save to new file with timestamp timestamp = int(time.time()) diff --git a/tests/perf/scripts/run_benchmark.py b/tests/perf/scripts/run_benchmark.py new file mode 100644 index 0000000000..5895fe2a15 --- /dev/null +++ b/tests/perf/scripts/run_benchmark.py @@ -0,0 +1,255 @@ +import os + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" +os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" + +import json +import subprocess +import threading +from datetime import datetime +from pathlib import Path +from typing import Any + +import pytest + +from tests.conftest import OmniServer, modify_stage_config + + +def load_configs(config_path: str) -> list[dict[str, Any]]: + try: + abs_path = Path(config_path).resolve() + with open(abs_path, encoding="utf-8") as f: + configs = json.load(f) + + return configs + + except json.JSONDecodeError as e: + raise ValueError(f"JSON parsing error: {str(e)}") + except FileNotFoundError: + raise ValueError(f"Configuration file not found: {config_path}") + except Exception as e: + raise RuntimeError(f"Failed to load configuration file: {str(e)}") + + +def modify_stage(default_path, updates, deletes): + kwargs = {} + if updates is not None: + kwargs["updates"] = updates + if deletes is not None: + kwargs["deletes"] = deletes + if kwargs: + path = modify_stage_config(default_path, **kwargs) + else: + path = default_path + + return path + + +def create_unique_server_params(configs: list[dict[str, Any]]) -> list[tuple[str, str, str]]: + unique_params = set() + for config in configs: + test_name = config["test_name"] + model = config["server_params"]["model"] + stage_config_name = config["server_params"]["stage_config_name"] + stage_config_path = str(Path(__file__).parent.parent / "stage_configs" / stage_config_name) + delete = config["server_params"].get("delete", None) + update = config["server_params"].get("update", None) + stage_config_path = modify_stage(stage_config_path, update, delete) + unique_params.add((test_name, model, stage_config_path)) + + return list(unique_params) + + +def create_test_parameter_mapping(configs: list[dict[str, Any]]) -> dict[str, dict]: + mapping = {} + for config in configs: + test_name = config["test_name"] + if test_name not in mapping: + mapping[test_name] = { + "test_name": test_name, + "benchmark_params": [], + } + mapping[test_name]["benchmark_params"].extend(config["benchmark_params"]) + return mapping + + +CONFIG_FILE_PATH = str(Path(__file__).parent.parent / "tests" / "test.json") +BENCHMARK_CONFIGS = load_configs(CONFIG_FILE_PATH) + + +test_params = create_unique_server_params(BENCHMARK_CONFIGS) +server_to_benchmark_mapping = create_test_parameter_mapping(BENCHMARK_CONFIGS) + +_omni_server_lock = threading.Lock() + + +@pytest.fixture(scope="module") +def omni_server(request): + """Start vLLM-Omni server as a subprocess with actual model weights. + Uses session scope so the server starts only once for the entire test session. + Multi-stage initialization can take 10-20+ minutes. + """ + with _omni_server_lock: + test_name, model, stage_config_path = request.param + + print(f"Starting OmniServer with test: {test_name}, model: {model}") + + with OmniServer(model, ["--stage-configs-path", stage_config_path, "--stage-init-timeout", "120"]) as server: + print("OmniServer started successfully") + yield server + print("OmniServer stopping...") + + print("OmniServer stopped") + + +def run_benchmark(args: list, test_name: str, flow, dataset_name: str, num_prompt) -> Any: + """Generate synthetic image with random values.""" + current_dt = datetime.now().strftime("%Y%m%d-%H%M%S") + result_filename = f"result_{test_name}_{dataset_name}_{flow}_{num_prompt}_{current_dt}.json" + if "--result-filename" in args: + print(f"The result file will be overwritten by {result_filename}") + command = ( + ["vllm", "bench", "serve", "--omni"] + + args + + [ + "--backend", + "openai-chat-omni", + "--endpoint", + "/v1/chat/completions", + "--save-result", + "--result-filename", + result_filename, + ] + ) + process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True + ) + + for line in iter(process.stdout.readline, ""): + print(line, end=" ") + + for line in iter(process.stderr.readline, ""): + print(line, end=" ") + + if "--result-dir" in args: + index = args.index("--result-dir") + result_dir = args[index + 1] + else: + result_dir = "./" + + with open(os.path.join(result_dir, result_filename), encoding="utf-8") as f: + result = json.load(f) + return result + + +def get_benchmark_params_for_server(test_name: str) -> list: + if test_name not in server_to_benchmark_mapping: + return [] + return server_to_benchmark_mapping[test_name]["benchmark_params"] + + +def create_benchmark_indices(): + indices = [] + for test_name, config_data in server_to_benchmark_mapping.items(): + params_list = config_data["benchmark_params"] + for idx in range(len(params_list)): + indices.append((test_name, idx)) + return indices + + +benchmark_indices = create_benchmark_indices() + + +@pytest.fixture(params=benchmark_indices) +def benchmark_params(request, omni_server): + """Benchmark parameters fixture with proper parametrization""" + test_name, param_index = request.param + all_params = get_benchmark_params_for_server(test_name) + + if not all_params: + raise ValueError(f"No benchmark parameters found for test: {test_name}") + + if param_index >= len(all_params): + raise ValueError(f"No benchmark parameters found for index {param_index} in test: {test_name}") + + return {"test_name": test_name, "params": all_params[param_index]} + + +def assert_result(result, params, num_prompt): + assert result["completed"] == num_prompt, "Request failures exist" + baseline_data = params.get("baseline", {}) + for metric_name, baseline_value in baseline_data.items(): + current_value = result[metric_name] + if "throughput" in metric_name: + assert current_value >= baseline_value, f"{metric_name}: {current_value} < {baseline_value}" + else: + assert current_value <= baseline_value, f"{metric_name}: {current_value} > {baseline_value}" + + +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +@pytest.mark.parametrize("benchmark_params", benchmark_indices, indirect=True) +def test_performance_benchmark(omni_server, benchmark_params): + test_name = benchmark_params["test_name"] + params = benchmark_params["params"] + dataset_name = params.get("dataset_name", "") + + host = omni_server.host + port = omni_server.port + model = omni_server.model + + print(f"Running benchmark for model: {model}") + print(f"Benchmark parameters: {benchmark_params}") + + def to_list(value, default=None): + if value is None: + return [] if default is None else [default] + return [value] if not isinstance(value, (list, tuple)) else list(value) + + qps_list = to_list(params.get("request_rate")) + num_prompt_list = to_list(params.get("num_prompts")) + max_concurrency_list = to_list(params.get("max_concurrency")) + + max_len = max(len(qps_list), len(max_concurrency_list)) + if len(num_prompt_list) == 1 and max_len > 1: + num_prompt_list = num_prompt_list * max_len + elif max_len == 1 and len(num_prompt_list) > 1: + if len(qps_list) == 1: + qps_list = qps_list * len(num_prompt_list) + if len(max_concurrency_list) == 1: + max_concurrency_list = max_concurrency_list * len(num_prompt_list) + max_len = max(len(qps_list), len(max_concurrency_list)) + elif len(num_prompt_list) != max_len and max_len > 0: + raise ValueError("The number of prompts does not match the QPS or max_concurrency") + + args = ["--host", host, "--port", str(port)] + exclude_keys = {"request_rate", "baseline", "num_prompts", "max_concurrency"} + + for key, value in params.items(): + if key in exclude_keys or value is None: + continue + + arg_name = f"--{key.replace('_', '-')}" + + if isinstance(value, bool) and value: + args.append(arg_name) + elif isinstance(value, dict): + json_str = json.dumps(value, ensure_ascii=False, separators=(",", ":")) + args.extend([arg_name, json_str]) + elif not isinstance(value, bool): + args.extend([arg_name, str(value)]) + + # QPS test + for qps, num_prompt in zip(qps_list, num_prompt_list): + args = args + ["--request-rate", str(qps), "--num-prompts", str(num_prompt)] + result = run_benchmark( + args=args, test_name=test_name, flow=qps, dataset_name=dataset_name, num_prompt=num_prompt + ) + assert_result(result, params, num_prompt=num_prompt) + + # concurrency test + for concurrency, num_prompt in zip(max_concurrency_list, num_prompt_list): + args = args + ["--max-concurrency", str(concurrency), "--num-prompts", str(num_prompt), "--request-rate", "inf"] + result = run_benchmark( + args=args, test_name=test_name, flow=concurrency, dataset_name=dataset_name, num_prompt=num_prompt + ) + assert_result(result, params, num_prompt=num_prompt) diff --git a/tests/perf/stage_configs/qwen3_omni.yaml b/tests/perf/stage_configs/qwen3_omni.yaml new file mode 100644 index 0000000000..802f8dd249 --- /dev/null +++ b/tests/perf/stage_configs/qwen3_omni.yaml @@ -0,0 +1,101 @@ +# Stage config for running Qwen3-Omni-MoE with 3-stage architecture +# Stage 0: Thinker (multimodal understanding + text generation) +# Stage 1: Talker (text embeddings → 8-layer RVQ codec codes) +# Stage 2: Code2Wav (8-layer RVQ codes → audio waveform) + +# The following config has been verified on 2x H100-80G GPUs. +async_chunk: false +stage_args: + - stage_id: 0 + stage_type: llm # Use llm stage type to launch OmniLLM + runtime: + devices: "0" + max_batch_size: 64 + engine_args: + model_stage: thinker + model_arch: Qwen3OmniMoeForConditionalGeneration + worker_type: ar + scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler + gpu_memory_utilization: 0.9 + enforce_eager: false + trust_remote_code: true + engine_output_type: latent # Output hidden states for talker + distributed_executor_backend: "mp" + enable_prefix_caching: false + max_num_batched_tokens: 32768 + hf_config_name: thinker_config + tensor_parallel_size: 1 + final_output: true + final_output_type: text + is_comprehension: true + default_sampling_params: + temperature: 0.4 + top_p: 0.9 + top_k: 1 + max_tokens: 2048 + seed: 42 + detokenize: True + repetition_penalty: 1.05 + + - stage_id: 1 + stage_type: llm # Use llm stage type to launch OmniLLM + runtime: + devices: "1" + max_batch_size: 64 + engine_args: + model_stage: talker + model_arch: Qwen3OmniMoeForConditionalGeneration + worker_type: ar + scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler + gpu_memory_utilization: 0.6 + enforce_eager: false + trust_remote_code: true + engine_output_type: latent # Output codec codes for code2wav + enable_prefix_caching: false + max_num_batched_tokens: 32768 + distributed_executor_backend: "mp" + hf_config_name: talker_config + engine_input_source: [0] + custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker + # final_output: true + # final_output_type: text + default_sampling_params: + temperature: 0.9 + top_k: 50 + max_tokens: 4096 + seed: 42 + detokenize: False + repetition_penalty: 1.05 + stop_token_ids: [2150] + + - stage_id: 2 + stage_type: llm # Use llm stage type to launch OmniLLM + runtime: + devices: "1" + max_batch_size: 64 + engine_args: + model_stage: code2wav + model_arch: Qwen3OmniMoeForConditionalGeneration + worker_type: generation + scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler + enforce_eager: true + trust_remote_code: true + async_scheduling: false + enable_prefix_caching: false + engine_output_type: audio # Final output: audio waveform + gpu_memory_utilization: 0.1 + distributed_executor_backend: "mp" + max_num_batched_tokens: 1000000 + hf_config_name: thinker_config + engine_input_source: [1] + custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav + final_output: true + final_output_type: audio + default_sampling_params: + temperature: 0.0 + top_p: 1.0 + top_k: -1 + max_tokens: 65536 + seed: 42 + detokenize: True + repetition_penalty: 1.1 diff --git a/tests/perf/tests/test.json b/tests/perf/tests/test.json new file mode 100644 index 0000000000..da965a5180 --- /dev/null +++ b/tests/perf/tests/test.json @@ -0,0 +1,266 @@ +[ + { + "test_name": "test_qwen3_omni", + "server_params": { + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "stage_config_name": "qwen3_omni.yaml" + }, + "benchmark_params": [ + { + "dataset_name": "random", + "num_prompts": [ + 10, + 40, + 100 + ], + "request_rate": [ + 0.1, + 0.5, + 1 + ], + "random_input_len": 2500, + "random_output_len": 900, + "ignore_eos": true, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": 100000, + "mean_audio_ttfp_ms": 100000, + "mean_audio_rtf": 100000 + } + }, + { + "dataset_name": "random", + "num_prompts": [ + 10, + 40, + 100 + ], + "max_concurrency": [ + 1, + 4, + 10 + ], + "random_input_len": 2500, + "random_output_len": 900, + "ignore_eos": true, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": 100000, + "mean_audio_ttfp_ms": 100000, + "mean_audio_rtf": 100000 + } + }, + { + "dataset_name": "random-mm", + "num_prompts": [ + 10, + 40, + 100 + ], + "max_concurrency": [ + 1, + 4, + 10 + ], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 3, + "random_mm_num_mm_items_range_ratio": 0, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1, + "audio": 1 + }, + "random_mm_bucket_config": { + "(32, 32, 1)": 0.5, + "(0, 1, 1)": 0.1, + "(32, 32, 2)": 0.4 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": 100000, + "mean_audio_ttfp_ms": 100000, + "mean_audio_rtf": 100000 + } + }, + { + "dataset_name": "random-mm", + "num_prompts": [ + 10, + 40, + 100 + ], + "request_rate": [ + 0.1, + 0.5, + 1 + ], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 3, + "random_mm_num_mm_items_range_ratio": 0, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1, + "audio": 1 + }, + "random_mm_bucket_config": { + "(32, 32, 1)": 0.5, + "(0, 1, 1)": 0.1, + "(32, 32, 2)": 0.4 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": 100000, + "mean_audio_ttfp_ms": 100000, + "mean_audio_rtf": 100000 + } + } + ] + }, + { + "test_name": "test_qwen3_omni_chunk", + "server_params": { + "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct", + "stage_config_name": "qwen3_omni.yaml", + "update": { + "async_chunk": true, + "stage_args": { + "0": { + "engine_args.custom_process_next_stage_input_func": "vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker_async_chunk" + }, + "1": { + "engine_args.custom_process_next_stage_input_func": "vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav_async_chunk" + } + } + }, + "delete": { + "stage_args": { + "2": [ + "custom_process_input_func" + ] + } + } + }, + "benchmark_params": [ + { + "dataset_name": "random", + "num_prompts": [ + 10, + 40, + 100 + ], + "request_rate": [ + 0.1, + 0.5, + 1 + ], + "random_input_len": 2500, + "random_output_len": 900, + "ignore_eos": true, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": 100000, + "mean_audio_ttfp_ms": 100000, + "mean_audio_rtf": 100000 + } + }, + { + "dataset_name": "random", + "num_prompts": [ + 10, + 40, + 100 + ], + "max_concurrency": [ + 1, + 4, + 10 + ], + "random_input_len": 2500, + "random_output_len": 900, + "ignore_eos": true, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": 100000, + "mean_audio_ttfp_ms": 100000, + "mean_audio_rtf": 100000 + } + }, + { + "dataset_name": "random-mm", + "num_prompts": [ + 10, + 40, + 100 + ], + "max_concurrency": [ + 1, + 4, + 10 + ], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 3, + "random_mm_num_mm_items_range_ratio": 0, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1, + "audio": 1 + }, + "random_mm_bucket_config": { + "(32, 32, 1)": 0.5, + "(0, 1, 1)": 0.1, + "(32, 32, 2)": 0.4 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": 100000, + "mean_audio_ttfp_ms": 100000, + "mean_audio_rtf": 100000 + } + }, + { + "dataset_name": "random-mm", + "num_prompts": [ + 10, + 40, + 100 + ], + "request_rate": [ + 0.1, + 0.5, + 1 + ], + "random_input_len": 100, + "random_output_len": 100, + "random_range_ratio": 0.0, + "ignore_eos": true, + "random_mm_base_items_per_request": 3, + "random_mm_num_mm_items_range_ratio": 0, + "random_mm_limit_mm_per_prompt": { + "image": 1, + "video": 1, + "audio": 1 + }, + "random_mm_bucket_config": { + "(32, 32, 1)": 0.5, + "(0, 1, 1)": 0.1, + "(32, 32, 2)": 0.4 + }, + "percentile-metrics": "ttft,tpot,itl,e2el,audio_rtf,audio_ttfp,audio_duration", + "baseline": { + "mean_ttft_ms": 100000, + "mean_audio_ttfp_ms": 100000, + "mean_audio_rtf": 100000 + } + } + ] + } +]