From d4f4ed83b12c2f689a7707e71f2d8b5726a7881d Mon Sep 17 00:00:00 2001 From: bjf-frz Date: Mon, 23 Mar 2026 15:35:22 +0800 Subject: [PATCH 1/6] add wan22 l4 test Signed-off-by: bjf-frz --- tests/conftest.py | 314 +++++++++++++++--- .../online_serving/test_wan22_expansion.py | 134 ++++++++ 2 files changed, 406 insertions(+), 42 deletions(-) create mode 100644 tests/e2e/online_serving/test_wan22_expansion.py diff --git a/tests/conftest.py b/tests/conftest.py index f2d866a5894..fbc2342277c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,13 @@ import base64 import datetime import io +import json import math import os import random +import tempfile + +import requests os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" # Set CPU device for CI environments without GPU @@ -24,7 +28,7 @@ from pathlib import Path from typing import Any, NamedTuple -import imageio.v3 as iio +import cv2 import numpy as np import psutil import pytest @@ -70,14 +74,91 @@ def assert_image_valid(image: Path | Image.Image, *, width: int | None = None, h return image -def assert_video_valid(frames: Path | np.ndarray, *, width: int, height: int, num_frames: int) -> None: +def assert_video_valid( + video: Path | bytes | BytesIO, + *, + num_frames: int | None = None, + width: int | None = None, + height: int | None = None, + fps: float | None = None, +) -> dict[str, int | float]: """Assert the MP4 has the expected resolution and exact frame count.""" - if isinstance(frames, Path): - assert frames.exists(), f"Video not found: {frames}" - frames = iio.imread(str(frames), plugin="pyav", index=None) - assert frames.shape[0] == num_frames, f"Expected {num_frames} frames, got {frames.shape[0]}" - assert frames.shape[1] == height, f"Expected height={height}, got {frames.shape[1]}" - assert frames.shape[2] == width, f"Expected width={width}, got {frames.shape[2]}" + temp_path = None + cap = None + try: + # Normalize input to file path + if isinstance(video, Path): + if not video.exists(): + raise AssertionError(f"Video file not found: {video}") + video_path = str(video) + else: + # Create temp file for bytes/BytesIO + suffix = ".mp4" + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, mode="wb") as tmp: + if isinstance(video, bytes): + tmp.write(video) + elif isinstance(video, BytesIO): + tmp.write(video.getvalue()) + else: + raise TypeError(f"Unsupported video type: {type(video)}") + temp_path = Path(tmp.name) + video_path = str(temp_path) + + # Open video capture + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise AssertionError(f"Failed to open video: {video_path}") + + # Extract properties + actual_num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = cap.get(cv2.CAP_PROP_FPS) + + actual_num_frames = 0 + while True: + ok, _frame = cap.read() + if not ok: + break + actual_num_frames += 1 + + # Basic validity checks + if actual_num_frames <= 0: + raise AssertionError(f"Invalid frame count: {actual_num_frames} (must be > 0)") + if actual_width <= 0 or actual_height <= 0: + raise AssertionError(f"Invalid dimensions: {actual_width}x{actual_height} (must be > 0)") + if actual_fps <= 0: + raise AssertionError(f"Invalid FPS: {actual_fps} (must be > 0)") + + # Validate against expectations + if num_frames is not None: + expected_num_frames = ((num_frames + 3) // 4) * 4 + 1 + assert actual_num_frames == expected_num_frames, ( + f"Frame count mismatch: expected {num_frames}, got {actual_num_frames}" + ) + if width is not None: + assert actual_width == width, f"Width mismatch: expected {width}px, got {actual_width}px" + if height is not None: + assert actual_height == height, f"Height mismatch: expected {height}px, got {actual_height}px" + if fps is not None: + # Use tolerance for float comparison (codec rounding) + assert abs(actual_fps - fps) < 0.5, f"FPS mismatch: expected {fps}, got {actual_fps:.2f}" + + return {"num_frames": actual_num_frames, "width": actual_width, "height": actual_height, "fps": actual_fps} + + except Exception as e: + print(f"ERROR: {type(e).__name__}: {e}", flush=True) + raise + + finally: + # Cleanup resources + if cap is not None: + cap.release() + if temp_path and temp_path.exists(): + try: + temp_path.unlink() + except OSError: + pass def assert_audio_valid(path: Path, *, sample_rate: int, channels: int, duration_s: float) -> None: @@ -1363,9 +1444,14 @@ def assert_diffusion_response(response: DiffusionResponse, request_config: dict[ for img in response.images: assert_image_valid(img, width=expected_width, height=expected_height) if response.videos is not None: - raise NotImplementedError( - "Video validation is not implemented yet" - ) # consider using assert_video_valid defined above + expected_frames = extra_body.get("num_frames") + expected_width = extra_body.get("width") + expected_height = extra_body.get("height") + expected_fps = extra_body.get("fps") + for vid_bytes in response.videos: + assert_video_valid( + vid_bytes, num_frames=expected_frames, width=expected_width, height=expected_height, fps=expected_fps + ) if response.audios is not None: raise NotImplementedError( "Audio validation is not implemented yet" @@ -1391,6 +1477,7 @@ def __init__( port: vLLM-Omni server port api_key: API key (defaults to "EMPTY") """ + self.base_url = f"http://{host}:{port}" self.client = OpenAI(base_url=f"http://{host}:{port}/v1", api_key=api_key) self.run_level = run_level @@ -1625,49 +1712,192 @@ def send_diffusion_request(self, request_config: dict[str, Any], request_num: in stream = request_config.get("stream", False) modalities = request_config.get("modalities", omit) # Most diffusion models don't require modalities param extra_body = request_config.get("extra_body", None) + messages = request_config.get("messages") if stream: raise NotImplementedError("Streaming is not currently implemented for diffusion model e2e test") - if request_num == 1: - # Send single request - chat_completion = self.client.chat.completions.create( - model=request_config.get("model"), - messages=request_config.get("messages"), - extra_body=extra_body, - modalities=modalities, - ) + if extra_body.get("num_frames", None): # videos + sys_prompt, user_prompt, vids, imgs, auds = extract_params_from_messages(messages) + + form_data = { + "prompt": user_prompt, + "negative_prompt": extra_body.get("negative_prompt", ""), + "width": str(extra_body.get("width", 512)), + "height": str(extra_body.get("height", 512)), + "num_frames": str(extra_body.get("num_frames", 8)), + "fps": str(extra_body.get("fps", 8)), + "num_inference_steps": str(extra_body.get("num_inference_steps", 2)), + "guidance_scale": str(extra_body.get("guidance_scale", 4.0)), + "seed": str(extra_body.get("seed", 42)), + } + + files = {} + if imgs: + img_url = imgs[-1] + if img_url.startswith("data:image"): + _, encoded = img_url.split(",", 1) + file_data = base64.b64decode(encoded) + files["input_reference"] = ("reference.jpg", BytesIO(file_data), "image/jpeg") + else: + form_data["image_reference"] = json.dumps({"image_url": img_url}) - response = self._process_diffusion_response(chat_completion) - assert_diffusion_response(response, request_config, run_level=self.run_level) - responses.append(response) + if "boundary_ratio" in extra_body: + form_data["boundary_ratio"] = str(extra_body["boundary_ratio"]) + if "flow_shift" in extra_body: + form_data["flow_shift"] = str(extra_body["flow_shift"]) - else: - # Send concurrent requests - with concurrent.futures.ThreadPoolExecutor(max_workers=request_num) as executor: - futures = [] + result = DiffusionResponse() + start_time = time.perf_counter() - # Submit all request tasks - for _ in range(request_num): - future = executor.submit( - self.client.chat.completions.create, - model=request_config.get("model"), - messages=request_config.get("messages"), - modalities=modalities, - extra_body=extra_body, - ) - futures.append(future) + try: + # create_and_poll includes (POST /v1/videos) and poll req (GET /v1/videos/{video_id}) + create_url = f"{self.base_url}//v1/videos" + response = requests.post( + create_url, data=form_data, files=files, headers={"Accept": "application/json"} + ) + response.raise_for_status() + job_data = response.json() + video_id = job_data["id"] + + while True: + status_url = f"{self.base_url}/v1/videos/{video_id}" + status_resp = requests.get(status_url) + status_data = status_resp.json() + current_status = status_data["status"] + + if current_status == "completed": + break + elif current_status == "failed": + error_msg = status_data.get("last_error", "Unknown error") + raise RuntimeError(f"Job failed: {error_msg}") + + time.sleep(2) + + download_url = f"{self.base_url}/v1/videos/{video_id}/content" + video_resp = requests.get(download_url, stream=True) + video_resp.raise_for_status() + + video_bytes = BytesIO() + for chunk in video_resp.iter_content(chunk_size=8192): + video_bytes.write(chunk) + video_bytes.seek(0) + + result.success = True + result.videos = [video_bytes.getvalue()] + result.e2e_latency = time.perf_counter() - start_time + + assert_diffusion_response(result, request_config, run_level=self.run_level) + + except Exception as e: + result.success = False + result.error_message = f"Diffusion response processing error: {str(e)}" + + responses.append(result) + + else: # images + if request_num == 1: + # Send single request + chat_completion = self.client.chat.completions.create( + model=request_config.get("model"), + messages=messages, + extra_body=extra_body, + modalities=modalities, + ) - # Process completed tasks - for future in concurrent.futures.as_completed(futures): - chat_completion = future.result() - response = self._process_diffusion_response(chat_completion) - assert_diffusion_response(response, request_config, run_level=self.run_level) - responses.append(response) + response = self._process_diffusion_response(chat_completion) + assert_diffusion_response(response, request_config, run_level=self.run_level) + responses.append(response) + + else: + # Send concurrent requests + with concurrent.futures.ThreadPoolExecutor(max_workers=request_num) as executor: + futures = [] + + # Submit all request tasks + for _ in range(request_num): + future = executor.submit( + self.client.chat.completions.create, + model=request_config.get("model"), + messages=messages, + modalities=modalities, + extra_body=extra_body, + ) + futures.append(future) + + # Process completed tasks + for future in concurrent.futures.as_completed(futures): + chat_completion = future.result() + response = self._process_diffusion_response(chat_completion) + assert_diffusion_response(response, request_config, run_level=self.run_level) + responses.append(response) return responses +def extract_params_from_messages( + messages: list[dict[str, Any]], +) -> tuple[ + str | None, # system_prompt_content + str, # user_prompt_text + list[str], # video_urls + list[str], # image_urls + list[str], # audio_urls +]: + system_prompt = None + user_prompt_parts = [] + video_urls, image_urls, audio_urls = [], [], [] + + # extract system_prompt + for msg in messages: + if msg.get("role") == "system": + content = msg.get("content", "") + if isinstance(content, str): + system_prompt = content + elif isinstance(content, list): + system_prompt = " ".join( + item["text"] for item in content if isinstance(item, dict) and item.get("type") == "text" + ) + break + + # extract latest user message + user_msg = next((msg for msg in reversed(messages) if msg.get("role") == "user"), None) + if not user_msg: + return system_prompt, "", video_urls, image_urls, audio_urls + + content = user_msg.get("content", "") + + if isinstance(content, str): + return system_prompt, content, video_urls, image_urls, audio_urls + + if not isinstance(content, list): + raise ValueError(f"Unexpected content type: {type(content)}") + + for item in content: + if not isinstance(item, dict): + continue + + item_type = item.get("type", "") + + if item_type == "text" and "text" in item: + user_prompt_parts.append(item["text"]) + + elif item_type in ("video_url", "image_url", "audio_url"): + media_key = item_type + media_obj = item.get(media_key, {}) + if isinstance(media_obj, dict) and "url" in media_obj: + url = media_obj["url"] + if url: + if item_type == "video_url": + video_urls.append(url) + elif item_type == "image_url": + image_urls.append(url) + elif item_type == "audio_url": + audio_urls.append(url) + + return (system_prompt, " ".join(user_prompt_parts).strip(), video_urls, image_urls, audio_urls) + + @pytest.fixture def openai_client(omni_server: OmniServer, run_level: str): """Create OpenAIClientHandler fixture to facilitate communication with OmniServer diff --git a/tests/e2e/online_serving/test_wan22_expansion.py b/tests/e2e/online_serving/test_wan22_expansion.py new file mode 100644 index 00000000000..ea59d8bfd5f --- /dev/null +++ b/tests/e2e/online_serving/test_wan22_expansion.py @@ -0,0 +1,134 @@ +""" +Comprehensive tests of diffusion features that are available in online serving mode +and are supported by the following models: +- Wan-AI/Wan2.2-T2V-A14B-Diffusers +- Wan-AI/Wan2.2-I2V-A14B-Diffusers +- Wan-AI/Wan2.2-TI2V-5B-Diffusers + +Coverage: +- Cache-DiT +- CFG-Parallel +- Ulysses-SP +- Tensor-Parallel +- VAE-Patch-Parallel +- HSDP + +assert_diffusion_response validates successful generation +""" + +import pytest + +from tests.conftest import ( + OmniServer, + OmniServerParams, + OpenAIClientHandler, + dummy_messages_from_mix_data, + generate_synthetic_image, +) +from tests.utils import hardware_marks + +PROMPT = "Two anthropomorphic cats in comfy boxing gear and bright gloves fight intensely on a spotlighted stage." +NEGATIVE_PROMPT = "low quality, blurry, distorted face, extra limbs, bad anatomy, watermark, logo, text, ugly, deformed, mutated, jpeg artifacts" +SINGLE_CARD_FEATURE_MARKS = hardware_marks(res={"cuda": "H100"}) +PARALLEL_FEATURE_MARKS = hardware_marks(res={"cuda": "H100"}, num_cards=2) + +WAN22_MODELS = [ + ("Wan-AI/Wan2.2-T2V-A14B-Diffusers", "t2v"), + ("Wan-AI/Wan2.2-I2V-A14B-Diffusers", "i2v"), + ("Wan-AI/Wan2.2-TI2V-5B-Diffusers", "ti2v"), +] + +PARALLEL_CONFIGS = [ + ("cfg_parallel", ["--cfg-parallel-size", "2"]), + ("ulysses_sp", ["--usp", "2"]), + ("tensor_parallel", ["--tensor-parallel-size", "2"]), + ("vae_patch", ["--vae-patch-parallel-size", "2"]), + ("hsdp", ["--use-hsdp", "--hsdp-shard-size", "2"]), # replicate_size=1 (default) +] + + +def _get_wan22_feature_cases(): + """ + Generate parameterized test cases covering: + - All 3 Wan2.2 model variants with architecture awareness + - 1 single-card feature (Cache-DiT) + - 6 multi-card parallelism features with CORRECT PARAMETER NAMES per spec + """ + cases = [] + + # Single-card: Cache-DiT (applies to all models) + for model_path, model_key in WAN22_MODELS: + cases.append( + pytest.param( + OmniServerParams( + model=model_path, + server_args=["--cache-backend", "cache_dit"], + ), + id=f"{model_key}_cache_dit", + marks=SINGLE_CARD_FEATURE_MARKS, + ) + ) + + # Multi-card features + for model_path, model_key in WAN22_MODELS: + for feat_id, server_args in PARALLEL_CONFIGS: + cases.append( + pytest.param( + OmniServerParams(model=model_path, server_args=server_args), + id=f"{model_key}_{feat_id}", + marks=PARALLEL_FEATURE_MARKS, + ) + ) + + return cases + + +@pytest.mark.advanced_model +@pytest.mark.diffusion +@pytest.mark.parametrize( + "omni_server", + _get_wan22_feature_cases(), + indirect=True, +) +def test_wan22_diffusion_features( + omni_server: OmniServer, + openai_client: OpenAIClientHandler, +): + model_path = omni_server.model + is_i2v_or_ti2v = any(kw in model_path for kw in ["I2V", "TI2V"]) + is_moe_model = "I2V-A14B" in model_path # Only I2V-A14B uses MoE per spec + + if is_i2v_or_ti2v: + image_data_url = f"data:image/jpeg;base64,{generate_synthetic_image(512, 512)['base64']}" + messages = dummy_messages_from_mix_data(image_data_url=image_data_url, content_text=PROMPT) + else: + messages = dummy_messages_from_mix_data(content_text=PROMPT) + + extra_body = { + "height": 512, + "width": 512, + "num_frames": 8, + "fps": 8, + "num_inference_steps": 2, + "guidance_scale": 4.0, + "negative_prompt": NEGATIVE_PROMPT, + "seed": 42, + # flow_shift omitted: Service uses resolution-based defaults (12.0 for 512px) + # vae_use_slicing/tiling omitted: Service-side optimization, not request param + } + + if is_moe_model: + extra_body.update( + { + "guidance_scale_high": 1.0, + "boundary_ratio": 0.5, + } + ) + + request_config = { + "model": model_path, + "messages": messages, + "extra_body": extra_body, + } + + openai_client.send_diffusion_request(request_config) From 3c56f678277a04b5f33ff8b68b0168418c6dfda9 Mon Sep 17 00:00:00 2001 From: bjf-frz Date: Mon, 23 Mar 2026 20:58:22 +0800 Subject: [PATCH 2/6] modify req logic Signed-off-by: bjf-frz --- tests/conftest.py | 443 ++++++++++-------- .../online_serving/test_wan22_expansion.py | 31 +- 2 files changed, 268 insertions(+), 206 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index fbc2342277c..31d708e5a7e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -60,6 +60,102 @@ class OmniServerParams(NamedTuple): server_args: list[str] | None = None +def assert_image_diffusion_response( + response, + request_config: dict[str, Any], + run_level: str = None, +) -> None: + """ + Validate image diffusion response. + + Expected request_config schema: + { + "request_type": "image", + "extra_body": { + "num_outputs_per_prompt": 1, + "width": ..., + "height": ..., + ... + } + } + """ + extra_body = request_config.get("extra_body", {}) + + num_outputs_per_prompt = extra_body.get("num_outputs_per_prompt", 1) + + assert response.images is not None, "Image response is None" + assert len(response.images) > 0, "No images in response" + assert len(response.images) == num_outputs_per_prompt, ( + f"Expected {num_outputs_per_prompt} images, got {len(response.images)}" + ) + + if run_level == "advanced_model": + expected_width = extra_body["width"] # intentionally raise KeyError if missing + expected_height = extra_body["height"] # intentionally raise KeyError if missing + + for img in response.images: + assert_image_valid(img, width=expected_width, height=expected_height) + + +def assert_video_diffusion_response( + response, + request_config: dict[str, Any], + run_level: str = None, +) -> None: + """ + Validate video diffusion response. + + Expected request_config schema: + { + "request_type": "video", + "form_data": { + "prompt": "...", + "num_frames": ..., + "width": ..., + "height": ..., + "fps": ..., + ... + } + } + """ + form_data = request_config.get("form_data", {}) + + assert response.videos is not None, "Video response is None" + assert len(response.videos) > 0, "No videos in response" + + expected_frames = _maybe_int(form_data.get("num_frames")) + expected_width = _maybe_int(form_data.get("width")) + expected_height = _maybe_int(form_data.get("height")) + expected_fps = _maybe_int(form_data.get("fps")) + + for vid_bytes in response.videos: + assert_video_valid( + vid_bytes, + num_frames=expected_frames, + width=expected_width, + height=expected_height, + fps=expected_fps, + ) + + +def assert_audio_diffusion_response( + response, + request_config: dict[str, Any], + run_level: str = None, +) -> None: + """ + Validate audio diffusion response. + """ + raise NotImplementedError("Audio validation is not implemented yet") + # consider using assert_audio_valid defined above + + +def _maybe_int(value: Any) -> int | None: + if value is None: + return None + return int(value) + + def assert_image_valid(image: Path | Image.Image, *, width: int | None = None, height: int | None = None): """Assert the file is a loadable image with optional exact dimensions.""" if isinstance(image, Path): @@ -132,7 +228,7 @@ def assert_video_valid( # Validate against expectations if num_frames is not None: - expected_num_frames = ((num_frames + 3) // 4) * 4 + 1 + expected_num_frames = (num_frames // 4) * 4 + 1 assert actual_num_frames == expected_num_frames, ( f"Frame count mismatch: expected {num_frames}, got {actual_num_frames}" ) @@ -1412,12 +1508,12 @@ def assert_diffusion_response(response: DiffusionResponse, request_config: dict[ """ Validate diffusion response results. + Dispatcher that routes validation to modality-specific assert functions. + Args: - response: DiffusionResponse object. Any not-None content will be validated based on the request_config. - request_config: Request configuration dictionary containing parameters like model, messages, extra_body. - When validating a certain modality, the corresponding params in request_config['extra_body'] must present. - It will be used to check against the multimedia file in the response. - run_level: Test run level (e.g., "core_model", "advanced_model") + response: DiffusionResponse object. + request_config: Request configuration dictionary. + run_level: Test run level (e.g. "core_model", "advanced_model") Raises: AssertionError: When the response does not meet validation criteria @@ -1429,33 +1525,29 @@ def assert_diffusion_response(response: DiffusionResponse, request_config: dict[ if e2e_latency is not None: print(f"the avg e2e is: {e2e_latency}") - extra_body = request_config.get("extra_body", {}) - - num_outputs_per_prompt = extra_body.get("num_outputs_per_prompt", 1) + has_any_content = any(content is not None for content in (response.images, response.videos, response.audios)) + assert has_any_content, "Response contains no images, videos, or audios" if response.images is not None: - assert len(response.images) > 0, "No images in response" - assert len(response.images) == num_outputs_per_prompt, ( - f"Expected {num_outputs_per_prompt} images, got {len(response.images)}" + assert_image_diffusion_response( + response=response, + request_config=request_config, + run_level=run_level, ) - if run_level == "advanced_model": - expected_width = extra_body["width"] # intend to raise KeyError - expected_height = extra_body["height"] # intend to raise KeyError - for img in response.images: - assert_image_valid(img, width=expected_width, height=expected_height) + if response.videos is not None: - expected_frames = extra_body.get("num_frames") - expected_width = extra_body.get("width") - expected_height = extra_body.get("height") - expected_fps = extra_body.get("fps") - for vid_bytes in response.videos: - assert_video_valid( - vid_bytes, num_frames=expected_frames, width=expected_width, height=expected_height, fps=expected_fps - ) + assert_video_diffusion_response( + response=response, + request_config=request_config, + run_level=run_level, + ) + if response.audios is not None: - raise NotImplementedError( - "Audio validation is not implemented yet" - ) # consider using assert_audio_valid defined above + assert_audio_diffusion_response( + response=response, + request_config=request_config, + run_level=run_level, + ) class OpenAIClientHandler: @@ -1712,190 +1804,163 @@ def send_diffusion_request(self, request_config: dict[str, Any], request_num: in stream = request_config.get("stream", False) modalities = request_config.get("modalities", omit) # Most diffusion models don't require modalities param extra_body = request_config.get("extra_body", None) - messages = request_config.get("messages") if stream: raise NotImplementedError("Streaming is not currently implemented for diffusion model e2e test") - if extra_body.get("num_frames", None): # videos - sys_prompt, user_prompt, vids, imgs, auds = extract_params_from_messages(messages) - - form_data = { - "prompt": user_prompt, - "negative_prompt": extra_body.get("negative_prompt", ""), - "width": str(extra_body.get("width", 512)), - "height": str(extra_body.get("height", 512)), - "num_frames": str(extra_body.get("num_frames", 8)), - "fps": str(extra_body.get("fps", 8)), - "num_inference_steps": str(extra_body.get("num_inference_steps", 2)), - "guidance_scale": str(extra_body.get("guidance_scale", 4.0)), - "seed": str(extra_body.get("seed", 42)), - } + if request_num == 1: + # Send single request + chat_completion = self.client.chat.completions.create( + model=request_config.get("model"), + messages=request_config.get("messages"), + extra_body=extra_body, + modalities=modalities, + ) - files = {} - if imgs: - img_url = imgs[-1] - if img_url.startswith("data:image"): - _, encoded = img_url.split(",", 1) - file_data = base64.b64decode(encoded) - files["input_reference"] = ("reference.jpg", BytesIO(file_data), "image/jpeg") - else: - form_data["image_reference"] = json.dumps({"image_url": img_url}) + response = self._process_diffusion_response(chat_completion) + assert_diffusion_response(response, request_config, run_level=self.run_level) + responses.append(response) - if "boundary_ratio" in extra_body: - form_data["boundary_ratio"] = str(extra_body["boundary_ratio"]) - if "flow_shift" in extra_body: - form_data["flow_shift"] = str(extra_body["flow_shift"]) + else: + # Send concurrent requests + with concurrent.futures.ThreadPoolExecutor(max_workers=request_num) as executor: + futures = [] - result = DiffusionResponse() - start_time = time.perf_counter() + # Submit all request tasks + for _ in range(request_num): + future = executor.submit( + self.client.chat.completions.create, + model=request_config.get("model"), + messages=request_config.get("messages"), + modalities=modalities, + extra_body=extra_body, + ) + futures.append(future) - try: - # create_and_poll includes (POST /v1/videos) and poll req (GET /v1/videos/{video_id}) - create_url = f"{self.base_url}//v1/videos" - response = requests.post( - create_url, data=form_data, files=files, headers={"Accept": "application/json"} - ) - response.raise_for_status() - job_data = response.json() - video_id = job_data["id"] - - while True: - status_url = f"{self.base_url}/v1/videos/{video_id}" - status_resp = requests.get(status_url) - status_data = status_resp.json() - current_status = status_data["status"] - - if current_status == "completed": - break - elif current_status == "failed": - error_msg = status_data.get("last_error", "Unknown error") - raise RuntimeError(f"Job failed: {error_msg}") - - time.sleep(2) - - download_url = f"{self.base_url}/v1/videos/{video_id}/content" - video_resp = requests.get(download_url, stream=True) - video_resp.raise_for_status() - - video_bytes = BytesIO() - for chunk in video_resp.iter_content(chunk_size=8192): - video_bytes.write(chunk) - video_bytes.seek(0) - - result.success = True - result.videos = [video_bytes.getvalue()] - result.e2e_latency = time.perf_counter() - start_time - - assert_diffusion_response(result, request_config, run_level=self.run_level) - - except Exception as e: - result.success = False - result.error_message = f"Diffusion response processing error: {str(e)}" - - responses.append(result) - - else: # images - if request_num == 1: - # Send single request - chat_completion = self.client.chat.completions.create( - model=request_config.get("model"), - messages=messages, - extra_body=extra_body, - modalities=modalities, - ) + # Process completed tasks + for future in concurrent.futures.as_completed(futures): + chat_completion = future.result() + response = self._process_diffusion_response(chat_completion) + assert_diffusion_response(response, request_config, run_level=self.run_level) + responses.append(response) - response = self._process_diffusion_response(chat_completion) - assert_diffusion_response(response, request_config, run_level=self.run_level) - responses.append(response) + return responses + def send_video_diffusion_request(self, request_config: dict[str, Any], request_num: int = 1) -> list[OmniResponse]: + """ + Send native /v1/videos requests. + """ + if request_num != 1: + raise NotImplementedError("Concurrent video diffusion requests are not currently implemented") + + if request_config.get("stream", False): + raise NotImplementedError("Streaming is not currently implemented for video diffusion e2e test") + + form_data = request_config.get("form_data") + if not isinstance(form_data, dict): + raise ValueError("Video request_config must contain 'form_data'") + + if not form_data.get("prompt"): + raise ValueError("Video request_config['form_data'] must contain 'prompt'") + + normalized_form_data = {key: str(value) for key, value in form_data.items() if value is not None} + + files: dict[str, tuple[str, BytesIO, str]] = {} + image_reference = request_config.get("image_reference") + if image_reference: + if image_reference.startswith("data:image"): + header, encoded = image_reference.split(",", 1) + content_type = header.split(";")[0].removeprefix("data:") + extension = content_type.split("/")[-1] + file_data = base64.b64decode(encoded) + + files["input_reference"] = ( + f"reference.{extension}", + BytesIO(file_data), + content_type, + ) else: - # Send concurrent requests - with concurrent.futures.ThreadPoolExecutor(max_workers=request_num) as executor: - futures = [] - - # Submit all request tasks - for _ in range(request_num): - future = executor.submit( - self.client.chat.completions.create, - model=request_config.get("model"), - messages=messages, - modalities=modalities, - extra_body=extra_body, - ) - futures.append(future) - - # Process completed tasks - for future in concurrent.futures.as_completed(futures): - chat_completion = future.result() - response = self._process_diffusion_response(chat_completion) - assert_diffusion_response(response, request_config, run_level=self.run_level) - responses.append(response) + normalized_form_data["image_reference"] = json.dumps({"image_url": image_reference}) - return responses + result = DiffusionResponse() + start_time = time.perf_counter() + try: + create_url = self._build_url("/v1/videos") + response = requests.post( + create_url, + data=normalized_form_data, + files=files, + headers={"Accept": "application/json"}, + timeout=60, + ) + response.raise_for_status() -def extract_params_from_messages( - messages: list[dict[str, Any]], -) -> tuple[ - str | None, # system_prompt_content - str, # user_prompt_text - list[str], # video_urls - list[str], # image_urls - list[str], # audio_urls -]: - system_prompt = None - user_prompt_parts = [] - video_urls, image_urls, audio_urls = [], [], [] - - # extract system_prompt - for msg in messages: - if msg.get("role") == "system": - content = msg.get("content", "") - if isinstance(content, str): - system_prompt = content - elif isinstance(content, list): - system_prompt = " ".join( - item["text"] for item in content if isinstance(item, dict) and item.get("type") == "text" - ) - break + job_data = response.json() + video_id = job_data["id"] + + self._wait_until_video_completed(video_id) + + video_content = self._download_video_content(video_id) + + result.success = True + result.videos = [video_content] + result.e2e_latency = time.perf_counter() - start_time + + assert_diffusion_response(result, request_config, run_level=self.run_level) + + except Exception as e: + result.success = False + result.error_message = f"Diffusion response processing error: {e}" + assert False, result.error_message + + return [result] + + def _wait_until_video_completed( + self, + video_id: str, + poll_interval_seconds: int = 2, + timeout_seconds: int = 300, + ) -> None: + status_url = self._build_url(f"/v1/videos/{video_id}") + deadline = time.monotonic() + timeout_seconds + + while time.monotonic() < deadline: + status_resp = requests.get( + status_url, + headers={"Accept": "application/json"}, + timeout=30, + ) + status_resp.raise_for_status() - # extract latest user message - user_msg = next((msg for msg in reversed(messages) if msg.get("role") == "user"), None) - if not user_msg: - return system_prompt, "", video_urls, image_urls, audio_urls + status_data = status_resp.json() + current_status = status_data["status"] - content = user_msg.get("content", "") + if current_status == "completed": + return - if isinstance(content, str): - return system_prompt, content, video_urls, image_urls, audio_urls + if current_status == "failed": + error_msg = status_data.get("last_error", "Unknown error") + raise RuntimeError(f"Job failed: {error_msg}") - if not isinstance(content, list): - raise ValueError(f"Unexpected content type: {type(content)}") + time.sleep(poll_interval_seconds) - for item in content: - if not isinstance(item, dict): - continue + raise TimeoutError(f"Video job {video_id} did not complete within {timeout_seconds}s") - item_type = item.get("type", "") + def _download_video_content(self, video_id: str) -> bytes: + download_url = self._build_url(f"/v1/videos/{video_id}/content") + video_resp = requests.get(download_url, stream=True, timeout=60) + video_resp.raise_for_status() - if item_type == "text" and "text" in item: - user_prompt_parts.append(item["text"]) + video_bytes = BytesIO() + for chunk in video_resp.iter_content(chunk_size=8192): + if chunk: + video_bytes.write(chunk) - elif item_type in ("video_url", "image_url", "audio_url"): - media_key = item_type - media_obj = item.get(media_key, {}) - if isinstance(media_obj, dict) and "url" in media_obj: - url = media_obj["url"] - if url: - if item_type == "video_url": - video_urls.append(url) - elif item_type == "image_url": - image_urls.append(url) - elif item_type == "audio_url": - audio_urls.append(url) + return video_bytes.getvalue() - return (system_prompt, " ".join(user_prompt_parts).strip(), video_urls, image_urls, audio_urls) + def _build_url(self, path: str) -> str: + return f"{self.base_url.rstrip('/')}/{path.lstrip('/')}" @pytest.fixture diff --git a/tests/e2e/online_serving/test_wan22_expansion.py b/tests/e2e/online_serving/test_wan22_expansion.py index ea59d8bfd5f..e5e2d748d58 100644 --- a/tests/e2e/online_serving/test_wan22_expansion.py +++ b/tests/e2e/online_serving/test_wan22_expansion.py @@ -12,6 +12,7 @@ - Tensor-Parallel - VAE-Patch-Parallel - HSDP +- Ring-Attn assert_diffusion_response validates successful generation """ @@ -22,7 +23,6 @@ OmniServer, OmniServerParams, OpenAIClientHandler, - dummy_messages_from_mix_data, generate_synthetic_image, ) from tests.utils import hardware_marks @@ -41,9 +41,9 @@ PARALLEL_CONFIGS = [ ("cfg_parallel", ["--cfg-parallel-size", "2"]), ("ulysses_sp", ["--usp", "2"]), - ("tensor_parallel", ["--tensor-parallel-size", "2"]), - ("vae_patch", ["--vae-patch-parallel-size", "2"]), + ("tp_vae_patch", ["--tensor-parallel-size", "2", "--vae-patch-parallel-size", "2"]), ("hsdp", ["--use-hsdp", "--hsdp-shard-size", "2"]), # replicate_size=1 (default) + ("ring_atten", ["--ring", "2"]), ] @@ -62,7 +62,7 @@ def _get_wan22_feature_cases(): pytest.param( OmniServerParams( model=model_path, - server_args=["--cache-backend", "cache_dit"], + server_args=["--cache-backend", "cache_dit", "--enable-layerwise-offload"], ), id=f"{model_key}_cache_dit", marks=SINGLE_CARD_FEATURE_MARKS, @@ -98,37 +98,34 @@ def test_wan22_diffusion_features( is_i2v_or_ti2v = any(kw in model_path for kw in ["I2V", "TI2V"]) is_moe_model = "I2V-A14B" in model_path # Only I2V-A14B uses MoE per spec - if is_i2v_or_ti2v: - image_data_url = f"data:image/jpeg;base64,{generate_synthetic_image(512, 512)['base64']}" - messages = dummy_messages_from_mix_data(image_data_url=image_data_url, content_text=PROMPT) - else: - messages = dummy_messages_from_mix_data(content_text=PROMPT) - - extra_body = { + form_data = { + "prompt": PROMPT, + "negative_prompt": NEGATIVE_PROMPT, "height": 512, "width": 512, "num_frames": 8, "fps": 8, "num_inference_steps": 2, "guidance_scale": 4.0, - "negative_prompt": NEGATIVE_PROMPT, "seed": 42, # flow_shift omitted: Service uses resolution-based defaults (12.0 for 512px) # vae_use_slicing/tiling omitted: Service-side optimization, not request param } if is_moe_model: - extra_body.update( + form_data.update( { - "guidance_scale_high": 1.0, + "guidance_scale_2": 1.0, "boundary_ratio": 0.5, } ) request_config = { "model": model_path, - "messages": messages, - "extra_body": extra_body, + "form_data": form_data, } - openai_client.send_diffusion_request(request_config) + if is_i2v_or_ti2v: + request_config["image_reference"] = f"data:image/jpeg;base64,{generate_synthetic_image(512, 512)['base64']}" + + openai_client.send_video_diffusion_request(request_config) From f8ad245ca728c06947f99fa82e2f8f14cbd9e6b6 Mon Sep 17 00:00:00 2001 From: bjf-frz Date: Tue, 24 Mar 2026 10:44:09 +0800 Subject: [PATCH 3/6] fix process image diffusion Signed-off-by: bjf-frz --- tests/conftest.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 31d708e5a7e..f0ce070adf2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1710,12 +1710,15 @@ def _process_diffusion_response(self, chat_completion) -> DiffusionResponse: content = choice.message.content if isinstance(content, list): for item in content: - if hasattr(item, "image_url") and item.image_url is not None: - image_url = item.image_url.url - if image_url.startswith("data:image"): - b64_data = image_url.split(",", 1)[1] - img = decode_b64_image(b64_data) - images.append(img) + if isinstance(item, dict): + image_url = item.get("image_url", {}).get("url") + else: + image_url_obj = getattr(item, "image_url", None) + image_url = hasattr(image_url_obj, "url", None) if image_url_obj else None + if image_url and image_url.startswith("data:image"): + b64_data = image_url.split(",", 1)[1] + img = decode_b64_image(b64_data) + images.append(img) result.e2e_latency = time.perf_counter() - start_time result.images = images if images else None From a7a359e2a6dcec38cbdfd460a5ea6d0cf8b83dec Mon Sep 17 00:00:00 2001 From: bjf-frz Date: Tue, 24 Mar 2026 15:46:10 +0800 Subject: [PATCH 4/6] prolong duration of nightly test with h100 Signed-off-by: bjf-frz --- .buildkite/test-nightly.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.buildkite/test-nightly.yml b/.buildkite/test-nightly.yml index 2329800d69a..df935877068 100644 --- a/.buildkite/test-nightly.yml +++ b/.buildkite/test-nightly.yml @@ -71,7 +71,7 @@ steps: - "/fsx/hf_cache:/fsx/hf_cache" - label: ":full_moon: Diffusion Model Test with H100" - timeout_in_minutes: 60 + timeout_in_minutes: 90 depends_on: upload-nightly-pipeline if: build.env("NIGHTLY") == "1" commands: From 4aa563d3d931f393f5cdb00aa1e1922d6b71dcec Mon Sep 17 00:00:00 2001 From: bjf-frz Date: Wed, 25 Mar 2026 11:26:48 +0800 Subject: [PATCH 5/6] modify hunyuan-video l4 model test Signed-off-by: bjf-frz --- .../test_hunyuan_video_15_expansion.py | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/tests/e2e/online_serving/test_hunyuan_video_15_expansion.py b/tests/e2e/online_serving/test_hunyuan_video_15_expansion.py index 62109fbbc5b..de950edb900 100644 --- a/tests/e2e/online_serving/test_hunyuan_video_15_expansion.py +++ b/tests/e2e/online_serving/test_hunyuan_video_15_expansion.py @@ -15,7 +15,6 @@ OmniServer, OmniServerParams, OpenAIClientHandler, - dummy_messages_from_mix_data, ) from tests.utils import hardware_marks @@ -79,20 +78,20 @@ def test_hunyuan_video_15_t2v( openai_client: OpenAIClientHandler, ): """L4 diffusion feature coverage for HunyuanVideo-1.5-T2V on H100.""" - messages = dummy_messages_from_mix_data(content_text=PROMPT) + form_data = { + "prompt": PROMPT, + "negative_prompt": NEGATIVE_PROMPT, + "height": 480, + "width": 640, + "num_frames": 5, + "num_inference_steps": 2, + "guidance_scale": 6.0, + "seed": 42, + } request_config = { "model": omni_server.model, - "messages": messages, - "extra_body": { - "height": 480, - "width": 640, - "num_frames": 5, - "num_inference_steps": 2, - "guidance_scale": 6.0, - "negative_prompt": NEGATIVE_PROMPT, - "seed": 42, - }, + "form_data": form_data, } - openai_client.send_diffusion_request(request_config) + openai_client.send_video_diffusion_request(request_config) From 8d25f4ace18f06b77d40f5ee5dffde944d8db4e2 Mon Sep 17 00:00:00 2001 From: bjf-frz Date: Thu, 26 Mar 2026 09:22:46 +0800 Subject: [PATCH 6/6] add l4 label for wan22 Signed-off-by: bjf-frz --- .buildkite/test-nightly.yml | 43 ++++++++++++++++++- .../online_serving/test_wan22_expansion.py | 1 - 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/.buildkite/test-nightly.yml b/.buildkite/test-nightly.yml index 26c26ef23e9..f9dadfd3127 100644 --- a/.buildkite/test-nightly.yml +++ b/.buildkite/test-nightly.yml @@ -71,7 +71,7 @@ steps: - "/fsx/hf_cache:/fsx/hf_cache" - label: ":full_moon: Diffusion Model Test with H100" - timeout_in_minutes: 90 + timeout_in_minutes: 60 depends_on: upload-nightly-pipeline if: build.env("NIGHTLY") == "1" commands: @@ -111,6 +111,47 @@ steps: path: /mnt/hf-cache type: DirectoryOrCreate + - label: ":full_moon: Diffusion Model Wan22 completed Test with H100" + timeout_in_minutes: 45 + depends_on: upload-nightly-pipeline + if: build.env("NIGHTLY") == "1" || build.pull_request.labels includes "nightly-test" + commands: + - export VLLM_WORKER_MULTIPROC_METHOD=spawn + - pytest -s -v tests/e2e/online_serving/test_wan22_expansion.py -m "advanced_model" --run-level "advanced_model" + agents: + queue: "mithril-h100-pool" + plugins: + - kubernetes: + podSpec: + containers: + - image: 936637512419.dkr.ecr.us-west-2.amazonaws.com/vllm-ci-pull-through-cache/q9t5s3a7/vllm-ci-test-repo:$BUILDKITE_COMMIT + resources: + limits: + nvidia.com/gpu: 2 + volumeMounts: + - name: devshm + mountPath: /dev/shm + - name: hf-cache + mountPath: /root/.cache/huggingface + env: + - name: HF_HOME + value: /root/.cache/huggingface + - name: HF_TOKEN + valueFrom: + secretKeyRef: + name: hf-token-secret + key: token + nodeSelector: + node.kubernetes.io/instance-type: gpu-h100-sxm + volumes: + - name: devshm + emptyDir: + medium: Memory + - name: hf-cache + hostPath: + path: /mnt/hf-cache + type: DirectoryOrCreate + - label: ":full_moon: Diffusion Model Test with L4" timeout_in_minutes: 60 depends_on: upload-nightly-pipeline diff --git a/tests/e2e/online_serving/test_wan22_expansion.py b/tests/e2e/online_serving/test_wan22_expansion.py index e5e2d748d58..f7c01c748e2 100644 --- a/tests/e2e/online_serving/test_wan22_expansion.py +++ b/tests/e2e/online_serving/test_wan22_expansion.py @@ -84,7 +84,6 @@ def _get_wan22_feature_cases(): @pytest.mark.advanced_model -@pytest.mark.diffusion @pytest.mark.parametrize( "omni_server", _get_wan22_feature_cases(),