Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 0 additions & 252 deletions benchmarks/diffusion/backends.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import base64
import json
import mimetypes
import os
import time
Expand Down Expand Up @@ -335,263 +334,12 @@ async def async_request_v1_videos(
return output


async def async_request_image_sglang(
input: RequestFuncInput,
session: aiohttp.ClientSession,
pbar: tqdm | None = None,
) -> RequestFuncOutput:
output = RequestFuncOutput()
output.start_time = time.perf_counter()

# Check if we need to use multipart (for image edits with input images)
if input.image_paths and len(input.image_paths) > 0:
# Use multipart/form-data for image edits
data = aiohttp.FormData()
data.add_field("model", input.model)
data.add_field("prompt", input.prompt)
data.add_field("response_format", "b64_json")

if input.width and input.height:
data.add_field("size", f"{input.width}x{input.height}")

# Merge extra parameters
for key, value in input.extra_body.items():
data.add_field(key, str(value))

# Add image file(s)
for idx, img_path in enumerate(input.image_paths):
if os.path.exists(img_path):
data.add_field(
"image",
open(img_path, "rb"),
filename=os.path.basename(img_path),
content_type="application/octet-stream",
)
else:
output.error = f"Image file not found: {img_path}"
output.success = False
if pbar:
pbar.update(1)
return output

try:
async with session.post(input.api_url, data=data) as response:
if response.status == 200:
resp_json = await response.json()
output.response_body = resp_json
output.success = True
if "peak_memory_mb" in resp_json:
output.peak_memory_mb = resp_json["peak_memory_mb"]
else:
output.error = f"HTTP {response.status}: {await response.text()}"
output.success = False
except Exception as e:
output.error = str(e)
output.success = False
else:
# Use JSON for text-to-image generation
payload = {
"model": input.model,
"prompt": input.prompt,
"n": 1,
"response_format": "b64_json",
}

if input.width and input.height:
payload["size"] = f"{input.width}x{input.height}"

if input.num_inference_steps:
payload["num_inference_steps"] = input.num_inference_steps

payload.update(input.extra_body)

try:
async with session.post(input.api_url, json=payload) as response:
if response.status == 200:
resp_json = await response.json()
output.response_body = resp_json
output.success = True
if "peak_memory_mb" in resp_json:
output.peak_memory_mb = resp_json["peak_memory_mb"]
else:
output.error = f"HTTP {response.status}: {await response.text()}"
output.success = False
except Exception as e:
output.error = str(e)
output.success = False

output.latency = time.perf_counter() - output.start_time

# Check SLO if defined
if input.slo_ms is not None and output.success:
output.slo_achieved = (output.latency * 1000.0) <= input.slo_ms

if pbar:
pbar.update(1)
return output


async def async_request_video_sglang(
input: RequestFuncInput,
session: aiohttp.ClientSession,
pbar: tqdm | None = None,
) -> RequestFuncOutput:
output = RequestFuncOutput()
output.start_time = time.perf_counter()

# 1. Submit Job
job_id = None
# Check if we need to upload images (Multipart) or just send JSON
if input.image_paths and len(input.image_paths) > 0:
# Use multipart/form-data
data = aiohttp.FormData()
data.add_field("model", input.model)
data.add_field("prompt", input.prompt)

if input.width and input.height:
data.add_field("size", f"{input.width}x{input.height}")

# Add extra body fields to form data if possible, or assume simple key-values
# Note: Nested dicts in extra_body might need JSON serialization if API expects it stringified
if input.extra_body:
data.add_field("extra_body", json.dumps(input.extra_body))

# Explicitly add fps/num_frames if they are not in extra_body (bench_serving logic overrides)
if input.num_frames:
data.add_field("num_frames", str(input.num_frames))
if input.fps:
data.add_field("fps", str(input.fps))

# Add image file
# Currently only support single image upload as 'input_reference' per API spec
img_path = input.image_paths[0]
if os.path.exists(img_path):
data.add_field(
"input_reference",
open(img_path, "rb"),
filename=os.path.basename(img_path),
content_type="application/octet-stream",
)
else:
output.error = f"Image file not found: {img_path}"
output.success = False
if pbar:
pbar.update(1)
return output

try:
async with session.post(input.api_url, data=data) as response:
if response.status == 200:
resp_json = await response.json()
job_id = resp_json.get("id")
else:
output.error = f"Submit failed HTTP {response.status}: {await response.text()}"
output.success = False
if pbar:
pbar.update(1)
return output
except Exception as e:
output.error = f"Submit exception: {str(e)}"
output.success = False
if pbar:
pbar.update(1)
return output

else:
# Use JSON
payload: dict[str, Any] = {
"model": input.model,
"prompt": input.prompt,
}
if input.width and input.height:
payload["size"] = f"{input.width}x{input.height}"
if input.num_frames:
payload["num_frames"] = input.num_frames
if input.fps:
payload["fps"] = input.fps
if input.num_inference_steps:
payload["num_inference_steps"] = input.num_inference_steps

payload.update(input.extra_body)

try:
async with session.post(input.api_url, json=payload) as response:
if response.status == 200:
resp_json = await response.json()
job_id = resp_json.get("id")
else:
output.error = f"Submit failed HTTP {response.status}: {await response.text()}"
output.success = False
if pbar:
pbar.update(1)
return output
except Exception as e:
output.error = f"Submit exception: {str(e)}"
output.success = False
if pbar:
pbar.update(1)
return output

if not job_id:
output.error = "No job_id returned"
output.success = False
if pbar:
pbar.update(1)
return output

# 2. Poll for completion
# Assuming the API returns a 'status' field.
# We construct the check URL. Assuming api_url is like .../v1/videos
# The check url should be .../v1/videos/{id}
check_url = f"{input.api_url}/{job_id}"

while True:
try:
async with session.get(check_url) as response:
if response.status == 200:
status_data = await response.json()
status = status_data.get("status")
if status == "completed":
output.success = True
output.response_body = status_data
if "peak_memory_mb" in status_data:
output.peak_memory_mb = status_data["peak_memory_mb"]
break
elif status == "failed":
output.success = False
output.error = f"Job failed: {status_data.get('error')}"
break
else:
# queued or processing
await asyncio.sleep(1.0)
else:
output.success = False
output.error = f"Poll failed HTTP {response.status}: {await response.text()}"
break
except Exception as e:
output.success = False
output.error = f"Poll exception: {str(e)}"
break

output.latency = time.perf_counter() - output.start_time

# Check SLO if defined
if input.slo_ms is not None and output.success:
output.slo_achieved = (output.latency * 1000.0) <= input.slo_ms

if pbar:
pbar.update(1)
return output


backends_function_mapping = {
"2i": {
"vllm-omni": (async_request_chat_completions, "/v1/chat/completions"),
"openai": (async_request_openai_images, "/v1/images/generations"),
"sglang": (async_request_image_sglang, "/v1/images/generations"),
},
"2v": {
"v1/videos": (async_request_v1_videos, "/v1/videos"),
"sglang": (async_request_video_sglang, "/v1/videos"),
},
}
4 changes: 2 additions & 2 deletions benchmarks/diffusion/diffusion_benchmark_serving.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# adapted from sglang and fastvideo
# adapted from fastvideo
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

Expand Down Expand Up @@ -1007,7 +1007,7 @@ async def limited_request_func(req, session, pbar):
"--backend",
type=str,
default="vllm-omni",
choices=["vllm-omni", "openai", "sglang", "v1/videos"],
choices=["vllm-omni", "openai", "v1/videos"],
help="Backend to target the benchmark to.",
)
parser.add_argument(
Expand Down
Loading
Loading