Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
638b377
[CosyVoice3][async_chunk] Enable async pipeline and batch-safe code2w…
indevn Mar 5, 2026
7e42da8
[CosyVoice3][async_chunk] Fix AR cleanup path and add code2wav trim d…
indevn Mar 5, 2026
b83e8af
[CosyVoice3][async_chunk] Fix cleanup-save race by moving reclaim to …
indevn Mar 5, 2026
5e4fe0f
[CosyVoice3][async_chunk] Fix duplicate terminal emission and clamp c…
indevn Mar 6, 2026
9c39558
[CosyVoice3][async_chunk] Honor single-request seq_token_counts in co…
indevn Mar 6, 2026
3cb6849
refactor: clarify multimodal output fallback chain
indevn Mar 23, 2026
2e3b6d1
fix: align cosyvoice3 async streaming and sampler behavior
indevn Mar 27, 2026
9caa81b
test: add cosyvoice3 unit coverage for streaming helpers
indevn Mar 27, 2026
e6c7b55
test: add cosyvoice3 offline e2e coverage
indevn Mar 27, 2026
836b116
fix: clarify chunk transfer external request id handling
indevn Mar 28, 2026
0d7fe6c
fix: align cosyvoice3 async_chunk e2e with current runtime
indevn Mar 29, 2026
931da94
test: rename cosyvoice3 e2e around reference zero-shot semantics
indevn Mar 29, 2026
e58e2ba
fix: align async_chunk config with vLLM 0.19.0 and enable streaming test
linyueqian Apr 5, 2026
338155c
fix: correct CosyVoice3 stop token handling and enforce min_tokens
linyueqian Apr 5, 2026
f5f9f29
test: disable CUDA graphs in async_chunk e2e test for faster CI
linyueqian Apr 5, 2026
e6f379f
fix: update talker mock model_stage and config in unit tests
linyueqian Apr 5, 2026
77fa20b
fix: set enforce_eager=true in async_chunk YAML to avoid CI timeout
linyueqian Apr 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ def test_save_async(build_adapter):
assert task["is_finished"] is False


def test_send_single_request_cleans_up_after_finished_payload(build_adapter, monkeypatch):
adapter, _ = build_adapter(stage_id=1)
request = _req("req-finished", RequestStatus.FINISHED_STOPPED, external_req_id="ext-finished")

adapter.custom_process_next_stage_input_func = lambda **kwargs: {"x": [1], "finished": True}
cleanup_calls = []
monkeypatch.setattr(adapter, "cleanup", lambda *a, **kw: cleanup_calls.append((a, kw)))

adapter._send_single_request({"pooling_output": None, "request": request, "is_finished": True})

assert len(cleanup_calls) == 1
args, _ = cleanup_calls[0]
assert args[0] == "req-finished"
assert args[1] == "ext-finished"


def test_update_request_payload(build_adapter):
adapter, _ = build_adapter()

Expand Down Expand Up @@ -409,3 +425,86 @@ def test_generation_scheduler_calls_cleanup_on_finished(monkeypatch, mocker: Moc
args, _ = cleanup_calls[0]
assert args[0] == "req-s1"
assert args[1] == "ext-s1"


def test_ar_scheduler_defers_cleanup_and_queues_save_on_finished(mocker: MockerFixture):
"""OmniARScheduler should enqueue save; adapter cleanup is handled in save thread."""
cleanup_calls = []
save_calls = []

adapter_mock = mocker.MagicMock()
adapter_mock.cleanup = lambda *a, **kw: cleanup_calls.append((a, kw))
adapter_mock.save_async = lambda *a, **kw: save_calls.append((a, kw))

from vllm_omni.core.sched.omni_ar_scheduler import OmniARScheduler

scheduler = mocker.MagicMock()
scheduler.chunk_transfer_adapter = adapter_mock
scheduler.connector = None
scheduler.perf_metrics = None
scheduler.log_stats = False
scheduler.recompute_kv_load_failures = False
scheduler.structured_output_manager = mocker.MagicMock()
scheduler.structured_output_manager.should_advance.return_value = False
scheduler.finished_req_ids_dict = {}
scheduler.kv_cache_manager = mocker.MagicMock()
scheduler.kv_cache_manager.take_events.return_value = None
scheduler.kv_event_publisher = mocker.MagicMock()
scheduler.waiting_for_transfer_free = set()
scheduler.transfer_triggered_requests = set()
scheduler.active_kv_transfers = set()

request = _HashableRequest(
request_id="req-ar",
external_req_id="ext-ar",
status=RequestStatus.RUNNING,
is_finished=lambda: False,
num_computed_tokens=1,
num_prompt_tokens=1,
prompt_token_ids=[1],
num_output_placeholders=0,
sampling_params=None,
pooling_params=None,
stop_reason=None,
client_index=0,
take_events=lambda: [],
trace_headers=None,
num_cached_tokens=0,
num_external_computed_tokens=0,
num_nans_in_logits=0,
get_finished_reason=lambda: "stop",
)
scheduler.requests = {"req-ar": request}

scheduler._update_request_with_output = mocker.MagicMock(return_value=([], True))
scheduler._process_kv_transfer_trigger = mocker.MagicMock(return_value=False)
scheduler._handle_stopped_request = mocker.MagicMock(return_value=True)
scheduler._free_request = mocker.MagicMock(return_value=None)
scheduler._get_routed_experts = mocker.MagicMock(return_value=None)
scheduler.running = [request]
scheduler.waiting = mocker.MagicMock()
scheduler.waiting.remove_requests = mocker.MagicMock()
scheduler.make_spec_decoding_stats = mocker.MagicMock(return_value=None)
scheduler.make_stats = mocker.MagicMock(return_value=None)

scheduler_output = SimpleNamespace(
num_scheduled_tokens={"req-ar": 1},
scheduled_spec_decode_tokens={},
num_invalid_spec_tokens=0,
)
model_runner_output = SimpleNamespace(
sampled_token_ids=[[123]],
logprobs=None,
prompt_logprobs_dict={},
pooler_output=None,
num_nans_in_logits=None,
kv_connector_output=None,
cudagraph_stats=None,
req_id_to_index={"req-ar": 0},
kv_extracted_req_ids=None,
)

OmniARScheduler.update_from_output(scheduler, scheduler_output, model_runner_output)

assert len(cleanup_calls) == 0
assert len(save_calls) == 1
218 changes: 218 additions & 0 deletions tests/e2e/offline_inference/test_cosyvoice3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Offline E2E smoke test for CosyVoice3 zero-shot reference inference.

This test uses the official upstream zero-shot prompt text/audio pair and
verifies a stable reference recipe:
- config-derived top_p/top_k and token-length ratios
- model EOS token as the stop token
- a conservative repetition penalty to avoid degenerate loops
"""

from __future__ import annotations

import functools
import io
import os
import tempfile
from pathlib import Path
from urllib.request import urlopen

import numpy as np
import pytest
import soundfile as sf
import yaml
from huggingface_hub import snapshot_download
from vllm.sampling_params import SamplingParams

from tests.conftest import OmniRunner
from tests.utils import hardware_test
from vllm_omni.model_executor.models.cosyvoice3.config import CosyVoice3Config
from vllm_omni.model_executor.models.cosyvoice3.tokenizer import get_qwen_tokenizer

MODEL = "FunAudioLLM/Fun-CosyVoice3-0.5B-2512"
MODEL_DIR_ENV = "VLLM_OMNI_COSYVOICE3_MODEL_DIR"

REFERENCE_PROMPT_WAV_URL = "https://raw.githubusercontent.com/FunAudioLLM/CosyVoice/main/asset/zero_shot_prompt.wav"
REFERENCE_PROMPT_TEXT = "You are a helpful assistant.<|endofprompt|>希望你以后能够做的比我还好呦。"
REFERENCE_SYNTH_TEXT = (
"CosyVoice is undergoing a comprehensive upgrade, providing more accurate, "
"stable, faster, and better voice generation capabilities."
)
REFERENCE_STAGE0_TEMPERATURE = 1.0
REFERENCE_STAGE0_REPETITION_PENALTY = 2.0


def _stage_config(name: str) -> str:
return str(Path(__file__).parent.parent.parent.parent / "vllm_omni" / "model_executor" / "stage_configs" / name)


STAGE_CONFIGS = [
_stage_config("cosyvoice3.yaml"),
_stage_config("cosyvoice3_async_chunk.yaml"),
]


@functools.lru_cache(maxsize=1)
def _load_reference_prompt_wav() -> tuple[np.ndarray, int]:
with urlopen(REFERENCE_PROMPT_WAV_URL, timeout=30) as resp:
data = resp.read()
audio, sr = sf.read(io.BytesIO(data), dtype="float32", always_2d=False)
if isinstance(audio, np.ndarray) and audio.ndim > 1:
audio = np.mean(audio, axis=-1)
return np.asarray(audio, dtype=np.float32), int(sr)


@functools.lru_cache(maxsize=1)
def _resolve_model_dir() -> Path:
override = os.environ.get(MODEL_DIR_ENV)
if override:
return Path(override).expanduser().resolve()
return Path(snapshot_download(MODEL, allow_patterns=["*"]))


def _reference_zero_shot_stage0_sampling(*, text: str) -> SamplingParams:
config = CosyVoice3Config()
sampling_cfg = config.llm.get("sampling", {})
eos_token_id = int(config.llm["eos_token_id"])
model_dir = _resolve_model_dir()
tokenizer = get_qwen_tokenizer(
token_path=str(model_dir / config.qwen_pretrain_path),
skip_special_tokens=config.skip_special_tokens,
version=config.version,
)
text_len = max(1, len(tokenizer.encode(text, allowed_special=config.allowed_special)))
return SamplingParams(
temperature=REFERENCE_STAGE0_TEMPERATURE,
top_p=float(sampling_cfg.get("top_p", 0.8)),
top_k=int(sampling_cfg.get("top_k", 25)),
repetition_penalty=REFERENCE_STAGE0_REPETITION_PENALTY,
stop_token_ids=[eos_token_id],
min_tokens=int(text_len * config.min_token_text_ratio),
max_tokens=int(text_len * config.max_token_text_ratio),
)


def _concat_audio(audio_val) -> np.ndarray:
import torch

if isinstance(audio_val, list):
tensors = []
for t in audio_val:
if t is None:
continue
if hasattr(t, "detach"):
t = t.detach()
if hasattr(t, "cpu"):
t = t.cpu()
if hasattr(t, "float"):
t = t.float()
if isinstance(t, torch.Tensor):
tensors.append(t.reshape(-1))
if not tensors:
return np.zeros((0,), dtype=np.float32)
return torch.cat(tensors, dim=-1).numpy().astype(np.float32, copy=False)

if hasattr(audio_val, "detach"):
audio_val = audio_val.detach()
if hasattr(audio_val, "cpu"):
audio_val = audio_val.cpu()
if hasattr(audio_val, "float"):
audio_val = audio_val.float()
if hasattr(audio_val, "numpy"):
audio_val = audio_val.numpy()
audio_np = np.asarray(audio_val, dtype=np.float32)
return audio_np.reshape(-1)


def _get_stage_engine_outputs(omni_runner: OmniRunner, stage_id: int):
stage_list = getattr(omni_runner.omni, "stage_list", None)
if stage_list is not None:
return getattr(stage_list[stage_id], "engine_outputs", None) or []

stage_clients = getattr(getattr(omni_runner.omni, "engine", None), "stage_clients", None)
if stage_clients is not None:
return getattr(stage_clients[stage_id], "engine_outputs", None) or []

raise AttributeError("Unable to locate stage outputs on Omni runner")


def _patched_stage_config(base_stage_config: str, model_dir: Path, tmp_dir: Path) -> str:
cfg = yaml.safe_load(Path(base_stage_config).read_text(encoding="utf-8"))
tokenizer_path = str(model_dir / "CosyVoice-BlankEN")
for stage in cfg.get("stage_args", []):
engine_args = stage.setdefault("engine_args", {})
engine_args["tokenizer"] = tokenizer_path
engine_args["enforce_eager"] = True
engine_args["hf_overrides"] = {"architectures": ["CosyVoice3Model"]}
out_path = tmp_dir / Path(base_stage_config).name
out_path.write_text(yaml.safe_dump(cfg, sort_keys=False), encoding="utf-8")
return str(out_path)


def _build_reference_inputs(prompt_audio: tuple[np.ndarray, int]) -> list[dict[str, object]]:
return [
{
"prompt": REFERENCE_SYNTH_TEXT,
"multi_modal_data": {"audio": prompt_audio},
"modalities": ["audio"],
"mm_processor_kwargs": {"prompt_text": REFERENCE_PROMPT_TEXT},
}
]


@pytest.mark.core_model
@pytest.mark.omni
@hardware_test(res={"cuda": "L4"}, num_cards=1)
@pytest.mark.parametrize("base_stage_config", STAGE_CONFIGS)
def test_cosyvoice3_offline_reference_zero_shot(base_stage_config: str) -> None:
"""CosyVoice3 zero-shot reference inference should stop cleanly and produce sane audio."""
prompt_audio, prompt_sr = _load_reference_prompt_wav()
model_dir = _resolve_model_dir()
expected_stop_token = int(CosyVoice3Config().llm["eos_token_id"])

with tempfile.TemporaryDirectory(prefix="cv3-e2e-") as tmp:
stage_config = _patched_stage_config(base_stage_config, model_dir, Path(tmp))
with OmniRunner(
str(model_dir), seed=42, stage_configs_path=stage_config, stage_init_timeout=300
) as omni_runner:
sampling_params_list = omni_runner.get_default_sampling_params_list()
sampling_params_list[0] = _reference_zero_shot_stage0_sampling(text=REFERENCE_SYNTH_TEXT)

outputs = omni_runner.omni.generate(
_build_reference_inputs((prompt_audio, prompt_sr)), sampling_params_list
)

assert outputs, "No outputs returned"
audio_mm = outputs[0].multimodal_output
assert "audio" in audio_mm, "No audio output found"

audio = _concat_audio(audio_mm["audio"])
assert audio.size > 0, "Generated audio is empty"

sr_val = audio_mm.get("sr", 24000)
if isinstance(sr_val, list) and sr_val:
sr_val = sr_val[-1]
if hasattr(sr_val, "item"):
sr_val = sr_val.item()
sr = int(sr_val)
assert sr == 24000, f"Unexpected sample_rate={sr}"

duration_s = audio.size / sr
assert 2.8 <= duration_s <= 8.8, f"Unexpected duration={duration_s:.3f}s (samples={audio.size}, sr={sr})"

stage0_outputs = _get_stage_engine_outputs(omni_runner, 0)
if stage0_outputs:
completion = stage0_outputs[0].outputs[0]
finish_reason = getattr(completion, "finish_reason", None)
stop_reason = getattr(completion, "stop_reason", None)
num_tokens = len(getattr(completion, "token_ids", []) or [])

assert finish_reason == "stop", f"Stage-0 finish_reason={finish_reason}, expected 'stop'"
assert int(stop_reason) == expected_stop_token, (
f"Stage-0 stop_reason={stop_reason}, expected {expected_stop_token}"
)
assert 80 <= num_tokens <= 220, f"Stage-0 num_tokens={num_tokens}, expected sane stop-bound range"
else:
assert "async_chunk" in Path(base_stage_config).name, "Stage-0 produced no engine outputs"
25 changes: 17 additions & 8 deletions tests/e2e/online_serving/test_cosyvoice3_tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,18 @@ def get_prompt(prompt_type="zh"):
)
]

tts_async_chunk_server_params = [
pytest.param(
OmniServerParams(
model=MODEL,
stage_config_path=get_stage_config("cosyvoice3_async_chunk.yaml"),
server_args=["--trust-remote-code", "--disable-log-stats"],
),
id="cosyvoice3_async_chunk",
)
]


@pytest.mark.advanced_model
@pytest.mark.core_model
@pytest.mark.omni
@hardware_test(res={"cuda": "H100"}, num_cards=1)
Expand All @@ -76,17 +86,16 @@ def test_voice_clone_zh_001(omni_server, openai_client) -> None:
openai_client.send_audio_speech_request(request_config)


@pytest.mark.advanced_model
@pytest.mark.core_model
@pytest.mark.omni
@hardware_test(res={"cuda": "H100"}, num_cards=1)
@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True)
@pytest.mark.skip(reason="CosyVoice3 does not support async_chunk streaming yet")
@pytest.mark.parametrize("omni_server", tts_async_chunk_server_params, indirect=True)
def test_voice_clone_zh_002(omni_server, openai_client) -> None:
"""
Test voice cloning TTS with Chinese text via OpenAI API.
Deploy Setting: default yaml
Test voice cloning TTS with Chinese text via async_chunk streaming.
Deploy Setting: cosyvoice3_async_chunk.yaml
Input Modal: text + ref_audio + ref_text
Output Modal: audio
Output Modal: audio (streamed)
Input Setting: stream=True
Datasets: single request
"""
Expand All @@ -101,7 +110,7 @@ def test_voice_clone_zh_002(omni_server, openai_client) -> None:
openai_client.send_audio_speech_request(request_config)


@pytest.mark.advanced_model
@pytest.mark.core_model
@pytest.mark.omni
@hardware_test(res={"cuda": "H100"}, num_cards=1)
@pytest.mark.parametrize("omni_server", tts_server_params, indirect=True)
Expand Down
Loading
Loading