diff --git a/.buildkite/test-nightly.yml b/.buildkite/test-nightly.yml index 1e311c3512f..2e963236714 100644 --- a/.buildkite/test-nightly.yml +++ b/.buildkite/test-nightly.yml @@ -111,6 +111,47 @@ steps: path: /mnt/hf-cache type: DirectoryOrCreate + - label: ":full_moon: Diffusion Model Wan22 completed Test with H100" + timeout_in_minutes: 45 + depends_on: upload-nightly-pipeline + if: build.env("NIGHTLY") == "1" || build.pull_request.labels includes "nightly-test" + commands: + - export VLLM_WORKER_MULTIPROC_METHOD=spawn + - pytest -s -v tests/e2e/online_serving/test_wan22_expansion.py -m "advanced_model" --run-level "advanced_model" + agents: + queue: "mithril-h100-pool" + plugins: + - kubernetes: + podSpec: + containers: + - image: 936637512419.dkr.ecr.us-west-2.amazonaws.com/vllm-ci-pull-through-cache/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + resources: + limits: + nvidia.com/gpu: 2 + volumeMounts: + - name: devshm + mountPath: /dev/shm + - name: hf-cache + mountPath: /root/.cache/huggingface + env: + - name: HF_HOME + value: /root/.cache/huggingface + - name: HF_TOKEN + valueFrom: + secretKeyRef: + name: hf-token-secret + key: token + nodeSelector: + node.kubernetes.io/instance-type: gpu-h100-sxm + volumes: + - name: devshm + emptyDir: + medium: Memory + - name: hf-cache + hostPath: + path: /mnt/hf-cache + type: DirectoryOrCreate + - label: ":full_moon: Diffusion Model Test with L4" timeout_in_minutes: 60 depends_on: upload-nightly-pipeline diff --git a/tests/conftest.py b/tests/conftest.py index 0d90f639d3d..a17548afb2c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,13 @@ import base64 import datetime import io +import json import math import os import random +import tempfile + +import requests os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" # Set CPU device for CI environments without GPU @@ -24,7 +28,7 @@ from pathlib import Path from typing import Any, NamedTuple -import imageio.v3 as iio +import cv2 import numpy as np import psutil import pytest @@ -56,6 +60,102 @@ class OmniServerParams(NamedTuple): server_args: list[str] | None = None +def assert_image_diffusion_response( + response, + request_config: dict[str, Any], + run_level: str = None, +) -> None: + """ + Validate image diffusion response. + + Expected request_config schema: + { + "request_type": "image", + "extra_body": { + "num_outputs_per_prompt": 1, + "width": ..., + "height": ..., + ... + } + } + """ + extra_body = request_config.get("extra_body", {}) + + num_outputs_per_prompt = extra_body.get("num_outputs_per_prompt", 1) + + assert response.images is not None, "Image response is None" + assert len(response.images) > 0, "No images in response" + assert len(response.images) == num_outputs_per_prompt, ( + f"Expected {num_outputs_per_prompt} images, got {len(response.images)}" + ) + + if run_level == "advanced_model": + expected_width = extra_body["width"] # intentionally raise KeyError if missing + expected_height = extra_body["height"] # intentionally raise KeyError if missing + + for img in response.images: + assert_image_valid(img, width=expected_width, height=expected_height) + + +def assert_video_diffusion_response( + response, + request_config: dict[str, Any], + run_level: str = None, +) -> None: + """ + Validate video diffusion response. + + Expected request_config schema: + { + "request_type": "video", + "form_data": { + "prompt": "...", + "num_frames": ..., + "width": ..., + "height": ..., + "fps": ..., + ... + } + } + """ + form_data = request_config.get("form_data", {}) + + assert response.videos is not None, "Video response is None" + assert len(response.videos) > 0, "No videos in response" + + expected_frames = _maybe_int(form_data.get("num_frames")) + expected_width = _maybe_int(form_data.get("width")) + expected_height = _maybe_int(form_data.get("height")) + expected_fps = _maybe_int(form_data.get("fps")) + + for vid_bytes in response.videos: + assert_video_valid( + vid_bytes, + num_frames=expected_frames, + width=expected_width, + height=expected_height, + fps=expected_fps, + ) + + +def assert_audio_diffusion_response( + response, + request_config: dict[str, Any], + run_level: str = None, +) -> None: + """ + Validate audio diffusion response. + """ + raise NotImplementedError("Audio validation is not implemented yet") + # consider using assert_audio_valid defined above + + +def _maybe_int(value: Any) -> int | None: + if value is None: + return None + return int(value) + + def assert_image_valid(image: Path | Image.Image, *, width: int | None = None, height: int | None = None): """Assert the file is a loadable image with optional exact dimensions.""" if isinstance(image, Path): @@ -70,14 +170,91 @@ def assert_image_valid(image: Path | Image.Image, *, width: int | None = None, h return image -def assert_video_valid(frames: Path | np.ndarray, *, width: int, height: int, num_frames: int) -> None: +def assert_video_valid( + video: Path | bytes | BytesIO, + *, + num_frames: int | None = None, + width: int | None = None, + height: int | None = None, + fps: float | None = None, +) -> dict[str, int | float]: """Assert the MP4 has the expected resolution and exact frame count.""" - if isinstance(frames, Path): - assert frames.exists(), f"Video not found: {frames}" - frames = iio.imread(str(frames), plugin="pyav", index=None) - assert frames.shape[0] == num_frames, f"Expected {num_frames} frames, got {frames.shape[0]}" - assert frames.shape[1] == height, f"Expected height={height}, got {frames.shape[1]}" - assert frames.shape[2] == width, f"Expected width={width}, got {frames.shape[2]}" + temp_path = None + cap = None + try: + # Normalize input to file path + if isinstance(video, Path): + if not video.exists(): + raise AssertionError(f"Video file not found: {video}") + video_path = str(video) + else: + # Create temp file for bytes/BytesIO + suffix = ".mp4" + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, mode="wb") as tmp: + if isinstance(video, bytes): + tmp.write(video) + elif isinstance(video, BytesIO): + tmp.write(video.getvalue()) + else: + raise TypeError(f"Unsupported video type: {type(video)}") + temp_path = Path(tmp.name) + video_path = str(temp_path) + + # Open video capture + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise AssertionError(f"Failed to open video: {video_path}") + + # Extract properties + actual_num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = cap.get(cv2.CAP_PROP_FPS) + + actual_num_frames = 0 + while True: + ok, _frame = cap.read() + if not ok: + break + actual_num_frames += 1 + + # Basic validity checks + if actual_num_frames <= 0: + raise AssertionError(f"Invalid frame count: {actual_num_frames} (must be > 0)") + if actual_width <= 0 or actual_height <= 0: + raise AssertionError(f"Invalid dimensions: {actual_width}x{actual_height} (must be > 0)") + if actual_fps <= 0: + raise AssertionError(f"Invalid FPS: {actual_fps} (must be > 0)") + + # Validate against expectations + if num_frames is not None: + expected_num_frames = (num_frames // 4) * 4 + 1 + assert actual_num_frames == expected_num_frames, ( + f"Frame count mismatch: expected {num_frames}, got {actual_num_frames}" + ) + if width is not None: + assert actual_width == width, f"Width mismatch: expected {width}px, got {actual_width}px" + if height is not None: + assert actual_height == height, f"Height mismatch: expected {height}px, got {actual_height}px" + if fps is not None: + # Use tolerance for float comparison (codec rounding) + assert abs(actual_fps - fps) < 0.5, f"FPS mismatch: expected {fps}, got {actual_fps:.2f}" + + return {"num_frames": actual_num_frames, "width": actual_width, "height": actual_height, "fps": actual_fps} + + except Exception as e: + print(f"ERROR: {type(e).__name__}: {e}", flush=True) + raise + + finally: + # Cleanup resources + if cap is not None: + cap.release() + if temp_path and temp_path.exists(): + try: + temp_path.unlink() + except OSError: + pass def assert_audio_valid(path: Path, *, sample_rate: int, channels: int, duration_s: float) -> None: @@ -1331,12 +1508,12 @@ def assert_diffusion_response(response: DiffusionResponse, request_config: dict[ """ Validate diffusion response results. + Dispatcher that routes validation to modality-specific assert functions. + Args: - response: DiffusionResponse object. Any not-None content will be validated based on the request_config. - request_config: Request configuration dictionary containing parameters like model, messages, extra_body. - When validating a certain modality, the corresponding params in request_config['extra_body'] must present. - It will be used to check against the multimedia file in the response. - run_level: Test run level (e.g., "core_model", "advanced_model") + response: DiffusionResponse object. + request_config: Request configuration dictionary. + run_level: Test run level (e.g. "core_model", "advanced_model") Raises: AssertionError: When the response does not meet validation criteria @@ -1348,28 +1525,29 @@ def assert_diffusion_response(response: DiffusionResponse, request_config: dict[ if e2e_latency is not None: print(f"the avg e2e is: {e2e_latency}") - extra_body = request_config.get("extra_body", {}) - - num_outputs_per_prompt = extra_body.get("num_outputs_per_prompt", 1) + has_any_content = any(content is not None for content in (response.images, response.videos, response.audios)) + assert has_any_content, "Response contains no images, videos, or audios" if response.images is not None: - assert len(response.images) > 0, "No images in response" - assert len(response.images) == num_outputs_per_prompt, ( - f"Expected {num_outputs_per_prompt} images, got {len(response.images)}" + assert_image_diffusion_response( + response=response, + request_config=request_config, + run_level=run_level, ) - if run_level == "advanced_model": - expected_width = extra_body["width"] # intend to raise KeyError - expected_height = extra_body["height"] # intend to raise KeyError - for img in response.images: - assert_image_valid(img, width=expected_width, height=expected_height) + if response.videos is not None: - raise NotImplementedError( - "Video validation is not implemented yet" - ) # consider using assert_video_valid defined above + assert_video_diffusion_response( + response=response, + request_config=request_config, + run_level=run_level, + ) + if response.audios is not None: - raise NotImplementedError( - "Audio validation is not implemented yet" - ) # consider using assert_audio_valid defined above + assert_audio_diffusion_response( + response=response, + request_config=request_config, + run_level=run_level, + ) class OpenAIClientHandler: @@ -1391,6 +1569,7 @@ def __init__( port: vLLM-Omni server port api_key: API key (defaults to "EMPTY") """ + self.base_url = f"http://{host}:{port}" self.client = OpenAI(base_url=f"http://{host}:{port}/v1", api_key=api_key) self.run_level = run_level @@ -1531,12 +1710,15 @@ def _process_diffusion_response(self, chat_completion) -> DiffusionResponse: content = choice.message.content if isinstance(content, list): for item in content: - if hasattr(item, "image_url") and item.image_url is not None: - image_url = item.image_url.url - if image_url.startswith("data:image"): - b64_data = image_url.split(",", 1)[1] - img = decode_b64_image(b64_data) - images.append(img) + if isinstance(item, dict): + image_url = item.get("image_url", {}).get("url") + else: + image_url_obj = getattr(item, "image_url", None) + image_url = hasattr(image_url_obj, "url", None) if image_url_obj else None + if image_url and image_url.startswith("data:image"): + b64_data = image_url.split(",", 1)[1] + img = decode_b64_image(b64_data) + images.append(img) result.e2e_latency = time.perf_counter() - start_time result.images = images if images else None @@ -1667,6 +1849,122 @@ def send_diffusion_request(self, request_config: dict[str, Any], request_num: in return responses + def send_video_diffusion_request(self, request_config: dict[str, Any], request_num: int = 1) -> list[OmniResponse]: + """ + Send native /v1/videos requests. + """ + if request_num != 1: + raise NotImplementedError("Concurrent video diffusion requests are not currently implemented") + + if request_config.get("stream", False): + raise NotImplementedError("Streaming is not currently implemented for video diffusion e2e test") + + form_data = request_config.get("form_data") + if not isinstance(form_data, dict): + raise ValueError("Video request_config must contain 'form_data'") + + if not form_data.get("prompt"): + raise ValueError("Video request_config['form_data'] must contain 'prompt'") + + normalized_form_data = {key: str(value) for key, value in form_data.items() if value is not None} + + files: dict[str, tuple[str, BytesIO, str]] = {} + image_reference = request_config.get("image_reference") + if image_reference: + if image_reference.startswith("data:image"): + header, encoded = image_reference.split(",", 1) + content_type = header.split(";")[0].removeprefix("data:") + extension = content_type.split("/")[-1] + file_data = base64.b64decode(encoded) + + files["input_reference"] = ( + f"reference.{extension}", + BytesIO(file_data), + content_type, + ) + else: + normalized_form_data["image_reference"] = json.dumps({"image_url": image_reference}) + + result = DiffusionResponse() + start_time = time.perf_counter() + + try: + create_url = self._build_url("/v1/videos") + response = requests.post( + create_url, + data=normalized_form_data, + files=files, + headers={"Accept": "application/json"}, + timeout=60, + ) + response.raise_for_status() + + job_data = response.json() + video_id = job_data["id"] + + self._wait_until_video_completed(video_id) + + video_content = self._download_video_content(video_id) + + result.success = True + result.videos = [video_content] + result.e2e_latency = time.perf_counter() - start_time + + assert_diffusion_response(result, request_config, run_level=self.run_level) + + except Exception as e: + result.success = False + result.error_message = f"Diffusion response processing error: {e}" + assert False, result.error_message + + return [result] + + def _wait_until_video_completed( + self, + video_id: str, + poll_interval_seconds: int = 2, + timeout_seconds: int = 300, + ) -> None: + status_url = self._build_url(f"/v1/videos/{video_id}") + deadline = time.monotonic() + timeout_seconds + + while time.monotonic() < deadline: + status_resp = requests.get( + status_url, + headers={"Accept": "application/json"}, + timeout=30, + ) + status_resp.raise_for_status() + + status_data = status_resp.json() + current_status = status_data["status"] + + if current_status == "completed": + return + + if current_status == "failed": + error_msg = status_data.get("last_error", "Unknown error") + raise RuntimeError(f"Job failed: {error_msg}") + + time.sleep(poll_interval_seconds) + + raise TimeoutError(f"Video job {video_id} did not complete within {timeout_seconds}s") + + def _download_video_content(self, video_id: str) -> bytes: + download_url = self._build_url(f"/v1/videos/{video_id}/content") + video_resp = requests.get(download_url, stream=True, timeout=60) + video_resp.raise_for_status() + + video_bytes = BytesIO() + for chunk in video_resp.iter_content(chunk_size=8192): + if chunk: + video_bytes.write(chunk) + + return video_bytes.getvalue() + + def _build_url(self, path: str) -> str: + return f"{self.base_url.rstrip('/')}/{path.lstrip('/')}" + @pytest.fixture def openai_client(omni_server: OmniServer, run_level: str): diff --git a/tests/e2e/online_serving/test_hunyuan_video_15_expansion.py b/tests/e2e/online_serving/test_hunyuan_video_15_expansion.py index 62109fbbc5b..de950edb900 100644 --- a/tests/e2e/online_serving/test_hunyuan_video_15_expansion.py +++ b/tests/e2e/online_serving/test_hunyuan_video_15_expansion.py @@ -15,7 +15,6 @@ OmniServer, OmniServerParams, OpenAIClientHandler, - dummy_messages_from_mix_data, ) from tests.utils import hardware_marks @@ -79,20 +78,20 @@ def test_hunyuan_video_15_t2v( openai_client: OpenAIClientHandler, ): """L4 diffusion feature coverage for HunyuanVideo-1.5-T2V on H100.""" - messages = dummy_messages_from_mix_data(content_text=PROMPT) + form_data = { + "prompt": PROMPT, + "negative_prompt": NEGATIVE_PROMPT, + "height": 480, + "width": 640, + "num_frames": 5, + "num_inference_steps": 2, + "guidance_scale": 6.0, + "seed": 42, + } request_config = { "model": omni_server.model, - "messages": messages, - "extra_body": { - "height": 480, - "width": 640, - "num_frames": 5, - "num_inference_steps": 2, - "guidance_scale": 6.0, - "negative_prompt": NEGATIVE_PROMPT, - "seed": 42, - }, + "form_data": form_data, } - openai_client.send_diffusion_request(request_config) + openai_client.send_video_diffusion_request(request_config) diff --git a/tests/e2e/online_serving/test_wan22_expansion.py b/tests/e2e/online_serving/test_wan22_expansion.py new file mode 100644 index 00000000000..f7c01c748e2 --- /dev/null +++ b/tests/e2e/online_serving/test_wan22_expansion.py @@ -0,0 +1,130 @@ +""" +Comprehensive tests of diffusion features that are available in online serving mode +and are supported by the following models: +- Wan-AI/Wan2.2-T2V-A14B-Diffusers +- Wan-AI/Wan2.2-I2V-A14B-Diffusers +- Wan-AI/Wan2.2-TI2V-5B-Diffusers + +Coverage: +- Cache-DiT +- CFG-Parallel +- Ulysses-SP +- Tensor-Parallel +- VAE-Patch-Parallel +- HSDP +- Ring-Attn + +assert_diffusion_response validates successful generation +""" + +import pytest + +from tests.conftest import ( + OmniServer, + OmniServerParams, + OpenAIClientHandler, + generate_synthetic_image, +) +from tests.utils import hardware_marks + +PROMPT = "Two anthropomorphic cats in comfy boxing gear and bright gloves fight intensely on a spotlighted stage." +NEGATIVE_PROMPT = "low quality, blurry, distorted face, extra limbs, bad anatomy, watermark, logo, text, ugly, deformed, mutated, jpeg artifacts" +SINGLE_CARD_FEATURE_MARKS = hardware_marks(res={"cuda": "H100"}) +PARALLEL_FEATURE_MARKS = hardware_marks(res={"cuda": "H100"}, num_cards=2) + +WAN22_MODELS = [ + ("Wan-AI/Wan2.2-T2V-A14B-Diffusers", "t2v"), + ("Wan-AI/Wan2.2-I2V-A14B-Diffusers", "i2v"), + ("Wan-AI/Wan2.2-TI2V-5B-Diffusers", "ti2v"), +] + +PARALLEL_CONFIGS = [ + ("cfg_parallel", ["--cfg-parallel-size", "2"]), + ("ulysses_sp", ["--usp", "2"]), + ("tp_vae_patch", ["--tensor-parallel-size", "2", "--vae-patch-parallel-size", "2"]), + ("hsdp", ["--use-hsdp", "--hsdp-shard-size", "2"]), # replicate_size=1 (default) + ("ring_atten", ["--ring", "2"]), +] + + +def _get_wan22_feature_cases(): + """ + Generate parameterized test cases covering: + - All 3 Wan2.2 model variants with architecture awareness + - 1 single-card feature (Cache-DiT) + - 6 multi-card parallelism features with CORRECT PARAMETER NAMES per spec + """ + cases = [] + + # Single-card: Cache-DiT (applies to all models) + for model_path, model_key in WAN22_MODELS: + cases.append( + pytest.param( + OmniServerParams( + model=model_path, + server_args=["--cache-backend", "cache_dit", "--enable-layerwise-offload"], + ), + id=f"{model_key}_cache_dit", + marks=SINGLE_CARD_FEATURE_MARKS, + ) + ) + + # Multi-card features + for model_path, model_key in WAN22_MODELS: + for feat_id, server_args in PARALLEL_CONFIGS: + cases.append( + pytest.param( + OmniServerParams(model=model_path, server_args=server_args), + id=f"{model_key}_{feat_id}", + marks=PARALLEL_FEATURE_MARKS, + ) + ) + + return cases + + +@pytest.mark.advanced_model +@pytest.mark.parametrize( + "omni_server", + _get_wan22_feature_cases(), + indirect=True, +) +def test_wan22_diffusion_features( + omni_server: OmniServer, + openai_client: OpenAIClientHandler, +): + model_path = omni_server.model + is_i2v_or_ti2v = any(kw in model_path for kw in ["I2V", "TI2V"]) + is_moe_model = "I2V-A14B" in model_path # Only I2V-A14B uses MoE per spec + + form_data = { + "prompt": PROMPT, + "negative_prompt": NEGATIVE_PROMPT, + "height": 512, + "width": 512, + "num_frames": 8, + "fps": 8, + "num_inference_steps": 2, + "guidance_scale": 4.0, + "seed": 42, + # flow_shift omitted: Service uses resolution-based defaults (12.0 for 512px) + # vae_use_slicing/tiling omitted: Service-side optimization, not request param + } + + if is_moe_model: + form_data.update( + { + "guidance_scale_2": 1.0, + "boundary_ratio": 0.5, + } + ) + + request_config = { + "model": model_path, + "form_data": form_data, + } + + if is_i2v_or_ti2v: + request_config["image_reference"] = f"data:image/jpeg;base64,{generate_synthetic_image(512, 512)['base64']}" + + openai_client.send_video_diffusion_request(request_config)