Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 136 additions & 2 deletions benchmarks/diffusion/backends.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import base64
import mimetypes
import os
Expand Down Expand Up @@ -206,7 +207,140 @@ async def async_request_openai_images(
return output


async def async_request_v1_videos(
input: RequestFuncInput,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In diffusion_benchmark_serving.py, it says t2v benchmark can also use vllm-omni backends. Why defining another backend here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/v1/chat/complete backends actually not support t2v

session: aiohttp.ClientSession,
pbar: tqdm | None = None,
) -> RequestFuncOutput:
output = RequestFuncOutput()
output.start_time = time.perf_counter()

files = dict(input.extra_body)
if input.prompt:
files.setdefault("prompt", input.prompt)
if input.width and input.height:
files.setdefault("height", input.height)
files.setdefault("width", input.width)
if input.num_frames:
files.setdefault("num_frames", input.num_frames)
if input.num_inference_steps:
files.setdefault("num_inference_steps", input.num_inference_steps)
if input.seed is not None:
files.setdefault("seed", input.seed)
if input.fps:
files.setdefault("fps", input.fps)

form = aiohttp.FormData()
for k, v in files.items():
form.add_field(k, str(v))

image_file = None
if input.image_paths and len(input.image_paths) > 0:
image_path = input.image_paths[0]
image_file = open(image_path, "rb")
form.add_field(
"input_reference",
image_file,
filename=os.path.basename(image_path),
content_type="application/octet-stream",
)

job_id = None
job_status = None
poll_json = {}
resp_json = {}

try:
# invoke a post request (POST /v1/videos)
async with session.post(input.api_url, data=form) as response:
if response.status == 200:
resp_json = await response.json()
job_id = resp_json.get("id")
job_status = resp_json.get("status")
if not job_id or not job_status:
output.error = "API response missing job 'id' or 'status' field."
output.success = False
return output
else:
output.error = f"HTTP {response.status}: {await response.text()}"
output.success = False
return output

# invoke a poll request (GET /v1/videos/{video_id})
poll_interval = 2.0 # Unit(s)
timeout_seconds = 600.0
deadline = time.perf_counter() + timeout_seconds
job_url = f"{input.api_url}/{job_id}"

while job_status not in {"completed", "failed"}:
await asyncio.sleep(poll_interval)

async with session.get(job_url) as poll_response:
if poll_response.status != 200:
output.error = f"Polling failed HTTP {poll_response.status}: {await poll_response.text()}"
output.success = False
return output

poll_json = await poll_response.json()
job_status = poll_json.get("status")

if time.perf_counter() >= deadline:
output.error = f"Timed out waiting for video job {job_id} to complete."
output.success = False
return output

if job_status == "failed":
output.error = f"Video job failed: {poll_json}"
output.success = False
return output

# invoke a get request (GET /v1/videos/{video_id}/content)
content_url = f"{job_url}/content"
async with session.get(content_url) as content_response:
if content_response.status != 200:
output.error = (
f"Content retrieval failed HTTP {content_response.status}: {await content_response.text()}"
)
output.success = False
return output

video_bytes = await content_response.read()
output.response_body = video_bytes
output.success = True
if "peak_memory_mb" in poll_json:
output.peak_memory_mb = poll_json["peak_memory_mb"]
elif "peak_memory_mb" in resp_json:
output.peak_memory_mb = resp_json["peak_memory_mb"]
except Exception as e:
output.error = str(e)
output.success = False
finally:
if image_file is not None:
image_file.close()

if job_id is not None:
try:
async with session.delete(f"{input.api_url}/{job_id}") as _:
pass
except Exception as e:
print(f"Failed to clean up video job {job_id}: {e}")

output.latency = time.perf_counter() - output.start_time

if output.success and input.slo_ms is not None:
output.slo_achieved = (output.latency * 1000.0) <= float(input.slo_ms)

if pbar:
pbar.update(1)
return output


backends_function_mapping = {
"vllm-omni": (async_request_chat_completions, "/v1/chat/completions"),
"openai": (async_request_openai_images, "/v1/images/generations"),
"2i": {
"vllm-omni": (async_request_chat_completions, "/v1/chat/completions"),
"openai": (async_request_openai_images, "/v1/images/generations"),
},
"2v": {
"v1/videos": (async_request_v1_videos, "/v1/videos"),
},
}
136 changes: 124 additions & 12 deletions benchmarks/diffusion/diffusion_benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Supports multiple backends:
- vllm-omni: Uses /v1/chat/completions endpoint (default)
- openai: Uses /v1/images/generations endpoint
- v1/videos: Use /v1/videos endpoint

Usage:
# Video (vllm-omni backend)
Expand All @@ -28,6 +29,16 @@
--backend vllm-omni --dataset vbench --task t2i --num-prompts 10 \
--height 1024 --width 1024

python3 benchmarks/diffusion/diffusion_benchmark_serving.py \
--backend vllm-omni --dataset random --task t2i --num-prompts 1 \
Comment thread
wtomin marked this conversation as resolved.
--max-concurrency 1 --enable-negative-prompt \
--random-request-config '[
{"width":512,"height":512,"num_inference_steps":20,"weight":0.15},
{"width":768,"height":768,"num_inference_steps":20,"weight":0.25},
{"width":1024,"height":1024,"num_inference_steps":25,"weight":0.45},
{"width":1536,"height":1536,"num_inference_steps":35,"weight":0.15}
]'

i2i:
python3 benchmarks/diffusion/diffusion_benchmark_serving.py \
--backend vllm-omni --dataset vbench --task i2i --num-prompts 10
Expand All @@ -38,14 +49,27 @@
--backend openai --dataset vbench --task t2i --num-prompts 10 \
--height 1024 --width 1024 --port 3000

# Video (v1/vedeos)
t2v:
python3 benchmarks/diffusion/diffusion_benchmark_serving.py \
--backend v1/videos --dataset random --task t2v --num-prompts 1 \
--max-concurrency 1 --enable-negative-prompt \
--random-request-config '[
{"width":854,"height":480,"num_inference_steps":18,"num_frames":120,"fps":24,"weight":1}
]'


"""

import argparse
import ast
import asyncio
import glob
import json
import logging
import os
import random
import tempfile
import time
import uuid
from abc import ABC, abstractmethod
Expand All @@ -57,8 +81,11 @@
import numpy as np
import requests
from backends import RequestFuncInput, RequestFuncOutput, backends_function_mapping
from PIL import Image
from tqdm.asyncio import tqdm

logger = logging.getLogger(__name__)


class BaseDataset(ABC):
def __init__(self, args, api_url: str, model: str):
Expand Down Expand Up @@ -516,26 +543,66 @@ def get_requests(self) -> list[RequestFuncInput]:


class RandomDataset(BaseDataset):
def __init__(self, args, api_url: str, model: str):
self.args = args
self.api_url = api_url
self.model = model
def __init__(self, args, api_url: str, model: str, enable_negative_prompt: bool = False):
super().__init__(args, api_url, model)
self.num_prompts = args.num_prompts
self.enable_negative_prompt = enable_negative_prompt
self.random_request_config = getattr(args, "random_request_config", None)
if self.random_request_config:
self.random_request_config = json.loads(self.random_request_config)
self._weights = [p["weight"] for p in self.random_request_config]

self.random_request_config = [
{k: v for k, v in p.items() if k != "weight"} for p in self.random_request_config
]

seed = getattr(args, "random_request_seed", 42)
self._rng = random.Random(seed)

self._sampled_requests = self._rng.choices(
self.random_request_config,
weights=self._weights,
k=self.num_prompts,
)
else:
self._sampled_requests = None

# Random image generate
if self.args.task in ["i2v", "ti2v", "ti2i", "i2i"]:
img = Image.new("RGB", (512, 512), (255, 255, 255))

image_path = os.path.join(tempfile.gettempdir(), "diffusion_benchmark_random_image.png")
self._random_image_path = [image_path]
img.save(image_path)
else:
self._random_image_path = None

def __len__(self) -> int:
return self.num_prompts

def __getitem__(self, idx: int) -> RequestFuncInput:
extra_body = {}
if self.enable_negative_prompt:
extra_body["negative_prompt"] = f"Negative prompt {idx} for benchmarking diffusion models"

params = {
"width": self.args.width,
"height": self.args.height,
"num_frames": self.args.num_frames,
"num_inference_steps": self.args.num_inference_steps,
"fps": self.args.fps,
}
if self._sampled_requests:
profile = self._sampled_requests[idx]
params.update(profile)
return RequestFuncInput(
prompt=f"Random prompt {idx} for benchmarking diffusion models",
api_url=self.api_url,
model=self.model,
width=self.args.width,
height=self.args.height,
num_frames=self.args.num_frames,
num_inference_steps=self.args.num_inference_steps,
seed=self.args.seed,
fps=self.args.fps,
extra_body=extra_body,
image_paths=self._random_image_path,
**params,
)

def get_requests(self) -> list[RequestFuncInput]:
Expand Down Expand Up @@ -686,6 +753,7 @@ def calculate_metrics(
"latency_mean": np.mean(latencies) if latencies else 0,
"latency_median": np.median(latencies) if latencies else 0,
"latency_p99": np.percentile(latencies, 99) if latencies else 0,
"latency_p95": np.percentile(latencies, 95) if latencies else 0,
"latency_p50": np.percentile(latencies, 50) if latencies else 0,
"peak_memory_mb_max": max(peak_memories) if peak_memories else 0,
"peak_memory_mb_mean": np.mean(peak_memories) if peak_memories else 0,
Expand Down Expand Up @@ -742,16 +810,40 @@ async def benchmark(args):
if args.base_url is None:
args.base_url = f"http://{args.host}:{args.port}"

VIDEO_TASKS = {"t2v", "i2v", "ti2v"}
IMAGE_TASKS = {"t2i", "i2i", "ti2i"}

if args.task in VIDEO_TASKS:
task_type = "2v"
elif args.task in IMAGE_TASKS:
task_type = "2i"
else:
raise ValueError(
f"Unsupported task: '{args.task}'. "
f"Valid video tasks: {sorted(VIDEO_TASKS)}, "
f"Valid image tasks: {sorted(IMAGE_TASKS)}"
)

valid_backends = sorted(backends_function_mapping[task_type].keys())

if args.backend not in valid_backends:
logger.error(
f"Invalid backend '{args.backend}' for task '{args.task}' (task type: '{task_type}').\n"
f"Valid backends for this task type: {valid_backends}\n"
f"Example usage: --task {args.task} --backend {valid_backends[0]}"
)
raise ValueError("Backend validation failed. See log above for valid options.")

# Setup API URL and request function based on backend
request_func, api_url = backends_function_mapping[args.backend]
request_func, api_url = backends_function_mapping[task_type][args.backend]
api_url = f"{args.base_url}{api_url}"

if args.dataset == "vbench":
dataset = VBenchDataset(args, api_url, args.model)
elif args.dataset == "trace":
dataset = TraceDataset(args, api_url, args.model)
elif args.dataset == "random":
dataset = RandomDataset(args, api_url, args.model)
dataset = RandomDataset(args, api_url, args.model, args.enable_negative_prompt)
else:
raise ValueError(f"Unknown dataset: {args.dataset}")

Expand Down Expand Up @@ -847,6 +939,7 @@ async def limited_request_func(req, session, pbar):
print("{:<40} {:<15.4f}".format("Latency Mean (s):", metrics["latency_mean"]))
print("{:<40} {:<15.4f}".format("Latency Median (s):", metrics["latency_median"]))
print("{:<40} {:<15.4f}".format("Latency P99 (s):", metrics["latency_p99"]))
print("{:<40} {:<15.4f}".format("Latency P95 (s):", metrics["latency_p95"]))

if args.slo:
print(f"{'-' * 50}")
Expand Down Expand Up @@ -883,7 +976,7 @@ async def limited_request_func(req, session, pbar):
"--backend",
type=str,
default="vllm-omni",
choices=["vllm-omni", "openai"],
choices=["vllm-omni", "openai", "v1/videos"],
help="Backend to target the benchmark to.",
)
parser.add_argument(
Expand Down Expand Up @@ -972,6 +1065,25 @@ async def limited_request_func(req, session, pbar):
help="SLO target multiplier: slo_ms = estimated_exec_time_ms * slo_scale (default: 3).",
)
parser.add_argument("--disable-tqdm", action="store_true", help="Disable progress bar.")
parser.add_argument(
"--enable-negative-prompt",
action="store_true",
default=False,
help="Generate negative prompts when using the random dataset.",
)
parser.add_argument(
"--random-request-config",
type=str,
default=None,
help=(
"JSON string defining random request profiles. "
"Each profile may contain: width, height, num_inference_steps, etc. "
"The 'weight' field controls sampling probability (relative weight). "
"Example: "
'[{"width":512,"height":512,"num_inference_steps":20,"weight":0.15},'
'{"width":768,"height":768,"num_inference_steps":20,"weight":0.85}]'
),
)

args = parser.parse_args()

Expand Down
Loading
Loading