Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions tests/perf/scripts/run_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import os

os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0"

import json
import subprocess
import threading
from datetime import datetime
from pathlib import Path
from typing import Any

import pytest

from tests.conftest import (
OmniServer,
)


def load_configs(config_path: str) -> list[dict[str, Any]]:
try:
abs_path = Path(config_path).resolve()
with open(abs_path, encoding="utf-8") as f:
configs = json.load(f)

return configs

except json.JSONDecodeError as e:
raise ValueError(f"JSON parsing error: {str(e)}")
except FileNotFoundError:
raise ValueError(f"Configuration file not found: {config_path}")
except Exception as e:
raise RuntimeError(f"Failed to load configuration file: {str(e)}")


def create_unique_server_params(configs: list[dict[str, Any]]) -> list[tuple[str, str, str]]:
unique_params = set()
for config in configs:
test_name = config["test_name"]
model = config["server_params"]["model"]
stage_config_name = config["server_params"]["stage_config_name"]
stage_config_path = str(Path(__file__).parent.parent / "stage_configs" / stage_config_name)
unique_params.add((test_name, model, stage_config_path))

return list(unique_params)


def create_test_parameter_mapping(configs: list[dict[str, Any]]) -> dict[tuple[str, str, str], dict]:
mapping = {}
for config in configs:
test_name = config["test_name"]
model = config["server_params"]["model"]
stage_config_name = config["server_params"]["stage_config_name"]
stage_config_path = str(Path(__file__).parent.parent / "stage_configs" / stage_config_name)
server_key = (test_name, model, stage_config_path)

mapping[server_key] = {
"test_name": test_name,
"model": model,
"stage_config_path": stage_config_path,
"benchmark_params": config["benchmark_params"],
}

return mapping


CONFIG_FILE_PATH = str(Path(__file__).parent.parents / "tests" / "test.json")
BENCHMARK_CONFIGS = load_configs(CONFIG_FILE_PATH)


test_params = create_unique_server_params(BENCHMARK_CONFIGS)
server_to_benchmark_mapping = create_test_parameter_mapping(BENCHMARK_CONFIGS)

_omni_server_lock = threading.Lock()


@pytest.fixture(scope="module")
def omni_server(request):
"""Start vLLM-Omni server as a subprocess with actual model weights.
Uses session scope so the server starts only once for the entire test session.
Multi-stage initialization can take 10-20+ minutes.
"""
with _omni_server_lock:
_, model, stage_config_path = request.param

print(f"Starting OmniServer with model: {model}")

with OmniServer(model, ["--stage-configs-path", stage_config_path, "--stage-init-timeout", "120"]) as server:
print("OmniServer started successfully")
yield server
print("OmniServer stopping...")

print("OmniServer stopped")


@pytest.fixture
def benchmark_params(request, omni_server):
test_name, model, stage_config_path = request.node.callspec.params["omni_server"]
server_key = (test_name, model, stage_config_path)

if server_key not in server_to_benchmark_mapping:
raise ValueError(f"No benchmark parameters found for server key: {server_key}")

config_data = server_to_benchmark_mapping[server_key]
all_params = config_data["benchmark_params"]

param_index = request.param if hasattr(request, "param") else 0

if param_index < len(all_params):
return {"test_name": config_data["test_name"], "model": config_data["model"], "params": all_params[param_index]}
else:
raise ValueError(f"No benchmark parameters found for index {param_index}")


def run_benchmark(args: list, test_name: str, qps: float) -> Any:
"""Generate synthetic image with random values."""
current_dt = datetime.now().strftime("%Y%m%d-%H%M%S")
result_filename = f"result_{test_name}_{qps}_{current_dt}.json"
if "--result-filename" in args:
print(f"The result file will be overwritten by {result_filename}")
command = (
["vllm", "bench", "serve", "--omni"]
+ args
+ [
"--backend",
"openai-chat-omni",
"--endpoint",
"/v1/chat/completions",
"--save-result",
"--result-filename",
result_filename,
]
)
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True
)

for line in iter(process.stdout.readline, ""):
print(line, end=" ")

for line in iter(process.stderr.readline, ""):
print(line, end=" ")

if "--result-dir" in args:
index = args.index("--result-dir")
result_dir = args[index + 1]
else:
result_dir = "./"

with open(os.path.join(result_dir, result_filename), encoding="utf-8") as f:
result = json.load(f)
return result


def create_benchmark_indices():
indices = []
for server_key, config_data in server_to_benchmark_mapping.items():
params_list = config_data["benchmark_params"]
indices.extend(range(len(params_list)))
return indices


benchmark_indices = create_benchmark_indices()


@pytest.mark.parametrize("omni_server", test_params, indirect=True)
@pytest.mark.parametrize("benchmark_params", benchmark_indices, indirect=True)
def test_performance_benchmark(omni_server, benchmark_params):
test_name = benchmark_params["test_name"]
model = benchmark_params["model"]
params = benchmark_params["params"]

host = omni_server.host
port = omni_server.port

print(f"Running benchmark for model: {model}")
print(f"Benchmark parameters: {benchmark_params}")

for qps in params.get("qps", []):
args = [
"--host",
host,
"--port",
str(port),
"--dataset-name",
params.get("dataset_name", "random"),
"--num-prompts",
str(params.get("num_prompts", 100)),
"--random-input-len",
str(params.get("random_input_len", 10)),
"--random-output-len",
str(params.get("random_output_len", 10)),
"--percentile-metrics",
params.get("percentile-metrics", "ttft,tpot,itl,e2el"),
"--request-rate",
str(qps),
]

result = run_benchmark(args=args, test_name=test_name, qps=qps)
assert result["completed"] == params.get("num_prompts"), "Request failures exist"
101 changes: 101 additions & 0 deletions tests/perf/stage_configs/qwen3_omni_async_chunk.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Stage config for running Qwen3-Omni-MoE with 3-stage architecture
# Stage 0: Thinker (multimodal understanding + text generation)
# Stage 1: Talker (text embeddings → 16-layer RVQ codec codes)
# Stage 2: Code2Wav (16-layer RVQ codes → audio waveform)

# The following config has been verified on 2x H100-80G GPUs.
async_chunk: true
stage_args:
- stage_id: 0
stage_type: llm # Use llm stage type to launch OmniLLM
runtime:
devices: "0"
max_batch_size: 64
engine_args:
model_stage: thinker
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
gpu_memory_utilization: 0.9
enforce_eager: false
trust_remote_code: true
engine_output_type: latent # Output hidden states for talker
distributed_executor_backend: "mp"
enable_prefix_caching: false
max_num_batched_tokens: 32768
hf_config_name: thinker_config
tensor_parallel_size: 1
custom_process_next_stage_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker_async_chunk
final_output: true
final_output_type: text
is_comprehension: true
default_sampling_params:
temperature: 0.4
top_p: 0.9
top_k: 1
max_tokens: 2048
seed: 42
detokenize: True
repetition_penalty: 1.05

- stage_id: 1
stage_type: llm # Use llm stage type to launch OmniLLM
runtime:
devices: "1"
max_batch_size: 64
engine_args:
model_stage: talker
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
gpu_memory_utilization: 0.6
enforce_eager: false
trust_remote_code: true
engine_output_type: latent # Output codec codes for code2wav
enable_prefix_caching: false
max_num_batched_tokens: 32768
distributed_executor_backend: "mp"
hf_config_name: talker_config
custom_process_next_stage_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav_async_chunk
engine_input_source: [0]
# final_output: true
# final_output_type: text
default_sampling_params:
temperature: 0.9
top_k: 50
max_tokens: 4096
seed: 42
detokenize: False
repetition_penalty: 1.05
stop_token_ids: [2150]

- stage_id: 2
stage_type: llm # Use llm stage type to launch OmniLLM
runtime:
devices: "1"
max_batch_size: 1
engine_args:
model_stage: code2wav
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: generation
scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler
enforce_eager: true
trust_remote_code: true
async_scheduling: false
enable_prefix_caching: false
engine_output_type: audio # Final output: audio waveform
gpu_memory_utilization: 0.1
distributed_executor_backend: "mp"
max_num_batched_tokens: 10000
hf_config_name: thinker_config
engine_input_source: [1]
final_output: true
final_output_type: audio
default_sampling_params:
temperature: 0.0
top_p: 1.0
top_k: -1
max_tokens: 65536
seed: 42
detokenize: True
repetition_penalty: 1.1
Loading