diff --git a/.buildkite/test-nightly.yml b/.buildkite/test-nightly.yml index 8b21acfa7c5..8f141576d38 100644 --- a/.buildkite/test-nightly.yml +++ b/.buildkite/test-nightly.yml @@ -62,6 +62,27 @@ steps: volumes: - "/fsx/hf_cache:/fsx/hf_cache" + - label: ":full_moon: Qwen3-TTS Non-Async-Chunk E2E Test" + timeout_in_minutes: 30 + depends_on: upload-nightly-pipeline + if: build.env("NIGHTLY") == "1" + commands: + - | + export VLLM_WORKER_MULTIPROC_METHOD=spawn + pytest -s -v tests/e2e/online_serving/test_qwen3_tts.py::TestQwen3TTSNoAsyncChunk + agents: + queue: "gpu_4_queue" + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + shm-size: "8gb" + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + - label: ":full_moon: Omni Model Perf Test with H100" key: nightly-performance timeout_in_minutes: 180 diff --git a/benchmarks/qwen3-tts/vllm_omni/bench_async_chunk.py b/benchmarks/qwen3-tts/vllm_omni/bench_async_chunk.py new file mode 100644 index 00000000000..3497ae82152 --- /dev/null +++ b/benchmarks/qwen3-tts/vllm_omni/bench_async_chunk.py @@ -0,0 +1,301 @@ +"""Benchmark comparing async_chunk on vs off for Qwen3-TTS. + +Measures TTFP (Time-to-First-Packet), E2E latency, and RTF across +concurrency levels for both async_chunk modes. Saves results as JSON. + +Usage: + # Run against a server already serving with a given config: + python bench_async_chunk.py \ + --host 127.0.0.1 --port 8000 \ + --config-name async_chunk_on \ + --num-prompts 50 \ + --max-concurrency 1 10 \ + --result-dir results/ +""" + +import argparse +import asyncio +import json +import time +from dataclasses import asdict, dataclass, field +from datetime import datetime +from pathlib import Path + +import aiohttp +import numpy as np +from tqdm.asyncio import tqdm + +PROMPTS = [ + "Hello, welcome to the voice synthesis benchmark test.", + "She said she would be here by noon, but nobody showed up.", + "The quick brown fox jumps over the lazy dog near the riverbank.", + "I can't believe how beautiful the sunset looks from up here on the mountain.", + "Please remember to bring your identification documents to the appointment tomorrow morning.", + "Have you ever wondered what it would be like to travel through time and visit ancient civilizations?", + "The restaurant on the corner serves the best pasta I have ever tasted in my entire life.", + "After the meeting, we should discuss the quarterly results and plan for the next phase.", + "Learning a new language takes patience, practice, and a genuine curiosity about other cultures.", + "The train leaves at half past seven, so we need to arrive at the station before then.", + "Could you please turn down the music a little bit, I'm trying to concentrate on my work.", + "It was a dark and stormy night when the old lighthouse keeper heard a knock at the door.", +] + + +@dataclass +class RequestResult: + success: bool = False + ttfp: float = 0.0 + e2e: float = 0.0 + audio_bytes: int = 0 + audio_duration: float = 0.0 + rtf: float = 0.0 + prompt: str = "" + error: str = "" + + +@dataclass +class BenchmarkResult: + config_name: str = "" + concurrency: int = 0 + num_prompts: int = 0 + completed: int = 0 + failed: int = 0 + duration_s: float = 0.0 + mean_ttfp_ms: float = 0.0 + median_ttfp_ms: float = 0.0 + std_ttfp_ms: float = 0.0 + p90_ttfp_ms: float = 0.0 + p95_ttfp_ms: float = 0.0 + p99_ttfp_ms: float = 0.0 + mean_e2e_ms: float = 0.0 + median_e2e_ms: float = 0.0 + std_e2e_ms: float = 0.0 + p90_e2e_ms: float = 0.0 + p95_e2e_ms: float = 0.0 + p99_e2e_ms: float = 0.0 + mean_rtf: float = 0.0 + median_rtf: float = 0.0 + std_rtf: float = 0.0 + mean_audio_duration_s: float = 0.0 + total_audio_duration_s: float = 0.0 + audio_throughput: float = 0.0 + request_throughput: float = 0.0 + per_request: list = field(default_factory=list) + + +def pcm_bytes_to_duration(num_bytes: int, sample_rate: int = 24000, sample_width: int = 2) -> float: + return num_bytes / sample_width / sample_rate + + +async def send_tts_request( + session: aiohttp.ClientSession, + api_url: str, + prompt: str, + voice: str = "vivian", + language: str = "English", + stream: bool = True, + pbar: tqdm | None = None, +) -> RequestResult: + payload = { + "input": prompt, + "voice": voice, + "language": language, + "stream": stream, + "response_format": "pcm", + } + + result = RequestResult(prompt=prompt) + st = time.perf_counter() + + try: + async with session.post(api_url, json=payload) as response: + if response.status != 200: + result.error = f"HTTP {response.status}: {await response.text()}" + return result + + first_chunk = True + total_bytes = 0 + + async for chunk in response.content.iter_any(): + if first_chunk and len(chunk) > 0: + result.ttfp = time.perf_counter() - st + first_chunk = False + total_bytes += len(chunk) + + result.e2e = time.perf_counter() - st + result.audio_bytes = total_bytes + result.audio_duration = pcm_bytes_to_duration(total_bytes) + if result.audio_duration > 0: + result.rtf = result.e2e / result.audio_duration + result.success = True + + except Exception as e: + result.error = str(e) + result.e2e = time.perf_counter() - st + + if pbar: + pbar.update(1) + return result + + +async def run_benchmark( + host: str, + port: int, + num_prompts: int, + max_concurrency: int, + num_warmups: int = 3, + voice: str = "vivian", + language: str = "English", + stream: bool = True, +) -> BenchmarkResult: + api_url = f"http://{host}:{port}/v1/audio/speech" + + connector = aiohttp.TCPConnector(limit=max_concurrency, limit_per_host=max_concurrency, keepalive_timeout=60) + session = aiohttp.ClientSession(connector=connector, timeout=aiohttp.ClientTimeout(total=600)) + + if num_warmups > 0: + print(f" Warming up with {num_warmups} requests...") + warmup_tasks = [ + send_tts_request(session, api_url, PROMPTS[i % len(PROMPTS)], voice, language, stream) + for i in range(num_warmups) + ] + await asyncio.gather(*warmup_tasks) + print(" Warmup done.") + + request_prompts = [PROMPTS[i % len(PROMPTS)] for i in range(num_prompts)] + + print(f" Running {num_prompts} requests with concurrency={max_concurrency}...") + semaphore = asyncio.Semaphore(max_concurrency) + pbar = tqdm(total=num_prompts, desc=f" concurrency={max_concurrency}") + + async def limited_request(prompt): + async with semaphore: + return await send_tts_request(session, api_url, prompt, voice, language, stream, pbar) + + start_time = time.perf_counter() + tasks = [asyncio.create_task(limited_request(p)) for p in request_prompts] + results: list[RequestResult] = await asyncio.gather(*tasks) + duration = time.perf_counter() - start_time + pbar.close() + + await session.close() + + successful = [r for r in results if r.success] + failed = [r for r in results if not r.success] + + bench = BenchmarkResult( + concurrency=max_concurrency, + num_prompts=num_prompts, + completed=len(successful), + failed=len(failed), + duration_s=duration, + ) + + if successful: + ttfps = [r.ttfp * 1000 for r in successful] + e2es = [r.e2e * 1000 for r in successful] + rtfs = [r.rtf for r in successful] + audio_durs = [r.audio_duration for r in successful] + + bench.mean_ttfp_ms = float(np.mean(ttfps)) + bench.median_ttfp_ms = float(np.median(ttfps)) + bench.std_ttfp_ms = float(np.std(ttfps)) + bench.p90_ttfp_ms = float(np.percentile(ttfps, 90)) + bench.p95_ttfp_ms = float(np.percentile(ttfps, 95)) + bench.p99_ttfp_ms = float(np.percentile(ttfps, 99)) + + bench.mean_e2e_ms = float(np.mean(e2es)) + bench.median_e2e_ms = float(np.median(e2es)) + bench.std_e2e_ms = float(np.std(e2es)) + bench.p90_e2e_ms = float(np.percentile(e2es, 90)) + bench.p95_e2e_ms = float(np.percentile(e2es, 95)) + bench.p99_e2e_ms = float(np.percentile(e2es, 99)) + + bench.mean_rtf = float(np.mean(rtfs)) + bench.median_rtf = float(np.median(rtfs)) + bench.std_rtf = float(np.std(rtfs)) + + bench.mean_audio_duration_s = float(np.mean(audio_durs)) + bench.total_audio_duration_s = float(np.sum(audio_durs)) + bench.audio_throughput = bench.total_audio_duration_s / duration + bench.request_throughput = len(successful) / duration + + bench.per_request = [ + { + "ttfp_ms": r.ttfp * 1000, + "e2e_ms": r.e2e * 1000, + "rtf": r.rtf, + "audio_duration_s": r.audio_duration, + "prompt": r.prompt, + } + for r in successful + ] + + print(f"\n{'=' * 60}") + print(f" Concurrency: {max_concurrency} | Completed: {bench.completed} | Failed: {bench.failed}") + print(f" Duration: {duration:.2f}s | Throughput: {bench.request_throughput:.2f} req/s") + print( + f" TTFP (ms): mean={bench.mean_ttfp_ms:.1f} median={bench.median_ttfp_ms:.1f}" + f" p90={bench.p90_ttfp_ms:.1f} p99={bench.p99_ttfp_ms:.1f}" + ) + print( + f" E2E (ms): mean={bench.mean_e2e_ms:.1f} median={bench.median_e2e_ms:.1f}" + f" p90={bench.p90_e2e_ms:.1f} p99={bench.p99_e2e_ms:.1f}" + ) + print(f" RTF: mean={bench.mean_rtf:.3f} median={bench.median_rtf:.3f}") + print(f" Throughput: {bench.audio_throughput:.2f} audio-sec/wall-sec") + print(f"{'=' * 60}\n") + + if failed: + for r in failed[:3]: + print(f" [ERROR] {r.error[:200]}") + + return bench + + +async def main(args): + all_results = [] + + for concurrency in args.max_concurrency: + result = await run_benchmark( + host=args.host, + port=args.port, + num_prompts=args.num_prompts, + max_concurrency=concurrency, + num_warmups=args.num_warmups, + voice=args.voice, + language=args.language, + stream=args.stream, + ) + result.config_name = args.config_name + all_results.append(asdict(result)) + + result_dir = Path(args.result_dir) + result_dir.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + result_file = result_dir / f"bench_{args.config_name}_{timestamp}.json" + + with open(result_file, "w") as f: + json.dump(all_results, f, indent=2) + print(f"Results saved to {result_file}") + + +def parse_args(): + parser = argparse.ArgumentParser(description="Qwen3-TTS async_chunk benchmark client") + parser.add_argument("--host", type=str, default="127.0.0.1") + parser.add_argument("--port", type=int, default=8000) + parser.add_argument("--num-prompts", type=int, default=50) + parser.add_argument("--max-concurrency", type=int, nargs="+", default=[1, 10]) + parser.add_argument("--num-warmups", type=int, default=3) + parser.add_argument("--voice", type=str, default="vivian") + parser.add_argument("--language", type=str, default="English") + parser.add_argument("--stream", action="store_true", default=True) + parser.add_argument("--no-stream", dest="stream", action="store_false") + parser.add_argument("--config-name", type=str, default="async_chunk_on") + parser.add_argument("--result-dir", type=str, default="results") + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_args() + asyncio.run(main(args)) diff --git a/benchmarks/qwen3-tts/vllm_omni/plot_async_chunk.py b/benchmarks/qwen3-tts/vllm_omni/plot_async_chunk.py new file mode 100644 index 00000000000..dd03d9626d9 --- /dev/null +++ b/benchmarks/qwen3-tts/vllm_omni/plot_async_chunk.py @@ -0,0 +1,249 @@ +"""Plot TTFP comparison: async_chunk off vs on. + +Generates a bar chart with improvement arrows, matching the Qwen3-Omni +async_chunk benchmark figure style. + +Usage: + python plot_async_chunk.py \ + --off results/bench_async_chunk_off_*.json \ + --on results/bench_async_chunk_on_*.json \ + --output results/qwen3_tts_async_chunk_ttfp.png + + # Also supports E2E and RTF metrics: + python plot_async_chunk.py \ + --off results/bench_async_chunk_off_*.json \ + --on results/bench_async_chunk_on_*.json \ + --metric e2e \ + --output results/qwen3_tts_async_chunk_e2e.png +""" + +import argparse +import json +from pathlib import Path + +import matplotlib.pyplot as plt +import numpy as np + +METRIC_CONFIG = { + "ttfp": { + "key": "mean_ttfp_ms", + "ylabel": "TTFP (s)", + "title": "TTFP (Time to First Audio Packet) - Qwen3-TTS, by concurrency", + "to_seconds": True, + }, + "e2e": { + "key": "mean_e2e_ms", + "ylabel": "E2E (s)", + "title": "E2E Latency - Qwen3-TTS, by concurrency", + "to_seconds": True, + }, + "rtf": { + "key": "mean_rtf", + "ylabel": "RTF", + "title": "Real-Time Factor - Qwen3-TTS, by concurrency", + "to_seconds": False, + }, +} + + +def load_results(path: str) -> list[dict]: + with open(path) as f: + return json.load(f) + + +def plot_ttfp_comparison( + off_results: list[dict], + on_results: list[dict], + metric: str, + output_path: str, + title_override: str | None = None, +): + cfg = METRIC_CONFIG[metric] + key = cfg["key"] + to_seconds = cfg["to_seconds"] + + off_map = {r["concurrency"]: r for r in off_results} + on_map = {r["concurrency"]: r for r in on_results} + concurrencies = sorted(set(off_map.keys()) & set(on_map.keys())) + + off_vals = [] + on_vals = [] + for c in concurrencies: + v_off = off_map[c][key] + v_on = on_map[c][key] + if to_seconds: + v_off /= 1000.0 + v_on /= 1000.0 + off_vals.append(v_off) + on_vals.append(v_on) + + fig, ax = plt.subplots(figsize=(8, 6)) + + x = np.arange(len(concurrencies)) + width = 0.3 + + ax.bar(x - width / 2, off_vals, width, label="async_chunk off", color="#87CEEB", edgecolor="none") + ax.bar(x + width / 2, on_vals, width, label="async_chunk on", color="#FFF8DC", edgecolor="#DDD8B8") + + # Draw improvement arrows and labels + for i in range(len(concurrencies)): + v_off = off_vals[i] + v_on = on_vals[i] + if v_on > 0: + improvement = v_off / v_on + else: + improvement = float("inf") + + # Arrow from top of off-bar to top of on-bar + arrow_start_x = x[i] - width / 2 + arrow_start_y = v_off * 0.95 + arrow_end_x = x[i] + width / 2 + arrow_end_y = v_on * 1.05 + + ax.annotate( + "", + xy=(arrow_end_x, arrow_end_y), + xytext=(arrow_start_x, arrow_start_y), + arrowprops=dict(arrowstyle="->", color="red", lw=1.5), + ) + + # Improvement label + label_x = (arrow_start_x + arrow_end_x) / 2 + label_y = arrow_start_y + (v_off - v_on) * 0.15 + ax.text( + label_x, + label_y, + f"{improvement:.1f}x improvement", + ha="center", + va="bottom", + fontsize=10, + color="red", + fontweight="bold", + ) + + title = title_override or cfg["title"] + ax.set_title(title, fontsize=13, fontweight="bold") + ax.set_ylabel(cfg["ylabel"], fontsize=12) + ax.set_xlabel("Max concurrency", fontsize=12) + ax.set_xticks(x) + ax.set_xticklabels([str(c) for c in concurrencies]) + ax.set_yscale("log") + ax.legend(loc="upper left", fontsize=11) + ax.grid(axis="y", alpha=0.3, linestyle="--") + ax.set_axisbelow(True) + + plt.tight_layout() + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + plt.savefig(output_path, dpi=150, bbox_inches="tight") + print(f"Plot saved to {output_path}") + plt.close() + + +def plot_all_metrics(off_results: list[dict], on_results: list[dict], output_path: str): + """Generate a 1x3 subplot with TTFP, E2E, and RTF comparisons.""" + off_map = {r["concurrency"]: r for r in off_results} + on_map = {r["concurrency"]: r for r in on_results} + concurrencies = sorted(set(off_map.keys()) & set(on_map.keys())) + + fig, axes = plt.subplots(1, 3, figsize=(18, 6)) + fig.suptitle("Qwen3-TTS: async_chunk on vs off", fontsize=15, fontweight="bold") + + for ax, metric in zip(axes, ["ttfp", "e2e", "rtf"]): + cfg = METRIC_CONFIG[metric] + key = cfg["key"] + to_seconds = cfg["to_seconds"] + + off_vals = [] + on_vals = [] + for c in concurrencies: + v_off = off_map[c][key] + v_on = on_map[c][key] + if to_seconds: + v_off /= 1000.0 + v_on /= 1000.0 + off_vals.append(v_off) + on_vals.append(v_on) + + x = np.arange(len(concurrencies)) + width = 0.3 + ax.bar(x - width / 2, off_vals, width, label="async_chunk off", color="#87CEEB") + ax.bar(x + width / 2, on_vals, width, label="async_chunk on", color="#FFF8DC", edgecolor="#DDD8B8") + + for i in range(len(concurrencies)): + if on_vals[i] > 0: + improvement = off_vals[i] / on_vals[i] + ax.annotate( + "", + xy=(x[i] + width / 2, on_vals[i] * 1.05), + xytext=(x[i] - width / 2, off_vals[i] * 0.95), + arrowprops=dict(arrowstyle="->", color="red", lw=1.5), + ) + label_y = off_vals[i] * 0.85 + ax.text(x[i], label_y, f"{improvement:.1f}x", ha="center", fontsize=10, color="red", fontweight="bold") + + ax.set_title(cfg["title"].split(" - ")[0], fontsize=12, fontweight="bold") + ax.set_ylabel(cfg["ylabel"], fontsize=11) + ax.set_xlabel("Max concurrency", fontsize=11) + ax.set_xticks(x) + ax.set_xticklabels([str(c) for c in concurrencies]) + if metric != "rtf": + ax.set_yscale("log") + ax.legend(fontsize=9) + ax.grid(axis="y", alpha=0.3, linestyle="--") + ax.set_axisbelow(True) + + plt.tight_layout() + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + plt.savefig(output_path, dpi=150, bbox_inches="tight") + print(f"Plot saved to {output_path}") + plt.close() + + +def print_table(off_results: list[dict], on_results: list[dict]): + off_map = {r["concurrency"]: r for r in off_results} + on_map = {r["concurrency"]: r for r in on_results} + concurrencies = sorted(set(off_map.keys()) & set(on_map.keys())) + + print("\n## Benchmark Results: async_chunk off vs on\n") + print("| Metric | Concurrency | async_chunk off | async_chunk on | Improvement |") + print("| --- | --- | --- | --- | --- |") + + for name, key, fmt in [ + ("TTFP (ms)", "mean_ttfp_ms", ".1f"), + ("E2E (ms)", "mean_e2e_ms", ".1f"), + ("RTF", "mean_rtf", ".3f"), + ("Throughput", "audio_throughput", ".2f"), + ]: + for c in concurrencies: + v_off = off_map[c].get(key, 0) + v_on = on_map[c].get(key, 0) + if v_on > 0 and key != "audio_throughput": + ratio = f"{v_off / v_on:.1f}x" + elif v_off > 0 and key == "audio_throughput": + ratio = f"{v_on / v_off:.1f}x" + else: + ratio = "N/A" + print(f"| {name} | {c} | {v_off:{fmt}} | {v_on:{fmt}} | {ratio} |") + + +def parse_args(): + parser = argparse.ArgumentParser(description="Plot async_chunk comparison for Qwen3-TTS") + parser.add_argument("--off", type=str, required=True, help="JSON results for async_chunk off") + parser.add_argument("--on", type=str, required=True, help="JSON results for async_chunk on") + parser.add_argument("--metric", type=str, default="ttfp", choices=["ttfp", "e2e", "rtf", "all"]) + parser.add_argument("--output", type=str, default="results/qwen3_tts_async_chunk.png") + parser.add_argument("--title", type=str, default=None, help="Custom title override") + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_args() + off_results = load_results(args.off) + on_results = load_results(args.on) + + print_table(off_results, on_results) + + if args.metric == "all": + plot_all_metrics(off_results, on_results, args.output) + else: + plot_ttfp_comparison(off_results, on_results, args.metric, args.output, args.title) diff --git a/benchmarks/qwen3-tts/vllm_omni/run_async_chunk_benchmark.sh b/benchmarks/qwen3-tts/vllm_omni/run_async_chunk_benchmark.sh new file mode 100755 index 00000000000..61cf7757a9b --- /dev/null +++ b/benchmarks/qwen3-tts/vllm_omni/run_async_chunk_benchmark.sh @@ -0,0 +1,167 @@ +#!/bin/bash +# Qwen3-TTS async_chunk on vs off Benchmark +# +# Starts two servers (async_chunk on and off), benchmarks both, +# and generates comparison plots. +# +# Usage: +# bash run_async_chunk_benchmark.sh +# +# Environment variables: +# GPU_DEVICE - GPU index (default: 0) +# MODEL - Model path (default: Qwen/Qwen3-TTS-12Hz-0.6B-CustomVoice) +# NUM_PROMPTS - Prompts per concurrency level (default: 50) +# CONCURRENCY - Space-separated concurrency levels (default: "1 10") +# PORT_ON - Port for async_chunk on server (default: 8000) +# PORT_OFF - Port for async_chunk off server (default: 8001) + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" +cd "$PROJECT_ROOT" + +GPU_DEVICE="${GPU_DEVICE:-0}" +MODEL="${MODEL:-Qwen/Qwen3-TTS-12Hz-0.6B-CustomVoice}" +NUM_PROMPTS="${NUM_PROMPTS:-50}" +CONCURRENCY="${CONCURRENCY:-1 10}" +NUM_WARMUPS="${NUM_WARMUPS:-3}" +PORT_ON="${PORT_ON:-8000}" +PORT_OFF="${PORT_OFF:-8001}" +RESULT_DIR="${SCRIPT_DIR}/results" +TIMESTAMP="$(date +%Y%m%d_%H%M%S)" + +STAGE_CONFIG_ON="vllm_omni/model_executor/stage_configs/qwen3_tts.yaml" +STAGE_CONFIG_OFF="vllm_omni/model_executor/stage_configs/qwen3_tts_no_async_chunk.yaml" + +mkdir -p "${RESULT_DIR}" + +echo "============================================================" +echo " Qwen3-TTS async_chunk Benchmark" +echo "============================================================" +echo " GPU: ${GPU_DEVICE}" +echo " Model: ${MODEL}" +echo " Prompts: ${NUM_PROMPTS}" +echo " Concurrency: ${CONCURRENCY}" +echo " Port (on/off): ${PORT_ON} / ${PORT_OFF}" +echo " Results: ${RESULT_DIR}" +echo "============================================================" + +cleanup() { + echo "Cleaning up servers..." + kill "$PID_ON" 2>/dev/null || true + kill "$PID_OFF" 2>/dev/null || true + wait "$PID_ON" 2>/dev/null || true + wait "$PID_OFF" 2>/dev/null || true +} +trap cleanup EXIT + +wait_for_server() { + local port=$1 + local name=$2 + local max_wait=300 + local elapsed=0 + echo "Waiting for ${name} server on port ${port}..." + while ! curl -s "http://localhost:${port}/health" >/dev/null 2>&1; do + sleep 5 + elapsed=$((elapsed + 5)) + if [ $elapsed -ge $max_wait ]; then + echo "ERROR: ${name} server failed to start within ${max_wait}s" + exit 1 + fi + done + echo "${name} server ready (${elapsed}s)" +} + +# ---- Phase 1: Start async_chunk ON server ---- +echo "" +echo "[Phase 1] Starting async_chunk ON server on port ${PORT_ON}..." +CUDA_VISIBLE_DEVICES=${GPU_DEVICE} vllm-omni serve "${MODEL}" \ + --stage-configs-path "${STAGE_CONFIG_ON}" \ + --host 0.0.0.0 --port "${PORT_ON}" \ + --trust-remote-code --enforce-eager --omni \ + > "${RESULT_DIR}/server_on_${TIMESTAMP}.log" 2>&1 & +PID_ON=$! + +wait_for_server "${PORT_ON}" "async_chunk_on" + +echo "[Phase 1] Benchmarking async_chunk ON..." +# shellcheck disable=SC2086 +python "${SCRIPT_DIR}/bench_async_chunk.py" \ + --host 127.0.0.1 --port "${PORT_ON}" \ + --config-name "async_chunk_on" \ + --num-prompts "${NUM_PROMPTS}" \ + --max-concurrency ${CONCURRENCY} \ + --num-warmups "${NUM_WARMUPS}" \ + --result-dir "${RESULT_DIR}" + +echo "[Phase 1] Stopping async_chunk ON server..." +kill "$PID_ON" 2>/dev/null || true +wait "$PID_ON" 2>/dev/null || true +sleep 5 + +# ---- Phase 2: Start async_chunk OFF server ---- +echo "" +echo "[Phase 2] Starting async_chunk OFF server on port ${PORT_OFF}..." +CUDA_VISIBLE_DEVICES=${GPU_DEVICE} vllm-omni serve "${MODEL}" \ + --stage-configs-path "${STAGE_CONFIG_OFF}" \ + --host 0.0.0.0 --port "${PORT_OFF}" \ + --trust-remote-code --enforce-eager --omni \ + > "${RESULT_DIR}/server_off_${TIMESTAMP}.log" 2>&1 & +PID_OFF=$! + +wait_for_server "${PORT_OFF}" "async_chunk_off" + +echo "[Phase 2] Benchmarking async_chunk OFF (non-streaming)..." +# shellcheck disable=SC2086 +python "${SCRIPT_DIR}/bench_async_chunk.py" \ + --host 127.0.0.1 --port "${PORT_OFF}" \ + --config-name "async_chunk_off" \ + --num-prompts "${NUM_PROMPTS}" \ + --max-concurrency ${CONCURRENCY} \ + --num-warmups "${NUM_WARMUPS}" \ + --no-stream \ + --result-dir "${RESULT_DIR}" + +echo "[Phase 2] Stopping async_chunk OFF server..." +kill "$PID_OFF" 2>/dev/null || true +wait "$PID_OFF" 2>/dev/null || true + +# ---- Phase 3: Plot results ---- +echo "" +echo "[Phase 3] Generating plots..." + +# Find the latest result files +RESULT_ON=$(ls -t "${RESULT_DIR}"/bench_async_chunk_on_*.json 2>/dev/null | head -1) +RESULT_OFF=$(ls -t "${RESULT_DIR}"/bench_async_chunk_off_*.json 2>/dev/null | head -1) + +if [ -z "$RESULT_ON" ] || [ -z "$RESULT_OFF" ]; then + echo "ERROR: Could not find result files. Check logs in ${RESULT_DIR}/" + exit 1 +fi + +echo " ON results: ${RESULT_ON}" +echo " OFF results: ${RESULT_OFF}" + +# TTFP comparison (main figure) +python "${SCRIPT_DIR}/plot_async_chunk.py" \ + --off "${RESULT_OFF}" \ + --on "${RESULT_ON}" \ + --metric ttfp \ + --output "${RESULT_DIR}/qwen3_tts_async_chunk_ttfp.png" + +# All metrics comparison +python "${SCRIPT_DIR}/plot_async_chunk.py" \ + --off "${RESULT_OFF}" \ + --on "${RESULT_ON}" \ + --metric all \ + --output "${RESULT_DIR}/qwen3_tts_async_chunk_all.png" + +echo "" +echo "============================================================" +echo " Benchmark complete!" +echo " Results: ${RESULT_DIR}/" +echo " Plots:" +echo " - ${RESULT_DIR}/qwen3_tts_async_chunk_ttfp.png" +echo " - ${RESULT_DIR}/qwen3_tts_async_chunk_all.png" +echo "============================================================" diff --git a/tests/e2e/online_serving/test_qwen3_tts.py b/tests/e2e/online_serving/test_qwen3_tts.py index b4ca57835db..2ab5cdd4b6e 100644 --- a/tests/e2e/online_serving/test_qwen3_tts.py +++ b/tests/e2e/online_serving/test_qwen3_tts.py @@ -23,11 +23,9 @@ MODEL = "Qwen/Qwen3-TTS-12Hz-0.6B-CustomVoice" -def get_stage_config(): +def get_stage_config(name: str = "qwen3_tts.yaml"): """Get the stage config path for Qwen3-TTS.""" - return str( - Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / "qwen3_tts.yaml" - ) + return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name) @pytest.fixture(scope="module") @@ -225,3 +223,63 @@ def test_models_endpoint(self, omni_server) -> None: data = response.json() assert "data" in data assert len(data["data"]) > 0 + + +@pytest.fixture(scope="module") +def omni_server_no_async_chunk(): + """Start vLLM-Omni server with non-async-chunk config.""" + stage_config_path = get_stage_config("qwen3_tts_no_async_chunk.yaml") + + with OmniServer( + MODEL, + [ + "--stage-configs-path", + stage_config_path, + "--stage-init-timeout", + "120", + "--trust-remote-code", + "--enforce-eager", + "--disable-log-stats", + ], + ) as server: + yield server + + +class TestQwen3TTSNoAsyncChunk: + """E2E tests for Qwen3-TTS in non-async-chunk (full decode) mode.""" + + @pytest.mark.core_model + @pytest.mark.omni + @hardware_test(res={"cuda": "L4"}, num_cards=4) + def test_speech_english(self, omni_server_no_async_chunk) -> None: + """Test English TTS with non-async-chunk pipeline.""" + response = make_speech_request( + host=omni_server_no_async_chunk.host, + port=omni_server_no_async_chunk.port, + text="Hello, how are you?", + voice="vivian", + language="English", + ) + + assert response.status_code == 200, f"Request failed: {response.text}" + assert response.headers.get("content-type") == "audio/wav" + assert verify_wav_audio(response.content), "Response is not valid WAV audio" + assert len(response.content) > MIN_AUDIO_BYTES + + @pytest.mark.core_model + @pytest.mark.omni + @hardware_test(res={"cuda": "L4"}, num_cards=4) + def test_speech_chinese(self, omni_server_no_async_chunk) -> None: + """Test Chinese TTS with non-async-chunk pipeline.""" + response = make_speech_request( + host=omni_server_no_async_chunk.host, + port=omni_server_no_async_chunk.port, + text="你好,我是通义千问", + voice="vivian", + language="Chinese", + ) + + assert response.status_code == 200, f"Request failed: {response.text}" + assert response.headers.get("content-type") == "audio/wav" + assert verify_wav_audio(response.content), "Response is not valid WAV audio" + assert len(response.content) > MIN_AUDIO_BYTES diff --git a/vllm_omni/model_executor/stage_configs/qwen3_tts_no_async_chunk.yaml b/vllm_omni/model_executor/stage_configs/qwen3_tts_no_async_chunk.yaml new file mode 100644 index 00000000000..da22953bbac --- /dev/null +++ b/vllm_omni/model_executor/stage_configs/qwen3_tts_no_async_chunk.yaml @@ -0,0 +1,67 @@ +async_chunk: false +stage_args: + - stage_id: 0 + stage_type: llm + runtime: + devices: "0" + max_batch_size: 1 + engine_args: + model_stage: qwen3_tts + model_arch: Qwen3TTSTalkerForConditionalGeneration + hf_overrides: + architectures: [Qwen3TTSTalkerForConditionalGeneration] + worker_type: ar + scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler + enforce_eager: false + trust_remote_code: true + async_scheduling: false + enable_prefix_caching: false + engine_output_type: latent + gpu_memory_utilization: 0.3 + distributed_executor_backend: "mp" + max_num_batched_tokens: 512 + max_model_len: 4096 + default_sampling_params: + temperature: 0.9 + top_k: 50 + max_tokens: 4096 + seed: 42 + detokenize: false + repetition_penalty: 1.05 + stop_token_ids: [2150] + + - stage_id: 1 + stage_type: llm + runtime: + devices: "0" + max_batch_size: 1 + engine_args: + model_stage: code2wav + model_arch: Qwen3TTSCode2Wav + hf_overrides: + architectures: [Qwen3TTSCode2Wav] + worker_type: generation + scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler + enforce_eager: true + trust_remote_code: true + async_scheduling: false + enable_prefix_caching: false + engine_output_type: audio + gpu_memory_utilization: 0.2 + distributed_executor_backend: "mp" + max_num_batched_tokens: 8192 + max_model_len: 32768 + engine_input_source: [0] + custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_tts.talker2code2wav + final_output: true + final_output_type: audio + tts_args: + max_instructions_length: 500 + default_sampling_params: + temperature: 0.0 + top_p: 1.0 + top_k: -1 + max_tokens: 65536 + seed: 42 + detokenize: true + repetition_penalty: 1.0 diff --git a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py index 8c21052e9ee..a806d6b146e 100644 --- a/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py +++ b/vllm_omni/model_executor/stage_input_processors/qwen3_tts.py @@ -5,6 +5,37 @@ import torch +def talker2code2wav( + stage_list: list[Any], + engine_input_source: list[int], + prompt: Any = None, + requires_multimodal_data: bool = False, +) -> list[Any]: + """Non-async processor: wait for talker to finish, then pass all codes to code2wav at once.""" + from vllm_omni.inputs.data import OmniTokensPrompt + from vllm_omni.model_executor.stage_input_processors.qwen3_omni import _validate_stage_inputs + + talker_outputs = _validate_stage_inputs(stage_list, engine_input_source) + code2wav_inputs: list[OmniTokensPrompt] = [] + for talker_output in talker_outputs: + output = talker_output.outputs[0] + # audio_codes shape: [num_frames, Q] where Q=num_quantizers (16) + audio_codes = output.multimodal_output["audio_codes"].to(torch.long) + # Filter zero-padded frames (EOS/invalid steps), matching _extract_last_frame behavior + valid_mask = audio_codes.any(dim=1) + audio_codes = audio_codes[valid_mask] + # Code2Wav expects codebook-major flat: [Q*num_frames] + codec_codes = audio_codes.transpose(0, 1).cpu().reshape(-1).tolist() + code2wav_inputs.append( + OmniTokensPrompt( + prompt_token_ids=codec_codes, + multi_modal_data=None, + mm_processor_kwargs=None, + ) + ) + return code2wav_inputs + + def _extract_last_frame(pooling_output: dict[str, Any]) -> torch.Tensor | None: audio_codes = pooling_output.get("audio_codes") if not isinstance(audio_codes, torch.Tensor) or audio_codes.numel() == 0: