diff --git a/.buildkite/test-nightly-diffusion.yml b/.buildkite/test-nightly-diffusion.yml index 04b99c0a837..a520ca4356d 100644 --- a/.buildkite/test-nightly-diffusion.yml +++ b/.buildkite/test-nightly-diffusion.yml @@ -325,10 +325,23 @@ steps: if: *nightly_or_pr_label commands: - export DIFFUSION_BENCHMARK_DIR=tests/dfx/perf/results + - export DIFFUSION_ATTENTION_BACKEND=FLASH_ATTN - export CACHE_DIT_VERSION=1.3.0 - - pytest -s -v tests/dfx/perf/scripts/run_diffusion_benchmark.py --config-file tests/dfx/perf/tests/test_qwen_image_vllm_omni.json - - buildkite-agent artifact upload "tests/dfx/perf/results/benchmark_results_*.json" - - buildkite-agent artifact upload "tests/dfx/perf/results/logs/*.log" + # [HACK]: run upload in the same command block as pytest. + # Because `exit` aborts the entire commands list. + - | + set +e + pytest -s -v tests/dfx/perf/scripts/run_diffusion_benchmark.py --config-file tests/dfx/perf/tests/test_qwen_image_vllm_omni.json + EXIT1=$$? + pytest -s -v tests/dfx/perf/scripts/run_diffusion_benchmark.py --config-file tests/dfx/perf/tests/test_qwen_image_edit_vllm_omni.json + EXIT2=$$? + pytest -s -v tests/dfx/perf/scripts/run_diffusion_benchmark.py --config-file tests/dfx/perf/tests/test_qwen_image_edit_2509_vllm_omni.json + EXIT3=$$? + if [ $$EXIT1 -eq 0 ] || [ $$EXIT2 -eq 0 ] || [ $$EXIT3 -eq 0 ]; then + buildkite-agent artifact upload "tests/dfx/perf/results/diffusion_result_*.json" + buildkite-agent artifact upload "tests/dfx/perf/results/logs/*.log" + fi + exit $$((EXIT1 | EXIT2 | EXIT3)) agents: queue: "mithril-h100-pool" plugins: diff --git a/benchmarks/diffusion/diffusion_benchmark_serving.py b/benchmarks/diffusion/diffusion_benchmark_serving.py index aad955b0d1d..32ec48a698f 100644 --- a/benchmarks/diffusion/diffusion_benchmark_serving.py +++ b/benchmarks/diffusion/diffusion_benchmark_serving.py @@ -558,6 +558,7 @@ def __init__(self, args, api_url: str, model: str, enable_negative_prompt: bool super().__init__(args, api_url, model) self.num_prompts = args.num_prompts self.enable_negative_prompt = enable_negative_prompt + self.num_input_images = max(1, args.num_input_images) self.random_request_config = getattr(args, "random_request_config", None) if self.random_request_config: self.random_request_config = json.loads(self.random_request_config) @@ -580,11 +581,7 @@ def __init__(self, args, api_url: str, model: str, enable_negative_prompt: bool # Random image generate if self.args.task in ["i2v", "ti2v", "ti2i", "i2i"]: - img = Image.new("RGB", (512, 512), (255, 255, 255)) - - image_path = os.path.join(tempfile.gettempdir(), "diffusion_benchmark_random_image.png") - self._random_image_path = [image_path] - img.save(image_path) + self._random_image_path = self._generate_random_image_paths() else: self._random_image_path = None @@ -619,6 +616,18 @@ def __getitem__(self, idx: int) -> RequestFuncInput: def get_requests(self) -> list[RequestFuncInput]: return [self[i] for i in range(len(self))] + def _generate_random_image_paths(self) -> list[str]: + image_paths: list[str] = [] + for image_idx in range(self.num_input_images): + img = Image.new("RGB", (512, 512), (255, 255, 255)) + image_path = os.path.join( + tempfile.gettempdir(), + f"diffusion_benchmark_random_image_{image_idx}.png", + ) + img.save(image_path) + image_paths.append(image_path) + return image_paths + def _compute_expected_latency_ms_from_base(req: RequestFuncInput, args, base_time_ms: float | None) -> float | None: """Compute expected execution time (ms) based on a base per-step-per-frame unit time. @@ -1115,6 +1124,15 @@ async def limited_request_func(req, session, pbar): '{"width":768,"height":768,"num_inference_steps":20,"weight":0.85}]' ), ) + parser.add_argument( + "--num-input-images", + type=int, + default=1, + help=( + "Number of synthetic input images to attach for image-conditioned tasks " + "(i2v, ti2v, ti2i, i2i) when using random dataset." + ), + ) args = parser.parse_args() diff --git a/tests/dfx/perf/scripts/run_diffusion_benchmark.py b/tests/dfx/perf/scripts/run_diffusion_benchmark.py index 1bd9bf1a143..123f21405e8 100644 --- a/tests/dfx/perf/scripts/run_diffusion_benchmark.py +++ b/tests/dfx/perf/scripts/run_diffusion_benchmark.py @@ -27,13 +27,14 @@ import time from datetime import datetime from pathlib import Path -from typing import Any +from typing import Any, cast import psutil import pytest os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0" +os.environ.setdefault("DIFFUSION_ATTENTION_BACKEND", "FLASH_ATTN") # --------------------------------------------------------------------------- # Paths @@ -50,6 +51,7 @@ # Populated lazily after CONFIG_FILE_PATH is resolved. _SESSION_TIMESTAMP = datetime.now().strftime("%Y%m%d-%H%M%S") _RESULT_LOCK = threading.Lock() +_BRANCHPOINT_COMMIT_SHA: str | None = None def _get_config_file_from_argv() -> str | None: @@ -110,7 +112,7 @@ def load_configs(config_path: str) -> list[dict[str, Any]]: BENCHMARK_CONFIGS = load_configs(CONFIG_FILE_PATH) _config_stem = Path(CONFIG_FILE_PATH).stem # e.g. "test_qwen_image_vllm_omni" -AGGREGATED_RESULT_FILE = BENCHMARK_RESULT_DIR / f"benchmark_results_{_config_stem}_{_SESSION_TIMESTAMP}.json" +AGGREGATED_RESULT_FILE = BENCHMARK_RESULT_DIR / f"diffusion_result_{_config_stem}_{_SESSION_TIMESTAMP}.json" def _append_to_aggregated_file(record: dict[str, Any]) -> None: @@ -232,13 +234,13 @@ class DiffusionServer: def __init__( self, - model: str, - serve_args: list[str], + server_cfg: dict[str, Any], *, port: int | None = None, ) -> None: - self.model = model - self.serve_args = serve_args + self.server_cfg: dict[str, Any] = server_cfg + self.model = server_cfg["model"] + self.serve_args = server_cfg["serve_args"] self.host = "127.0.0.1" self.port = port if port is not None else _get_open_port() self.proc: subprocess.Popen | None = None @@ -299,6 +301,95 @@ def _build_serve_args(serve_args_dict: dict[str, Any]) -> list[str]: return args +def _get_branchpoint_commit_sha() -> str: + """Return the branch-point commit SHA against main. + + Uses git command: ``git merge-base HEAD origin/main``. + """ + global _BRANCHPOINT_COMMIT_SHA + if _BRANCHPOINT_COMMIT_SHA is not None: + return _BRANCHPOINT_COMMIT_SHA + + repo_root = Path(__file__).parent.parent.parent.parent + try: + sha = ( + subprocess.check_output( + ["git", "merge-base", "HEAD", "origin/main"], + cwd=str(repo_root), + stderr=subprocess.STDOUT, + text=True, + ) + .strip() + .splitlines()[0] + ) + _BRANCHPOINT_COMMIT_SHA = sha + except Exception as e: + print(f"Warning: failed to get branch-point commit SHA: {e}") + _BRANCHPOINT_COMMIT_SHA = "" + return _BRANCHPOINT_COMMIT_SHA + + +def _to_resolution_string(params: dict[str, Any]) -> str: + width = params.get("width", "unknown width") + height = params.get("height", "unknown height") + return f"{width}x{height}" + + +def _to_parallelism_string(framework: str, serve_args_dict: dict[str, Any]) -> str: + parts: list[str] = [] + if framework == "vllm-omni": + keys = [ + "num-gpus", + "usp", + "ulysses-degree", + "ring", + "ring-degree", + "cfg-parallel-size", + "vae-patch-parallel-size", + "vae-use-tiling", + "tensor-parallel-size", + ] + for key in keys: + if key in serve_args_dict: + parts.append(f"{key}={serve_args_dict[key]}") + return ",".join(parts) if parts else "none" + + +def _to_cache_string(framework: str, serve_args_dict: dict[str, Any]) -> str: + if framework == "vllm-omni": + if "cache-backend" in serve_args_dict: + return str(serve_args_dict["cache-backend"]) + return "disabled" + + +def _to_offload_string(framework: str, serve_args_dict: dict[str, Any]) -> str: + selected: list[str] = [] + if framework == "vllm-omni": + offload_keys = [ + "enable-cpu-offload", + "enable-layerwise-offload", + ] + for key in offload_keys: + if key in serve_args_dict: + selected.append(key) + return f"enabled({';'.join(selected)})" if selected else "disabled" + + +def _to_compile_value(framework: str, serve_args_dict: dict[str, Any]) -> str: + if framework == "vllm-omni": + if "enforce-eager" in serve_args_dict: + return "disabled" + return "enabled" + return "disabled" + + +def _to_quantization_value(framework: str, serve_args_dict: dict[str, Any]) -> str: + if framework == "vllm-omni": + quant = serve_args_dict.get("quantization") + return str(quant) if quant else "disabled" + return "disabled" + + def _unique_server_params(configs: list[dict[str, Any]]) -> list[dict[str, Any]]: """Return one server-config dict per unique test_name.""" seen: set[str] = set() @@ -310,12 +401,14 @@ def _unique_server_params(configs: list[dict[str, Any]]) -> list[dict[str, Any]] seen.add(test_name) if cfg.get("server_type", "vllm-omni") != "vllm-omni": raise ValueError(f"Unsupported server_type in config: {cfg.get('server_type')}") + serve_args_dict = cfg["server_params"].get("serve_args", {}) result.append( { "test_name": test_name, "server_type": "vllm-omni", "model": cfg["server_params"]["model"], - "serve_args": _build_serve_args(cfg["server_params"].get("serve_args", {})), + "serve_args_dict": serve_args_dict, + "serve_args": _build_serve_args(serve_args_dict), "benchmark_backend": "vllm-omni", "server_params": cfg["server_params"], } @@ -334,9 +427,7 @@ def _test_param_mapping(configs: list[dict[str, Any]]) -> dict[str, list[dict]]: def _make_server(server_cfg: dict[str, Any]) -> DiffusionServer: """Factory: return a vLLM-Omni diffusion server instance for the config.""" - model = server_cfg["model"] - serve_args = server_cfg["serve_args"] - return DiffusionServer(model=model, serve_args=serve_args) + return DiffusionServer(server_cfg=server_cfg) # --------------------------------------------------------------------------- @@ -364,7 +455,6 @@ def diffusion_server(request): print(f"\nStarting {server_type} server for test: {test_name}") with _make_server(server_cfg) as server: server.test_name = test_name - server.server_params = server_cfg["server_params"] print(f"{server_type} server started successfully") yield server print(f"{server_type} server stopping…") @@ -402,16 +492,18 @@ def run_benchmark( params: dict[str, Any], test_name: str, backend: str = "vllm-omni", - server_params: dict[str, Any] | None = None, + server_cfg: dict[str, Any] | None = None, + source_file: str = "", ) -> dict[str, Any]: """Run diffusion_benchmark_serving.py as a subprocess and return parsed metrics. The raw metrics are written to a temporary file by the subprocess. After the run completes the metrics are merged with full metadata (test_name, - backend, benchmark_params, timestamp) and appended to the session-wide - aggregated JSON file (AGGREGATED_RESULT_FILE). The temporary file is - removed afterwards. Subprocess stdout/stderr are tee'd to a .log file - under BENCHMARK_RESULT_DIR/logs/; its path is stored in the record. + backend, benchmark_params, timestamp, flat reporting fields) and appended + to the session-wide aggregated JSON file (AGGREGATED_RESULT_FILE). The + temporary file is removed afterwards. Subprocess stdout/stderr are tee'd + to a .log file under BENCHMARK_RESULT_DIR/logs/; its path is stored in + the record. """ timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") @@ -495,14 +587,55 @@ def run_benchmark( finally: tmp_result_file.unlink(missing_ok=True) + server_cfg = server_cfg or {} + serve_args_dict = server_cfg.get("serve_args_dict", {}) + if not isinstance(serve_args_dict, dict): + serve_args_dict = {} + + completed = metrics.get("completed_requests", metrics.get("completed", 0)) + failed = metrics.get("failed_requests", metrics.get("failed", 0)) + record: dict[str, Any] = { "test_name": test_name, "backend": backend, "timestamp": timestamp, - "server_params": server_params, + "server_params": server_cfg.get("server_params"), "benchmark_params": params, "result": metrics, "log_file": str(log_file), + "Model": model, + "Framework": backend, + "Hardware": "", + "Deployment": "", + "Task": params.get("task", "t2i"), + "Dataset": params.get("dataset", "random"), + "resolution": _to_resolution_string(params), + "Parallelism": _to_parallelism_string(backend, serve_args_dict), + "max_concurrency": params.get("max-concurrency", ""), + "Cache": _to_cache_string(backend, serve_args_dict), + "Quantization": _to_quantization_value(backend, serve_args_dict), + "offload": _to_offload_string(backend, serve_args_dict), + "compile": _to_compile_value(backend, serve_args_dict), + "Attn_backend": os.environ.get("DIFFUSION_ATTENTION_BACKEND", ""), + "num_inference_steps": params.get("num-inference-steps", ""), + "completed": completed, + "failed": failed, + "throughput_qps": metrics.get("throughput_qps"), + "latency_mean": metrics.get("latency_mean"), + "latency_median": metrics.get("latency_median"), + "latency_p99": metrics.get("latency_p99"), + "latency_p95": metrics.get("latency_p95"), + "latency_p50": metrics.get("latency_p50"), + "peak_memory_mb_max": metrics.get("peak_memory_mb_max"), + "peak_memory_mb_mean": metrics.get("peak_memory_mb_mean"), + "peak_memory_mb_median": metrics.get("peak_memory_mb_median"), + "stage_durations_mean": metrics.get("stage_durations_mean"), + "stage_durations_p50": metrics.get("stage_durations_p50"), + "stage_durations_p99": metrics.get("stage_durations_p99"), + "commit_sha": _get_branchpoint_commit_sha(), + "build_id": os.environ.get("BUILDKITE_BUILD_ID", ""), + "build_url": os.environ.get("BUILDKITE_BUILD_URL", ""), + "source_file": source_file, } _append_to_aggregated_file(record) print(f"\n Result appended to: {AGGREGATED_RESULT_FILE}") @@ -565,7 +698,8 @@ def test_diffusion_performance_benchmark(diffusion_server, benchmark_params): params=params, test_name=test_name, backend=backend, - server_params=diffusion_server.server_params, + server_cfg=getattr(diffusion_server, "server_cfg", {}), + source_file=cast(str, CONFIG_FILE_PATH), ) print(f"\n{'=' * 60}") diff --git a/tests/dfx/perf/tests/test_qwen_image_edit_2509_vllm_omni.json b/tests/dfx/perf/tests/test_qwen_image_edit_2509_vllm_omni.json new file mode 100644 index 00000000000..7d1fbbfa704 --- /dev/null +++ b/tests/dfx/perf/tests/test_qwen_image_edit_2509_vllm_omni.json @@ -0,0 +1,167 @@ +[ + { + "test_name": "test_qwen_image_edit_2509_single_device", + "description": "Single-device baseline (two input images)", + "server_type": "vllm-omni", + "server_params": { + "model": "Qwen/Qwen-Image-Edit-2509", + "serve_args": { + "enable-diffusion-pipeline-profiler": true + } + }, + "benchmark_params": [ + { + "name": "512x512_steps20_i2i_2img", + "dataset": "random", + "task": "i2i", + "width": 512, + "height": 512, + "num-inference-steps": 20, + "num-prompts": 10, + "max-concurrency": 1, + "num-input-images": 2, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.05, + "latency_mean": 18, + "peak_memory_mb_max": 78500, + "peak_memory_mb_mean": 78500 + } + }, + { + "name": "1536x1536_steps35_i2i_2img", + "dataset": "random", + "task": "i2i", + "width": 1536, + "height": 1536, + "num-inference-steps": 35, + "num-prompts": 10, + "max-concurrency": 1, + "num-input-images": 2, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.01, + "latency_mean": 70, + "peak_memory_mb_max": 81000, + "peak_memory_mb_mean": 81000 + } + } + ] + }, + { + "test_name": "test_qwen_image_edit_2509_ulysses2_cfg2_vae_patch4", + "description": "Ulysses SP=2 + CFG=2 + VAE patch parallel=4", + "server_type": "vllm-omni", + "server_params": { + "model": "Qwen/Qwen-Image-Edit-2509", + "serve_args": { + "ulysses-degree": 2, + "cfg-parallel-size": 2, + "vae-patch-parallel-size": 4, + "vae-use-tiling": true, + "enable-diffusion-pipeline-profiler": true + } + }, + "benchmark_params": [ + { + "name": "512x512_steps20_i2i_2img", + "dataset": "random", + "task": "i2i", + "width": 512, + "height": 512, + "num-inference-steps": 20, + "num-prompts": 10, + "max-concurrency": 1, + "num-input-images": 2, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.1, + "latency_mean": 12, + "peak_memory_mb_max": 69000, + "peak_memory_mb_mean": 69000 + } + }, + { + "name": "1536x1536_steps35_i2i_2img", + "dataset": "random", + "task": "i2i", + "width": 1536, + "height": 1536, + "num-inference-steps": 35, + "num-prompts": 10, + "max-concurrency": 1, + "num-input-images": 2, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.03, + "latency_mean": 28, + "peak_memory_mb_max": 69000, + "peak_memory_mb_mean": 69000 + } + } + ] + }, + { + "test_name": "test_qwen_image_edit_2509_ulysses2_cfg2_cache_dit", + "description": "Ulysses SP=2 + CFG=2 + CacheDiT", + "server_type": "vllm-omni", + "server_params": { + "model": "Qwen/Qwen-Image-Edit-2509", + "serve_args": { + "ulysses-degree": 2, + "cfg-parallel-size": 2, + "cache-backend": "cache_dit", + "cache-config": { + "Fn_compute_blocks": 1, + "Bn_compute_blocks": 0, + "max_warmup_steps": 4, + "residual_diff_threshold": 0.24, + "max_continuous_cached_steps": 3, + "enable_taylorseer": false, + "taylorseer_order": 1, + "scm_steps_mask_policy": null, + "scm_steps_policy": "dynamic" + }, + "enable-diffusion-pipeline-profiler": true + } + }, + "benchmark_params": [ + { + "name": "512x512_steps20_i2i_2img", + "dataset": "random", + "task": "i2i", + "width": 512, + "height": 512, + "num-inference-steps": 20, + "num-prompts": 10, + "max-concurrency": 1, + "num-input-images": 2, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.10, + "latency_mean": 12, + "peak_memory_mb_max": 73000, + "peak_memory_mb_mean": 73000 + } + }, + { + "name": "1536x1536_steps35_i2i_2img", + "dataset": "random", + "task": "i2i", + "width": 1536, + "height": 1536, + "num-inference-steps": 35, + "num-prompts": 10, + "max-concurrency": 1, + "num-input-images": 2, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.05, + "latency_mean": 20, + "peak_memory_mb_max": 81000, + "peak_memory_mb_mean": 81000 + } + } + ] + } +] diff --git a/tests/dfx/perf/tests/test_qwen_image_edit_vllm_omni.json b/tests/dfx/perf/tests/test_qwen_image_edit_vllm_omni.json new file mode 100644 index 00000000000..f68201db5f5 --- /dev/null +++ b/tests/dfx/perf/tests/test_qwen_image_edit_vllm_omni.json @@ -0,0 +1,161 @@ +[ + { + "test_name": "test_qwen_image_edit_single_device", + "description": "Single-device baseline", + "server_type": "vllm-omni", + "server_params": { + "model": "Qwen/Qwen-Image-Edit", + "serve_args": { + "enable-diffusion-pipeline-profiler": true + } + }, + "benchmark_params": [ + { + "name": "512x512_steps20_i2i", + "dataset": "random", + "task": "i2i", + "width": 512, + "height": 512, + "num-inference-steps": 20, + "num-prompts": 10, + "max-concurrency": 1, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.05, + "latency_mean": 15.0, + "peak_memory_mb_max": 72500, + "peak_memory_mb_mean": 72500 + } + }, + { + "name": "1536x1536_steps35_i2i", + "dataset": "random", + "task": "i2i", + "width": 1536, + "height": 1536, + "num-inference-steps": 35, + "num-prompts": 10, + "max-concurrency": 1, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.01, + "latency_mean": 65.6, + "peak_memory_mb_max": 80777, + "peak_memory_mb_mean": 80777 + } + } + ] + }, + { + "test_name": "test_qwen_image_edit_ulysses2_cfg2_vae_patch4", + "description": "Ulysses SP=2 + CFG=2 + VAE patch parallel=4", + "server_type": "vllm-omni", + "server_params": { + "model": "Qwen/Qwen-Image-Edit", + "serve_args": { + "ulysses-degree": 2, + "cfg-parallel-size": 2, + "vae-patch-parallel-size": 4, + "vae-use-tiling": true, + "enable-diffusion-pipeline-profiler": true + } + }, + "benchmark_params": [ + { + "name": "512x512_steps20_i2i", + "dataset": "random", + "task": "i2i", + "width": 512, + "height": 512, + "num-inference-steps": 20, + "num-prompts": 10, + "max-concurrency": 1, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.10, + "latency_mean": 7.2, + "peak_memory_mb_max": 68100, + "peak_memory_mb_mean": 68100 + } + }, + { + "name": "1536x1536_steps35_i2i", + "dataset": "random", + "task": "i2i", + "width": 1536, + "height": 1536, + "num-inference-steps": 35, + "num-prompts": 10, + "max-concurrency": 1, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.03, + "latency_mean": 24.0, + "peak_memory_mb_max": 68100, + "peak_memory_mb_mean": 68100 + } + } + ] + }, + { + "test_name": "test_qwen_image_edit_ulysses2_cfg2_cache_dit", + "description": "Ulysses SP=2 + CFG=2 + CacheDiT", + "server_type": "vllm-omni", + "server_params": { + "model": "Qwen/Qwen-Image-Edit", + "serve_args": { + "ulysses-degree": 2, + "cfg-parallel-size": 2, + "cache-backend": "cache_dit", + "cache-config": { + "Fn_compute_blocks": 1, + "Bn_compute_blocks": 0, + "max_warmup_steps": 4, + "residual_diff_threshold": 0.24, + "max_continuous_cached_steps": 3, + "enable_taylorseer": false, + "taylorseer_order": 1, + "scm_steps_mask_policy": null, + "scm_steps_policy": "dynamic" + }, + "enable-diffusion-pipeline-profiler": true + } + }, + "benchmark_params": [ + { + "name": "512x512_steps20_i2i", + "dataset": "random", + "task": "i2i", + "width": 512, + "height": 512, + "num-inference-steps": 20, + "num-prompts": 10, + "max-concurrency": 1, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.1, + "latency_mean": 6.5, + "peak_memory_mb_max": 72600, + "peak_memory_mb_mean": 72600 + } + }, + { + "name": "1536x1536_steps35_i2i", + "dataset": "random", + "task": "i2i", + "width": 1536, + "height": 1536, + "num-inference-steps": 35, + "num-prompts": 10, + "max-concurrency": 1, + "enable-negative-prompt": true, + "baseline": { + "throughput_qps": 0.05, + "latency_mean": 16.0, + "peak_memory_mb_max": 81000, + "peak_memory_mb_mean": 81000 + } + } + ] + } +] diff --git a/tests/dfx/perf/tests/test_qwen_image_vllm_omni.json b/tests/dfx/perf/tests/test_qwen_image_vllm_omni.json index 387e874ad5f..0667ef523c8 100644 --- a/tests/dfx/perf/tests/test_qwen_image_vllm_omni.json +++ b/tests/dfx/perf/tests/test_qwen_image_vllm_omni.json @@ -44,7 +44,6 @@ } ] }, - { "test_name": "test_qwen_image_ulysses2_cfg2_vae_patch4", "description": "Ulysses SP=2 + CFG-parallel=2 + VAE Patch Parallel=4", @@ -94,7 +93,6 @@ } ] }, - { "test_name": "test_qwen_image_ulysses2_cfg2_cache_dit", "description": "Ulysses SP=2 + CFG-parallel=2 + CacheDiT acceleration", diff --git a/tools/nightly/generate_nightly_perf_excel.py b/tools/nightly/generate_nightly_perf_excel.py index 817f37f664e..5f9eb428bca 100644 --- a/tools/nightly/generate_nightly_perf_excel.py +++ b/tools/nightly/generate_nightly_perf_excel.py @@ -23,6 +23,22 @@ GREY_BLOCK_FILL = PatternFill(start_color="D3D3D3", fill_type="solid") # Diffusion sheet columns (Qwen-Image diffusion benchmark). +# Per-stage latency metrics. Unpack from stage_durations_mean/p50/p99 dicts +DIFFUSION_STAGE_LATENCY_COLUMNS: tuple[str, ...] = ( + # "vae.encode_mean", + # "vae.encode_p50", + # "vae.encode_p99", + "vae.decode_mean", + "vae.decode_p50", + "vae.decode_p99", + "diffuse_mean", + "diffuse_p50", + "diffuse_p99", + "text_encoder.forward_mean", + "text_encoder.forward_p50", + "text_encoder.forward_p99", +) + DIFFUSION_BENCHMARK_COLUMNS: tuple[str, ...] = ( "duration", "completed_requests", @@ -36,7 +52,7 @@ "peak_memory_mb_mean", "peak_memory_mb_median", "slo_attainment_rate", -) +) + DIFFUSION_STAGE_LATENCY_COLUMNS DIFFUSION_NUMERIC_FORMAT_COLUMNS: tuple[str, ...] = DIFFUSION_BENCHMARK_COLUMNS @@ -63,7 +79,7 @@ "build_id", "build_url", "source_file", -) +) + DIFFUSION_STAGE_LATENCY_COLUMNS # Benchmark metric columns: grey the latest row's cell when value changed vs previous date. BENCHMARK_COLUMNS: tuple[str, ...] = ( @@ -106,7 +122,7 @@ _COLUMNS_FILENAME = "nightly_perf_summary_columns.txt" _RESULT_JSON_PREFIX = "result_test_" -_DIFFUSION_JSON_PREFIX = "diffusion_perf_" +_DIFFUSION_RESULT_PREFIX = "diffusion_result_" DEFAULT_INPUT_DIR = os.getenv("DEFAULT_INPUT_DIR") if os.getenv("DEFAULT_INPUT_DIR") else "tests" DEFAULT_OUTPUT_DIR = os.getenv("DEFAULT_OUTPUT_DIR") if os.getenv("DEFAULT_OUTPUT_DIR") else "tests" DEFAULT_DIFFUSION_INPUT_DIR = os.getenv("DIFFUSION_BENCHMARK_DIR") @@ -252,7 +268,7 @@ def parse_args() -> argparse.Namespace: type=str, default=None, help=( - "Directory containing diffusion_perf_*.json files; default is " + "Directory containing diffusion_result_*.json files; default is " "DIFFUSION_BENCHMARK_DIR, fallback to --input-dir." ), ) @@ -286,7 +302,7 @@ def parse_args() -> argparse.Namespace: return parser.parse_args() -def _load_json_file(path: str) -> dict[str, Any] | None: +def _load_json_file(path: str) -> dict[str, Any] | list[Any] | None: """Safely load a single JSON file; return None and log a warning on failure.""" try: with open(path, encoding="utf-8") as f: @@ -295,8 +311,8 @@ def _load_json_file(path: str) -> dict[str, Any] | None: LOGGER.warning("failed to load json '%s': %s", path, exc) return None - if not isinstance(data, dict): - LOGGER.warning("json root in '%s' is not an object, skip", path) + if not isinstance(data, (dict, list)): + LOGGER.warning("json root in '%s' is not a dict or list, skip", path) return None return data @@ -396,27 +412,29 @@ def _iter_omni_json_records(input_dir: str) -> Iterable[dict[str, Any]]: yield record -def _parse_diffusion_from_filename(filename: str) -> dict[str, Any]: - """Parse diffusion test_name/date from filename: diffusion_perf__.json""" +def _parse_diffusion_result_from_filename(filename: str) -> dict[str, Any]: + """Parse test_name/date from filename: diffusion_result__.json""" name, ext = os.path.splitext(filename) - if ext != ".json" or not name.startswith(_DIFFUSION_JSON_PREFIX): + if ext != ".json" or not name.startswith(_DIFFUSION_RESULT_PREFIX): return {} - core = name[len(_DIFFUSION_JSON_PREFIX) :] + core = name[len(_DIFFUSION_RESULT_PREFIX) :] parts = core.split("_") if len(parts) < 2: return {} timestamp = parts[-1] - test_name = "_".join(parts[:-1]) if parts[:-1] else "" parsed: dict[str, Any] = {} if len(timestamp) >= 15: parsed["date"] = timestamp - if test_name: - parsed["test_name"] = test_name return parsed -def _iter_diffusion_json_records(input_dir: str) -> Iterable[dict[str, Any]]: - """Iterate over diffusion_perf_*.json files and yield normalized diffusion records.""" +def _iter_diffusion_records(input_dir: str) -> Iterable[dict[str, Any]]: + """Iterate over diffusion_result_*.json files and yield normalized records. + + Unlike omni format where each JSON file contains one test case, diffusion format + produces a single JSON file containing a list of all test case records. + Test params (feature toggles) are NOT embedded in the filename. + """ if not os.path.isdir(input_dir): LOGGER.warning("diffusion input dir '%s' does not exist or is not a directory", input_dir) return @@ -424,7 +442,7 @@ def _iter_diffusion_json_records(input_dir: str) -> Iterable[dict[str, Any]]: for entry in sorted(os.listdir(input_dir)): if not entry.endswith(".json"): continue - if not entry.startswith(_DIFFUSION_JSON_PREFIX): + if not entry.startswith(_DIFFUSION_RESULT_PREFIX): continue full_path = os.path.join(input_dir, entry) if not os.path.isfile(full_path): @@ -434,23 +452,63 @@ def _iter_diffusion_json_records(input_dir: str) -> Iterable[dict[str, Any]]: if data is None: continue - record: dict[str, Any] = dict(data) - filename_meta = _parse_diffusion_from_filename(os.path.basename(full_path)) - if "date" not in record or not record.get("date"): - record["date"] = filename_meta.get("date") or datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") - if "test_name" not in record or not record.get("test_name"): - if "test_name" in filename_meta: - record["test_name"] = filename_meta["test_name"] - record["source_file"] = os.path.basename(full_path) - yield record + filename_meta = _parse_diffusion_result_from_filename(os.path.basename(full_path)) + if not isinstance(data, list): + LOGGER.warning("diffusion result file '%s' root is not a list, skip", full_path) + continue -def _collect_records(input_dir: str) -> list[dict[str, Any]]: + for record in data: + if not isinstance(record, dict): + continue + record = dict(record) + if "date" not in record or not record.get("date"): + record["date"] = filename_meta.get("date") or datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") + record["source_file"] = os.path.basename(full_path) + yield record + + +def _collect_omni_records(input_dir: str) -> list[dict[str, Any]]: return list(_iter_omni_json_records(input_dir)) def _collect_diffusion_records(diffusion_input_dir: str) -> list[dict[str, Any]]: - return list(_iter_diffusion_json_records(diffusion_input_dir)) + """Collect diffusion records from diffusion_result_*.json files. + Their format is different from omni JSON files. + """ + return [_process_diffusion_record(r) for r in _iter_diffusion_records(diffusion_input_dir)] + + +def _flatten_stage_durations(record: dict[str, Any]) -> dict[str, Any]: + """Flatten stage_durations dict into individual columns matching DIFFUSION_STAGE_LATENCY_COLUMNS.""" + result = dict(record) + + for prefix in ("stage_durations_mean", "stage_durations_p50", "stage_durations_p99"): + durations = result.pop(prefix, None) + if not isinstance(durations, dict): + continue + + suffix = prefix.replace("stage_durations_", "") # "mean", "p50", "p99" + + for stage_key, value in durations.items(): # e.g., "SomePipeline.vae.decode_mean": 100.0 + stage_key = stage_key.split(".", 1)[-1] # "decode_mean" + col_name = f"{stage_key}_{suffix}" + if col_name not in DIFFUSION_STAGE_LATENCY_COLUMNS: + print(f"skipping stage_key: {col_name}") + continue + result[col_name] = value + + return result + + +def _process_diffusion_record(record: dict[str, Any]) -> dict[str, Any]: + """Normalize a diffusion record by merging `result` and flattening stage metrics.""" + flat = record.copy() + flat.update(flat.pop("result", {})) + flat = _flatten_stage_durations(flat) + flat.pop("benchmark_params", None) + flat.pop("server_params", None) + return flat def _apply_build_metadata_to_latest_only( @@ -493,7 +551,7 @@ def _apply_build_metadata_to_latest_only( def _sort_records_for_summary(records: list[dict[str, Any]]) -> list[dict[str, Any]]: """Sort so that same test configuration is grouped, newest date first within each group.""" - by_date_desc = sorted(records, key=lambda r: (r.get("date") or ""), reverse=True) + by_date_desc = sorted(records, key=lambda r: r.get("date") or "", reverse=True) return sorted( by_date_desc, key=_omni_group_key, @@ -501,7 +559,7 @@ def _sort_records_for_summary(records: list[dict[str, Any]]) -> list[dict[str, A def _sort_diffusion_records_for_summary(records: list[dict[str, Any]]) -> list[dict[str, Any]]: - by_date_desc = sorted(records, key=lambda r: (r.get("date") or ""), reverse=True) + by_date_desc = sorted(records, key=lambda r: r.get("date") or "", reverse=True) return sorted(by_date_desc, key=_diffusion_group_key) @@ -678,7 +736,7 @@ def generate_excel_report( script_dir = os.path.dirname(os.path.abspath(__file__)) omni_summary_columns = _ensure_omni_summary_columns(_load_summary_columns(script_dir)) - omni_records = _collect_records(input_dir) + omni_records = _collect_omni_records(input_dir) diffusion_records = _collect_diffusion_records(diffusion_input_dir) if not omni_records: