diff --git a/.buildkite/test-merge.yml b/.buildkite/test-merge.yml index 3a083f0b0bc..4d431ff8217 100644 --- a/.buildkite/test-merge.yml +++ b/.buildkite/test-merge.yml @@ -208,6 +208,50 @@ steps: volumes: - "/fsx/hf_cache:/fsx/hf_cache" + - label: "Qwen3-TTS CustomVoice E2E Test" + depends_on: upload-merge-pipeline + commands: + - | + timeout 20m bash -c ' + export VLLM_LOGGING_LEVEL=DEBUG + export VLLM_WORKER_MULTIPROC_METHOD=spawn + pytest -s -v tests/e2e/online_serving/test_qwen3_tts_customvoice.py -m "advanced_model" --run-level "advanced_model" && pytest -s -v tests/e2e/offline/test_qwen3_tts_customvoice.py + ' + agents: + queue: "gpu_1_queue" + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + shm-size: "8gb" + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + + - label: "Qwen3-TTS Base E2E Test" + depends_on: upload-merge-pipeline + commands: + - | + timeout 20m bash -c ' + export VLLM_LOGGING_LEVEL=DEBUG + export VLLM_WORKER_MULTIPROC_METHOD=spawn + pytest -s -v tests/e2e/online_serving/test_qwen3_tts_base.py -m "advanced_model" --run-level "advanced_model" && pytest -s -v tests/e2e/offline/test_qwen3_tts_base.py + ' + agents: + queue: "gpu_1_queue" + plugins: + - docker#v5.2.0: + image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + always-pull: true + propagate-environment: true + shm-size: "8gb" + environment: + - "HF_HOME=/fsx/hf_cache" + volumes: + - "/fsx/hf_cache:/fsx/hf_cache" + - label: "Omni Model Test with H100" timeout_in_minutes: 30 depends_on: upload-merge-pipeline diff --git a/.buildkite/test-nightly.yml b/.buildkite/test-nightly.yml index 4a5837d502d..ffbd74ba380 100644 --- a/.buildkite/test-nightly.yml +++ b/.buildkite/test-nightly.yml @@ -55,7 +55,8 @@ steps: if: build.env("NIGHTLY") == "1" commands: - export VLLM_WORKER_MULTIPROC_METHOD=spawn - - pytest -s -v tests/examples/online_serving/test_qwen2_5_omni.py -m "advanced_model" --run-level "advanced_model" + - pytest -s -v examples/ -m "advanced_model and L4" --run-level "advanced_model" + - pytest -s -v e2e/online_serving/test_*_expansion.py -m "advanced_model and L4" --run-level "advanced_model" agents: queue: "gpu_4_queue" # g6.12xlarge instance on AWS, has 4 L4 GPU plugins: diff --git a/.buildkite/test-ready.yml b/.buildkite/test-ready.yml index af0d285442e..166dc51802e 100644 --- a/.buildkite/test-ready.yml +++ b/.buildkite/test-ready.yml @@ -260,13 +260,12 @@ steps: commands: - | timeout 20m bash -c ' - huggingface-cli download Qwen/Qwen3-TTS-12Hz-0.6B-CustomVoice export VLLM_LOGGING_LEVEL=DEBUG export VLLM_WORKER_MULTIPROC_METHOD=spawn - pytest -s -v tests/e2e/online_serving/test_qwen3_tts.py -k "not NoAsyncChunk" + pytest -s -v tests/e2e/online_serving/test_qwen3_tts_customvoice.py -m "core_model" --run-level "core_model" ' agents: - queue: "gpu_4_queue" + queue: "gpu_1_queue" plugins: - docker#v5.2.0: image: public.ecr.aws/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT diff --git a/pyproject.toml b/pyproject.toml index 3fd987d99e5..f4b8ecf021a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ dev = [ "av", # for ComfyUI tests "openpyxl>=3.0.0", # for nightly CI "pyttsx3>=2.99", + "opencc>=1.2.0", "mistune>=3.2.0", # for example tests ] diff --git a/tests/conftest.py b/tests/conftest.py index 0d90f639d3d..96f6a4c9299 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ import math import os import random +import re os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" # Set CPU device for CI environments without GPU @@ -16,8 +17,10 @@ import socket import subprocess import sys +import tempfile import threading import time +import uuid from collections.abc import Generator from dataclasses import dataclass from io import BytesIO @@ -33,6 +36,7 @@ import yaml from openai import OpenAI, omit from PIL import Image +from transformers import pipeline from vllm import TextPrompt from vllm.distributed.parallel_state import cleanup_dist_env_and_memory from vllm.logger import init_logger @@ -48,6 +52,11 @@ PromptImageInput = list[Any] | Any | None PromptVideoInput = list[Any] | Any | None +_GENDER_PIPELINE = None + +# int16 mono PCM from /v1/audio/speech when response_format=pcm (Qwen3-TTS code2wav output rate). +_PCM_SPEECH_SAMPLE_RATE_HZ = 24_000 + class OmniServerParams(NamedTuple): model: str @@ -82,7 +91,6 @@ def assert_video_valid(frames: Path | np.ndarray, *, width: int, height: int, nu def assert_audio_valid(path: Path, *, sample_rate: int, channels: int, duration_s: float) -> None: """Assert the WAV has the expected sample rate, channel count, and duration.""" - assert path.exists(), f"Audio not found: {path}" info = sf.info(str(path)) assert info.samplerate == sample_rate, f"Expected sample_rate={sample_rate}, got {info.samplerate}" @@ -285,7 +293,6 @@ def generate_synthetic_audio( """ Generate TTS speech with pyttsx3 and return base64 string. """ - import tempfile import pyttsx3 import soundfile as sf @@ -493,8 +500,8 @@ def _enhance_speech(audio: np.ndarray) -> np.ndarray: # Return result base64_audio = base64.b64encode(audio_bytes).decode("utf-8") result["base64"] = base64_audio - if save_to_file and output_path: - result["file_path"] = output_path + # Always include file_path to avoid KeyError in callers. + result["file_path"] = output_path if save_to_file and output_path else None return result @@ -706,7 +713,7 @@ def generate_synthetic_image(width: int, height: int, save_to_file: bool = False def preprocess_text(text): - import re + import opencc word_to_num = { "zero": "0", @@ -728,6 +735,14 @@ def preprocess_text(text): text = re.sub(r"[^\w\s]", "", text) text = re.sub(r"\s+", " ", text) + cc = opencc.OpenCC("t2s") + text = cc.convert(text) + + # Special handling for spaces between Chinese characters: + # - Keep single spaces between English words/numbers + # - Remove spaces only when surrounded by Chinese characters on both sides to prevent incorrect word segmentation + text = re.sub(r"(?<=[\u4e00-\u9fff])\s+(?=[\u4e00-\u9fff])", "", text) + return text.lower().strip() @@ -739,6 +754,7 @@ def cosine_similarity_text(text1, text2, n: int = 3): text1 = preprocess_text(text1) text2 = preprocess_text(text2) + print(f"cosine similarity text1 is: {text1}, text2 is: {text2}") ngrams1 = [text1[i : i + n] for i in range(len(text1) - n + 1)] ngrams2 = [text2[i : i + n] for i in range(len(text2) - n + 1)] @@ -764,7 +780,7 @@ def convert_audio_to_text(audio_data): Convert base64 encoded audio data to text using speech recognition. """ audio_data = base64.b64decode(audio_data) - output_path = f"./test_{int(time.time())}" + output_path = f"./test_{int(time.time())}.wav" with open(output_path, "wb") as audio_file: audio_file.write(audio_data) @@ -773,11 +789,23 @@ def convert_audio_to_text(audio_data): return text +def _merge_base64_audio_to_segment(base64_list: list[str]): + """Merge a list of base64-encoded audio chunks into one pydub AudioSegment.""" + from pydub import AudioSegment + + merged = None + for b64 in base64_list: + raw = base64.b64decode(b64.split(",", 1)[-1]) + seg = AudioSegment.from_file(io.BytesIO(raw)) + merged = seg if merged is None else merged + seg + return merged + + def _whisper_transcribe_in_current_process(output_path: str) -> str: import whisper - device = "cuda" if torch.cuda.is_available() else "cpu" - model = whisper.load_model("small", device=device) + # Keep Whisper on CPU to avoid consuming GPU memory in tests. + model = whisper.load_model("small", device="cpu") try: text = model.transcribe( output_path, @@ -786,14 +814,8 @@ def _whisper_transcribe_in_current_process(output_path: str) -> str: condition_on_previous_text=False, )["text"] finally: - # Sync GPU so in-flight ops finish before we free the model; otherwise - # freed memory may not show up until those ops complete. - if torch is not None and torch.cuda.is_available(): - torch.cuda.synchronize() del model gc.collect() - if torch is not None and torch.cuda.is_available(): - torch.cuda.empty_cache() return text or "" @@ -807,22 +829,26 @@ def convert_audio_file_to_text(output_path: str) -> str: return future.result() +def convert_audio_bytes_to_text(raw_bytes: bytes) -> str: + """ + Write container audio bytes (WAV, etc.) to a temp WAV file suitable for Whisper/ffmpeg. + Normalizes with soundfile to PCM_16 WAV when possible to avoid codec issues. + """ + output_path = f"./test_{uuid.uuid4().hex}.wav" + data, samplerate = sf.read(io.BytesIO(raw_bytes)) + sf.write(output_path, data, samplerate, format="WAV", subtype="PCM_16") + text = convert_audio_file_to_text(output_path) + return text + + def merge_base64_and_convert_to_text(base64_list): """ Merge a list of base64 encoded audio data and convert to text. """ - from pydub import AudioSegment - - merged_audio = None - for base64_data in base64_list: - audio_data = base64.b64decode(base64_data.split(",", 1)[-1]) - seg = AudioSegment.from_file(io.BytesIO(audio_data)) - if merged_audio is None: - merged_audio = seg - else: - merged_audio += seg - output_path = f"./test_{int(time.time())}" + merged_audio = _merge_base64_audio_to_segment(base64_list) + output_path = f"./test_{uuid.uuid4().hex}.wav" merged_audio.export(output_path, format="wav") + print(f"audio data is saved: {output_path}") text = convert_audio_file_to_text(output_path) return text @@ -1003,8 +1029,7 @@ def delete_by_path(config_dict: dict, path: str) -> None: break if target_stage is None: - available_ids = [s.get("stage_id") for s in stage_args if "stage_id" in s] - raise KeyError(f"Stage ID {stage_id} not found, available: {available_ids}") + continue # Delete specified paths in this stage for path in delete_paths: @@ -1262,6 +1287,8 @@ class OmniResponse: text_content: str | None = None audio_data: list[str] | None = None audio_content: str | None = None + audio_format: str | None = None + audio_bytes: bytes | None = None similarity: float | None = None e2e_latency: float | None = None success: bool = False @@ -1279,6 +1306,132 @@ class DiffusionResponse: error_message: str | None = None +def _load_gender_pipeline(): + """ + Lazy-load a cached audio-classification pipeline for gender. + + We prefer the pipeline wrapper because it encapsulates processor/model loading + and avoids direct AutoProcessor.from_pretrained call sites in this file. + """ + global _GENDER_PIPELINE + if _GENDER_PIPELINE is not None: + return _GENDER_PIPELINE + + model_name = "7wolf/wav2vec2-base-gender-classification" + try: + # device=-1 forces CPU for pipeline. + _GENDER_PIPELINE = pipeline( + task="audio-classification", + model=model_name, + device=-1, + ) + return _GENDER_PIPELINE + except Exception as exc: # pragma: no cover - best-effort fallback + print(f"Warning: failed to create gender pipeline '{model_name}': {exc}") + _GENDER_PIPELINE = None + return None + + +def _estimate_voice_gender_from_audio(audio_bytes: bytes) -> str: + """ + Estimate voice gender from audio using a small pre-trained classification model. + + Uses a cached `audio-classification` pipeline to classify the clip. + Returns 'male' / 'female' when the model confidence is >= 0.9 and the label + maps to one of these; otherwise returns 'unknown'. If the model is unavailable + or inference fails, returns 'unknown' to keep tests stable. + """ + data, sr = sf.read(io.BytesIO(audio_bytes), dtype="float32", always_2d=True) + if data.size == 0: + raise ValueError("Empty audio") + mono = np.mean(data, axis=1) + + try: + target_sr = 16000 + if int(sr) != target_sr and mono.size > 1: + src_len = int(mono.shape[0]) + dst_len = max(1, int(round(src_len * float(target_sr) / float(sr)))) + src_idx = np.arange(src_len, dtype=np.float32) + dst_idx = np.linspace(0, src_len - 1, dst_len, dtype=np.float32) + mono = np.interp(dst_idx, src_idx, mono.astype(np.float32, copy=False)).astype(np.float32) + sr = target_sr + + clf = _load_gender_pipeline() + if clf is None: + print("gender model not available, returning 'unknown'") + return "unknown" + + # transformers pipeline returns a list of {label, score} (highest score first). + outputs = clf(mono, sampling_rate=sr) + if not outputs: + return "unknown" + + top = outputs[0] + label = str(top.get("label", "")).lower() + conf = float(top.get("score", 0.0)) + + if conf < 0.6: + gender = "unknown" + # Some models use non-English labels (e.g., Russian). Normalize to 'male'/'female'. + elif ("female" in label) or ("жен" in label): + gender = "female" + elif ("male" in label) or ("муж" in label): + gender = "male" + else: + gender = "unknown" + + print(f"gender classifier: label={label}, conf={conf:.3f}, gender={gender}") + return gender + except Exception as exc: # pragma: no cover - best-effort fallback + print(f"Warning: gender classification failed, returning 'unknown': {exc}") + return "unknown" + + +# Threshold aligned with _compute_pcm_hnr_db docstring (clean clone vs distorted). +_MIN_PCM_SPEECH_HNR_DB = 1.2 + + +def _compute_pcm_hnr_db(pcm_samples: np.ndarray, sr: int = _PCM_SPEECH_SAMPLE_RATE_HZ) -> float: + """Compute mean Harmonic-to-Noise Ratio (dB) for speech quality. + + Clean cloned speech has HNR > 1.2 dB; distorted speech (e.g. lost + ref_code decoder context) drops below 1.0 dB. + """ + frame_len = int(0.03 * sr) # 30ms frames + hop = frame_len // 2 + hnr_values: list[float] = [] + + for start in range(0, len(pcm_samples) - frame_len, hop): + frame = pcm_samples[start : start + frame_len].astype(np.float32, copy=False) + frame = frame - np.mean(frame) + if np.max(np.abs(frame)) < 0.01: + continue + ac = np.correlate(frame, frame, mode="full")[len(frame) - 1 :] + ac = ac / (ac[0] + 1e-10) + min_lag = int(sr / 400) + max_lag = min(int(sr / 80), len(ac)) + if min_lag >= max_lag: + continue + peak = float(np.max(ac[min_lag:max_lag])) + if 0 < peak < 1: + hnr_values.append(10 * np.log10(peak / (1 - peak + 1e-10))) + + return float(np.mean(hnr_values)) if hnr_values else 0.0 + + +def _assert_pcm_int16_speech_hnr(audio_bytes: bytes) -> None: + """Validate harmonic-to-noise ratio on raw int16 PCM from /v1/audio/speech.""" + assert audio_bytes is not None and len(audio_bytes) >= 2, "missing PCM bytes" + assert len(audio_bytes) % 2 == 0, "PCM byte length must be aligned to int16" + pcm_samples = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0 + hnr = _compute_pcm_hnr_db(pcm_samples) + print(f"PCM speech HNR: {hnr:.2f} dB (threshold: {_MIN_PCM_SPEECH_HNR_DB} dB)") + assert hnr >= _MIN_PCM_SPEECH_HNR_DB, ( + f"Audio distortion detected: HNR={hnr:.2f} dB < {_MIN_PCM_SPEECH_HNR_DB} dB. " + "Voice clone decoder may be losing ref_code speaker context on later chunks." + ) + + def assert_omni_response(response: OmniResponse, request_config: dict[str, Any], run_level): """ Validate response results. @@ -1292,7 +1445,7 @@ def assert_omni_response(response: OmniResponse, request_config: dict[str, Any], assert response.success, "The request failed." e2e_latency = response.e2e_latency if e2e_latency is not None: - print(f"the avg e2e latency is: {e2e_latency}") + print(f"the e2e latency is: {e2e_latency}") modalities = request_config.get("modalities", ["text", "audio"]) @@ -1327,6 +1480,68 @@ def assert_omni_response(response: OmniResponse, request_config: dict[str, Any], print(f"similarity is: {response.similarity}") +def assert_audio_speech_response( + response: OmniResponse, + request_config: dict[str, Any], + run_level: str, +) -> None: + """ + Validate /v1/audio/speech response: success, optional format check, transcription similarity + and gender (non-PCM only for advanced_model), and int16 PCM HNR when response_format is pcm. + """ + assert response.success, "The request failed." + + req_fmt = request_config.get("response_format") + + if req_fmt == "pcm" and response.audio_bytes: + _assert_pcm_int16_speech_hnr(response.audio_bytes) + if response.audio_format: + assert "pcm" in response.audio_format.lower(), ( + f"Expected audio/pcm content-type, got {response.audio_format!r}" + ) + + elif req_fmt == "wav" and response.audio_format: + assert req_fmt in response.audio_format, ( + f"The response audio format {response.audio_format} don't match the request audio format {req_fmt}" + ) + + e2e_latency = response.e2e_latency + if e2e_latency is not None: + print(f"the avg e2e latency is: {e2e_latency}") + + if run_level == "advanced_model" and req_fmt != "pcm": + # Text–audio semantic similarity check (skipped for raw PCM: no Whisper transcript). + expected_text = request_config.get("input") + if expected_text: + transcript = (response.audio_content or "").strip() + print(f"audio content is: {transcript}") + print(f"input text is: {expected_text}") + similarity = cosine_similarity_text(transcript.lower(), expected_text.lower()) + print(f"Cosine similarity: {similarity:.3f}") + assert similarity > 0.9, ( + f"Transcript doesn't match input: similarity={similarity:.2f}, transcript='{transcript}'" + ) + + # Voice gender consistency check: + # When the estimator returns 'unknown', we treat it as inconclusive and do NOT fail the test. + voice = (request_config.get("voice") or "").lower() + if voice and response.audio_bytes: + estimated_gender = _estimate_voice_gender_from_audio(response.audio_bytes) + voice_gender_map = { + # adjust this mapping to your actual voice names + "serena": "female", + "eric": "male", + "clone": "female", + } + expected_gender = voice_gender_map.get(voice) + if expected_gender is not None: + print(f"Estimated voice gender from audio: {estimated_gender} (voice='{voice}')") + if estimated_gender != "unknown": + assert estimated_gender == expected_gender, ( + f"Voice '{voice}' is expected {expected_gender}, but estimated gender is '{estimated_gender}'" + ) + + def assert_diffusion_response(response: DiffusionResponse, request_config: dict[str, Any], run_level: str = None): """ Validate diffusion response results. @@ -1548,6 +1763,109 @@ def _process_diffusion_response(self, chat_completion) -> DiffusionResponse: return result + def _process_stream_audio_speech_response(self, response, *, response_format: str | None = None) -> OmniResponse: + """ + Process streaming /v1/audio/speech responses into an OmniResponse. + + This mirrors _process_stream_omni_response but operates on low-level + audio bytes and produces an OmniResponse with audio_content filled + from Whisper transcription. + """ + result = OmniResponse() + start_time = time.perf_counter() + + try: + # Aggregate all audio bytes from the streaming response. + data = bytearray() + + # Preferred OpenAI helper. + if hasattr(response, "iter_bytes") and callable(getattr(response, "iter_bytes")): + for chunk in response.iter_bytes(): + if chunk: + data.extend(chunk) + else: + # Generic iterable-of-bytes fallback (e.g., generator or list of chunks). + try: + iterator = iter(response) + except TypeError: + iterator = None + + if iterator is not None: + for chunk in iterator: + if not chunk: + continue + if isinstance(chunk, (bytes, bytearray)): + data.extend(chunk) + elif hasattr(chunk, "data"): + data.extend(chunk.data) # type: ignore[arg-type] + elif hasattr(chunk, "content"): + data.extend(chunk.content) # type: ignore[arg-type] + else: + raise TypeError(f"Unsupported stream chunk type: {type(chunk)}") + else: + raise TypeError(f"Unsupported audio speech streaming response type: {type(response)}") + + raw_bytes = bytes(data) + if response_format == "pcm": + transcript = None + else: + transcript = convert_audio_bytes_to_text(raw_bytes) + + # Populate OmniResponse. + result.audio_bytes = raw_bytes + result.audio_content = transcript + result.e2e_latency = time.perf_counter() - start_time + result.success = True + result.audio_format = getattr(response, "response", None) + if result.audio_format is not None: + result.audio_format = result.audio_format.headers.get("content-type", "") + + except Exception as e: + result.error_message = f"Audio speech stream processing error: {str(e)}" + print(f"Error: {result.error_message}") + + return result + + def _process_non_stream_audio_speech_response( + self, response, *, response_format: str | None = None + ) -> OmniResponse: + """ + Process non-streaming /v1/audio/speech responses into an OmniResponse. + + This mirrors _process_non_stream_omni_response but for the binary + audio payload returned by audio.speech.create. + """ + result = OmniResponse() + start_time = time.perf_counter() + + try: + # OpenAI non-streaming audio.speech.create returns HttpxBinaryResponseContent (.read() or .content) + if hasattr(response, "read") and callable(getattr(response, "read")): + raw_bytes = response.read() + elif hasattr(response, "content"): + raw_bytes = response.content # type: ignore[assignment] + else: + raise TypeError(f"Unsupported audio speech response type: {type(response)}") + + if response_format == "pcm": + transcript = None + else: + transcript = convert_audio_bytes_to_text(raw_bytes) + + result.audio_bytes = raw_bytes + result.audio_content = transcript + result.e2e_latency = time.perf_counter() - start_time + result.success = True + result.audio_format = getattr(response, "response", None) + if result.audio_format is not None: + result.audio_format = result.audio_format.headers.get("content-type", "") + + except Exception as e: + result.error_message = f"Audio speech non-stream processing error: {str(e)}" + print(f"Error: {result.error_message}") + + return result + def send_omni_request(self, request_config: dict[str, Any], request_num: int = 1) -> list[OmniResponse]: """ Send OpenAI requests. @@ -1582,32 +1900,135 @@ def send_omni_request(self, request_config: dict[str, Any], request_num: int = 1 responses.append(response) else: - # Send concurrent requests + # Send concurrent requests: run create + process in worker so e2e_latency includes full round-trip. + def _one_omni_request(): + start = time.perf_counter() + chat_completion = self.client.chat.completions.create( + model=request_config.get("model"), + messages=request_config.get("messages"), + modalities=modalities, + stream=stream, + ) + if stream: + response = self._process_stream_omni_response(chat_completion) + else: + response = self._process_non_stream_omni_response(chat_completion) + response.e2e_latency = time.perf_counter() - start + return response + with concurrent.futures.ThreadPoolExecutor(max_workers=request_num) as executor: - futures = [] + futures = [executor.submit(_one_omni_request) for _ in range(request_num)] + for future in concurrent.futures.as_completed(futures): + response = future.result() + assert_omni_response(response, request_config, run_level=self.run_level) + responses.append(response) - # Submit all request tasks - for _ in range(request_num): - future = executor.submit( - self.client.chat.completions.create, - model=request_config.get("model"), - messages=request_config.get("messages"), - modalities=modalities, - stream=stream, - ) - futures.append(future) + return responses - # Process completed tasks - for future in concurrent.futures.as_completed(futures): - chat_completion = future.result() + def send_audio_speech_request(self, request_config: dict[str, Any], request_num: int = 1) -> list[OmniResponse]: + """ + Call the /v1/audio/speech endpoint using the same configuration-dict + style as send_omni_request, but via the OpenAI Python client's + audio.speech APIs. + + Expected keys in request_config: + - model: model name/path (required) + - input: text to synthesize (required) + - response_format: audio format such as "wav" or "pcm" (optional) + - task_type, ref_text, ref_audio: TTS-specific extras (optional, passed via extra_body) + - timeout: request timeout in seconds (float, optional, default 120.0) + - stream: whether to use streaming API (bool, optional, default False) + """ + timeout = float(request_config.get("timeout", 120.0)) - if stream: - response = self._process_stream_omni_response(chat_completion) - else: - response = self._process_non_stream_omni_response(chat_completion) + model = request_config["model"] + text_input = request_config["input"] + stream = bool(request_config.get("stream", False)) + voice = request_config.get("voice", None) - assert_omni_response(response, request_config, run_level=self.run_level) - responses.append(response) + # Standard OpenAI param: use omit when not provided to keep default behavior. + response_format = request_config.get("response_format", omit) + + # Qwen3-TTS custom fields, forwarded via extra_body. + extra_body: dict[str, Any] = {} + # Keep this list aligned with vllm_omni.entrypoints.openai.protocol.audio params. + for key in ("task_type", "ref_text", "ref_audio", "language", "max_new_tokens"): + if key in request_config: + extra_body[key] = request_config[key] + + responses: list[OmniResponse] = [] + + speech_fmt: str | None = None if response_format is omit else str(response_format).lower() + + if request_num == 1: + if stream: + # Use streaming response helper. + with self.client.audio.speech.with_streaming_response.create( + model=model, + input=text_input, + response_format=response_format, + extra_body=extra_body or None, + timeout=timeout, + voice=voice, + ) as resp: + omni_resp = self._process_stream_audio_speech_response(resp, response_format=speech_fmt) + else: + # Non-streaming response. + resp = self.client.audio.speech.create( + model=model, + input=text_input, + response_format=response_format, + extra_body=extra_body or None, + timeout=timeout, + voice=voice, + ) + omni_resp = self._process_non_stream_audio_speech_response(resp, response_format=speech_fmt) + + assert_audio_speech_response(omni_resp, request_config, run_level=self.run_level) + responses.append(omni_resp) + return responses + else: + # request_num > 1: concurrent requests (use same params as single-request path) + + if stream: + + def _stream_task(): + with self.client.audio.speech.with_streaming_response.create( + model=model, + input=text_input, + response_format=response_format, + extra_body=extra_body or None, + timeout=timeout, + voice=voice, + ) as resp: + return self._process_stream_audio_speech_response(resp, response_format=speech_fmt) + + with concurrent.futures.ThreadPoolExecutor(max_workers=request_num) as executor: + futures = [executor.submit(_stream_task) for _ in range(request_num)] + for future in concurrent.futures.as_completed(futures): + omni_resp = future.result() + assert_audio_speech_response(omni_resp, request_config, run_level=self.run_level) + responses.append(omni_resp) + else: + with concurrent.futures.ThreadPoolExecutor(max_workers=request_num) as executor: + futures = [] + for _ in range(request_num): + future = executor.submit( + self.client.audio.speech.create, + model=model, + input=text_input, + response_format=response_format, + extra_body=extra_body or None, + timeout=timeout, + voice=voice, + ) + futures.append(future) + + for future in concurrent.futures.as_completed(futures): + resp = future.result() + omni_resp = self._process_non_stream_audio_speech_response(resp, response_format=speech_fmt) + assert_audio_speech_response(omni_resp, request_config, run_level=self.run_level) + responses.append(omni_resp) return responses @@ -1723,6 +2144,44 @@ def __init__( **kwargs, ) + def _estimate_prompt_len( + self, + additional_information: dict[str, Any], + model_name: str, + _cache: dict[str, Any] = {}, + ) -> int: + """Estimate prompt_token_ids placeholder length for the Talker stage. + + The AR Talker replaces all input embeddings via ``preprocess``, so the + placeholder values are irrelevant but the **length** must match the + embeddings that ``preprocess`` will produce. + """ + try: + from vllm_omni.model_executor.models.qwen3_tts.configuration_qwen3_tts import Qwen3TTSConfig + from vllm_omni.model_executor.models.qwen3_tts.qwen3_tts_talker import ( + Qwen3TTSTalkerForConditionalGeneration, + ) + + if model_name not in _cache: + from transformers import AutoTokenizer + + tok = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True, padding_side="left") + cfg = Qwen3TTSConfig.from_pretrained(model_name, trust_remote_code=True) + _cache[model_name] = (tok, getattr(cfg, "talker_config", None)) + + tok, tcfg = _cache[model_name] + task_type = (additional_information.get("task_type") or ["CustomVoice"])[0] + return Qwen3TTSTalkerForConditionalGeneration.estimate_prompt_len_from_additional_information( + additional_information=additional_information, + task_type=task_type, + tokenize_prompt=lambda t: tok(t, padding=False)["input_ids"], + codec_language_id=getattr(tcfg, "codec_language_id", None), + spk_is_dialect=getattr(tcfg, "spk_is_dialect", None), + ) + except Exception as exc: + logger.warning("Failed to estimate prompt length, using fallback 2048: %s", exc) + return 2048 + def get_default_sampling_params_list(self) -> list[OmniSamplingParams]: """ Get a list of default sampling parameters for all stages. @@ -1777,6 +2236,42 @@ def get_omni_inputs( if isinstance(prompts, str): prompts = [prompts] + # Qwen-TTS: follow examples/offline_inference/qwen3_tts/end2end.py style. + # Stage 0 expects token placeholders + additional_information (text/speaker/task_type/...), + # and Talker replaces embeddings in preprocess based on additional_information only. + is_tts_model = "Qwen3-TTS" in self.model_name or "qwen3_tts" in self.model_name.lower() + if is_tts_model and modalities == ["audio"]: + tts_kw = mm_processor_kwargs or {} + task_type = tts_kw.get("task_type", "CustomVoice") + speaker = tts_kw.get("speaker", "Vivian") + language = tts_kw.get("language", "Auto") + max_new_tokens = int(tts_kw.get("max_new_tokens", 2048)) + ref_audio = tts_kw.get("ref_audio", None) + ref_text = tts_kw.get("ref_text", None) + + omni_inputs: list[TextPrompt] = [] + for prompt_text in prompts: + text_str = str(prompt_text).strip() or " " + additional_information: dict[str, Any] = { + "task_type": [task_type], + "text": [text_str], + "language": [language], + "speaker": [speaker], + "max_new_tokens": [max_new_tokens], + } + if ref_audio is not None: + additional_information["ref_audio"] = [ref_audio] + if ref_text is not None: + additional_information["ref_text"] = [ref_text] + # Use official helper to get correct placeholder length + plen = self._estimate_prompt_len(additional_information, self.model_name) + input_dict: TextPrompt = { + "prompt_token_ids": [0] * plen, + "additional_information": additional_information, + } + omni_inputs.append(input_dict) + return omni_inputs + def _normalize_mm_input(mm_input, num_prompts): if mm_input is None: return [None] * num_prompts @@ -2044,9 +2539,67 @@ def send_request(self, request_config: dict[str, Any] | None = None) -> OmniResp prompts=prompts, videos=videos, images=images, audios=audios, modalities=modalities ) response = self._process_output(outputs) - assert_omni_response(response, request_config, run_level="L2") + assert_omni_response(response, request_config, run_level="core_model") return response + def send_audio_speech_request( + self, + request_config: dict[str, Any], + ) -> OmniResponse: + """ + Offline TTS: text -> audio via generate_multimodal, then validate with assert_audio_speech_response. + + request_config must contain: + - 'input' or 'prompts': text to synthesize. + Optional keys: + - 'voice' -> speaker (CustomVoice) + - 'task_type' -> task_type in additional_information (default: "CustomVoice") + - 'language' -> language in additional_information (default: "Auto") + - 'max_new_tokens' -> max_new_tokens in additional_information (default: 2048) + - 'response_format' -> desired audio format (used only for assertion) + """ + input_text = request_config.get("input") or request_config.get("prompts") + if input_text is None: + raise ValueError("request_config must contain 'input' or 'prompts' for TTS") + if isinstance(input_text, list): + input_text = input_text[0] if input_text else "" + + # Build TTS-specific kwargs passed through to get_omni_inputs for Qwen3-TTS, + # matching examples/offline_inference/qwen3_tts/end2end.py. + mm_processor_kwargs: dict[str, Any] = {} + if "voice" in request_config: + mm_processor_kwargs["speaker"] = request_config["voice"] + if "task_type" in request_config: + mm_processor_kwargs["task_type"] = request_config["task_type"] + if "ref_audio" in request_config: + mm_processor_kwargs["ref_audio"] = request_config["ref_audio"] + if "ref_text" in request_config: + mm_processor_kwargs["ref_text"] = request_config["ref_text"] + if "language" in request_config: + mm_processor_kwargs["language"] = request_config["language"] + if "max_new_tokens" in request_config: + mm_processor_kwargs["max_new_tokens"] = request_config["max_new_tokens"] + + outputs = self.runner.generate_multimodal( + prompts=input_text, + modalities=["audio"], + mm_processor_kwargs=mm_processor_kwargs or None, + ) + audio_tensor = None + for stage_out in outputs: + if getattr(stage_out, "final_output_type", None) == "audio": + audio_tensor = stage_out.request_output[0].outputs[0].multimodal_output["audio"] + break + if audio_tensor is None: + result = OmniResponse(success=False, error_message="No audio output from pipeline") + assert result.success, result.error_message + return result + + # Keep the offline path lightweight: downstream validation can be done elsewhere if needed. + result = OmniResponse(success=True) + assert_audio_speech_response(result, request_config, run_level="core_model") + return result + def start_profile( self, profile_prefix: str | None = None, diff --git a/tests/e2e/offline_inference/test_qwen3_tts_base.py b/tests/e2e/offline_inference/test_qwen3_tts_base.py new file mode 100644 index 00000000000..6ce1d345105 --- /dev/null +++ b/tests/e2e/offline_inference/test_qwen3_tts_base.py @@ -0,0 +1,84 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +E2E offline tests for Qwen3-TTS Base model with text input and audio output. + +Async_chunk disable, cuda_graph disabled (no_async_chunk stage config). +CUDA graph is disabled by setting engine_args.enforce_eager=true via modify_stage_config(). +Same structure as test_qwen3_omni (models, stage_configs, test_params, parametrize omni_runner). +""" + +import os + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" +os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" + +from pathlib import Path + +import pytest + +from tests.conftest import modify_stage_config +from tests.utils import hardware_test + +MODEL = "Qwen/Qwen3-TTS-12Hz-0.6B-Base" +REF_AUDIO_URL = "https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen3-TTS-Repo/clone_2.wav" +REF_TEXT = "Okay. Yeah. I resent you. I love you. I respect you. But you know what? You blew it! And thanks to you." + + +def get_cuda_graph_config(): + path = modify_stage_config( + get_stage_config(), + updates={ + "stage_args": { + 0: { + "engine_args.enforce_eager": "true", + }, + 1: {"engine_args.enforce_eager": "true"}, + }, + }, + ) + return path + + +def get_stage_config(name: str = "qwen3_tts_no_async_chunk.yaml"): + """Get the no_async_chunk stage config path (async_chunk disable, cuda_graph disabled).""" + return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name) + + +# Same structure as test_qwen3_omni: models, stage_configs, test_params +tts_server_params = [ + pytest.param( + (MODEL, get_cuda_graph_config()), + id="no_cuda_graph", + ) +] + + +def get_prompt(): + """Text prompt for text-to-audio (same role as get_question in test_qwen3_omni).""" + return "Hello, this is a test for text to audio." + + +@pytest.mark.skip(reason="Known issue(2030): qwen3_tts_no_async_chunk path temporarily disabled.") +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_runner", tts_server_params, indirect=True) +def test_text_to_audio_001(omni_runner, omni_runner_handler) -> None: + """ + Test text input processing and audio output via offline Omni runner. + Deploy Setting: qwen3_tts_no_async_chunk.yaml + enforce_eager=true + Input Modal: text + Output Modal: audio + Input Setting: stream=False + Extra Setting: task_type=Base, voice=clone, ref_audio/ref_text provided + Datasets: few requests + """ + request_config = { + "input": get_prompt(), + "task_type": "Base", + "voice": "clone", + "ref_audio": REF_AUDIO_URL, + "ref_text": REF_TEXT, + } + omni_runner_handler.send_audio_speech_request(request_config) diff --git a/tests/e2e/offline_inference/test_qwen3_tts_customvoice.py b/tests/e2e/offline_inference/test_qwen3_tts_customvoice.py new file mode 100644 index 00000000000..67d72df908c --- /dev/null +++ b/tests/e2e/offline_inference/test_qwen3_tts_customvoice.py @@ -0,0 +1,74 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +E2E offline tests for Qwen3-TTS CustomVoice model with text input and audio output. + +Async_chunk disable, cuda_graph disabled (no_async_chunk stage config). +CUDA graph is disabled by setting engine_args.enforce_eager=true via modify_stage_config(). +Same structure as test_qwen3_omni (models, stage_configs, test_params, parametrize omni_runner). +""" + +import os + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" +os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" + +from pathlib import Path + +import pytest + +from tests.conftest import modify_stage_config +from tests.utils import hardware_test + +MODEL = "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice" + + +def get_cuda_graph_config(): + path = modify_stage_config( + get_stage_config(), + updates={ + "stage_args": { + 0: { + "engine_args.enforce_eager": "true", + }, + 1: {"engine_args.enforce_eager": "true"}, + }, + }, + ) + return path + + +def get_stage_config(name: str = "qwen3_tts_no_async_chunk.yaml"): + """Get the no_async_chunk stage config path (async_chunk disable, cuda_graph disabled).""" + return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name) + + +# Same structure as test_qwen3_omni: models, stage_configs, test_params +tts_server_params = [ + pytest.param( + (MODEL, get_cuda_graph_config()), + id="no_cuda_graph", + ) +] + + +def get_prompt(): + """Text prompt for text-to-audio (same role as get_question in test_qwen3_omni).""" + return "Hello, this is a test for text to audio." + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_runner", tts_server_params, indirect=True) +def test_text_to_audio_001(omni_runner, omni_runner_handler) -> None: + """ + Test text input processing and audio output via offline Omni runner. + Deploy Setting: qwen3_tts_no_async_chunk.yaml + enforce_eager=true + Input Modal: text + Output Modal: audio + Input Setting: stream=False + Datasets: few requests + """ + request_config = {"input": get_prompt(), "voice": "vivian"} + omni_runner_handler.send_audio_speech_request(request_config) diff --git a/tests/e2e/online_serving/test_qwen3_tts.py b/tests/e2e/online_serving/test_qwen3_tts.py deleted file mode 100644 index fd0ef766283..00000000000 --- a/tests/e2e/online_serving/test_qwen3_tts.py +++ /dev/null @@ -1,285 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -""" -E2E Online tests for Qwen3-TTS model with text input and audio output. - -These tests verify the /v1/audio/speech endpoint works correctly with -actual model inference, not mocks. -""" - -import os - -os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" -os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" - -from pathlib import Path - -import httpx -import pytest - -from tests.conftest import OmniServer -from tests.utils import hardware_test - -MODEL = "Qwen/Qwen3-TTS-12Hz-0.6B-CustomVoice" - - -def get_stage_config(name: str = "qwen3_tts.yaml"): - """Get the stage config path for Qwen3-TTS.""" - return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name) - - -@pytest.fixture(scope="class") -def omni_server(): - """Start vLLM-Omni server with CustomVoice model.""" - stage_config_path = get_stage_config() - - print(f"Starting OmniServer with model: {MODEL}") - - with OmniServer( - MODEL, - [ - "--stage-configs-path", - stage_config_path, - "--stage-init-timeout", - "120", - "--trust-remote-code", - "--enforce-eager", - "--disable-log-stats", - ], - ) as server: - print("OmniServer started successfully") - yield server - print("OmniServer stopping...") - - print("OmniServer stopped") - - -def make_speech_request( - host: str, - port: int, - text: str, - voice: str = "vivian", - language: str = "English", - task_type: str | None = None, - instructions: str | None = None, - timeout: float = 120.0, -) -> httpx.Response: - """Make a request to the /v1/audio/speech endpoint.""" - url = f"http://{host}:{port}/v1/audio/speech" - payload = { - "input": text, - "voice": voice, - "language": language, - } - if task_type: - payload["task_type"] = task_type - if instructions: - payload["instructions"] = instructions - - with httpx.Client(timeout=timeout) as client: - return client.post(url, json=payload) - - -def verify_wav_audio(content: bytes) -> bool: - """Verify that content is valid WAV audio data.""" - # WAV files start with "RIFF" header - if len(content) < 44: # Minimum WAV header size - return False - return content[:4] == b"RIFF" and content[8:12] == b"WAVE" - - -# Minimum expected audio size for a short sentence (~1 second of 24kHz 16-bit mono WAV) -MIN_AUDIO_BYTES = 10000 - - -class TestQwen3TTSCustomVoice: - """E2E tests for Qwen3-TTS CustomVoice model.""" - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_speech_english_basic(self, omni_server) -> None: - """Test basic English TTS generation.""" - response = make_speech_request( - host=omni_server.host, - port=omni_server.port, - text="Hello, how are you?", - voice="vivian", - language="English", - ) - - assert response.status_code == 200, f"Request failed: {response.text}" - assert response.headers.get("content-type") == "audio/wav" - assert verify_wav_audio(response.content), "Response is not valid WAV audio" - assert len(response.content) > MIN_AUDIO_BYTES, ( - f"Audio content too small ({len(response.content)} bytes), expected at least {MIN_AUDIO_BYTES} bytes" - ) - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_speech_chinese_basic(self, omni_server) -> None: - """Test basic Chinese TTS generation.""" - response = make_speech_request( - host=omni_server.host, - port=omni_server.port, - text="你好,我是通义千问", - voice="vivian", - language="Chinese", - ) - - assert response.status_code == 200, f"Request failed: {response.text}" - assert response.headers.get("content-type") == "audio/wav" - assert verify_wav_audio(response.content), "Response is not valid WAV audio" - assert len(response.content) > MIN_AUDIO_BYTES, ( - f"Audio content too small ({len(response.content)} bytes), expected at least {MIN_AUDIO_BYTES} bytes" - ) - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_speech_different_voices(self, omni_server) -> None: - """Test TTS with different voice options.""" - voices = ["vivian", "ryan"] - for voice in voices: - response = make_speech_request( - host=omni_server.host, - port=omni_server.port, - text="Testing voice selection.", - voice=voice, - language="English", - ) - - assert response.status_code == 200, f"Request failed for voice {voice}: {response.text}" - assert verify_wav_audio(response.content), f"Invalid WAV for voice {voice}" - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_speech_binary_response_not_utf8_error(self, omni_server) -> None: - """ - Regression test: Verify binary audio is returned, not UTF-8 error. - - This test ensures the multimodal_output property correctly retrieves - audio from completion outputs, preventing the "TTS model did not - produce audio output" error. - """ - response = make_speech_request( - host=omni_server.host, - port=omni_server.port, - text="This should return binary audio, not a JSON error.", - voice="vivian", - language="English", - ) - - # Should NOT be a JSON error response - assert response.status_code == 200, f"Request failed: {response.text}" - - # Verify it's binary audio, not JSON - try: - # If this succeeds and starts with {"error", it's a bug - text = response.content.decode("utf-8") - assert not text.startswith('{"error"'), f"Got error response instead of audio: {text}" - except UnicodeDecodeError: - # This is expected - binary audio can't be decoded as UTF-8 - pass - - assert verify_wav_audio(response.content), "Response is not valid WAV audio" - - -class TestQwen3TTSAPIEndpoints: - """Test API endpoint functionality.""" - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_list_voices_endpoint(self, omni_server) -> None: - """Test the /v1/audio/voices endpoint returns available voices.""" - url = f"http://{omni_server.host}:{omni_server.port}/v1/audio/voices" - - with httpx.Client(timeout=30.0) as client: - response = client.get(url) - - assert response.status_code == 200 - data = response.json() - assert "voices" in data - assert isinstance(data["voices"], list) - assert len(data["voices"]) > 0 - # Check some expected voices are present - voices_lower = [v.lower() for v in data["voices"]] - assert "vivian" in voices_lower or "ryan" in voices_lower - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_models_endpoint(self, omni_server) -> None: - """Test the /v1/models endpoint returns loaded model.""" - url = f"http://{omni_server.host}:{omni_server.port}/v1/models" - - with httpx.Client(timeout=30.0) as client: - response = client.get(url) - - assert response.status_code == 200 - data = response.json() - assert "data" in data - assert len(data["data"]) > 0 - - -@pytest.fixture(scope="class") -def omni_server_no_async_chunk(): - """Start vLLM-Omni server with non-async-chunk config.""" - stage_config_path = get_stage_config("qwen3_tts_no_async_chunk.yaml") - - with OmniServer( - MODEL, - [ - "--stage-configs-path", - stage_config_path, - "--stage-init-timeout", - "120", - "--trust-remote-code", - "--enforce-eager", - "--disable-log-stats", - ], - ) as server: - yield server - - -class TestQwen3TTSNoAsyncChunk: - """E2E tests for Qwen3-TTS in non-async-chunk (full decode) mode.""" - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_speech_english(self, omni_server_no_async_chunk) -> None: - """Test English TTS with non-async-chunk pipeline.""" - response = make_speech_request( - host=omni_server_no_async_chunk.host, - port=omni_server_no_async_chunk.port, - text="Hello, how are you?", - voice="vivian", - language="English", - ) - - assert response.status_code == 200, f"Request failed: {response.text}" - assert response.headers.get("content-type") == "audio/wav" - assert verify_wav_audio(response.content), "Response is not valid WAV audio" - assert len(response.content) > MIN_AUDIO_BYTES - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_speech_chinese(self, omni_server_no_async_chunk) -> None: - """Test Chinese TTS with non-async-chunk pipeline.""" - response = make_speech_request( - host=omni_server_no_async_chunk.host, - port=omni_server_no_async_chunk.port, - text="你好,我是通义千问", - voice="vivian", - language="Chinese", - ) - - assert response.status_code == 200, f"Request failed: {response.text}" - assert response.headers.get("content-type") == "audio/wav" - assert verify_wav_audio(response.content), "Response is not valid WAV audio" - assert len(response.content) > MIN_AUDIO_BYTES diff --git a/tests/e2e/online_serving/test_qwen3_tts_base.py b/tests/e2e/online_serving/test_qwen3_tts_base.py index c09ee592caa..002f9d99724 100644 --- a/tests/e2e/online_serving/test_qwen3_tts_base.py +++ b/tests/e2e/online_serving/test_qwen3_tts_base.py @@ -1,10 +1,10 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project """ -E2E tests for Qwen3-TTS Base voice-clone model. +E2E Online tests for Qwen3-TTS model with text input and audio output. -Regression test for #1663: speech tokenizer loaded in bfloat16 produced -all-silence PCM due to NaN overflow in SnakeBeta activation. +These tests verify the /v1/audio/speech endpoint works correctly with +actual model inference, not mocks. """ import os @@ -12,212 +12,99 @@ os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" -import struct -import tempfile from pathlib import Path -import httpx -import numpy as np import pytest -from tests.conftest import ( - OmniServer, - convert_audio_file_to_text, - cosine_similarity_text, -) +from tests.conftest import OmniServerParams from tests.utils import hardware_test -MODEL = "Qwen/Qwen3-TTS-12Hz-1.7B-Base" +MODEL = "Qwen/Qwen3-TTS-12Hz-0.6B-Base" -# Official Qwen3-TTS reference audio/text from examples/offline_inference/qwen3_tts/end2end.py REF_AUDIO_URL = "https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen3-TTS-Repo/clone_2.wav" REF_TEXT = "Okay. Yeah. I resent you. I love you. I respect you. But you know what? You blew it! And thanks to you." -SYN_TEXT = "Good one. Okay, fine, I'm just gonna leave this sock monkey here. Goodbye." -def get_stage_config(): - return str( - Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / "qwen3_tts.yaml" - ) +def get_stage_config(name: str = "qwen3_tts.yaml"): + """Get the stage config path from vllm_omni model_executor stage_configs.""" + return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name) -@pytest.fixture(scope="module") -def omni_server(): - """Start vLLM-Omni server with Base voice-clone model.""" - stage_config_path = get_stage_config() - - with OmniServer( - MODEL, - [ - "--stage-configs-path", - stage_config_path, - "--stage-init-timeout", - "120", - "--trust-remote-code", - "--enforce-eager", - "--disable-log-stats", - ], - ) as server: - yield server - - -def make_base_speech_request( - host: str, - port: int, - text: str = SYN_TEXT, - ref_text: str = REF_TEXT, - ref_audio: str = REF_AUDIO_URL, - response_format: str = "wav", - timeout: float = 120.0, -) -> httpx.Response: - url = f"http://{host}:{port}/v1/audio/speech" - payload = { - "model": MODEL, - "input": text, - "task_type": "Base", - "ref_text": ref_text, - "ref_audio": ref_audio, - "response_format": response_format, +def get_prompt(prompt_type="text"): + """Text prompt for text-to-audio tests (same as test_qwen3_omni - beijing test case).""" + prompts = { + "text": "The weather is nice today, perfect for a walk in the park.", } - with httpx.Client(timeout=timeout) as client: - return client.post(url, json=payload) + return prompts.get(prompt_type, prompts["text"]) -def verify_wav_audio(content: bytes) -> bool: - if len(content) < 44: - return False - return content[:4] == b"RIFF" and content[8:12] == b"WAVE" +def get_max_batch_size(size_type="few"): + """Batch size for concurrent requests (same as test_qwen3_omni).""" + batch_sizes = {"few": 5, "medium": 100, "large": 256} + return batch_sizes.get(size_type, 5) -def assert_not_silence(pcm_bytes: bytes): - """Assert PCM16 samples are not all identical (e.g. all-silence).""" - samples = struct.unpack(f"<{len(pcm_bytes) // 2}h", pcm_bytes) - unique = set(samples) - assert len(unique) > 1, ( - f"All-silence detected: {len(samples)} samples, unique values: {unique}. " - "See https://github.com/vllm-project/vllm-omni/issues/1663" +tts_server_params = [ + pytest.param( + OmniServerParams( + model=MODEL, + stage_config_path=get_stage_config("qwen3_tts.yaml"), + server_args=["--trust-remote-code", "--disable-log-stats"], + ), + id="async_chunk", ) +] -MIN_AUDIO_BYTES = 10000 -MIN_HNR_DB = 1.2 # Clean voice clone > 1.2 dB; distorted < 1.0 dB +@pytest.mark.advanced_model +@pytest.mark.core_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_text_to_audio_001(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: stream=False + Datasets: few requests + """ + request_config = { + "model": omni_server.model, + "input": get_prompt(), + "stream": False, + "response_format": "wav", + "task_type": "Base", + "voice": "clone", + "ref_audio": REF_AUDIO_URL, + "ref_text": REF_TEXT, + } + openai_client.send_audio_speech_request(request_config, request_num=get_max_batch_size("few")) -def compute_hnr_db(pcm_samples: np.ndarray, sr: int = 24000) -> float: - """Compute mean Harmonic-to-Noise Ratio (dB) for speech quality. - Clean cloned speech has HNR > 1.2 dB; distorted speech (e.g. lost - ref_code decoder context) drops below 1.0 dB. +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_text_to_audio_002(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: stream=True + Datasets: single request """ - frame_len = int(0.03 * sr) # 30ms frames - hop = frame_len // 2 - hnr_values: list[float] = [] - - for start in range(0, len(pcm_samples) - frame_len, hop): - frame = pcm_samples[start : start + frame_len] - if np.max(np.abs(frame)) < 0.01: - continue - ac = np.correlate(frame, frame, mode="full")[len(frame) - 1 :] - ac = ac / (ac[0] + 1e-10) - min_lag = int(sr / 400) - max_lag = min(int(sr / 80), len(ac)) - if min_lag >= max_lag: - continue - peak = float(np.max(ac[min_lag:max_lag])) - if 0 < peak < 1: - hnr_values.append(10 * np.log10(peak / (1 - peak + 1e-10))) - - return float(np.mean(hnr_values)) if hnr_values else 0.0 - - -class TestQwen3TTSBaseVoiceClone: - """Regression tests for Base voice-clone (fix #1663).""" - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_base_voice_clone_not_silence(self, omni_server) -> None: - """PCM output must contain real audio, not all-silence.""" - response = make_base_speech_request( - host=omni_server.host, - port=omni_server.port, - response_format="pcm", - ) - - assert response.status_code == 200, f"Request failed: {response.text}" - assert len(response.content) > MIN_AUDIO_BYTES, f"Audio too small: {len(response.content)} bytes" - assert_not_silence(response.content) - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_base_voice_clone_whisper_transcription(self, omni_server) -> None: - """Whisper must transcribe the output as intelligible speech.""" - response = make_base_speech_request( - host=omni_server.host, - port=omni_server.port, - response_format="wav", - ) - - assert response.status_code == 200, f"Request failed: {response.text}" - assert verify_wav_audio(response.content), "Response is not valid WAV" - assert len(response.content) > MIN_AUDIO_BYTES - - with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: - f.write(response.content) - wav_path = f.name - - try: - transcript = convert_audio_file_to_text(wav_path) - print(f"Whisper transcript: {transcript}") - assert len(transcript.strip()) > 0, "Whisper returned empty transcript — audio is likely silence" - similarity = cosine_similarity_text(transcript.lower(), SYN_TEXT.lower()) - print(f"Cosine similarity: {similarity:.3f}") - assert similarity > 0.9, ( - f"Transcript doesn't match input: similarity={similarity:.2f}, transcript='{transcript}'" - ) - finally: - os.unlink(wav_path) - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_base_voice_clone_wav_format(self, omni_server) -> None: - """WAV response must have valid headers and sufficient size.""" - response = make_base_speech_request( - host=omni_server.host, - port=omni_server.port, - response_format="wav", - ) - - assert response.status_code == 200, f"Request failed: {response.text}" - assert response.headers.get("content-type") == "audio/wav" - assert verify_wav_audio(response.content), "Response is not valid WAV" - assert len(response.content) > MIN_AUDIO_BYTES - - @pytest.mark.core_model - @pytest.mark.omni - @hardware_test(res={"cuda": "L4"}, num_cards=4) - def test_base_voice_clone_no_distortion(self, omni_server) -> None: - """Audio must not be distorted (regression for ref_code context loss). - - When the decoder loses ref_code speaker context on later streaming - chunks, HNR drops below 1.0 dB. Clean voice clone should be > 1.2 dB. - See https://github.com/vllm-project/vllm-omni/issues/1944 - """ - response = make_base_speech_request( - host=omni_server.host, - port=omni_server.port, - response_format="pcm", - ) - - assert response.status_code == 200, f"Request failed: {response.text}" - assert len(response.content) > MIN_AUDIO_BYTES - - pcm_samples = np.frombuffer(response.content, dtype=np.int16).astype(np.float32) / 32768.0 - hnr = compute_hnr_db(pcm_samples) - print(f"Voice clone HNR: {hnr:.2f} dB (threshold: {MIN_HNR_DB} dB)") - assert hnr >= MIN_HNR_DB, ( - f"Audio distortion detected: HNR={hnr:.2f} dB < {MIN_HNR_DB} dB. " - "Voice clone decoder may be losing ref_code speaker context on later chunks." - ) + request_config = { + "model": omni_server.model, + "input": get_prompt(), + "stream": True, + "response_format": "wav", + "task_type": "Base", + "voice": "clone", + "ref_audio": REF_AUDIO_URL, + "ref_text": REF_TEXT, + } + openai_client.send_audio_speech_request(request_config) diff --git a/tests/e2e/online_serving/test_qwen3_tts_base_expansion.py b/tests/e2e/online_serving/test_qwen3_tts_base_expansion.py new file mode 100644 index 00000000000..3c33485e4f4 --- /dev/null +++ b/tests/e2e/online_serving/test_qwen3_tts_base_expansion.py @@ -0,0 +1,119 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +E2E Online tests for Qwen3-TTS model with text input and audio output. + +These tests verify the /v1/audio/speech endpoint works correctly with +actual model inference, not mocks. +""" + +import os + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" +os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" + +from pathlib import Path + +import pytest + +from tests.conftest import OmniServerParams +from tests.utils import hardware_test + +MODEL = "Qwen/Qwen3-TTS-12Hz-0.6B-Base" + +REF_AUDIO_URL = "https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen3-TTS-Repo/clone_2.wav" +REF_TEXT = "Okay. Yeah. I resent you. I love you. I respect you. But you know what? You blew it! And thanks to you." + + +def get_stage_config(name: str = "qwen3_tts.yaml"): + """Get the stage config path from vllm_omni model_executor stage_configs.""" + return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name) + + +def get_prompt(prompt_type="text"): + """Text prompt for text-to-audio tests (same as test_qwen3_omni - beijing test case).""" + prompts = { + "text": "The weather is nice today, perfect for a walk in the park.", + } + return prompts.get(prompt_type, prompts["text"]) + + +def get_max_batch_size(size_type="few"): + """Batch size for concurrent requests (same as test_qwen3_omni).""" + batch_sizes = {"few": 5, "medium": 100, "large": 256} + return batch_sizes.get(size_type, 5) + + +tts_server_params = [ + pytest.param( + OmniServerParams( + model=MODEL, + stage_config_path=get_stage_config("qwen3_tts.yaml"), + server_args=["--trust-remote-code", "--disable-log-stats"], + ), + id="async_chunk", + ), + pytest.param( + OmniServerParams( + model=MODEL, + stage_config_path=get_stage_config("qwen3_tts_no_async_chunk.yaml"), + server_args=["--trust-remote-code", "--disable-log-stats"], + ), + id="no_async_chunk", + ), +] + + +@pytest.mark.advanced_model +@pytest.mark.core_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_voice_clone_streaming_001(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: stream=True + Datasets: few requests + """ + + request_config = { + "model": omni_server.model, + "input": get_prompt(), + "stream": True, + "response_format": "wav", + "task_type": "Base", + "voice": "clone", + "ref_audio": REF_AUDIO_URL, + "ref_text": REF_TEXT, + } + openai_client.send_audio_speech_request(request_config, request_num=get_max_batch_size("few")) + + +@pytest.mark.advanced_model +@pytest.mark.core_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_response_format_001(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: non-stream PCM + Datasets: few requests + """ + + request_config = { + "model": omni_server.model, + "input": get_prompt(), + "response_format": "pcm", + "task_type": "Base", + "voice": "clone", + "ref_audio": REF_AUDIO_URL, + "ref_text": REF_TEXT, + } + openai_client.send_audio_speech_request(request_config) diff --git a/tests/e2e/online_serving/test_qwen3_tts_customvoice.py b/tests/e2e/online_serving/test_qwen3_tts_customvoice.py new file mode 100644 index 00000000000..fb60df725ba --- /dev/null +++ b/tests/e2e/online_serving/test_qwen3_tts_customvoice.py @@ -0,0 +1,105 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +E2E Online tests for Qwen3-TTS model with text input and audio output. + +These tests verify the /v1/audio/speech endpoint works correctly with +actual model inference, not mocks. +""" + +import os + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" +os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" + +from pathlib import Path + +import pytest + +from tests.conftest import OmniServerParams +from tests.utils import hardware_test + +MODEL = "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice" + + +def get_stage_config(name: str = "qwen3_tts.yaml"): + """Get the stage config path from vllm_omni model_executor stage_configs.""" + return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name) + + +def get_prompt(prompt_type="text"): + """Text prompt for text-to-audio tests (same as test_qwen3_omni - beijing test case).""" + prompts = { + "text": "Beijing, China's capital, blends ancient wonders like the Great Wall with modern marvels. This vibrant metropolis offers rich culture, delicious Peking duck, and endless exploration opportunities.", + } + return prompts.get(prompt_type, prompts["text"]) + + +def get_max_batch_size(size_type="few"): + """Batch size for concurrent requests (same as test_qwen3_omni).""" + batch_sizes = {"few": 5, "medium": 100, "large": 256} + return batch_sizes.get(size_type, 5) + + +tts_server_params = [ + pytest.param( + OmniServerParams( + model=MODEL, + stage_config_path=get_stage_config("qwen3_tts.yaml"), + server_args=["--trust-remote-code", "--disable-log-stats"], + ), + id="async_chunk", + ) +] + + +@pytest.mark.core_model +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_text_to_audio_001(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: stream=False + Datasets: few requests + """ + request_config = { + "model": omni_server.model, + "input": get_prompt(), + "stream": False, + "response_format": "wav", + "task_type": "CustomVoice", + "voice": "vivian", + } + + openai_client.send_audio_speech_request(request_config, request_num=get_max_batch_size()) + + +@pytest.mark.core_model +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_text_to_audio_002(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: stream=True + Datasets: single request + """ + request_config = { + "model": omni_server.model, + "input": get_prompt(), + "stream": True, + "response_format": "wav", + "task_type": "CustomVoice", + "voice": "vivian", + } + + openai_client.send_audio_speech_request(request_config) diff --git a/tests/e2e/online_serving/test_qwen3_tts_customvoice_expansion.py b/tests/e2e/online_serving/test_qwen3_tts_customvoice_expansion.py new file mode 100644 index 00000000000..0a3764c8a7d --- /dev/null +++ b/tests/e2e/online_serving/test_qwen3_tts_customvoice_expansion.py @@ -0,0 +1,137 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +E2E Online tests for Qwen3-TTS model with text input and audio output. + +These tests verify the /v1/audio/speech endpoint works correctly with +actual model inference, not mocks. +""" + +import os + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" +os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" + +from pathlib import Path + +import pytest + +from tests.conftest import OmniServerParams +from tests.utils import hardware_test + +MODEL = "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice" + + +def get_stage_config(name: str = "qwen3_tts.yaml"): + """Get the stage config path from vllm_omni model_executor stage_configs.""" + return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name) + + +def get_prompt(prompt_type="english"): + """Text prompt for text-to-audio tests (same as test_qwen3_omni - beijing test case).""" + prompts = { + "english": "Beijing, China's capital, blends ancient wonders like the Great Wall with modern marvels. This vibrant metropolis offers rich culture, delicious Peking duck, and endless exploration opportunities.", + "chinese": "北京,中国的首都,是一座融合了紫禁城、长城等历史遗迹与现代建筑的国际化大都市,充满了独特的文化与活力", + } + return prompts.get(prompt_type, prompts["english"]) + + +def get_max_batch_size(size_type="few"): + """Batch size for concurrent requests (same as test_qwen3_omni).""" + batch_sizes = {"few": 5, "medium": 100, "large": 256} + return batch_sizes.get(size_type, 5) + + +tts_server_params = [ + pytest.param( + OmniServerParams( + model=MODEL, + stage_config_path=get_stage_config("qwen3_tts.yaml"), + server_args=["--trust-remote-code", "--disable-log-stats"], + ), + id="async_chunk", + ), + pytest.param( + OmniServerParams( + model=MODEL, + stage_config_path=get_stage_config("qwen3_tts_no_async_chunk.yaml"), + server_args=["--trust-remote-code", "--disable-log-stats"], + ), + id="no_async_chunk", + ), +] + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_language_001(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: stream=False, language=chinese + Datasets: few requests + """ + request_config = { + "model": omni_server.model, + "input": get_prompt("chinese"), + "stream": False, + "response_format": "wav", + "task_type": "CustomVoice", + "voice": "vivian", + } + + openai_client.send_audio_speech_request(request_config) + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_voice_001(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: stream=False, voice=eric + Datasets: few requests + """ + request_config = { + "model": omni_server.model, + "input": get_prompt(), + "stream": True, + "response_format": "wav", + "task_type": "CustomVoice", + "voice": "eric", + } + + openai_client.send_audio_speech_request(request_config, request_num=get_max_batch_size()) + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards=1) +@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True) +def test_voice_002(omni_server, openai_client) -> None: + """ + Test text input processing and audio output via OpenAI API. + Deploy Setting: default yaml + Input Modal: text + Output Modal: audio + Input Setting: stream=False, language=chinese + Datasets: few requests + """ + request_config = { + "model": omni_server.model, + "input": get_prompt(), + "stream": False, + "response_format": "wav", + "task_type": "CustomVoice", + "voice": "Serena", + } + + openai_client.send_audio_speech_request(request_config)