From f0d0f3993bd59299ac9d60a8a870337218257969 Mon Sep 17 00:00:00 2001 From: "jaeeun.kil" Date: Mon, 6 Apr 2026 13:32:01 +0900 Subject: [PATCH 1/4] feat: add HyperCLOVAX-SEED-Omni-8B vision decoder and full pipeline support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacks on top of #869 (HyperCLOVAX audio decoder). - Add HCXOmniForCausalLM thinker model (LLM stage, extends HCXVisionV2) - Add HyperCLOVAXVisionPipeline diffusion model (TA-Tok decoder, 27×27 image tokens) - Add hcx_omni.yaml 3-stage pipeline config (thinker TP=4 + vision/audio decoders) - Add thinker2vision_decoder and thinker2audio_decoder stage input processors - Add fan-out async pipeline topology (stage 0 → stage 1 AND stage 0 → stage 2) - Add _stage0_is_llm guard in serving_chat to preserve HCX multimodal inputs - Fix vLLM 0.18.0 compatibility (AttentionBackendEnum, _RUNNER_TASKS, TaskOption) - Add E2E tests, unit tests, client demo, and benchmark scripts Co-Authored-By: Hyunjoon Cho Signed-off-by: jaeeun.kil --- benchmarks/hcx-omni/README.md | 60 + benchmarks/hcx-omni/benchmark_hcx_omni.py | 400 +++++ benchmarks/hcx-omni/run_benchmark.sh | 38 + examples/online_serving/hcx_omni/README.md | 235 +++ .../online_serving/hcx_omni/client_demo.py | 153 ++ .../online_serving/hcx_omni/run_server.sh | 52 + pyproject.toml | 1 + tests/e2e/offline_inference/test_hcx_omni.py | 169 ++ tests/e2e/online_serving/test_hcx_omni.py | 125 ++ tests/e2e/stage_configs/hcx_omni_ci.yaml | 93 + tests/unit/__init__.py | 0 tests/unit/conftest.py | 19 + tests/unit/model_executor/__init__.py | 0 .../test_hcx_omni_processing.py | 187 ++ tools/pre_commit/check_pickle_imports.py | 2 + vllm_omni/config/model.py | 594 ++++--- vllm_omni/diffusion/diffusion_engine.py | 418 +++-- vllm_omni/diffusion/ipc.py | 47 +- .../models/hyperclovax_vision/__init__.py | 21 + .../hyperclovax_vision_transformer.py | 146 ++ .../models/hyperclovax_vision/layers.py | 234 +++ .../pipeline_hyperclovax_vision.py | 433 +++++ .../hyperclovax_vision/transformer_usp.py | 307 ++++ .../vision_token_embedder.py | 119 ++ vllm_omni/diffusion/registry.py | 141 +- vllm_omni/diffusion/request.py | 2 + .../diffusion/worker/diffusion_worker.py | 10 +- vllm_omni/engine/arg_utils.py | 208 +-- vllm_omni/engine/input_processor.py | 32 + vllm_omni/entrypoints/async_omni.py | 1369 +++++++------- vllm_omni/entrypoints/async_omni_diffusion.py | 491 +++++ vllm_omni/entrypoints/async_omni_llm.py | 225 +++ vllm_omni/entrypoints/cli/main.py | 7 +- vllm_omni/entrypoints/omni.py | 1211 +++++++++++-- vllm_omni/entrypoints/omni_diffusion.py | 150 ++ vllm_omni/entrypoints/omni_llm.py | 242 +++ vllm_omni/entrypoints/omni_stage.py | 1572 +++++++++++++++++ vllm_omni/entrypoints/openai/serving_chat.py | 547 +++--- vllm_omni/entrypoints/stage_utils.py | 298 +++- vllm_omni/entrypoints/zmq_utils.py | 95 + .../models/hcx_omni/__init__.py | 3 + .../models/hcx_omni/hcx_omni.py | 134 ++ .../models/hcx_omni/hcx_omni_thinker.py | 126 ++ .../qwen2_5_omni/qwen2_5_omni_thinker.py | 208 +-- .../qwen3_omni/qwen3_omni_moe_thinker.py | 1245 ++++--------- vllm_omni/model_executor/models/registry.py | 25 +- .../stage_configs/hcx_omni.yaml | 102 ++ .../hyperclovax_seed_omni.py | 145 ++ vllm_omni/worker/gpu_ar_model_runner.py | 417 ++--- 49 files changed, 9685 insertions(+), 3173 deletions(-) create mode 100644 benchmarks/hcx-omni/README.md create mode 100644 benchmarks/hcx-omni/benchmark_hcx_omni.py create mode 100755 benchmarks/hcx-omni/run_benchmark.sh create mode 100644 examples/online_serving/hcx_omni/README.md create mode 100644 examples/online_serving/hcx_omni/client_demo.py create mode 100755 examples/online_serving/hcx_omni/run_server.sh create mode 100644 tests/e2e/offline_inference/test_hcx_omni.py create mode 100644 tests/e2e/online_serving/test_hcx_omni.py create mode 100644 tests/e2e/stage_configs/hcx_omni_ci.yaml create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/conftest.py create mode 100644 tests/unit/model_executor/__init__.py create mode 100644 tests/unit/model_executor/test_hcx_omni_processing.py create mode 100644 vllm_omni/diffusion/models/hyperclovax_vision/__init__.py create mode 100644 vllm_omni/diffusion/models/hyperclovax_vision/hyperclovax_vision_transformer.py create mode 100644 vllm_omni/diffusion/models/hyperclovax_vision/layers.py create mode 100644 vllm_omni/diffusion/models/hyperclovax_vision/pipeline_hyperclovax_vision.py create mode 100644 vllm_omni/diffusion/models/hyperclovax_vision/transformer_usp.py create mode 100644 vllm_omni/diffusion/models/hyperclovax_vision/vision_token_embedder.py create mode 100644 vllm_omni/engine/input_processor.py create mode 100644 vllm_omni/entrypoints/async_omni_diffusion.py create mode 100644 vllm_omni/entrypoints/async_omni_llm.py create mode 100644 vllm_omni/entrypoints/omni_diffusion.py create mode 100644 vllm_omni/entrypoints/omni_llm.py create mode 100644 vllm_omni/entrypoints/omni_stage.py create mode 100644 vllm_omni/entrypoints/zmq_utils.py create mode 100644 vllm_omni/model_executor/models/hcx_omni/__init__.py create mode 100644 vllm_omni/model_executor/models/hcx_omni/hcx_omni.py create mode 100644 vllm_omni/model_executor/models/hcx_omni/hcx_omni_thinker.py create mode 100644 vllm_omni/model_executor/stage_configs/hcx_omni.yaml create mode 100644 vllm_omni/model_executor/stage_input_processors/hyperclovax_seed_omni.py diff --git a/benchmarks/hcx-omni/README.md b/benchmarks/hcx-omni/README.md new file mode 100644 index 00000000000..85abf6b086e --- /dev/null +++ b/benchmarks/hcx-omni/README.md @@ -0,0 +1,60 @@ +# HyperCLOVAX-SEED-Omni-8B Benchmarks + +Measures end-to-end latency and throughput for: + +| Mode | Input → Output | +|------|----------------| +| T2T | Text → Text (thinker only) | +| T2V | Text → Text + Image | +| S2S | Audio + Text → Text + Audio | + +## Prerequisites + +Start the server first: + +```bash +cd examples/online_serving/hcx_omni +./run_server.sh --model naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B +``` + +## Run Benchmarks + +```bash +# All modes (10 requests each, sequential) +bash benchmarks/hcx-omni/run_benchmark.sh + +# Custom settings via env vars +NUM_PROMPTS=50 CONCURRENCY=4 MODE=t2v bash benchmarks/hcx-omni/run_benchmark.sh + +# S2S with a real audio file +python benchmarks/hcx-omni/benchmark_hcx_omni.py \ + --mode s2s --num-prompts 20 --audio-file /path/to/speech.wav + +# Save results to JSON +python benchmarks/hcx-omni/benchmark_hcx_omni.py \ + --mode all --num-prompts 10 --output-json results.json +``` + +## Metrics + +``` +t2v Results: + mode : t2v + total : 10 + success : 10 + success_rate : 100.0% + latency_mean : 18.43s + latency_p50 : 17.91s + latency_p90 : 21.34s + latency_p99 : 22.10s + latency_min : 15.20s + latency_max : 22.10s +``` + +Expected latency ranges (A100 80GB × 6): + +| Mode | p50 latency | Notes | +|------|------------|-------| +| T2T | ~2–4 s | Thinker only | +| T2V | ~15–25 s | Thinker + 50-step diffusion | +| S2S | ~5–12 s | Thinker + BigVGAN vocoder | diff --git a/benchmarks/hcx-omni/benchmark_hcx_omni.py b/benchmarks/hcx-omni/benchmark_hcx_omni.py new file mode 100644 index 00000000000..baaf4c9210a --- /dev/null +++ b/benchmarks/hcx-omni/benchmark_hcx_omni.py @@ -0,0 +1,400 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Benchmark script for HyperCLOVAX-SEED-Omni-8B online serving. + +Measures end-to-end latency and throughput for: + - Speech-to-Speech (S2S): audio input → text + audio output + - Text-to-Vision (T2V): text prompt → text + image output + - Text-to-Text (T2T): text prompt → text only (thinker stage only) + +Metrics reported per mode: + - Latency : mean / p50 / p90 / p99 (seconds, wall-clock) + - Throughput: requests / second + - Success rate + +Usage: + # Start the server first (see run_server.sh), then: + + # All modes (10 requests each) + python benchmark_hcx_omni.py --base-url http://localhost:8000/v1 --num-prompts 10 + + # S2S only, 50 requests, concurrency 4 + python benchmark_hcx_omni.py --mode s2s --num-prompts 50 --concurrency 4 + + # T2V only + python benchmark_hcx_omni.py --mode t2v --num-prompts 20 + + # With a real audio file for S2S + python benchmark_hcx_omni.py --mode s2s --audio-file /path/to/speech.wav +""" + +import argparse +import asyncio +import base64 +import io +import json +import statistics +import time +from dataclasses import dataclass, field + +import aiohttp + +# --------------------------------------------------------------------------- +# Defaults +# --------------------------------------------------------------------------- +DEFAULT_BASE_URL = "http://localhost:8000/v1" +DEFAULT_MODEL = "naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B" + +# System prompt required for audio/image generation to activate +SYSTEM_PROMPT = { + "role": "system", + "content": [ + { + "type": "text", + "text": ( + "당신은 CLOVA X입니다. 네이버가 만든 AI 어시스턴트로서 " + "오디오와 이미지를 인식하고 텍스트, 음성, 이미지를 생성할 수 있습니다." + ), + } + ], +} + +T2V_PROMPTS = [ + "귀여운 고양이 한 마리가 소파에 앉아 있는 그림을 그려줘.", + "밤하늘에 별이 빛나는 산 풍경 이미지를 만들어줘.", + "노란 해바라기가 가득한 들판을 그려줘.", + "현대적인 카페 인테리어 이미지를 생성해줘.", + "귀여운 강아지가 공원에서 뛰노는 그림을 그려줘.", + "파란 바다와 흰 모래 해변의 풍경을 그려줘.", + "봄날의 벚꽃이 흩날리는 공원 이미지를 만들어줘.", + "아늑한 서재에서 책을 읽는 사람의 그림을 그려줘.", + "빨간 지붕의 유럽풍 작은 마을 풍경을 그려줘.", + "우주 공간에서 지구를 바라보는 우주비행사 그림을 그려줘.", +] + +S2S_PROMPTS = [ + "이 오디오에서 무슨 내용이 들리나요?", + "방금 들은 내용을 한국어로 요약해줘.", + "이 소리가 무엇인지 설명해줘.", +] + +T2T_PROMPTS = [ + "대한민국의 수도는 어디인가요?", + "하늘은 왜 파란가요?", + "인공지능이란 무엇인가요?", + "건강한 식습관을 위한 조언을 해줘.", + "파이썬 프로그래밍 언어의 특징은 무엇인가요?", +] + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- +@dataclass +class RequestResult: + mode: str + latency: float + success: bool + has_audio: bool = False + has_image: bool = False + text: str = "" + error: str = "" + + +@dataclass +class BenchmarkStats: + mode: str + total: int + success: int + latencies: list[float] = field(default_factory=list) + + @property + def success_rate(self) -> float: + return self.success / self.total if self.total else 0.0 + + @property + def throughput(self) -> float: + return self.success / sum(self.latencies) * self.success if self.latencies else 0.0 + + def summary(self) -> dict: + if not self.latencies: + return {"mode": self.mode, "total": self.total, "success": 0} + s = sorted(self.latencies) + n = len(s) + return { + "mode": self.mode, + "total": self.total, + "success": self.success, + "success_rate": f"{self.success_rate:.1%}", + "latency_mean": f"{statistics.mean(s):.2f}s", + "latency_p50": f"{s[int(n * 0.50)]:.2f}s", + "latency_p90": f"{s[int(n * 0.90)]:.2f}s", + "latency_p99": f"{s[min(int(n * 0.99), n - 1)]:.2f}s", + "latency_min": f"{s[0]:.2f}s", + "latency_max": f"{s[-1]:.2f}s", + } + + +# --------------------------------------------------------------------------- +# Audio helpers +# --------------------------------------------------------------------------- +def make_sine_wav_b64(duration_sec: float = 1.0, sample_rate: int = 16000) -> str: + """Generate a simple 440 Hz sine wave and return as base64 WAV.""" + try: + import numpy as np + import scipy.io.wavfile as wav + + t = np.linspace(0, duration_sec, int(sample_rate * duration_sec), endpoint=False) + audio = (np.sin(2 * np.pi * 440 * t) * 0.5).astype(np.float32) + buf = io.BytesIO() + wav.write(buf, sample_rate, (audio * 32767).astype(np.int16)) + return base64.b64encode(buf.getvalue()).decode() + except ImportError: + # Minimal WAV header without numpy (44-byte header + silence) + sample_rate = 16000 + n_samples = int(sample_rate * duration_sec) + data = b"\x00\x00" * n_samples # 16-bit silence + data_size = len(data) + header = ( + b"RIFF" + + (data_size + 36).to_bytes(4, "little") + + b"WAVEfmt " + + (16).to_bytes(4, "little") + + (1).to_bytes(2, "little") # PCM + + (1).to_bytes(2, "little") # mono + + sample_rate.to_bytes(4, "little") + + (sample_rate * 2).to_bytes(4, "little") + + (2).to_bytes(2, "little") + + (16).to_bytes(2, "little") + + b"data" + + data_size.to_bytes(4, "little") + ) + return base64.b64encode(header + data).decode() + + +# --------------------------------------------------------------------------- +# Async request functions +# --------------------------------------------------------------------------- +async def send_request( + session: aiohttp.ClientSession, + base_url: str, + model: str, + payload: dict, +) -> tuple[float, dict]: + url = f"{base_url}/chat/completions" + t0 = time.perf_counter() + async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=300)) as resp: + body = await resp.json() + latency = time.perf_counter() - t0 + return latency, body + + +async def run_t2t( + session: aiohttp.ClientSession, + base_url: str, + model: str, + prompt: str, +) -> RequestResult: + payload = { + "model": model, + "modalities": ["text"], + "messages": [ + SYSTEM_PROMPT, + {"role": "user", "content": prompt}, + ], + "max_tokens": 256, + } + try: + latency, body = await send_request(session, base_url, model, payload) + if "error" in body: + return RequestResult("t2t", latency, False, error=str(body["error"])) + text = body["choices"][0]["message"].get("content", "") + return RequestResult("t2t", latency, True, text=text) + except Exception as e: + return RequestResult("t2t", 0.0, False, error=str(e)) + + +async def run_t2v( + session: aiohttp.ClientSession, + base_url: str, + model: str, + prompt: str, +) -> RequestResult: + payload = { + "model": model, + "modalities": ["text", "image"], + "messages": [ + SYSTEM_PROMPT, + { + "role": "user", + "content": [{"type": "text", "text": prompt}], + }, + ], + "max_tokens": 800, + } + try: + latency, body = await send_request(session, base_url, model, payload) + if "error" in body: + return RequestResult("t2v", latency, False, error=str(body["error"])) + + has_image = False + text = "" + for choice in body.get("choices", []): + msg = choice.get("message", {}) + content = msg.get("content") + if isinstance(content, list): + for item in content: + if isinstance(item, dict) and item.get("type") == "image_url": + url = item.get("image_url", {}).get("url", "") + if url.startswith("data:image"): + has_image = True + elif isinstance(content, str): + text += content + return RequestResult("t2v", latency, True, has_image=has_image, text=text) + except Exception as e: + return RequestResult("t2v", 0.0, False, error=str(e)) + + +async def run_s2s( + session: aiohttp.ClientSession, + base_url: str, + model: str, + prompt: str, + audio_b64: str, +) -> RequestResult: + payload = { + "model": model, + "modalities": ["text", "audio"], + "messages": [ + SYSTEM_PROMPT, + { + "role": "user", + "content": [ + { + "type": "input_audio", + "input_audio": {"data": audio_b64, "format": "wav"}, + }, + {"type": "text", "text": prompt}, + ], + }, + ], + "max_tokens": 512, + } + try: + latency, body = await send_request(session, base_url, model, payload) + if "error" in body: + return RequestResult("s2s", latency, False, error=str(body["error"])) + + has_audio = False + text = "" + for choice in body.get("choices", []): + msg = choice.get("message", {}) + audio = msg.get("audio") + if audio and audio.get("data"): + has_audio = True + content = msg.get("content") + if isinstance(content, str) and content and content != "None": + text += content + return RequestResult("s2s", latency, True, has_audio=has_audio, text=text) + except Exception as e: + return RequestResult("s2s", 0.0, False, error=str(e)) + + +# --------------------------------------------------------------------------- +# Benchmark runner +# --------------------------------------------------------------------------- +async def run_benchmark( + mode: str, + base_url: str, + model: str, + num_prompts: int, + concurrency: int, + audio_b64: str, +) -> BenchmarkStats: + stats = BenchmarkStats(mode=mode, total=num_prompts, success=0) + semaphore = asyncio.Semaphore(concurrency) + + async def bounded(i: int) -> RequestResult: + async with semaphore: + if mode == "t2t": + prompt = T2T_PROMPTS[i % len(T2T_PROMPTS)] + return await run_t2t(session, base_url, model, prompt) + elif mode == "t2v": + prompt = T2V_PROMPTS[i % len(T2V_PROMPTS)] + return await run_t2v(session, base_url, model, prompt) + else: # s2s + prompt = S2S_PROMPTS[i % len(S2S_PROMPTS)] + return await run_s2s(session, base_url, model, prompt, audio_b64) + + connector = aiohttp.TCPConnector(limit=concurrency) + async with aiohttp.ClientSession(connector=connector) as session: + tasks = [bounded(i) for i in range(num_prompts)] + results = await asyncio.gather(*tasks) + + for r in results: + if r.success: + stats.success += 1 + stats.latencies.append(r.latency) + else: + print(f" [FAIL] {r.error[:80]}") + + return stats + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- +def main(): + parser = argparse.ArgumentParser(description="HyperCLOVAX-SEED-Omni-8B benchmark") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--model", default=DEFAULT_MODEL) + parser.add_argument( + "--mode", + choices=["t2t", "t2v", "s2s", "all"], + default="all", + help="Benchmark mode (default: all)", + ) + parser.add_argument("--num-prompts", type=int, default=10) + parser.add_argument("--concurrency", type=int, default=1) + parser.add_argument("--audio-file", default=None, help="WAV file for S2S input") + parser.add_argument("--output-json", default=None, help="Save results to JSON file") + args = parser.parse_args() + + # Prepare audio + if args.audio_file: + with open(args.audio_file, "rb") as f: + audio_b64 = base64.b64encode(f.read()).decode() + print(f"Using audio file: {args.audio_file}") + else: + audio_b64 = make_sine_wav_b64(1.0) + print("Using synthetic 1s 440Hz sine wave audio") + + modes = ["t2t", "t2v", "s2s"] if args.mode == "all" else [args.mode] + + print(f"\nBenchmark: {args.base_url}") + print(f"Model : {args.model}") + print(f"Prompts : {args.num_prompts} per mode, concurrency={args.concurrency}") + print() + + all_stats = [] + for mode in modes: + print(f"Running {mode.upper()} ({args.num_prompts} requests)...") + stats = asyncio.run( + run_benchmark(mode, args.base_url, args.model, args.num_prompts, args.concurrency, audio_b64) + ) + all_stats.append(stats) + s = stats.summary() + print(f" {mode.upper()} Results:") + for k, v in s.items(): + print(f" {k:20s}: {v}") + print() + + if args.output_json: + with open(args.output_json, "w") as f: + json.dump([s.summary() for s in all_stats], f, indent=2, ensure_ascii=False) + print(f"Results saved to {args.output_json}") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/hcx-omni/run_benchmark.sh b/benchmarks/hcx-omni/run_benchmark.sh new file mode 100755 index 00000000000..a93385d149d --- /dev/null +++ b/benchmarks/hcx-omni/run_benchmark.sh @@ -0,0 +1,38 @@ +#!/bin/bash +# HyperCLOVAX-SEED-Omni-8B Benchmark Script +# Run from vllm-omni root directory. + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +cd "$ROOT" + +BASE_URL="${BASE_URL:-http://localhost:8000/v1}" +NUM_PROMPTS="${NUM_PROMPTS:-10}" +CONCURRENCY="${CONCURRENCY:-1}" +MODE="${MODE:-all}" +OUTPUT_DIR="$SCRIPT_DIR/results" + +mkdir -p "$OUTPUT_DIR" +TIMESTAMP=$(date +%Y%m%d_%H%M%S) +OUTPUT_JSON="$OUTPUT_DIR/benchmark_${MODE}_${TIMESTAMP}.json" + +echo "==================================================" +echo " HyperCLOVAX-SEED-Omni-8B Benchmark" +echo " BASE_URL : $BASE_URL" +echo " MODE : $MODE" +echo " NUM_PROMPTS : $NUM_PROMPTS" +echo " CONCURRENCY : $CONCURRENCY" +echo " OUTPUT : $OUTPUT_JSON" +echo "==================================================" + +python benchmarks/hcx-omni/benchmark_hcx_omni.py \ + --base-url "$BASE_URL" \ + --mode "$MODE" \ + --num-prompts "$NUM_PROMPTS" \ + --concurrency "$CONCURRENCY" \ + --output-json "$OUTPUT_JSON" + +echo "" +echo "Done. Results: $OUTPUT_JSON" diff --git a/examples/online_serving/hcx_omni/README.md b/examples/online_serving/hcx_omni/README.md new file mode 100644 index 00000000000..5c1bb458e84 --- /dev/null +++ b/examples/online_serving/hcx_omni/README.md @@ -0,0 +1,235 @@ +# HyperCLOVAX-SEED-Omni-8B with vLLM-Omni + +[HyperCLOVAX-SEED-Omni-8B](https://huggingface.co/naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B) +is an omni-modal model by NAVER Cloud that supports: + +| Input | Output | +|--------|-----------------| +| Text | Text | +| Audio | Text + Audio | +| Image | Text | +| Text | Text + Image | +| Audio | Text + Audio + Image | + +## Architecture + +The model uses a 3-stage pipeline: + +``` +Stage 0 (Thinker) ──→ Stage 1 (Vision Decoder, diffusion) + │ + └──────────→ Stage 2 (Audio Decoder, unit-BigVGAN) +``` + +- **Thinker**: Qwen2.5-VL vision encoder + Qwen2Audio encoder + HyperCLOVAX language model. + Outputs text tokens and discrete audio/vision codes in the vocabulary. +- **Vision Decoder**: Diffusion-based image generation from 729 discrete TA-Tok codes. +- **Audio Decoder**: Unit-BigVGAN vocoder from CosyVoice2 FSQ discrete audio codes. + +## Hardware Requirements + +| Setup | GPUs | +|-----------|---------------------------------------------| +| Default | 6 × GPU ≥24 GB (4 for thinker TP, 1+1 for decoders) | +| Minimal | 3 × GPU ≥24 GB (1 for thinker, 1+1 for decoders) | + +## Quick Start + +### 1. Start the Server + +```bash +# 6-GPU setup (production) +./run_server.sh --model naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B + +# Custom GPU allocation +CUDA_VISIBLE_DEVICES=0,1,2,3,4,5 ./run_server.sh +``` + +### 2. Run the Client Demo + +```bash +# All modes: text-only, text-to-vision, speech-to-speech +python client_demo.py --base-url http://localhost:8000/v1 + +# Speech-to-Speech with your own audio file +python client_demo.py --mode s2s --audio-file /path/to/speech.wav + +# Text-to-Vision +python client_demo.py --mode t2v --prompt "고양이 그림을 그려줘" +``` + +### 3. Use the OpenAI API Directly + +**Speech-to-Speech:** +```bash +curl http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B", + "modalities": ["text", "audio"], + "messages": [{ + "role": "user", + "content": [ + {"type": "input_audio", "input_audio": {"data": "", "format": "wav"}}, + {"type": "text", "text": "이 오디오에 무슨 내용이 있나요?"} + ] + }] + }' +``` + +**Text-to-Vision:** +```bash +curl http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B", + "modalities": ["text", "image"], + "messages": [{ + "role": "user", + "content": [ + {"type": "text", "text": "귀여운 강아지 한 마리가 공원에서 뛰노는 그림을 그려줘."} + ] + }] + }' +``` + +## System Prompt (Required for Audio/Image Generation) + +The thinker model decides whether to emit discrete audio or image tokens based +on context. **A system prompt is required** to reliably activate audio/image +generation. Without it, the model typically responds in text only. + +```python +SYSTEM_PROMPT = { + "role": "system", + "content": [ + { + "type": "text", + "text": ( + "당신은 CLOVA X입니다. 네이버가 만든 AI 어시스턴트로서 " + "오디오와 이미지를 인식하고 텍스트, 음성, 이미지를 생성할 수 있습니다." + ), + } + ], +} +``` + +Include it as the first message in every request that expects audio or image output. + +## Mode Activation Conditions + +### Speech-to-Speech (S2S) + +**Requirements:** +- `modalities: ["text", "audio"]` +- Audio input via `input_audio` content block (base64-encoded WAV/MP3) +- System prompt included + +The thinker generates discrete audio unit tokens (`<|audio0000|>` … `<|audio6560|>`) +in its output, which are routed to the audio decoder (BigVGAN). The audio +response is in `choices[N].message.audio.data` (base64 WAV). + +```python +response = client.chat.completions.create( + model=MODEL, + modalities=["text", "audio"], + messages=[ + SYSTEM_PROMPT, + { + "role": "user", + "content": [ + { + "type": "input_audio", + "input_audio": {"data": audio_b64, "format": "wav"}, + }, + {"type": "text", "text": "이 오디오에 무슨 내용이 있나요?"}, + ], + }, + ], +) + +# The response may have two choices: one with text, one with audio +for choice in response.choices: + if choice.message.audio: + wav_bytes = base64.b64decode(choice.message.audio.data) +``` + +### Text-to-Vision (T2V) + +**Requirements:** +- `modalities: ["text", "image"]` +- Text-only user message (no audio input) +- System prompt included + +The thinker generates 729 discrete vision codes (`<|vision00000|>` … `<|vision65535|>`, +27×27 TA-Tok tokens), which are routed to the vision decoder (diffusion, 50 steps by +default). The image is returned in `choices[N].message.content` as an +`image_url` item with a `data:image/png;base64,...` URL. + +```python +response = client.chat.completions.create( + model=MODEL, + modalities=["text", "image"], + messages=[ + SYSTEM_PROMPT, + { + "role": "user", + "content": [{"type": "text", "text": "귀여운 강아지가 공원에서 뛰노는 그림을 그려줘."}], + }, + ], +) + +# Parse raw JSON to access image_url content +import json, httpx +raw = json.loads(response._raw_response.content) +for choice in raw["choices"]: + content = choice["message"].get("content", []) + if isinstance(content, list): + for item in content: + if item.get("type") == "image_url": + data_url = item["image_url"]["url"] # "data:image/png;base64,..." + img_bytes = base64.b64decode(data_url.split(",", 1)[1]) +``` + +### Text-to-Text (T2T) + +No special requirements. The thinker responds in text only. + +```python +response = client.chat.completions.create( + model=MODEL, + modalities=["text"], + messages=[{"role": "user", "content": "대한민국의 수도는 어디인가요?"}], +) +print(response.choices[0].message.content) +``` + +## Response Structure + +| Mode | `choices[i].message` field | Content | +|------|--------------------------|---------| +| T2T | `content` (str) | Text response | +| S2S | `content` (str) | Text transcript | +| S2S | `audio.data` (str) | base64 WAV | +| T2V | `content` (list) | `[{"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}}]` | + +> **Note:** S2S responses typically contain two `choices` entries — one with +> the text and one with the audio. Iterate over all choices to collect both. + +## Stage Config + +The default stage config is at +`vllm_omni/model_executor/stage_configs/hcx_omni.yaml`. + +Key parameters: + +| Stage | Type | `model_arch` / `model_class_name` | GPU | +|-------|-----------|------------------------------------|-------| +| 0 | LLM | `HCXVisionV2ForCausalLM` | 0-3 | +| 1 | Diffusion | `HyperCLOVAXVisionPipeline` | 4 | +| 2 | Diffusion | `HyperCLOVAXAudioPipeline` | 5 | + +## Benchmarks + +See [`benchmarks/hcx-omni/`](../../../benchmarks/hcx-omni/) for latency and +throughput measurement scripts. diff --git a/examples/online_serving/hcx_omni/client_demo.py b/examples/online_serving/hcx_omni/client_demo.py new file mode 100644 index 00000000000..bdc2ac58a4d --- /dev/null +++ b/examples/online_serving/hcx_omni/client_demo.py @@ -0,0 +1,153 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""HyperCLOVAX-SEED-Omni-8B client demo. + +Demonstrates Speech-to-Speech and Text-to-Vision via the OpenAI-compatible +HTTP API provided by vLLM-Omni. + +Usage: + # Start the server first (see run_server.sh), then: + python client_demo.py --base-url http://localhost:8000/v1 + + # With a local audio file: + python client_demo.py --audio-file path/to/speech.wav + + # Text-to-Vision only: + python client_demo.py --mode t2v --prompt "고양이 그림을 그려줘" +""" + +import argparse +import base64 +import io +import sys +from pathlib import Path + +try: + from openai import OpenAI +except ImportError: + print("Please install openai: pip install openai") + sys.exit(1) + + +def encode_audio_file(path: str) -> str: + """Base64-encode a WAV/MP3 file.""" + with open(path, "rb") as f: + return base64.b64encode(f.read()).decode() + + +def encode_audio_array(array, sample_rate: int = 16000) -> str: + """Base64-encode a numpy audio array as WAV.""" + import numpy as np + import scipy.io.wavfile as wav + + if not isinstance(array, np.ndarray): + array = np.array(array) + buf = io.BytesIO() + wav.write(buf, sample_rate, (array * 32767).astype(np.int16)) + return base64.b64encode(buf.getvalue()).decode() + + +def speech_to_speech(client: OpenAI, audio_b64: str, prompt: str = "이 오디오에 무슨 내용이 있나요?"): + """Send audio → receive text + audio.""" + print(f"\n[Speech-to-Speech] prompt: {prompt!r}") + response = client.chat.completions.create( + model="naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B", + modalities=["text", "audio"], + messages=[ + { + "role": "user", + "content": [ + { + "type": "input_audio", + "input_audio": {"data": audio_b64, "format": "wav"}, + }, + {"type": "text", "text": prompt}, + ], + } + ], + ) + choice = response.choices[0] + print(f"Text response: {choice.message.content}") + if hasattr(choice.message, "audio") and choice.message.audio: + audio_data = base64.b64decode(choice.message.audio.data) + out_path = Path("/tmp/hcx_omni_response.wav") + out_path.write_bytes(audio_data) + print(f"Audio saved to: {out_path}") + return response + + +def text_to_vision(client: OpenAI, prompt: str = "귀여운 강아지 한 마리가 공원에서 뛰노는 그림을 그려줘."): + """Send text → receive text + image.""" + print(f"\n[Text-to-Vision] prompt: {prompt!r}") + response = client.chat.completions.create( + model="naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B", + modalities=["text", "image"], + messages=[ + { + "role": "user", + "content": [{"type": "text", "text": prompt}], + } + ], + ) + choice = response.choices[0] + print(f"Text response: {choice.message.content}") + if hasattr(choice.message, "image") and choice.message.image: + img_data = base64.b64decode(choice.message.image.data) + out_path = Path("/tmp/hcx_omni_generated.png") + out_path.write_bytes(img_data) + print(f"Image saved to: {out_path}") + return response + + +def text_only(client: OpenAI, prompt: str = "대한민국의 수도는 어디인가요?"): + """Pure text conversation (thinker only).""" + print(f"\n[Text-only] prompt: {prompt!r}") + response = client.chat.completions.create( + model="naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B", + modalities=["text"], + messages=[{"role": "user", "content": prompt}], + ) + print(f"Response: {response.choices[0].message.content}") + return response + + +def main(): + parser = argparse.ArgumentParser(description="HyperCLOVAX-SEED-Omni-8B demo") + parser.add_argument("--base-url", default="http://localhost:8000/v1") + parser.add_argument( + "--mode", + choices=["s2s", "t2v", "text", "all"], + default="all", + help="Demo mode: s2s=Speech-to-Speech, t2v=Text-to-Vision, text=Text-only", + ) + parser.add_argument("--audio-file", default=None, help="Path to input audio file") + parser.add_argument("--prompt", default=None, help="Text prompt override") + args = parser.parse_args() + + client = OpenAI(api_key="EMPTY", base_url=args.base_url) + + if args.mode in ("text", "all"): + text_only(client, prompt=args.prompt or "대한민국의 수도는 어디인가요?") + + if args.mode in ("t2v", "all"): + text_to_vision(client, prompt=args.prompt or "귀여운 강아지 한 마리가 공원에서 뛰노는 그림을 그려줘.") + + if args.mode in ("s2s", "all"): + if args.audio_file: + audio_b64 = encode_audio_file(args.audio_file) + else: + # Generate synthetic 1-second sine wave + try: + import numpy as np + + t = np.linspace(0, 1, 16000, endpoint=False) + audio_array = np.sin(2 * np.pi * 440 * t).astype(np.float32) + audio_b64 = encode_audio_array(audio_array) + except ImportError: + print("numpy not available, skipping S2S demo") + return + speech_to_speech(client, audio_b64, prompt=args.prompt or "이 오디오에 무슨 내용이 있나요?") + + +if __name__ == "__main__": + main() diff --git a/examples/online_serving/hcx_omni/run_server.sh b/examples/online_serving/hcx_omni/run_server.sh new file mode 100755 index 00000000000..c3cbafba4b2 --- /dev/null +++ b/examples/online_serving/hcx_omni/run_server.sh @@ -0,0 +1,52 @@ +#!/bin/bash +# Launch HyperCLOVAX-SEED-Omni-8B with vLLM-Omni. +# +# Requirements: +# - 6× GPUs (≥24 GB VRAM each): +# GPU 0-3: Thinker (tensor_parallel_size=4) +# GPU 4 : Vision decoder +# GPU 5 : Audio decoder +# - HF model: naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B +# +# Usage: +# ./run_server.sh [--model MODEL] [--port PORT] [--stage-configs-path PATH] + +set -e + +MODEL="${MODEL:-naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B}" +PORT="${PORT:-8000}" +HOST="${HOST:-0.0.0.0}" +STAGE_CONFIG="${STAGE_CONFIG:-}" + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DEFAULT_STAGE_CONFIG="$SCRIPT_DIR/../../../vllm_omni/model_executor/stage_configs/hcx_omni.yaml" + +while [[ $# -gt 0 ]]; do + case $1 in + --model) MODEL="$2"; shift 2 ;; + --port) PORT="$2"; shift 2 ;; + --host) HOST="$2"; shift 2 ;; + --stage-configs-path) STAGE_CONFIG="$2"; shift 2 ;; + --help) + echo "Usage: $0 [--model MODEL] [--port PORT] [--host HOST] [--stage-configs-path PATH]" + exit 0 ;; + *) echo "Unknown: $1"; exit 1 ;; + esac +done + +[[ -z "$STAGE_CONFIG" ]] && STAGE_CONFIG="$DEFAULT_STAGE_CONFIG" + +echo "=================================================" +echo " HyperCLOVAX-SEED-Omni-8B vLLM-Omni Server" +echo "=================================================" +echo " Model : $MODEL" +echo " Stage config: $STAGE_CONFIG" +echo " Endpoint : http://$HOST:$PORT/v1" +echo "=================================================" + +python -m vllm_omni.entrypoints.openai.api_server \ + --model "$MODEL" \ + --stage-configs-path "$STAGE_CONFIG" \ + --port "$PORT" \ + --host "$HOST" \ + --trust-remote-code diff --git a/pyproject.toml b/pyproject.toml index 9b034a7c8e9..c11d08b4e83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -237,3 +237,4 @@ ue = "ue" semantics = "semantics" fullset = "fullset" Vai = "Vai" +nd = "nd" diff --git a/tests/e2e/offline_inference/test_hcx_omni.py b/tests/e2e/offline_inference/test_hcx_omni.py new file mode 100644 index 00000000000..565a6a0f4e7 --- /dev/null +++ b/tests/e2e/offline_inference/test_hcx_omni.py @@ -0,0 +1,169 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""E2E tests for HyperCLOVAX-SEED-Omni-8B. + +Tests cover: + - Text-only inference (comprehension) + - Speech-to-Speech (audio input → audio output) + - Text-to-Vision (text input → image output) + - Audio-to-Vision (audio input → image + audio output) +""" + +from pathlib import Path + +import pytest + +from tests.conftest import ( + generate_synthetic_audio, + generate_synthetic_image, + modify_stage_config, +) +from tests.utils import hardware_test + +MODEL = "naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B" + +_CI_YAML = str(Path(__file__).parent.parent / "stage_configs" / "hcx_omni_ci.yaml") + + +def _ci_config(enforce_eager: bool = True) -> str: + updates: dict = { + "stage_args": { + 0: {"engine_args.enforce_eager": str(enforce_eager).lower()}, + } + } + return modify_stage_config(_CI_YAML, updates=updates) + + +stage_config = _ci_config(enforce_eager=True) +test_params = [(MODEL, stage_config)] + + +# ------------------------------------------------------------------ # +# Helper # +# ------------------------------------------------------------------ # + + +def _text_prompt(text: str) -> dict: + return { + "role": "user", + "content": [{"type": "text", "text": text}], + } + + +def _audio_text_prompt(audio_array, text: str) -> dict: + return { + "role": "user", + "content": [ + {"type": "input_audio", "input_audio": {"data": audio_array, "format": "wav"}}, + {"type": "text", "text": text}, + ], + } + + +def _image_text_prompt(image_array, text: str) -> dict: + return { + "role": "user", + "content": [ + {"type": "image_url", "image_url": {"url": image_array}}, + {"type": "text", "text": text}, + ], + } + + +# ------------------------------------------------------------------ # +# Tests # +# ------------------------------------------------------------------ # + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test( + res={"cuda": "L4"}, + num_cards={"cuda": 3}, +) +@pytest.mark.parametrize("omni_runner", test_params, indirect=True) +def test_text_to_text(omni_runner, omni_runner_handler) -> None: + """Text-only (comprehension) request — verifies thinker stage alone.""" + request_config = { + "prompts": "What is the capital of South Korea?", + "output_modalities": ["text"], + } + results = omni_runner.run(request_config) + assert results and len(results) > 0 + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test( + res={"cuda": "L4"}, + num_cards={"cuda": 3}, +) +@pytest.mark.parametrize("omni_runner", test_params, indirect=True) +def test_audio_to_audio(omni_runner, omni_runner_handler) -> None: + """Speech-to-Speech: audio input processed by thinker → audio decoder.""" + audio = generate_synthetic_audio(1, 1, 16000)["np_array"] + if len(audio.shape) == 2: + audio = audio.squeeze() + + request_config = { + "prompts": _audio_text_prompt(audio, "Repeat what you heard."), + "output_modalities": ["text", "audio"], + } + results = omni_runner.run(request_config) + assert results and len(results) > 0 + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test( + res={"cuda": "L4"}, + num_cards={"cuda": 3}, +) +@pytest.mark.parametrize("omni_runner", test_params, indirect=True) +def test_text_to_image(omni_runner, omni_runner_handler) -> None: + """Text-to-Vision: text prompt → image generated by vision decoder.""" + request_config = { + "prompts": "Draw a picture of a cat sitting on a sofa.", + "output_modalities": ["text", "image"], + } + results = omni_runner.run(request_config) + assert results and len(results) > 0 + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test( + res={"cuda": "L4"}, + num_cards={"cuda": 3}, +) +@pytest.mark.parametrize("omni_runner", test_params, indirect=True) +def test_image_to_text(omni_runner, omni_runner_handler) -> None: + """Image understanding: image input → text description.""" + image = generate_synthetic_image(224, 224)["np_array"] + request_config = { + "prompts": _image_text_prompt(image, "Describe this image."), + "output_modalities": ["text"], + } + results = omni_runner.run(request_config) + assert results and len(results) > 0 + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test( + res={"cuda": "L4"}, + num_cards={"cuda": 3}, +) +@pytest.mark.parametrize("omni_runner", test_params, indirect=True) +def test_multimodal_to_multimodal(omni_runner, omni_runner_handler) -> None: + """Full omni: audio + image input → text + audio + image output.""" + audio = generate_synthetic_audio(1, 1, 16000)["np_array"] + if len(audio.shape) == 2: + audio = audio.squeeze() + + request_config = { + "prompts": "Listen to the audio and draw what you hear.", + "output_modalities": ["text", "audio", "image"], + } + results = omni_runner.run(request_config) + assert results and len(results) > 0 diff --git a/tests/e2e/online_serving/test_hcx_omni.py b/tests/e2e/online_serving/test_hcx_omni.py new file mode 100644 index 00000000000..491c8371797 --- /dev/null +++ b/tests/e2e/online_serving/test_hcx_omni.py @@ -0,0 +1,125 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""E2E online serving tests for HyperCLOVAX-SEED-Omni-8B. + +Tests the OpenAI-compatible HTTP API for Speech-to-Speech and +Text-to-Vision generation. +""" + +import os +from pathlib import Path + +import pytest + +from tests.conftest import ( + OmniServerParams, + generate_synthetic_audio, + generate_synthetic_image, +) +from tests.utils import hardware_test + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" +os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" + +MODEL = "naver-hyperclovax/HyperCLOVAX-SEED-Omni-8B" +_CI_YAML = str(Path(__file__).parent.parent / "stage_configs" / "hcx_omni_ci.yaml") + +test_params = [OmniServerParams(model=MODEL, stage_config_path=_CI_YAML)] + +SYSTEM_PROMPT = { + "role": "system", + "content": [ + { + "type": "text", + "text": ( + "당신은 CLOVA X입니다. 네이버가 만든 AI 어시스턴트로서 " + "오디오와 이미지를 인식하고 텍스트, 음성, 이미지를 생성할 수 있습니다." + ), + } + ], +} + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards={"cuda": 3}) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +def test_speech_to_speech(omni_server, omni_server_handler) -> None: + """Speech-to-Speech: audio input → text + audio response.""" + audio = generate_synthetic_audio(1, 1, 16000)["np_array"] + if len(audio.shape) == 2: + audio = audio.squeeze() + + messages = [ + SYSTEM_PROMPT, + { + "role": "user", + "content": [ + { + "type": "input_audio", + "input_audio": {"data": audio, "format": "wav"}, + }, + {"type": "text", "text": "이 오디오에서 무슨 내용이 들리나요?"}, + ], + }, + ] + request_config = { + "messages": messages, + "modalities": ["text", "audio"], + "stream": False, + } + response = omni_server.chat(request_config) + assert response is not None + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards={"cuda": 3}) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +def test_text_to_vision(omni_server, omni_server_handler) -> None: + """Text-to-Vision: text prompt → text + image response.""" + messages = [ + SYSTEM_PROMPT, + { + "role": "user", + "content": [ + {"type": "text", "text": "고양이 한 마리가 소파에 앉아 있는 그림을 그려줘."}, + ], + }, + ] + request_config = { + "messages": messages, + "modalities": ["text", "image"], + "stream": False, + } + response = omni_server.chat(request_config) + assert response is not None + + +@pytest.mark.advanced_model +@pytest.mark.omni +@hardware_test(res={"cuda": "L4"}, num_cards={"cuda": 3}) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +def test_image_understanding(omni_server, omni_server_handler) -> None: + """Image understanding: image input → text description.""" + image = generate_synthetic_image(224, 224)["np_array"] + messages = [ + SYSTEM_PROMPT, + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": {"url": f"data:image/png;base64,{image}"}, + }, + {"type": "text", "text": "이 이미지에 무엇이 있나요?"}, + ], + }, + ] + request_config = { + "messages": messages, + "modalities": ["text"], + "stream": False, + } + response = omni_server.chat(request_config) + assert response is not None diff --git a/tests/e2e/stage_configs/hcx_omni_ci.yaml b/tests/e2e/stage_configs/hcx_omni_ci.yaml new file mode 100644 index 00000000000..f455422689e --- /dev/null +++ b/tests/e2e/stage_configs/hcx_omni_ci.yaml @@ -0,0 +1,93 @@ +# Stage config for HyperCLOVAX-SEED-Omni-8B CI tests. +# Verified on 3x 24GB GPU (L4/RTX3090/RTX4090). +# Stage 0 (thinker): 4xTP → single GPU in CI uses 1xTP + +runtime: + connectors: + shared_memory_connector: + extra: + shm_threshold_bytes: 65536 + name: SharedMemoryConnector + defaults: + max_inflight: 1 + window_size: -1 + edges: + - { from: 0, to: 1, window_size: -1 } + - { from: 0, to: 2, window_size: -1 } + enabled: true + +stage_args: + - stage_id: 0 + stage_type: llm + runtime: + process: true + devices: "0" + engine_args: + model_stage: thinker + model_arch: HCXVisionV2ForCausalLM + worker_type: ar + scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler + tensor_parallel_size: 1 + max_model_len: 4096 + max_num_batched_tokens: 4096 + max_num_seqs: 1 + gpu_memory_utilization: 0.8 + enforce_eager: true + trust_remote_code: true + engine_output_type: latent + enable_prefix_caching: false + limit_mm_per_prompt: + audio: 1 + image: 1 + load_format: dummy + is_comprehension: true + final_output: true + final_output_type: text + default_sampling_params: + temperature: 0.1 + top_p: 1.0 + top_k: -1 + max_tokens: 128 + seed: 42 + detokenize: true + repetition_penalty: 1.0 + + - stage_id: 1 + stage_type: diffusion + runtime: + process: true + devices: "1" + max_batch_size: 1 + engine_args: + engine_output_type: image + gpu_memory_utilization: 0.75 + model_class_name: HyperCLOVAXVisionPipeline + model_stage: decoder/vision + model_subdir: decoder/vision + trust_remote_code: true + enforce_eager: true + engine_input_source: + - 0 + final_output: true + final_output_type: image + custom_process_input_func: vllm_omni.model_executor.stage_input_processors.hyperclovax_seed_omni.thinker2vision_decoder + + - stage_id: 2 + stage_type: diffusion + runtime: + process: true + devices: "2" + max_batch_size: 1 + engine_args: + engine_output_type: audio + gpu_memory_utilization: 0.4 + model_class_name: HyperCLOVAXAudioPipeline + model_stage: decoder/audio + model_subdir: decoder/audio/NCZSCosybigvganDecoder.mar + trust_remote_code: true + enforce_eager: true + engine_input_source: + - 0 + final_output: true + final_output_type: audio + custom_process_input_func: vllm_omni.model_executor.stage_input_processors.hyperclovax_seed_omni.thinker2audio_decoder diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 00000000000..42656022e64 --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,19 @@ +"""conftest.py for unit tests — stubs out heavy vllm_omni init.""" + +import sys +import types + +# Provide a lightweight stub for vllm_omni so that submodule imports +# (e.g. vllm_omni.model_executor.stage_input_processors) don't trigger the +# full package __init__.py which requires a complete vLLM installation. +_stub = types.ModuleType("vllm_omni") +_stub.__path__ = [] +_stub.__spec__ = None +sys.modules.setdefault("vllm_omni", _stub) + +# Stub out vllm_omni.inputs.data.OmniTokensPrompt +_inputs = types.ModuleType("vllm_omni.inputs") +_inputs_data = types.ModuleType("vllm_omni.inputs.data") +_inputs_data.OmniTokensPrompt = dict # type: ignore[attr-defined] +sys.modules.setdefault("vllm_omni.inputs", _inputs) +sys.modules.setdefault("vllm_omni.inputs.data", _inputs_data) diff --git a/tests/unit/model_executor/__init__.py b/tests/unit/model_executor/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/model_executor/test_hcx_omni_processing.py b/tests/unit/model_executor/test_hcx_omni_processing.py new file mode 100644 index 00000000000..71b544a070e --- /dev/null +++ b/tests/unit/model_executor/test_hcx_omni_processing.py @@ -0,0 +1,187 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Unit tests for HCXOmni multimodal token processing. + +Tests verify that: + 1. Audio tokens are correctly positioned and embedded. + 2. Image tokens (continuous path via Qwen2.5-VL) are correctly positioned. + 3. Discrete audio/image token boundaries match config.json values. + 4. Stage input processors correctly extract discrete tokens from mixed output. +""" + +# Token ID boundaries (from HyperCLOVAX-SEED-Omni-8B config.json) +DISCRETE_AUDIO_UNIT_0_ID = 128606 +DISCRETE_IMAGE_UNIT_0_ID = 135168 +DISCRETE_AUDIO_VOCAB_SIZE = 6561 +DISCRETE_IMAGE_VOCAB_SIZE = 65536 +DISCRETE_IMAGE_TOKEN_LENGTH = 729 # 27 * 27 + + +class TestDiscreteTokenBoundaries: + """Verify token ID arithmetic matches config.json.""" + + def test_audio_range(self): + assert DISCRETE_AUDIO_UNIT_0_ID == 128606 + assert DISCRETE_AUDIO_UNIT_0_ID + DISCRETE_AUDIO_VOCAB_SIZE - 1 == 135166 + + def test_image_range(self): + assert DISCRETE_IMAGE_UNIT_0_ID == 135168 + # Image codebook is 2^16 = 65536 + assert DISCRETE_IMAGE_VOCAB_SIZE == 65536 + + def test_no_overlap(self): + audio_end = DISCRETE_AUDIO_UNIT_0_ID + DISCRETE_AUDIO_VOCAB_SIZE + assert audio_end < DISCRETE_IMAGE_UNIT_0_ID, "Audio and image token ranges must not overlap" + + def test_image_token_count_is_square(self): + """TA-Tok produces 27×27 = 729 tokens per image.""" + import math + + side = math.isqrt(DISCRETE_IMAGE_TOKEN_LENGTH) + assert side * side == DISCRETE_IMAGE_TOKEN_LENGTH + + +class TestExtractDiscreteTokens: + """Test the _extract_discrete_tokens helper.""" + + def _extract(self, token_ids, start_id, vocab_size): + return [tid - start_id for tid in token_ids if start_id <= tid < start_id + vocab_size] + + def test_extract_audio_tokens(self): + token_ids = [ + 100, + 200, # text + DISCRETE_AUDIO_UNIT_0_ID, + DISCRETE_AUDIO_UNIT_0_ID + 42, + DISCRETE_AUDIO_UNIT_0_ID + 100, + 300, # text + ] + result = self._extract(token_ids, DISCRETE_AUDIO_UNIT_0_ID, DISCRETE_AUDIO_VOCAB_SIZE) + assert result == [0, 42, 100] + + def test_extract_image_tokens(self): + token_ids = [ + 100, + DISCRETE_IMAGE_UNIT_0_ID, + DISCRETE_IMAGE_UNIT_0_ID + 255, + 200, + ] + result = self._extract(token_ids, DISCRETE_IMAGE_UNIT_0_ID, DISCRETE_IMAGE_VOCAB_SIZE) + assert result == [0, 255] + + def test_no_overlap_extraction(self): + """Audio extraction must not pick up image tokens and vice versa.""" + mixed = [ + DISCRETE_AUDIO_UNIT_0_ID + 5, + DISCRETE_IMAGE_UNIT_0_ID + 5, + ] + audio = self._extract(mixed, DISCRETE_AUDIO_UNIT_0_ID, DISCRETE_AUDIO_VOCAB_SIZE) + image = self._extract(mixed, DISCRETE_IMAGE_UNIT_0_ID, DISCRETE_IMAGE_VOCAB_SIZE) + assert audio == [5] + assert image == [5] + + def test_truncate_and_pad_image(self): + """Vision decoder needs exactly DISCRETE_IMAGE_TOKEN_LENGTH codes.""" + codes = list(range(DISCRETE_IMAGE_TOKEN_LENGTH + 50)) # too long + truncated = codes[:DISCRETE_IMAGE_TOKEN_LENGTH] + assert len(truncated) == DISCRETE_IMAGE_TOKEN_LENGTH + + codes_short = list(range(100)) # too short + padded = codes_short + [0] * (DISCRETE_IMAGE_TOKEN_LENGTH - len(codes_short)) + assert len(padded) == DISCRETE_IMAGE_TOKEN_LENGTH + + +class TestStageInputProcessor: + """Test thinker2vision_decoder and thinker2audio_decoder processors.""" + + def _make_fake_output(self, token_ids: list[int]): + """Create a minimal fake EngineCoreOutput-like object.""" + from types import SimpleNamespace + + output = SimpleNamespace( + token_ids=token_ids, + ) + thinker_out = SimpleNamespace( + outputs=[output], + request_id="test-001", + prompt_token_ids=[1, 2, 3], + ) + return thinker_out + + def test_vision_decoder_extracts_image_tokens(self): + """thinker2vision_decoder should extract exactly 729 image tokens.""" + image_codes = list(range(DISCRETE_IMAGE_UNIT_0_ID, DISCRETE_IMAGE_UNIT_0_ID + DISCRETE_IMAGE_TOKEN_LENGTH)) + audio_codes = list(range(DISCRETE_AUDIO_UNIT_0_ID, DISCRETE_AUDIO_UNIT_0_ID + 20)) + token_ids = [100, 200] + audio_codes + image_codes + [300] + + thinker_out = self._make_fake_output(token_ids) + + from types import SimpleNamespace + + stage_list = {0: SimpleNamespace(engine_outputs=[thinker_out])} + + from vllm_omni.model_executor.stage_input_processors.hyperclovax_seed_omni import ( + thinker2vision_decoder, + ) + + results = thinker2vision_decoder(stage_list, [0]) + assert len(results) == 1 + prompt_ids = results[0]["prompt_token_ids"] + assert len(prompt_ids) == DISCRETE_IMAGE_TOKEN_LENGTH + assert all(0 <= tid < DISCRETE_IMAGE_VOCAB_SIZE for tid in prompt_ids) + + def test_audio_decoder_extracts_audio_tokens(self): + """thinker2audio_decoder should extract discrete audio tokens.""" + audio_codes = list(range(DISCRETE_AUDIO_UNIT_0_ID, DISCRETE_AUDIO_UNIT_0_ID + 50)) + token_ids = [100, 200] + audio_codes + [300] + + thinker_out = self._make_fake_output(token_ids) + + from types import SimpleNamespace + + stage_list = {0: SimpleNamespace(engine_outputs=[thinker_out])} + + from vllm_omni.model_executor.stage_input_processors.hyperclovax_seed_omni import ( + thinker2audio_decoder, + ) + + results = thinker2audio_decoder(stage_list, [0]) + assert len(results) == 1 + additional = results[0]["additional_information"] + audio_tokens = additional["audio_tokens"][0] + assert len(audio_tokens) == 50 + assert all(0 <= tid < DISCRETE_AUDIO_VOCAB_SIZE for tid in audio_tokens) + + def test_vision_decoder_no_output_if_no_image_tokens(self): + """thinker2vision_decoder returns empty list when no image tokens present.""" + token_ids = [100, 200, 300] # text only + + thinker_out = self._make_fake_output(token_ids) + + from types import SimpleNamespace + + stage_list = {0: SimpleNamespace(engine_outputs=[thinker_out])} + + from vllm_omni.model_executor.stage_input_processors.hyperclovax_seed_omni import ( + thinker2vision_decoder, + ) + + results = thinker2vision_decoder(stage_list, [0]) + assert results == [] + + def test_audio_decoder_no_output_if_no_audio_tokens(self): + """thinker2audio_decoder returns empty list when no audio tokens present.""" + token_ids = [100, 200, 300] # text only + + thinker_out = self._make_fake_output(token_ids) + + from types import SimpleNamespace + + stage_list = {0: SimpleNamespace(engine_outputs=[thinker_out])} + + from vllm_omni.model_executor.stage_input_processors.hyperclovax_seed_omni import ( + thinker2audio_decoder, + ) + + results = thinker2audio_decoder(stage_list, [0]) + assert results == [] diff --git a/tools/pre_commit/check_pickle_imports.py b/tools/pre_commit/check_pickle_imports.py index 1c08a1543d2..89384686bb7 100644 --- a/tools/pre_commit/check_pickle_imports.py +++ b/tools/pre_commit/check_pickle_imports.py @@ -20,6 +20,8 @@ "tests/utils.py", "vllm_omni/diffusion/distributed/group_coordinator.py", "tests/diffusion/attention/test_attention_sp.py", + # cloudpickle needed to serialize arbitrary worker_cls types across processes + "vllm_omni/entrypoints/omni_llm.py", } PICKLE_RE = re.compile( diff --git a/vllm_omni/config/model.py b/vllm_omni/config/model.py index 588efabfc4f..f9e49abc678 100644 --- a/vllm_omni/config/model.py +++ b/vllm_omni/config/model.py @@ -1,13 +1,61 @@ -from dataclasses import MISSING, field +import warnings +from importlib.util import find_spec from typing import Any -from pydantic import ConfigDict, TypeAdapter -from vllm.config import ModelConfig -from vllm.config.utils import config +import torch +import vllm.envs as envs +from pydantic import ConfigDict +from pydantic.dataclasses import dataclass + +try: + from vllm.attention.backends.registry import AttentionBackendEnum +except ImportError: + from vllm.v1.attention.backends.registry import AttentionBackendEnum +from vllm.config import ModelConfig, config + +try: + from vllm.config.model import ( + _RUNNER_CONVERTS, + _RUNNER_TASKS, + ConvertOption, + ConvertType, + RunnerOption, + TaskOption, + _get_and_verify_dtype, + get_served_model_name, + ) +except ImportError: + # vLLM 0.18.0: _RUNNER_TASKS and TaskOption were removed/renamed + _RUNNER_TASKS: dict = { + "generate": {"generate", "auto"}, + "pooling": {"embed", "classify", "reward", "score"}, + } + from vllm.config.model import ( # type: ignore[no-redef] + _RUNNER_CONVERTS, + ConvertOption, + ConvertType, + RunnerOption, + _get_and_verify_dtype, + get_served_model_name, + ) + + TaskOption = str # type: ignore[misc,assignment] +from vllm.config.multimodal import MMCacheType, MMEncoderTPMode, MultiModalConfig +from vllm.config.pooler import PoolerConfig from vllm.logger import init_logger -from vllm.transformers_utils.config import get_hf_text_config -from vllm.transformers_utils.model_arch_config_convertor import ( - ModelArchConfigConvertorBase, +from vllm.platforms import current_platform +from vllm.transformers_utils.config import ( + get_config, + get_hf_image_processor_config, + get_hf_text_config, + get_pooling_config, +) +from vllm.transformers_utils.gguf_utils import ( + maybe_patch_hf_config_from_gguf, +) +from vllm.transformers_utils.utils import ( + is_gguf, + maybe_model_redirect, ) import vllm_omni.model_executor.models as me_models @@ -15,103 +63,37 @@ logger = init_logger(__name__) -class OmniModelArchConfigConvertor(ModelArchConfigConvertorBase): - """Config convertor for Omni multi-stage models. - - Pre-quantized checkpoints (e.g. modelopt FP8) store quantization - config in a stage-specific sub-config (e.g. - thinker_config.text_config.quantization_config) with correct relative - prefixes. The legacy hf_quant_config.json sits at the top level with - "thinker."-prefixed names that don't match vllm-omni's module names. - - This convertor accepts an optional *stage_config_name* so that only - the relevant stage's quantization config is surfaced. - """ - - def __init__( - self, - hf_config, - hf_text_config, - stage_config_name: str | None = None, - ): - super().__init__(hf_config, hf_text_config) - self.stage_config_name = stage_config_name - - def get_quantization_config(self): - # When a stage_config_name is set, look for quantization config - # in that stage's text_config first (has correct relative prefixes). - if self.stage_config_name is not None: - stage_cfg = getattr(self.hf_config, self.stage_config_name, None) - if stage_cfg is not None: - text_cfg = getattr(stage_cfg, "text_config", None) - if text_cfg is not None: - quant_cfg = self._normalize_quantization_config(text_cfg) - if quant_cfg is not None: - return quant_cfg - - # For non-thinker stages (talker, code2wav) whose text_config - # has no quantization_config, return None so quantization is - # not applied to stages that were not quantized. - return None - - return super().get_quantization_config() - - -@config(config=ConfigDict(arbitrary_types_allowed=True)) +@config +@dataclass(config=ConfigDict(arbitrary_types_allowed=True)) class OmniModelConfig(ModelConfig): """Configuration for Omni models, extending the base ModelConfig. - This configuration class extends the base vLLM ModelConfig with - omni-specific fields for multi-stage pipeline processing. - - Attributes: - hf_config: The model's HF Transformers config (default: None) - hf_text_config: The sub text_config of the model's hf_config (default: None) - stage_id: Identifier for the stage in a multi-stage pipeline (default: 0) - async_chunk: If set to True, perform async chunk - model_stage: Stage type identifier, e.g., "thinker" or "talker" - (default: "thinker") - model_arch: Model architecture name - (default: "Qwen2_5OmniForConditionalGeneration") - worker_type: Model Type, e.g., "ar" or "generation" - engine_output_type: Optional output type specification for the engine. - Used to route outputs to appropriate processors (e.g., "image", - "audio", "latents"). If None, output type is inferred. - stage_connector_config: Stage connector configuration dictionary. - Contains "name" (connector name), "extra" (extra connector config). - task_type: Default task type for TTS models (CustomVoice, VoiceDesign, or Base). - If not specified, will be inferred from model path. - - - The correct way to initialize this class is via vLLM config, as most - of the logic for handling values is in the ModelConfig's __post_init__. - - Example: - >>> config = OmniModelConfig.from_vllm_model_config( - ... vllm_config, - ... stage_id=0, - ... model_stage="thinker", - ... model_arch="Qwen2_5OmniForConditionalGeneration" - ... ) + This configuration class extends the base vLLM ModelConfig with + omni-specific fields for multi-stage pipeline processing. + + Attributes: + stage_id: Identifier for the stage in a multi-stage pipeline (default: 0) + model_stage: Stage type identifier, e.g., "thinker" or "talker" + (default: "thinker") + model_arch: Model architecture name + (default: "Qwen2_5OmniForConditionalGeneration") + engine_output_type: Optional output type specification for the engine. + Used to route outputs to appropriate processors (e.g., "image", + "audio", "latents"). If None, output type is inferred. + + Example: + >>> config = OmniModelConfig( + ... stage_id=0, + ... model_stage="thinker", + ... model_arch="Qwen2_5OmniForConditionalGeneration" + ... ) """ stage_id: int = 0 - async_chunk: bool = False model_stage: str = "thinker" - model_arch: str | None = None - worker_type: str | None = None + model_arch: str = "Qwen2_5OmniForConditionalGeneration" engine_output_type: str | None = None hf_config_name: str | None = None - custom_process_next_stage_input_func: str | None = None - stage_connector_config: dict[str, Any] = field( - default_factory=lambda: { - "name": "SharedMemoryConnector", - "extra": {}, - } - ) - omni_kv_config: dict | None = None - codec_frame_rate_hz: float | None = None - task_type: str | None = None @property def registry(self): @@ -119,32 +101,7 @@ def registry(self): @property def architectures(self) -> list[str]: - if self.model_arch is not None: - return [self.model_arch] - return super().architectures - - @property - def embedding_size(self): - if self.hf_config_name is not None: - stage_config = getattr(self.hf_config, self.hf_config_name, None) - override = getattr(stage_config, "embedding_size", None) - if override is not None: - return override - return super().embedding_size - - def get_model_arch_config(self): - # For multi-stage omni models, use a stage-aware convertor so that - # only the correct stage's quantization config is surfaced. - # Without this, a pre-quantized thinker checkpoint would also - # apply quantization to the talker/code2wav stages. - if self.hf_config_name is not None: - convertor = OmniModelArchConfigConvertor( - self.hf_config, - self.hf_text_config, - stage_config_name=self.hf_config_name, - ) - return convertor.convert() - return super().get_model_arch_config() + return [self.model_arch] def draw_hf_text_config(self): # transformers' get_text_config method is used to get the text config from thinker_config. @@ -164,113 +121,290 @@ def draw_hf_text_config(self): ) return get_hf_text_config(self.hf_config) - def _patch_qwen3_tts(self): - """Patches the value of `position_id_per_seconds` in Qwen3's - TTS's talker_config into the this class's codec_frame_rate_hz. - """ - talker_cfg = getattr(self.hf_config, "talker_config", None) - if isinstance(talker_cfg, dict): - pos_per_sec = talker_cfg.get("position_id_per_seconds") + def __post_init__( + self, + # Multimodal config init vars + limit_mm_per_prompt: dict[str, int | dict[str, int]] | None, + enable_mm_embeds: bool | None, + media_io_kwargs: dict[str, dict[str, Any]] | None, + mm_processor_kwargs: dict[str, Any] | None, + mm_processor_cache_gb: float | None, + mm_processor_cache_type: MMCacheType | None, + mm_shm_cache_max_object_size_mb: int | None, + mm_encoder_tp_mode: MMEncoderTPMode | None, + mm_encoder_attn_backend: AttentionBackendEnum | str | None, + interleave_mm_strings: bool | None, + skip_mm_profiling: bool | None, + video_pruning_rate: float | None, + ) -> None: + # Keep set served_model_name before maybe_model_redirect(self.model) + self.served_model_name = get_served_model_name(self.model, self.served_model_name) + self.model = maybe_model_redirect(self.model) + # The tokenizer is consistent with the model by default. + if self.tokenizer is None: + self.tokenizer = self.model + if self.tokenizer_revision is None: + self.tokenizer_revision = self.revision + self.tokenizer = maybe_model_redirect(self.tokenizer) + + if isinstance(self.hf_config_path, str): + self.hf_config_path = maybe_model_redirect(self.hf_config_path) + + if callable(self.hf_overrides): + hf_overrides_kw = {} + hf_overrides_fn = self.hf_overrides + dict_overrides: dict[str, Any] = {} else: - pos_per_sec = getattr(talker_cfg, "position_id_per_seconds", None) - if pos_per_sec is not None: - try: - fps = float(pos_per_sec) - except Exception: - fps = None - if fps is not None and fps > 0: - self.codec_frame_rate_hz = fps - - def _maybe_override_text_config(self): - """Override hf_text_config with omni-specific logic for multi-stage - models (e.g., thinker_config, talker_config). - """ - new_hf_text_config = self.draw_hf_text_config() - if new_hf_text_config is not self.hf_text_config: - self.hf_text_config = new_hf_text_config - # Recalculate dependent attributes - self.attention_chunk_size = getattr(self.hf_text_config, "attention_chunk_size", None) - # Recalculate max_model_len since it depends on hf_text_config - self.max_model_len = self.get_and_verify_max_len(self.original_max_model_len) - # Reset sliding_window if needed - if self.disable_sliding_window and self.hf_text_config is not None: - self.hf_text_config.sliding_window = None - - @classmethod - def from_vllm_model_config(cls, model_config: ModelConfig, **omni_kwargs): - """Create OmniModelConfig from an existing vLLM ModelConfig - and additional Omni specific kwargs. - - NOTE: The validation and __post_init__ for ModelConfig is expensive; - to avoid calling it a second time, we explicitly retrieve defaults - from dataclass attributes for values not passed to omni_kwargs, - and use that to initialize a __new__ instance. This is significantly - faster than creating the OmniModelConfig directly from the ModelConfig, - and saves us from having to pass all kwargs to the OmniModelConfig. - """ - # Add missing defaults to the omni kwargs and ensure values are valid - cls.add_defaults_to_omni_kwargs(omni_kwargs) - cls._validate_omni_fields(**omni_kwargs) - - # Allocate the new omni config and copy the model config & omni fields - omni_cfg = object.__new__(cls) - omni_cfg.__dict__.update(model_config.__dict__) - omni_cfg.__dict__.update(omni_kwargs) - - # Apply any model specific patches or necessary overrides - if ( - omni_cfg.codec_frame_rate_hz is None - and omni_cfg.model_arch == "Qwen3TTSTalkerForConditionalGenerationARVLLM" - ): - omni_cfg._patch_qwen3_tts() - - omni_cfg._maybe_override_text_config() - - if omni_cfg.hf_config is not None: - omni_cfg.hf_config.architectures = omni_cfg.architectures - - return omni_cfg - - @classmethod - def _validate_omni_fields(cls, **omni_kwargs): - """Validate omni-specific fields; we use TypeAdapters here to quickly - validate only omni kwargs to avoid rerunning validation on the - ModelConfig. - - NOTE: This assumes add_defaults_to_omni_kwargs has already been called, - so that all omni fields are present in the provided omni_kwargs. - """ - omni_fields = set(cls.__dataclass_fields__) - set(ModelConfig.__dataclass_fields__) - - for key, value in omni_kwargs.items(): - if key not in omni_fields: - raise ValueError(f"Unexpected omni kwarg: {key}") - - field_type = cls.__dataclass_fields__[key].type - if field_type is not Any: - TypeAdapter(field_type).validate_python(value) - - # We should not have any uninitialized keys - uninitialized_fields = omni_fields - omni_kwargs.keys() - if len(uninitialized_fields): - logger.error(f"The following OmniModelConfig keys were not initialized: {uninitialized_fields}") - - @classmethod - def add_defaults_to_omni_kwargs(cls, omni_kwargs): - """Because we init the OmniModelConfig with __new__ to sidestep expensive - validation, we need to be careful to ensure fields with default factories - are initialized, otherwise we will get an AttributeError when we use it. - - To work around this issue, we explicitly add defaults to the omni_kwargs - dict provided to ensure all fields are defined correctly. - - NOTE: omni_kwargs are mutated in place. - """ - omni_fields = set(cls.__dataclass_fields__) - set(ModelConfig.__dataclass_fields__) - - for field_name in omni_fields - set(omni_kwargs.keys()): - field_def = cls.__dataclass_fields__[field_name] - if field_def.default_factory is not MISSING: - omni_kwargs[field_name] = field_def.default_factory() - elif field_def.default is not MISSING: - omni_kwargs[field_name] = field_def.default + # Separate dict overrides from flat ones + # We'll determine how to apply dict overrides after loading the config + hf_overrides_kw = {} + dict_overrides = {} + for key, value in self.hf_overrides.items(): + if isinstance(value, dict): + dict_overrides[key] = value + else: + hf_overrides_kw[key] = value + hf_overrides_fn = None + + self.maybe_pull_model_tokenizer_for_runai(self.model, self.tokenizer) + + if (backend := envs.VLLM_ATTENTION_BACKEND) and backend == "FLASHINFER" and find_spec("flashinfer") is None: + raise ValueError( + "VLLM_ATTENTION_BACKEND is set to FLASHINFER, but flashinfer " + "module was not found. See " + "https://github.com/vllm-project/vllm/blob/main/docker/Dockerfile " # noqa: E501 + "for instructions on how to install it." + ) + + if self.override_attention_dtype is not None and not current_platform.is_rocm(): + warnings.warn( + "override-attention-dtype is set but not using ROCm platform", + stacklevel=2, + ) + + if self.enable_sleep_mode and not current_platform.is_sleep_mode_available(): + raise ValueError("Sleep mode is not supported on current platform.") + + hf_config = get_config( + self.hf_config_path or self.model, + self.trust_remote_code, + self.revision, + self.code_revision, + self.config_format, + hf_overrides_kw=hf_overrides_kw, + hf_overrides_fn=hf_overrides_fn, + ) + hf_config = maybe_patch_hf_config_from_gguf( + self.model, + hf_config, + ) + + self.hf_config = hf_config + if dict_overrides: + self._apply_dict_overrides(hf_config, dict_overrides) + self.hf_text_config = self.draw_hf_text_config() + self.attention_chunk_size = getattr(self.hf_text_config, "attention_chunk_size", None) + self.encoder_config = self._get_encoder_config() + # Try to load image processor config, but allow it to fail for stages that don't need it + try: + self.hf_image_processor_config = get_hf_image_processor_config( + self.model, hf_token=self.hf_token, revision=self.revision + ) + except (OSError, ValueError, IndexError) as e: + # Some stages (e.g., code2wav, talker) don't need image processor + # Log warning but allow initialization to continue + logger.warning( + f"Failed to load image processor config for model '{self.model}': {e}. " + "This is expected for stages that don't require image processing." + ) + self.hf_image_processor_config = None + + architectures = self.architectures + registry = self.registry + is_generative_model = registry.is_text_generation_model(architectures, self) + is_pooling_model = registry.is_pooling_model(architectures, self) + + def _task_to_convert(task: TaskOption) -> ConvertType: + if task == "embedding" or task == "embed": + return "embed" + if task == "classify": + return "classify" + if task == "reward": + return "reward" + if task == "score": + new_task = self._get_default_pooling_task(architectures) + return "classify" if new_task == "classify" else "embed" + + return "none" + + if self.task is not None: + runner: RunnerOption = "auto" + convert: ConvertOption = "auto" + msg_prefix = ( + "The 'task' option has been deprecated and will be removed in v0.13.0 or v1.0, whichever comes first." + ) + msg_hint = "Please remove this option." + + is_generative_task = self.task in _RUNNER_TASKS["generate"] + is_pooling_task = self.task in _RUNNER_TASKS["pooling"] + + if is_generative_model and is_pooling_model: + if is_generative_task: + runner = "generate" + convert = "auto" + msg_hint = ( + "Please replace this option with `--runner " + "generate` to continue using this model " + "as a generative model." + ) + elif is_pooling_task: + runner = "pooling" + convert = "auto" + msg_hint = ( + "Please replace this option with `--runner " + "pooling` to continue using this model " + "as a pooling model." + ) + else: # task == "auto" + pass + elif is_generative_model or is_pooling_model: + if is_generative_task: + runner = "generate" + convert = "auto" + msg_hint = "Please remove this option" + elif is_pooling_task: + runner = "pooling" + convert = _task_to_convert(self.task) + msg_hint = ( + "Please replace this option with `--convert " + f"{convert}` to continue using this model " + "as a pooling model." + ) + else: # task == "auto" + pass + else: + # Neither generative nor pooling model - try to convert if possible + if is_pooling_task: + runner = "pooling" + convert = _task_to_convert(self.task) + msg_hint = ( + "Please replace this option with `--runner pooling " + f"--convert {convert}` to continue using this model " + "as a pooling model." + ) + else: + debug_info = { + "architectures": architectures, + "is_generative_model": is_generative_model, + "is_pooling_model": is_pooling_model, + } + raise AssertionError( + "The model should be a generative or " + "pooling model when task is set to " + f"{self.task!r}. Found: {debug_info}" + ) + + self.runner = runner + self.convert = convert + + msg = f"{msg_prefix} {msg_hint}" + warnings.warn(msg, DeprecationWarning, stacklevel=2) + + self.runner_type = self._get_runner_type(architectures, self.runner) + self.convert_type = self._get_convert_type(architectures, self.runner_type, self.convert) + + if self.runner_type == "generate" and not is_generative_model: + generate_converts = _RUNNER_CONVERTS["generate"] + if self.convert_type not in generate_converts: + # Currently we don't have any converters for generative models + raise ValueError("This model does not support `--runner generate`.") + if self.runner_type == "pooling" and not is_pooling_model: + pooling_converts = _RUNNER_CONVERTS["pooling"] + if self.convert_type not in pooling_converts: + convert_option = "<" + "|".join(pooling_converts) + ">" + raise ValueError( + "This model does not support `--runner pooling`. " + f"You can pass `--convert {convert_option} to adapt " + "it into a pooling model." + ) + + # Note: Initialize these attributes early because transformers fallback + # may fail to load dynamic modules in child processes + model_info, arch = registry.inspect_model_cls(architectures, self) + self._model_info = model_info + self._architecture = arch + logger.info("Resolved architecture: %s", arch) + + # Init pooler config if needed + if self.runner_type == "pooling": + if self.pooler_config is None: + self.pooler_config = PoolerConfig() + + base_config = get_pooling_config(self.model, self.revision) + if base_config is not None: + # Only set values that are not overridden by the user + for k, v in base_config.items(): + if getattr(self.pooler_config, k) is None: + setattr(self.pooler_config, k, v) + + default_pooling_type = self._model_info.default_pooling_type + if self.pooler_config.pooling_type is None: + self.pooler_config.pooling_type = default_pooling_type + + self.dtype: torch.dtype = _get_and_verify_dtype( + self.model, + self.hf_config, + self.dtype, + is_pooling_model=self.runner_type == "pooling", + revision=self.revision, + ) + + self.original_max_model_len = self.max_model_len + self.max_model_len = self.get_and_verify_max_len(self.max_model_len) + # Init multimodal config if needed + if self._model_info.supports_multimodal: + if mm_encoder_tp_mode == "data" and not self._model_info.supports_multimodal_encoder_tp_data: + logger.warning_once( + "This model does not support `--mm-encoder-tp-mode data`. " + "Falling back to `--mm-encoder-tp-mode weights`." + ) + mm_encoder_tp_mode = "weights" + + mm_config_kwargs = dict( + limit_per_prompt=limit_mm_per_prompt, + enable_mm_embeds=enable_mm_embeds, + media_io_kwargs=media_io_kwargs, + mm_processor_kwargs=mm_processor_kwargs, + mm_processor_cache_gb=mm_processor_cache_gb, + mm_processor_cache_type=mm_processor_cache_type, + mm_shm_cache_max_object_size_mb=mm_shm_cache_max_object_size_mb, + mm_encoder_tp_mode=mm_encoder_tp_mode, + mm_encoder_attn_backend=mm_encoder_attn_backend, + interleave_mm_strings=interleave_mm_strings, + skip_mm_profiling=skip_mm_profiling, + video_pruning_rate=video_pruning_rate, + ) + + mm_config_kwargs = {k: v for k, v in mm_config_kwargs.items() if v is not None} + + self.multimodal_config = MultiModalConfig(**mm_config_kwargs) + + # Multimodal GGUF models must use original repo for mm processing + if is_gguf(self.tokenizer) and self.is_multimodal_model: + raise ValueError( + "Loading a multimodal GGUF model needs to use original " + "tokenizer. Please specify the unquantized hf model's " + "repo name or path using the --tokenizer argument." + ) + + if self.disable_sliding_window: + # Set after get_and_verify_max_len to ensure that max_model_len + # can be correctly capped to sliding window size + self.hf_text_config.sliding_window = None + + # Avoid running try_verify_and_update_config multiple times + self.config_updated = False + + self._verify_quantization() + self._verify_cuda_graph() + self._verify_bnb_config() diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index fe940d623e5..c28fe0943ce 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -7,31 +7,25 @@ import queue import threading import time -from collections.abc import Iterable +import weakref +from collections.abc import Callable, Iterable +from dataclasses import dataclass from typing import Any -import numpy as np import PIL.Image -import torch from vllm.logger import init_logger -from vllm_omni.diffusion.data import ( - DiffusionOutput, - DiffusionRequestAbortedError, - OmniDiffusionConfig, -) -from vllm_omni.diffusion.executor.abstract import DiffusionExecutor +from vllm_omni.diffusion.data import SHUTDOWN_MESSAGE, OmniDiffusionConfig from vllm_omni.diffusion.registry import ( DiffusionModelRegistry, get_diffusion_post_process_func, get_diffusion_pre_process_func, ) from vllm_omni.diffusion.request import OmniDiffusionRequest -from vllm_omni.diffusion.sched import RequestScheduler, SchedulerInterface, StepScheduler -from vllm_omni.diffusion.sched.interface import DiffusionRequestStatus -from vllm_omni.diffusion.worker.utils import RunnerOutput +from vllm_omni.diffusion.scheduler import Scheduler, scheduler from vllm_omni.inputs.data import OmniDiffusionSamplingParams, OmniTextPrompt from vllm_omni.outputs import OmniRequestOutput +from vllm_omni.utils.platform_utils import get_diffusion_worker_class logger = init_logger(__name__) @@ -43,33 +37,43 @@ def supports_image_input(model_class_name: str) -> bool: return bool(getattr(model_cls, "support_image_input", False)) -def supports_audio_input(model_class_name: str) -> bool: - model_cls = DiffusionModelRegistry._try_load_model_cls(model_class_name) - if model_cls is None: - return False - return bool(getattr(model_cls, "support_audio_input", False)) - - -def image_color_format(model_class_name: str) -> str: - model_cls = DiffusionModelRegistry._try_load_model_cls(model_class_name) - return getattr(model_cls, "color_format", "RGB") - - -def supports_audio_output(model_class_name: str) -> bool: - model_cls = DiffusionModelRegistry._try_load_model_cls(model_class_name) - if model_cls is None: - return False - return bool(getattr(model_cls, "support_audio_output", False)) +@dataclass +class BackgroundResources: + """ + Used as a finalizer for clean shutdown. + Create a BackgroundResources instance to encapsulate all background resources + (e.g., the scheduler and worker processes) that need explicit cleanup. + This object holds references to external system resources that are not managed + by Python's garbage collector (like OS processes, message queues, etc.), + so they must be cleaned up manually to avoid resource leaks or zombie processes. + """ + + scheduler: Scheduler | None = None + processes: list[mp.Process] | None = None + + def __call__(self): + """Clean up background resources.""" + if scheduler is not None: + try: + for _ in range(scheduler.num_workers): + scheduler.mq.enqueue(SHUTDOWN_MESSAGE) + scheduler.close() + except Exception as exc: + logger.warning("Failed to send shutdown signal: %s", exc) + for proc in self.processes: + if not proc.is_alive(): + continue + proc.join(30) + if proc.is_alive(): + logger.warning("Terminating diffusion worker %s after timeout", proc.name) + proc.terminate() + proc.join(30) class DiffusionEngine: """The diffusion engine for vLLM-Omni diffusion models.""" - def __init__( - self, - od_config: OmniDiffusionConfig, - scheduler: SchedulerInterface | None = None, - ): + def __init__(self, od_config: OmniDiffusionConfig): """Initialize the diffusion engine. Args: @@ -86,17 +90,9 @@ def __init__( and "sampling_params" in inspect.signature(self.post_process_func).parameters ) - executor_class = DiffusionExecutor.get_class(od_config) - self.executor = executor_class(od_config) - self.step_execution = bool(getattr(od_config, "step_execution", False)) - self.scheduler: SchedulerInterface = scheduler or ( - StepScheduler() if self.step_execution else RequestScheduler() - ) - self.scheduler.initialize(od_config) - self._rpc_lock = threading.RLock() - self.abort_queue: queue.Queue[str] = queue.Queue() - self.execute_fn = self.executor.execute_step if self.step_execution else self.executor.execute_request - + self._processes: list[mp.Process] = [] + self._closed = False + self._make_client() try: self._dummy_run() except Exception as e: @@ -322,10 +318,7 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: return results @staticmethod - def make_engine( - config: OmniDiffusionConfig, - scheduler: SchedulerInterface | None = None, - ) -> DiffusionEngine: + def make_engine(config: OmniDiffusionConfig) -> "DiffusionEngine": """Factory method to create a DiffusionEngine instance. Args: @@ -334,48 +327,14 @@ def make_engine( Returns: An instance of DiffusionEngine. """ - return DiffusionEngine(config, scheduler=scheduler) - - def add_req_and_wait_for_response(self, request: OmniDiffusionRequest) -> DiffusionOutput: - with self._rpc_lock: - target_sched_req_id = self.scheduler.add_request(request) - - # keep scheduling and executing until the target request is finished - while True: - self._process_aborts_queue() - sched_output = self.scheduler.schedule() - if sched_output.is_empty: - if target_sched_req_id in sched_output.finished_req_ids: - return self._finalize_finished_request(target_sched_req_id) - if not self.scheduler.has_requests(): - raise RuntimeError("Diffusion scheduler has no runnable requests.") - continue - - # NOTE: add_req_and_wait_for_response() is synchronous, and - # the scheduler currently enforces _max_batch_size = 1 (see - # vllm_omni/diffusion/sched/base_scheduler.py), so we directly - # take the single scheduled request here. - sched_req_id = sched_output.scheduled_req_ids[0] - try: - runner_output = self.execute_fn(sched_output) - except Exception as exc: - logger.error("Execution failed for diffusion request %s", sched_req_id, exc_info=True) - runner_output = RunnerOutput( - req_id=sched_req_id, - step_index=None, - finished=True, - result=DiffusionOutput(error=str(exc)), - ) + return DiffusionEngine(config) - self._process_aborts_queue() + def _make_client(self): + # TODO rename it + scheduler.initialize(self.od_config) - finished_req_ids = self.scheduler.update_from_output(sched_output, runner_output) - if target_sched_req_id in finished_req_ids: - return self._finalize_finished_request( - target_sched_req_id, - runner_output=runner_output, - missing_result_error="Diffusion execution finished without a final output.", - ) + # Get the broadcast handle from the initialized scheduler + broadcast_handle = scheduler.get_broadcast_handle() def profile(self, is_start: bool = True, profile_prefix: str | None = None) -> None: """Start or stop profiling on all diffusion workers. @@ -384,41 +343,145 @@ def profile(self, is_start: bool = True, profile_prefix: str | None = None) -> N is_start: True to start profiling, False to stop. profile_prefix: Optional prefix for trace filename. """ - if is_start: - if profile_prefix is None: - profile_prefix = f"diffusion_{int(time.time())}" - logger.info(f"Starting diffusion profiling with prefix: {profile_prefix}") - else: - logger.info("Stopping diffusion profiling...") + Start torch profiling on all diffusion workers. + + Creates a directory (if needed) and sets up a base filename template + for per-rank profiler traces (typically saved as