From 26a4e8d8d6a6f2724ab23b3a4b58460640227c25 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Fri, 6 Mar 2026 10:14:45 +0000 Subject: [PATCH 01/15] add time cost log for different stages Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 20 ++++++-- .../models/wan2_2/pipeline_wan2_2.py | 36 +++++++++++++-- .../models/wan2_2/pipeline_wan2_2_i2v.py | 46 ++++++++++++++++--- vllm_omni/diffusion/scheduler.py | 14 +++++- .../diffusion/worker/diffusion_worker.py | 7 +++ vllm_omni/entrypoints/async_omni.py | 19 ++++++++ vllm_omni/entrypoints/omni_diffusion.py | 10 +++- vllm_omni/entrypoints/omni_stage.py | 22 +++++++-- vllm_omni/entrypoints/openai/serving_video.py | 3 ++ 9 files changed, 155 insertions(+), 22 deletions(-) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index 066ba2bb8bd..78c80ad9796 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -67,14 +67,19 @@ def __init__(self, od_config: OmniDiffusionConfig): raise e def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: + _step_t0 = time.perf_counter() + # Apply pre-processing if available + preprocess_time = 0.0 if self.pre_process_func is not None: - preprocess_start_time = time.time() + preprocess_start_time = time.perf_counter() request = self.pre_process_func(request) - preprocess_time = time.time() - preprocess_start_time + preprocess_time = time.perf_counter() - preprocess_start_time logger.info(f"Pre-processing completed in {preprocess_time:.4f} seconds") + _generate_t0 = time.perf_counter() output = self.add_req_and_wait_for_response(request) + _generate_ms = (time.perf_counter() - _generate_t0) * 1000 if output.error: raise Exception(f"{output.error}") logger.info("Generation completed successfully.") @@ -92,15 +97,22 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: for i, prompt in enumerate(request.prompts) ] - postprocess_start_time = time.time() + postprocess_start_time = time.perf_counter() outputs = self.post_process_func(output.output) if self.post_process_func is not None else output.output audio_payload = None if isinstance(outputs, dict): audio_payload = outputs.get("audio") outputs = outputs.get("video", outputs) - postprocess_time = time.time() - postprocess_start_time + postprocess_time = time.perf_counter() - postprocess_start_time logger.info(f"Post-processing completed in {postprocess_time:.4f} seconds") + _step_total_ms = (time.perf_counter() - _step_t0) * 1000 + logger.info( + "DiffusionEngine.step breakdown: preprocess=%.2f ms, " + "add_req_and_wait=%.2f ms, postprocess=%.2f ms, total=%.2f ms", + preprocess_time * 1000, _generate_ms, postprocess_time * 1000, _step_total_ms, + ) + # Convert to OmniRequestOutput format # Ensure outputs is a list if not isinstance(outputs, list): diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 1de801c10de..9dd01136070 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -6,6 +6,7 @@ import json import logging import os +import time from collections.abc import Iterable from typing import Any, cast @@ -21,7 +22,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin +from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin, _is_rank_zero from vllm_omni.diffusion.models.schedulers import FlowUniPCMultistepScheduler from vllm_omni.diffusion.models.wan2_2.wan2_2_transformer import WanTransformer3DModel from vllm_omni.diffusion.request import OmniDiffusionRequest @@ -424,7 +425,10 @@ def forward( if generator is None and req.sampling_params.seed is not None: generator = torch.Generator(device=device).manual_seed(req.sampling_params.seed) - # Encode prompts + # Sync GPU before timing to ensure accurate measurements + current_omni_platform.synchronize() + _t_pipeline_start = time.perf_counter() + _t_text_enc_start = _t_pipeline_start if prompt_embeds is None: prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt=prompt, @@ -443,6 +447,8 @@ def forward( raise ValueError( "negative_prompt_embeds must be provided when prompt_embeds are given and guidance > 1." ) + current_omni_platform.synchronize() + _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 # Timesteps self.scheduler.set_timesteps(num_steps, device=device) @@ -452,7 +458,7 @@ def forward( if boundary_ratio is not None: boundary_timestep = boundary_ratio * self.scheduler.config.num_train_timesteps - # Handle I2V mode when expand_timesteps=True and image is provided + _t_latent_prep_start = time.perf_counter() multi_modal_data = req.prompts[0].get("multi_modal_data", {}) if not isinstance(req.prompts[0], str) else None raw_image = multi_modal_data.get("image", None) if multi_modal_data is not None else None if isinstance(raw_image, list): @@ -543,11 +549,13 @@ def forward( generator=generator, latents=req.sampling_params.latents, ) + current_omni_platform.synchronize() + _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 if attention_kwargs is None: attention_kwargs = {} - # Denoising + _t_denoise_start = time.perf_counter() with self.progress_bar(total=len(timesteps)) as pbar: for t in timesteps: self._current_timestep = t @@ -638,12 +646,14 @@ def forward( if current_omni_platform.is_available(): current_omni_platform.empty_cache() self._current_timestep = None + current_omni_platform.synchronize() + _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 # For I2V mode: blend final latents with condition if self.expand_timesteps and latent_condition is not None: latents = (1 - first_frame_mask) * latent_condition + first_frame_mask * latents - # Decode + _t_decode_start = time.perf_counter() if output_type == "latent": output = latents else: @@ -658,6 +668,22 @@ def forward( ) latents = latents / latents_std + latents_mean output = self.vae.decode(latents, return_dict=False)[0] + current_omni_platform.synchronize() + _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 + _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 + _t_stages_sum = _t_text_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms + + if _is_rank_zero(): + logger.info( + "Pipeline stage timing summary: " + "TextEncoding=%.2f ms, LatentPreparation=%.2f ms, " + "Denoising=%.2f ms (%d steps), Decoding=%.2f ms, " + "StagesSum=%.2f ms, PipelineWall=%.2f ms, Unaccounted=%.2f ms", + _t_text_enc_ms, _t_latent_prep_ms, + _t_denoise_ms, len(timesteps), _t_decode_ms, + _t_stages_sum, _t_pipeline_wall_ms, + _t_pipeline_wall_ms - _t_stages_sum, + ) return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index b673cd0fae4..98ebff63e15 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -5,6 +5,7 @@ import logging import os +import time from collections.abc import Iterable from typing import Any, cast @@ -22,7 +23,7 @@ from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader from vllm_omni.diffusion.models.interface import SupportImageInput -from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin +from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin, _is_rank_zero from vllm_omni.diffusion.models.schedulers import FlowUniPCMultistepScheduler from vllm_omni.diffusion.models.wan2_2.pipeline_wan2_2 import ( create_transformer_from_config, @@ -336,6 +337,9 @@ def forward( num_frames = req.sampling_params.num_frames or frame_num num_steps = req.sampling_params.num_inference_steps or num_inference_steps + # print("D--: num_frames", num_frames) + # print("D--: height x width =", height, "x", width) + # Respect per-request guidance_scale when explicitly provided. if req.sampling_params.guidance_scale_provided: guidance_scale = req.sampling_params.guidance_scale @@ -388,7 +392,10 @@ def forward( if generator is None and req.sampling_params.seed is not None: generator = torch.Generator(device=device).manual_seed(req.sampling_params.seed) - # Encode prompts + # Sync GPU before timing to ensure accurate measurements + current_omni_platform.synchronize() + _t_pipeline_start = time.perf_counter() + _t_text_enc_start = _t_pipeline_start if prompt_embeds is None: prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt=prompt, @@ -403,10 +410,12 @@ def forward( prompt_embeds = prompt_embeds.to(device=device, dtype=dtype) if negative_prompt_embeds is not None: negative_prompt_embeds = negative_prompt_embeds.to(device=device, dtype=dtype) + current_omni_platform.synchronize() + _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 batch_size = prompt_embeds.shape[0] - # Encode image embeddings (for Wan2.1-style with CLIP) + _t_img_enc_start = time.perf_counter() if self.has_image_encoder and self.transformer.config.image_dim is not None: if image_embeds is None: if last_image is None: @@ -417,6 +426,8 @@ def forward( image_embeds = image_embeds.to(dtype) else: image_embeds = None + current_omni_platform.synchronize() + _t_img_enc_ms = (time.perf_counter() - _t_img_enc_start) * 1000 # Timesteps self.scheduler.set_timesteps(num_steps, device=device) @@ -430,7 +441,7 @@ def forward( # Prepare latents (use out_channels=16 for VAE latent, not in_channels=36) num_channels_latents = self.transformer.config.out_channels - # Preprocess image for VAE + _t_latent_prep_start = time.perf_counter() from diffusers.video_processor import VideoProcessor video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial) @@ -464,11 +475,13 @@ def forward( latents=req.sampling_params.latents, last_image=last_image_tensor, ) + current_omni_platform.synchronize() + _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 if attention_kwargs is None: attention_kwargs = {} - # Denoising loop + _t_denoise_start = time.perf_counter() with self.progress_bar(total=len(timesteps)) as pbar: for t in timesteps: self._current_timestep = t @@ -527,6 +540,8 @@ def forward( cfg_normalize=False, ) + # print("D--: latent shape", latents.shape) + # Compute the previous noisy sample x_t -> x_t-1 with automatic CFG sync latents = self.scheduler_step_maybe_with_cfg(noise_pred, t, latents, do_true_cfg) @@ -537,12 +552,14 @@ def forward( if current_omni_platform.is_available(): current_omni_platform.empty_cache() self._current_timestep = None + current_omni_platform.synchronize() + _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 # For expand_timesteps mode, blend final latents with condition if self.expand_timesteps: latents = (1 - first_frame_mask) * condition + first_frame_mask * latents - # Decode + _t_decode_start = time.perf_counter() if output_type == "latent": output = latents else: @@ -557,6 +574,23 @@ def forward( ) latents = latents / latents_std + latents_mean output = self.vae.decode(latents, return_dict=False)[0] + current_omni_platform.synchronize() + _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 + _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 + _t_stages_sum = _t_text_enc_ms + _t_img_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms + + if _is_rank_zero(): + logger.info( + "Pipeline stage timing summary: " + "TextEncoding=%.2f ms, ImageEncoding=%.2f ms, " + "LatentPreparation=%.2f ms, Denoising=%.2f ms (%d steps), " + "Decoding=%.2f ms, StagesSum=%.2f ms, PipelineWall=%.2f ms, " + "Unaccounted=%.2f ms", + _t_text_enc_ms, _t_img_enc_ms, + _t_latent_prep_ms, _t_denoise_ms, len(timesteps), + _t_decode_ms, _t_stages_sum, _t_pipeline_wall_ms, + _t_pipeline_wall_ms - _t_stages_sum, + ) return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/scheduler.py b/vllm_omni/diffusion/scheduler.py index c30d907aa3b..97693782e39 100644 --- a/vllm_omni/diffusion/scheduler.py +++ b/vllm_omni/diffusion/scheduler.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import threading +import time as _time import zmq from vllm.distributed.device_communicators.shm_broadcast import MessageQueue @@ -58,13 +59,24 @@ def add_req(self, request: OmniDiffusionRequest) -> DiffusionOutput: } # Broadcast RPC request to all workers + _t_broadcast = _time.perf_counter() self.mq.enqueue(rpc_request) - # Wait for result from Rank 0 (or whoever sends it) + _t_broadcast_ms = (_time.perf_counter() - _t_broadcast) * 1000 + logger.info("Hop1 scheduler→workers: mq.enqueue (broadcast request) took %.2f ms", _t_broadcast_ms) + # Wait for result from Rank 0 (or whoever sends it) if self.result_mq is None: raise RuntimeError("Result queue not initialized") + _t_dequeue = _time.perf_counter() output = self.result_mq.dequeue() + _t_dequeue_ms = (_time.perf_counter() - _t_dequeue) * 1000 + logger.info( + "Hop1 scheduler←worker: result_mq.dequeue took %.2f ms " + "(includes waiting for generation + enqueue serialization)", + _t_dequeue_ms, + ) + # {"status": "error", "error": str(e)} if isinstance(output, dict) and output.get("status") == "error": raise RuntimeError("worker error") diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index d4389f4d2bf..8118890bd7f 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -373,7 +373,14 @@ def _create_worker( def return_result(self, output: DiffusionOutput): """Reply to client, only on rank 0.""" if self.result_mq is not None: + import time as _time + _t0 = _time.perf_counter() self.result_mq.enqueue(output) + _t_ms = (_time.perf_counter() - _t0) * 1000 + logger.info( + "Hop1 worker→scheduler: result_mq.enqueue took %.2f ms (rank %s)", + _t_ms, self.gpu_id, + ) def recv_message(self): """Receive messages from broadcast queue.""" diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 6939987b04e..a86791fc8de 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -522,7 +522,26 @@ def _process_single_result( ) raise RuntimeError(result) + _t_load = time.perf_counter() engine_outputs = _load(result, obj_key="engine_outputs", shm_key="engine_outputs_shm") + _t_load_ms = (time.perf_counter() - _t_load) * 1000 + + _hop3 = result.get("_hop3_timing") + if _hop3: + logger.info( + "Hop3 stageWorker→orchestrator: " + "generate=%.2f ms, serialize(shm=%s)=%.2f ms, " + "orchestrator_deserialize=%.2f ms", + _hop3.get("generate_ms", 0), + _hop3.get("use_shm", "?"), + _hop3.get("serialize_ms", 0), + _t_load_ms, + ) + elif _t_load_ms > 10: + logger.info( + "Hop3 orchestrator←stageWorker: deserialize took %.2f ms (req %s, stage %s)", + _t_load_ms, req_id, stage_id, + ) if isinstance(engine_outputs, list): engine_outputs = engine_outputs[0] diff --git a/vllm_omni/entrypoints/omni_diffusion.py b/vllm_omni/entrypoints/omni_diffusion.py index 9937726e51f..59ddf2eeb17 100644 --- a/vllm_omni/entrypoints/omni_diffusion.py +++ b/vllm_omni/entrypoints/omni_diffusion.py @@ -1,9 +1,11 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import time import uuid from collections.abc import Sequence +from vllm.logger import init_logger from vllm.transformers_utils.config import get_hf_file_to_dict from vllm_omni.diffusion.data import OmniDiffusionConfig, TransformerConfig @@ -12,6 +14,8 @@ from vllm_omni.inputs.data import OmniDiffusionSamplingParams, OmniPromptType from vllm_omni.outputs import OmniRequestOutput +logger = init_logger(__name__) + class OmniDiffusion: """ @@ -108,6 +112,7 @@ def generate( sampling_params: OmniDiffusionSamplingParams, request_ids: list[str] = [], ) -> list[OmniRequestOutput]: + _t0 = time.perf_counter() if isinstance(prompts, (str, dict)): prompts = [prompts] else: @@ -118,7 +123,10 @@ def generate( request_ids.extend(f"{i + len(request_ids)}_{uuid.uuid4()}" for i in range(len(prompts) - len(request_ids))) request = OmniDiffusionRequest(prompts, sampling_params, request_ids) - return self._run_engine(request) + result = self._run_engine(request) + _t_ms = (time.perf_counter() - _t0) * 1000 + logger.info("OmniDiffusion.generate total: %.2f ms", _t_ms) + return result def _run_engine(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: return self.engine.step(request) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 098cfa15d88..de99bff714d 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -982,6 +982,7 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: try: _batch_seq += 1 gen_outputs: list[OmniRequestOutput | RequestOutput] = [] + _pre_gen_ms = (_time.time() - _recv_dequeue_ts) * 1000.0 _gen_t0 = _time.time() if stage_type == "diffusion": stage_engine = cast(OmniDiffusion, stage_engine) @@ -1007,7 +1008,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: gen_outputs.extend(results) _gen_t1 = _time.time() _gen_ms = (_gen_t1 - _gen_t0) * 1000.0 - logger.debug(f"Generate done: batch={len(batch_tasks)}, req_ids={batch_request_ids}, gen_ms={_gen_ms:.1f}") # Group outputs per request id with fallback req_to_outputs: dict[Any, list[Any]] = {rid: [] for rid in batch_request_ids} @@ -1029,6 +1029,7 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: _agg_total_gen_time_ms += _gen_ms # Emit per-request results + _post_gen_t0 = _time.time() for i, rid in enumerate(batch_request_ids): r_outputs = req_to_outputs.get(rid, []) _metrics = make_request_stats( @@ -1045,8 +1046,21 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: _metrics.stage_stats = make_stage_stats(_agg_total_tokens, _agg_total_gen_time_ms) else: _metrics.stage_stats = None + _t_serialize = _time.time() try: use_shm, payload = maybe_dump_to_shm(r_outputs, shm_threshold_bytes) + except Exception: + use_shm, payload = False, r_outputs + _t_serialize_ms = (_time.time() - _t_serialize) * 1000.0 + + _hop3_timing = { + "generate_ms": _gen_ms, + "serialize_ms": _t_serialize_ms, + "use_shm": use_shm, + } + + _t_enqueue = _time.time() + try: if use_shm: out_q.put( { @@ -1054,6 +1068,7 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "stage_id": stage_id, "engine_outputs_shm": payload, "metrics": _metrics, + "_hop3_timing": _hop3_timing, } ) else: @@ -1063,6 +1078,7 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "stage_id": stage_id, "engine_outputs": payload, "metrics": _metrics, + "_hop3_timing": _hop3_timing, } ) except Exception: @@ -1074,10 +1090,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "metrics": _metrics, } ) - logger.debug( - "Enqueued result for request %s to downstream", - rid, - ) except Exception as e: logger.exception("Failed on batch %s: %s", batch_request_ids, e) _tb = traceback.format_exc() diff --git a/vllm_omni/entrypoints/openai/serving_video.py b/vllm_omni/entrypoints/openai/serving_video.py index f73df5ea498..b663eb52f03 100644 --- a/vllm_omni/entrypoints/openai/serving_video.py +++ b/vllm_omni/entrypoints/openai/serving_video.py @@ -141,6 +141,7 @@ async def generate_videos( ) result = await self._run_generation(prompt, gen_params, request_id, raw_request) + _t_encode_start = time.perf_counter() videos = self._extract_video_outputs(result) audios = self._extract_audio_outputs(result, expected_count=len(videos)) output_fps = fps or 24 @@ -161,6 +162,8 @@ async def generate_videos( ) for idx, video in enumerate(videos) ] + _t_encode_ms = (time.perf_counter() - _t_encode_start) * 1000 + logger.info("Video response encoding (MP4+base64): %.2f ms", _t_encode_ms) return VideoGenerationResponse(created=int(time.time()), data=video_data) def _resolve_model_name(self, raw_request: Request | None) -> str | None: From b3b70a8be4cd02a27706fc4da4845924a439c38d Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Fri, 6 Mar 2026 10:55:23 +0000 Subject: [PATCH 02/15] reduce hop3 overhead Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 5 +- .../models/wan2_2/pipeline_wan2_2.py | 10 +- .../models/wan2_2/pipeline_wan2_2_i2v.py | 11 +- .../diffusion/worker/diffusion_worker.py | 4 +- vllm_omni/entrypoints/async_omni.py | 105 +++++++++++++++++- vllm_omni/entrypoints/omni.py | 80 +++++++++++++ 6 files changed, 204 insertions(+), 11 deletions(-) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index 78c80ad9796..327f79e91e4 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -110,7 +110,10 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: logger.info( "DiffusionEngine.step breakdown: preprocess=%.2f ms, " "add_req_and_wait=%.2f ms, postprocess=%.2f ms, total=%.2f ms", - preprocess_time * 1000, _generate_ms, postprocess_time * 1000, _step_total_ms, + preprocess_time * 1000, + _generate_ms, + postprocess_time * 1000, + _step_total_ms, ) # Convert to OmniRequestOutput format diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 9dd01136070..89af9cae09c 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -679,9 +679,13 @@ def forward( "TextEncoding=%.2f ms, LatentPreparation=%.2f ms, " "Denoising=%.2f ms (%d steps), Decoding=%.2f ms, " "StagesSum=%.2f ms, PipelineWall=%.2f ms, Unaccounted=%.2f ms", - _t_text_enc_ms, _t_latent_prep_ms, - _t_denoise_ms, len(timesteps), _t_decode_ms, - _t_stages_sum, _t_pipeline_wall_ms, + _t_text_enc_ms, + _t_latent_prep_ms, + _t_denoise_ms, + len(timesteps), + _t_decode_ms, + _t_stages_sum, + _t_pipeline_wall_ms, _t_pipeline_wall_ms - _t_stages_sum, ) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 98ebff63e15..8f9a7010c03 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -586,9 +586,14 @@ def forward( "LatentPreparation=%.2f ms, Denoising=%.2f ms (%d steps), " "Decoding=%.2f ms, StagesSum=%.2f ms, PipelineWall=%.2f ms, " "Unaccounted=%.2f ms", - _t_text_enc_ms, _t_img_enc_ms, - _t_latent_prep_ms, _t_denoise_ms, len(timesteps), - _t_decode_ms, _t_stages_sum, _t_pipeline_wall_ms, + _t_text_enc_ms, + _t_img_enc_ms, + _t_latent_prep_ms, + _t_denoise_ms, + len(timesteps), + _t_decode_ms, + _t_stages_sum, + _t_pipeline_wall_ms, _t_pipeline_wall_ms - _t_stages_sum, ) diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index 8118890bd7f..f2dfccacf63 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -374,12 +374,14 @@ def return_result(self, output: DiffusionOutput): """Reply to client, only on rank 0.""" if self.result_mq is not None: import time as _time + _t0 = _time.perf_counter() self.result_mq.enqueue(output) _t_ms = (_time.perf_counter() - _t0) * 1000 logger.info( "Hop1 worker→scheduler: result_mq.enqueue took %.2f ms (rank %s)", - _t_ms, self.gpu_id, + _t_ms, + self.gpu_id, ) def recv_message(self): diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index a86791fc8de..3b0e1f61fbb 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -38,8 +38,13 @@ logger = init_logger(__name__) -def _weak_close_cleanup_async(stage_list, stage_in_queues, stage_out_queues, ray_pg, output_handler, zmq_ctx=None): +def _weak_close_cleanup_async(stage_list, stage_in_queues, stage_out_queues, ray_pg, output_handler, zmq_ctx=None, inline_engine=None): """Weak reference cleanup function for AsyncOmni instances.""" + if inline_engine is not None: + try: + inline_engine.close() + except Exception as e: + logger.warning("Failed to close inline diffusion engine: %s", e) if stage_list: for q in stage_in_queues: try: @@ -121,6 +126,7 @@ def __init__(self, model: str, **kwargs: dict[str, Any]) -> None: self._ray_pg, self.output_handler, self._zmq_ctx, + getattr(self, "_inline_engine", None), ) def _create_default_diffusion_stage_cfg(self, kwargs: dict[str, Any]) -> dict[str, Any]: @@ -297,6 +303,13 @@ async def generate( async with self._pause_cond: await self._pause_cond.wait_for(lambda: not self._paused) + if self._inline_diffusion: + async for output in self._generate_inline( + prompt, request_id, sampling_params_list, output_modalities + ): + yield output + return + logger.debug(f"[{self._name}] generate() called") try: # Start output handler on the first call to generate() @@ -388,6 +401,89 @@ async def generate( logger.info("[AsyncOrchestrator] Request %s aborted.", request_id) raise + async def _generate_inline( + self, + prompt: OmniPromptType, + request_id: str, + sampling_params_list: Sequence[OmniSamplingParams] | None = None, + output_modalities: list[str] | None = None, + ) -> AsyncGenerator[OmniRequestOutput, None]: + """Generate using inline diffusion engine (no stage worker subprocess). + + Eliminates Hop3 IPC overhead by running OmniDiffusion directly in the + orchestrator process. The blocking generate() call is offloaded to a + thread executor so the asyncio event loop remains responsive. + """ + _wall_start_ts = time.time() + + if sampling_params_list is None: + sampling_params_list = self.default_sampling_params_list + sp0 = sampling_params_list[0] + + stage = self.stage_list[0] + final_stage_id_for_e2e = 0 + + metrics = OrchestratorAggregator( + num_stages=1, + log_stats=self.log_stats, + wall_start_ts=_wall_start_ts, + final_stage_id_for_e2e=final_stage_id_for_e2e, + ) + metrics.stage_first_ts[0] = time.time() + + logger.info( + "[%s] Inline diffusion generate for request %s", + self._name, request_id, + ) + + try: + loop = asyncio.get_running_loop() + results = await loop.run_in_executor( + None, + self._inline_engine.generate, + prompt, + sp0, + [request_id], + ) + + for result in results: + images = getattr(result, "images", None) or [] + finished = getattr(result, "finished", True) + + output_to_yield = OmniRequestOutput( + stage_id=0, + final_output_type=stage.final_output_type, + request_output=result, + images=images, + finished=finished, + ) + + metrics.stage_last_ts[0] = time.time() + yield output_to_yield + + try: + metrics.on_finalize_request( + final_stage_id_for_e2e, request_id, _wall_start_ts, + ) + metrics.build_and_log_summary() + except Exception as e: + logger.exception( + "[%s] Failed to finalize inline metrics: %s", + self._name, e, + ) + + except (asyncio.CancelledError, GeneratorExit): + logger.info( + "[%s] Inline request %s cancelled.", self._name, request_id, + ) + raise + except Exception as e: + logger.exception( + "[%s] Inline diffusion failed for request %s: %s", + self._name, request_id, e, + ) + raise + async def _process_async_results( self, request_id: str, @@ -540,7 +636,9 @@ def _process_single_result( elif _t_load_ms > 10: logger.info( "Hop3 orchestrator←stageWorker: deserialize took %.2f ms (req %s, stage %s)", - _t_load_ms, req_id, stage_id, + _t_load_ms, + req_id, + stage_id, ) if isinstance(engine_outputs, list): engine_outputs = engine_outputs[0] @@ -649,7 +747,8 @@ async def output_handler(): @property def is_running(self) -> bool: - # Is None before the loop is started. + if self._inline_diffusion: + return self._inline_engine is not None return len(self._stage_in_queues) > 0 @property diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index 9ecc5f76236..543c48ac844 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -40,6 +40,7 @@ from vllm_omni.entrypoints.stage_utils import SHUTDOWN_TASK, OmniStageTaskType from vllm_omni.entrypoints.stage_utils import maybe_load_from_ipc as _load from vllm_omni.entrypoints.utils import ( + filter_dataclass_kwargs, get_final_stage_id_for_e2e, inject_omni_kv_config, load_and_resolve_stage_configs, @@ -304,6 +305,9 @@ def _resolve_stage_configs(self, model: str, kwargs: dict[str, Any]) -> tuple[st def _initialize_stages(self, model: str, kwargs: dict[str, Any]) -> None: """Initialize stage list management.""" + self._inline_diffusion = False + self._inline_engine = None + stage_init_timeout = kwargs.get("stage_init_timeout", 20) shm_threshold_bytes = kwargs.get("shm_threshold_bytes", 65536) init_timeout = kwargs.get("init_timeout", 300) @@ -351,6 +355,17 @@ def _build_stage(idx_cfg: tuple[int, Any]) -> tuple[int, OmniStage]: self.output_modalities = [st.final_output_type for st in self.stage_list] logger.info(f"[{self._name}] Loaded {len(self.stage_list)} stages") + # Phase 1 optimization: for a single diffusion stage in async mode, + # run the engine directly in the orchestrator process to eliminate + # the stage worker subprocess and its IPC serialization overhead. + if ( + len(self.stage_list) == 1 + and self.stage_list[0].stage_type == "diffusion" + and self.is_async + ): + self._init_inline_diffusion_engine(model, self.stage_configs[0], kwargs) + return + if self.worker_backend == "ray": self._queue_cls = get_ray_queue_class() else: @@ -363,6 +378,71 @@ def _build_stage(idx_cfg: tuple[int, Any]) -> tuple[int, OmniStage]: # Wait for all stages to report readiness before seeding self._wait_for_stages_ready(timeout=init_timeout) + def _init_inline_diffusion_engine( + self, + model: str, + stage_config: Any, + kwargs: dict[str, Any], + ) -> None: + """Initialize diffusion engine directly in the orchestrator process. + + For single-stage diffusion pipelines, this eliminates the stage worker + subprocess and the associated Hop3 IPC serialization overhead. + GPU workers for tensor parallelism are still spawned by the + DiffusionExecutor as separate processes. + """ + from vllm_omni.diffusion.data import OmniDiffusionConfig + from vllm_omni.entrypoints.omni_diffusion import OmniDiffusion + from vllm_omni.entrypoints.stage_utils import ( + _to_dict, + load_func_from_config, + set_stage_devices, + ) + + stage_id = stage_config.stage_id + engine_args = _to_dict(stage_config.engine_args) + runtime_cfg = _to_dict(getattr(stage_config, "runtime", {})) + + if os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn": + os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" + + try: + from vllm_omni.platforms import current_omni_platform + + device_type = current_omni_platform.device_type + set_stage_devices(stage_id, runtime_cfg.get("devices"), device_type=device_type) + except Exception as e: + logger.warning("Device setup for inline diffusion failed: %s", e) + + engine_args = filter_dataclass_kwargs(OmniDiffusionConfig, engine_args) + engine_args.pop("model_stage", None) + engine_args.pop("model", None) + + cfg_kv_collect_func = load_func_from_config( + getattr(stage_config, "cfg_kv_collect_func", None) + ) + + self._inline_engine = OmniDiffusion( + model=model, + stage_id=stage_id, + engine_input_source=getattr(stage_config, "engine_input_source", []), + cfg_kv_collect_func=cfg_kv_collect_func, + **engine_args, + ) + self._inline_diffusion = True + + # These attributes are normally set by AsyncOmni._wait_for_stages_ready + # but we skip that for inline mode. Set them to None since there is no + # LLM stage to provide them. + self.input_processor = None + self.io_processor = None + self.model_config = None + + logger.info( + "[%s] Inline diffusion mode active – stage worker subprocess bypassed", + self._name, + ) + def _is_async_chunk_enable(self, stage_args: list) -> bool: """get async chunk flag""" engine_args = getattr(stage_args[0], "engine_args", None) From 104d71c4cf42a1c3ada2f7ab218a2f7463fa1413 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Fri, 6 Mar 2026 11:13:23 +0000 Subject: [PATCH 03/15] perf: reduce IPC overhead for single-stage diffusion serving MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two optimizations that eliminate ~6.5s of IPC serialization overhead for single-stage diffusion pipelines (e.g. Wan2.2 I2V/T2V) in online serving mode: Phase 1 – Inline diffusion (eliminate Hop3): When there is exactly one diffusion stage in async mode, initialize OmniDiffusion directly in the orchestrator process instead of spawning a stage worker subprocess. This removes the entire Hop3 serialization path (pickle + mp.Queue/SHM) between the stage worker and orchestrator. GPU workers for tensor parallelism are still spawned by DiffusionExecutor. Phase 2 – SHM tensor transfer (optimize Hop1): Replace pickle-based serialization of large tensors through MessageQueue with POSIX shared memory. The worker copies tensor data into a named SHM segment and enqueues only lightweight metadata; the scheduler reconstructs the tensor from SHM. This reduces Hop1 overhead from ~3.4s to ~1.5s. Measured on Wan2.2-I2V-A14B (TP=2, 1280x720, 5s@16fps, 1 step): Before: e2e = 37.5s Phase 1: e2e = 33.1s (−4.4s) Phase 2: e2e = 31.0s (−2.1s) Total: e2e = 31.0s (−6.5s, −17.5%) Made-with: Cursor Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/data.py | 72 +++++++++++++++++++ vllm_omni/diffusion/scheduler.py | 14 +++- .../diffusion/worker/diffusion_worker.py | 17 ++++- vllm_omni/entrypoints/async_omni.py | 26 ++++--- vllm_omni/entrypoints/omni.py | 10 +-- 5 files changed, 116 insertions(+), 23 deletions(-) diff --git a/vllm_omni/diffusion/data.py b/vllm_omni/diffusion/data.py index da68adbdb17..30f761cfb62 100644 --- a/vllm_omni/diffusion/data.py +++ b/vllm_omni/diffusion/data.py @@ -622,6 +622,78 @@ class DiffusionOutput: # timings: Optional["RequestTimings"] = None +_SHM_TENSOR_THRESHOLD = 1_000_000 # 1 MB + + +def _tensor_to_shm(tensor: torch.Tensor) -> dict[str, Any]: + """Copy a tensor into POSIX shared memory and return a metadata handle. + + The shared memory segment remains alive after this call (the local fd is + closed, but the segment persists until ``_tensor_from_shm`` unlinks it). + """ + from multiprocessing import shared_memory + + import numpy as np + + tensor = tensor.detach().cpu().contiguous() + arr = tensor.numpy() + nbytes = arr.nbytes + shm = shared_memory.SharedMemory(create=True, size=nbytes) + shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf[:nbytes]) + np.copyto(shm_arr, arr) + handle = { + "__tensor_shm__": True, + "name": shm.name, + "shape": list(tensor.shape), + "torch_dtype": str(tensor.dtype), + "numpy_dtype": str(arr.dtype), + "nbytes": nbytes, + } + shm.close() + return handle + + +def _tensor_from_shm(handle: dict[str, Any]) -> torch.Tensor: + """Reconstruct a tensor from a shared-memory handle and free the segment.""" + from multiprocessing import shared_memory + + import numpy as np + + shm = shared_memory.SharedMemory(name=handle["name"]) + try: + np_dtype = np.dtype(handle["numpy_dtype"]) + arr = np.ndarray(handle["shape"], dtype=np_dtype, buffer=shm.buf[: handle["nbytes"]]) + tensor = torch.from_numpy(arr.copy()) + finally: + shm.close() + shm.unlink() + return tensor + + +def pack_diffusion_output_shm(output: "DiffusionOutput") -> "DiffusionOutput": + """Replace large tensors in *output* with shared-memory handles. + + The DiffusionOutput is modified **in-place** so that the (now lightweight) + object can be serialised cheaply through a MessageQueue. + """ + if output.output is not None and isinstance(output.output, torch.Tensor): + if output.output.nelement() * output.output.element_size() > _SHM_TENSOR_THRESHOLD: + output.output = _tensor_to_shm(output.output) + if output.trajectory_latents is not None and isinstance(output.trajectory_latents, torch.Tensor): + if output.trajectory_latents.nelement() * output.trajectory_latents.element_size() > _SHM_TENSOR_THRESHOLD: + output.trajectory_latents = _tensor_to_shm(output.trajectory_latents) + return output + + +def unpack_diffusion_output_shm(output: "DiffusionOutput") -> "DiffusionOutput": + """Reconstruct tensors from shared-memory handles produced by ``pack_diffusion_output_shm``.""" + if isinstance(output.output, dict) and output.output.get("__tensor_shm__"): + output.output = _tensor_from_shm(output.output) + if isinstance(output.trajectory_latents, dict) and output.trajectory_latents.get("__tensor_shm__"): + output.trajectory_latents = _tensor_from_shm(output.trajectory_latents) + return output + + class AttentionBackendEnum(enum.Enum): FA = enum.auto() SLIDING_TILE_ATTN = enum.auto() diff --git a/vllm_omni/diffusion/scheduler.py b/vllm_omni/diffusion/scheduler.py index 97693782e39..c4891d06e00 100644 --- a/vllm_omni/diffusion/scheduler.py +++ b/vllm_omni/diffusion/scheduler.py @@ -8,7 +8,7 @@ from vllm.distributed.device_communicators.shm_broadcast import MessageQueue from vllm.logger import init_logger -from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig +from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig, unpack_diffusion_output_shm from vllm_omni.diffusion.request import OmniDiffusionRequest logger = init_logger(__name__) @@ -71,10 +71,18 @@ def add_req(self, request: OmniDiffusionRequest) -> DiffusionOutput: _t_dequeue = _time.perf_counter() output = self.result_mq.dequeue() _t_dequeue_ms = (_time.perf_counter() - _t_dequeue) * 1000 + + _t_unpack = _time.perf_counter() + try: + unpack_diffusion_output_shm(output) + except Exception as e: + logger.warning("SHM unpack failed (data may already be inline): %s", e) + _t_unpack_ms = (_time.perf_counter() - _t_unpack) * 1000 + logger.info( - "Hop1 scheduler←worker: result_mq.dequeue took %.2f ms " - "(includes waiting for generation + enqueue serialization)", + "Hop1 scheduler←worker: mq.dequeue=%.2f ms, shm_unpack=%.2f ms (dequeue includes generation wait)", _t_dequeue_ms, + _t_unpack_ms, ) # {"status": "error", "error": str(e)} diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index f2dfccacf63..1ae974cf5bd 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -27,6 +27,7 @@ from vllm_omni.diffusion.data import ( DiffusionOutput, OmniDiffusionConfig, + pack_diffusion_output_shm, ) from vllm_omni.diffusion.distributed.parallel_state import ( destroy_distributed_env, @@ -376,11 +377,21 @@ def return_result(self, output: DiffusionOutput): import time as _time _t0 = _time.perf_counter() + try: + pack_diffusion_output_shm(output) + _t_pack = (_time.perf_counter() - _t0) * 1000 + except Exception as e: + logger.warning("SHM pack failed, falling back to raw enqueue: %s", e) + _t_pack = 0.0 + _t1 = _time.perf_counter() self.result_mq.enqueue(output) - _t_ms = (_time.perf_counter() - _t0) * 1000 + _t_enqueue = (_time.perf_counter() - _t1) * 1000 + _t_total = (_time.perf_counter() - _t0) * 1000 logger.info( - "Hop1 worker→scheduler: result_mq.enqueue took %.2f ms (rank %s)", - _t_ms, + "Hop1 worker→scheduler: shm_pack=%.2f ms, mq.enqueue=%.2f ms, total=%.2f ms (rank %s)", + _t_pack, + _t_enqueue, + _t_total, self.gpu_id, ) diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 3b0e1f61fbb..b3175d48946 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -38,7 +38,9 @@ logger = init_logger(__name__) -def _weak_close_cleanup_async(stage_list, stage_in_queues, stage_out_queues, ray_pg, output_handler, zmq_ctx=None, inline_engine=None): +def _weak_close_cleanup_async( + stage_list, stage_in_queues, stage_out_queues, ray_pg, output_handler, zmq_ctx=None, inline_engine=None +): """Weak reference cleanup function for AsyncOmni instances.""" if inline_engine is not None: try: @@ -304,9 +306,7 @@ async def generate( await self._pause_cond.wait_for(lambda: not self._paused) if self._inline_diffusion: - async for output in self._generate_inline( - prompt, request_id, sampling_params_list, output_modalities - ): + async for output in self._generate_inline(prompt, request_id, sampling_params_list, output_modalities): yield output return @@ -433,7 +433,8 @@ async def _generate_inline( logger.info( "[%s] Inline diffusion generate for request %s", - self._name, request_id, + self._name, + request_id, ) try: @@ -463,24 +464,31 @@ async def _generate_inline( try: metrics.on_finalize_request( - final_stage_id_for_e2e, request_id, _wall_start_ts, + final_stage_id_for_e2e, + request_id, + _wall_start_ts, ) metrics.build_and_log_summary() except Exception as e: logger.exception( "[%s] Failed to finalize inline metrics: %s", - self._name, e, + self._name, + e, ) except (asyncio.CancelledError, GeneratorExit): logger.info( - "[%s] Inline request %s cancelled.", self._name, request_id, + "[%s] Inline request %s cancelled.", + self._name, + request_id, ) raise except Exception as e: logger.exception( "[%s] Inline diffusion failed for request %s: %s", - self._name, request_id, e, + self._name, + request_id, + e, ) raise diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index 543c48ac844..db91cece323 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -358,11 +358,7 @@ def _build_stage(idx_cfg: tuple[int, Any]) -> tuple[int, OmniStage]: # Phase 1 optimization: for a single diffusion stage in async mode, # run the engine directly in the orchestrator process to eliminate # the stage worker subprocess and its IPC serialization overhead. - if ( - len(self.stage_list) == 1 - and self.stage_list[0].stage_type == "diffusion" - and self.is_async - ): + if len(self.stage_list) == 1 and self.stage_list[0].stage_type == "diffusion" and self.is_async: self._init_inline_diffusion_engine(model, self.stage_configs[0], kwargs) return @@ -418,9 +414,7 @@ def _init_inline_diffusion_engine( engine_args.pop("model_stage", None) engine_args.pop("model", None) - cfg_kv_collect_func = load_func_from_config( - getattr(stage_config, "cfg_kv_collect_func", None) - ) + cfg_kv_collect_func = load_func_from_config(getattr(stage_config, "cfg_kv_collect_func", None)) self._inline_engine = OmniDiffusion( model=model, From dd4468cbb298194682885b800fdf2902b19d02fa Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Fri, 6 Mar 2026 16:33:12 +0000 Subject: [PATCH 04/15] fix conflicts Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index 906d0a37667..b0bd92b56fb 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -106,14 +106,14 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: postprocess_time = time.perf_counter() - postprocess_start_time logger.info(f"Post-processing completed in {postprocess_time:.4f} seconds") - _step_total_ms = (time.perf_counter() - _step_t0) * 1000 + step_total_ms = (time.perf_counter() - diffusion_engine_start_time) * 1000 logger.info( "DiffusionEngine.step breakdown: preprocess=%.2f ms, " "add_req_and_wait=%.2f ms, postprocess=%.2f ms, total=%.2f ms", preprocess_time * 1000, - _generate_ms, + exec_total_time * 1000, postprocess_time * 1000, - _step_total_ms, + step_total_ms, ) # Convert to OmniRequestOutput format From ff62a1eba3be5fb9598014e50d5ee0b3350ce392 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 03:25:32 +0000 Subject: [PATCH 05/15] rm redundancy Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 8 ++++---- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py | 5 ----- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index b0bd92b56fb..ded29ca6418 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -67,7 +67,7 @@ def __init__(self, od_config: OmniDiffusionConfig): raise e def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: - diffusion_engine_start_time = time.time() + diffusion_engine_start_time = time.perf_counter() # Apply pre-processing if available preprocess_time = 0.0 if self.pre_process_func is not None: @@ -76,9 +76,9 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: preprocess_time = time.perf_counter() - preprocess_start_time logger.info(f"Pre-processing completed in {preprocess_time:.4f} seconds") - exec_start_time = time.time() + exec_start_time = time.perf_counter() output = self.add_req_and_wait_for_response(request) - exec_total_time = time.time() - exec_start_time + exec_total_time = time.perf_counter() - exec_start_time if output.error: raise Exception(f"{output.error}") @@ -123,7 +123,7 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: metrics = { "preprocess_time_ms": preprocess_time * 1000, - "diffusion_engine_exec_time_ms": (time.time() - diffusion_engine_start_time) * 1000, + "diffusion_engine_exec_time_ms": (time.perf_counter() - diffusion_engine_start_time) * 1000, "diffusion_engine_total_time_ms": exec_total_time * 1000, "image_num": int(request.sampling_params.num_outputs_per_prompt), "resolution": int(request.sampling_params.resolution), diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 8f9a7010c03..e7b526adb69 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -337,9 +337,6 @@ def forward( num_frames = req.sampling_params.num_frames or frame_num num_steps = req.sampling_params.num_inference_steps or num_inference_steps - # print("D--: num_frames", num_frames) - # print("D--: height x width =", height, "x", width) - # Respect per-request guidance_scale when explicitly provided. if req.sampling_params.guidance_scale_provided: guidance_scale = req.sampling_params.guidance_scale @@ -540,8 +537,6 @@ def forward( cfg_normalize=False, ) - # print("D--: latent shape", latents.shape) - # Compute the previous noisy sample x_t -> x_t-1 with automatic CFG sync latents = self.scheduler_step_maybe_with_cfg(noise_pred, t, latents, do_true_cfg) From 5414a428a677d9ce75e5d6a590a5b7a512b8b3a2 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 04:20:55 +0000 Subject: [PATCH 06/15] rm logs Signed-off-by: samithuang <285365963@qq.com> --- .../models/wan2_2/pipeline_wan2_2.py | 68 ++++++++------ .../models/wan2_2/pipeline_wan2_2_i2v.py | 92 +++++++++++-------- vllm_omni/diffusion/scheduler.py | 14 --- .../diffusion/worker/diffusion_worker.py | 15 --- vllm_omni/entrypoints/async_omni.py | 20 ---- vllm_omni/entrypoints/omni_stage.py | 13 --- 6 files changed, 93 insertions(+), 129 deletions(-) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 89af9cae09c..9bbedc32859 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -30,6 +30,7 @@ from vllm_omni.platforms import current_omni_platform logger = logging.getLogger(__name__) +DEBUG_PERF = False def retrieve_latents( @@ -425,10 +426,11 @@ def forward( if generator is None and req.sampling_params.seed is not None: generator = torch.Generator(device=device).manual_seed(req.sampling_params.seed) - # Sync GPU before timing to ensure accurate measurements - current_omni_platform.synchronize() - _t_pipeline_start = time.perf_counter() - _t_text_enc_start = _t_pipeline_start + if DEBUG_PERF: + # Sync GPU before timing to ensure accurate measurements + current_omni_platform.synchronize() + _t_pipeline_start = time.perf_counter() + _t_text_enc_start = _t_pipeline_start if prompt_embeds is None: prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt=prompt, @@ -447,8 +449,9 @@ def forward( raise ValueError( "negative_prompt_embeds must be provided when prompt_embeds are given and guidance > 1." ) - current_omni_platform.synchronize() - _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 # Timesteps self.scheduler.set_timesteps(num_steps, device=device) @@ -458,7 +461,8 @@ def forward( if boundary_ratio is not None: boundary_timestep = boundary_ratio * self.scheduler.config.num_train_timesteps - _t_latent_prep_start = time.perf_counter() + if DEBUG_PERF: + _t_latent_prep_start = time.perf_counter() multi_modal_data = req.prompts[0].get("multi_modal_data", {}) if not isinstance(req.prompts[0], str) else None raw_image = multi_modal_data.get("image", None) if multi_modal_data is not None else None if isinstance(raw_image, list): @@ -549,13 +553,15 @@ def forward( generator=generator, latents=req.sampling_params.latents, ) - current_omni_platform.synchronize() - _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 if attention_kwargs is None: attention_kwargs = {} - _t_denoise_start = time.perf_counter() + if DEBUG_PERF: + _t_denoise_start = time.perf_counter() with self.progress_bar(total=len(timesteps)) as pbar: for t in timesteps: self._current_timestep = t @@ -668,26 +674,28 @@ def forward( ) latents = latents / latents_std + latents_mean output = self.vae.decode(latents, return_dict=False)[0] - current_omni_platform.synchronize() - _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 - _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 - _t_stages_sum = _t_text_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms - - if _is_rank_zero(): - logger.info( - "Pipeline stage timing summary: " - "TextEncoding=%.2f ms, LatentPreparation=%.2f ms, " - "Denoising=%.2f ms (%d steps), Decoding=%.2f ms, " - "StagesSum=%.2f ms, PipelineWall=%.2f ms, Unaccounted=%.2f ms", - _t_text_enc_ms, - _t_latent_prep_ms, - _t_denoise_ms, - len(timesteps), - _t_decode_ms, - _t_stages_sum, - _t_pipeline_wall_ms, - _t_pipeline_wall_ms - _t_stages_sum, - ) + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 + _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 + _t_stages_sum = _t_text_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms + + if _is_rank_zero(): + logger.info( + "Pipeline stage timing summary: " + "TextEncoding=%.2f ms, LatentPreparation=%.2f ms, " + "Denoising=%.2f ms (%d steps), Decoding=%.2f ms, " + "StagesSum=%.2f ms, PipelineWall=%.2f ms, Unaccounted=%.2f ms", + _t_text_enc_ms, + _t_latent_prep_ms, + _t_denoise_ms, + len(timesteps), + _t_decode_ms, + _t_stages_sum, + _t_pipeline_wall_ms, + _t_pipeline_wall_ms - _t_stages_sum, + ) return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index e7b526adb69..33292987ff5 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -36,6 +36,8 @@ logger = logging.getLogger(__name__) +DEBUG_PERF = False + def _load_model_index(model: str, local_files_only: bool) -> dict: """Load model_index.json from local path or HF Hub.""" @@ -389,10 +391,12 @@ def forward( if generator is None and req.sampling_params.seed is not None: generator = torch.Generator(device=device).manual_seed(req.sampling_params.seed) - # Sync GPU before timing to ensure accurate measurements - current_omni_platform.synchronize() - _t_pipeline_start = time.perf_counter() - _t_text_enc_start = _t_pipeline_start + if DEBUG_PERF: + # Sync GPU before timing to ensure accurate measurements + current_omni_platform.synchronize() + _t_pipeline_start = time.perf_counter() + _t_text_enc_start = _t_pipeline_start + if prompt_embeds is None: prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt=prompt, @@ -407,8 +411,10 @@ def forward( prompt_embeds = prompt_embeds.to(device=device, dtype=dtype) if negative_prompt_embeds is not None: negative_prompt_embeds = negative_prompt_embeds.to(device=device, dtype=dtype) - current_omni_platform.synchronize() - _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 batch_size = prompt_embeds.shape[0] @@ -423,8 +429,10 @@ def forward( image_embeds = image_embeds.to(dtype) else: image_embeds = None - current_omni_platform.synchronize() - _t_img_enc_ms = (time.perf_counter() - _t_img_enc_start) * 1000 + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_img_enc_ms = (time.perf_counter() - _t_img_enc_start) * 1000 # Timesteps self.scheduler.set_timesteps(num_steps, device=device) @@ -438,7 +446,8 @@ def forward( # Prepare latents (use out_channels=16 for VAE latent, not in_channels=36) num_channels_latents = self.transformer.config.out_channels - _t_latent_prep_start = time.perf_counter() + if DEBUG_PERF: + _t_latent_prep_start = time.perf_counter() from diffusers.video_processor import VideoProcessor video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial) @@ -472,13 +481,16 @@ def forward( latents=req.sampling_params.latents, last_image=last_image_tensor, ) - current_omni_platform.synchronize() - _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 if attention_kwargs is None: attention_kwargs = {} - _t_denoise_start = time.perf_counter() + if DEBUG_PERF: + _t_denoise_start = time.perf_counter() with self.progress_bar(total=len(timesteps)) as pbar: for t in timesteps: self._current_timestep = t @@ -547,14 +559,18 @@ def forward( if current_omni_platform.is_available(): current_omni_platform.empty_cache() self._current_timestep = None - current_omni_platform.synchronize() - _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 # For expand_timesteps mode, blend final latents with condition if self.expand_timesteps: latents = (1 - first_frame_mask) * condition + first_frame_mask * latents - _t_decode_start = time.perf_counter() + if DEBUG_PERF: + _t_decode_start = time.perf_counter() + if output_type == "latent": output = latents else: @@ -569,28 +585,30 @@ def forward( ) latents = latents / latents_std + latents_mean output = self.vae.decode(latents, return_dict=False)[0] - current_omni_platform.synchronize() - _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 - _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 - _t_stages_sum = _t_text_enc_ms + _t_img_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms - - if _is_rank_zero(): - logger.info( - "Pipeline stage timing summary: " - "TextEncoding=%.2f ms, ImageEncoding=%.2f ms, " - "LatentPreparation=%.2f ms, Denoising=%.2f ms (%d steps), " - "Decoding=%.2f ms, StagesSum=%.2f ms, PipelineWall=%.2f ms, " - "Unaccounted=%.2f ms", - _t_text_enc_ms, - _t_img_enc_ms, - _t_latent_prep_ms, - _t_denoise_ms, - len(timesteps), - _t_decode_ms, - _t_stages_sum, - _t_pipeline_wall_ms, - _t_pipeline_wall_ms - _t_stages_sum, - ) + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 + _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 + _t_stages_sum = _t_text_enc_ms + _t_img_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms + + if _is_rank_zero(): + logger.info( + "Pipeline stage timing summary: " + "TextEncoding=%.2f ms, ImageEncoding=%.2f ms, " + "LatentPreparation=%.2f ms, Denoising=%.2f ms (%d steps), " + "Decoding=%.2f ms, StagesSum=%.2f ms, PipelineWall=%.2f ms, " + "Unaccounted=%.2f ms", + _t_text_enc_ms, + _t_img_enc_ms, + _t_latent_prep_ms, + _t_denoise_ms, + len(timesteps), + _t_decode_ms, + _t_stages_sum, + _t_pipeline_wall_ms, + _t_pipeline_wall_ms - _t_stages_sum, + ) return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/scheduler.py b/vllm_omni/diffusion/scheduler.py index c4891d06e00..4c9ba3de6a5 100644 --- a/vllm_omni/diffusion/scheduler.py +++ b/vllm_omni/diffusion/scheduler.py @@ -2,7 +2,6 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import threading -import time as _time import zmq from vllm.distributed.device_communicators.shm_broadcast import MessageQueue @@ -59,31 +58,18 @@ def add_req(self, request: OmniDiffusionRequest) -> DiffusionOutput: } # Broadcast RPC request to all workers - _t_broadcast = _time.perf_counter() self.mq.enqueue(rpc_request) - _t_broadcast_ms = (_time.perf_counter() - _t_broadcast) * 1000 - logger.info("Hop1 scheduler→workers: mq.enqueue (broadcast request) took %.2f ms", _t_broadcast_ms) # Wait for result from Rank 0 (or whoever sends it) if self.result_mq is None: raise RuntimeError("Result queue not initialized") - _t_dequeue = _time.perf_counter() output = self.result_mq.dequeue() - _t_dequeue_ms = (_time.perf_counter() - _t_dequeue) * 1000 - _t_unpack = _time.perf_counter() try: unpack_diffusion_output_shm(output) except Exception as e: logger.warning("SHM unpack failed (data may already be inline): %s", e) - _t_unpack_ms = (_time.perf_counter() - _t_unpack) * 1000 - - logger.info( - "Hop1 scheduler←worker: mq.dequeue=%.2f ms, shm_unpack=%.2f ms (dequeue includes generation wait)", - _t_dequeue_ms, - _t_unpack_ms, - ) # {"status": "error", "error": str(e)} if isinstance(output, dict) and output.get("status") == "error": diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index a69acca2424..651c0fe33c0 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -376,26 +376,11 @@ def _create_worker( def return_result(self, output: DiffusionOutput): """Reply to client, only on rank 0.""" if self.result_mq is not None: - import time as _time - - _t0 = _time.perf_counter() try: pack_diffusion_output_shm(output) - _t_pack = (_time.perf_counter() - _t0) * 1000 except Exception as e: logger.warning("SHM pack failed, falling back to raw enqueue: %s", e) - _t_pack = 0.0 - _t1 = _time.perf_counter() self.result_mq.enqueue(output) - _t_enqueue = (_time.perf_counter() - _t1) * 1000 - _t_total = (_time.perf_counter() - _t0) * 1000 - logger.info( - "Hop1 worker→scheduler: shm_pack=%.2f ms, mq.enqueue=%.2f ms, total=%.2f ms (rank %s)", - _t_pack, - _t_enqueue, - _t_total, - self.gpu_id, - ) def recv_message(self): """Receive messages from broadcast queue.""" diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 0a4ddd4ff71..a6757e71723 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -659,28 +659,8 @@ def _process_single_result( ) raise RuntimeError(result) - _t_load = time.perf_counter() engine_outputs = _load(result, obj_key="engine_outputs", shm_key="engine_outputs_shm") - _t_load_ms = (time.perf_counter() - _t_load) * 1000 - _hop3 = result.get("_hop3_timing") - if _hop3: - logger.info( - "Hop3 stageWorker→orchestrator: " - "generate=%.2f ms, serialize(shm=%s)=%.2f ms, " - "orchestrator_deserialize=%.2f ms", - _hop3.get("generate_ms", 0), - _hop3.get("use_shm", "?"), - _hop3.get("serialize_ms", 0), - _t_load_ms, - ) - elif _t_load_ms > 10: - logger.info( - "Hop3 orchestrator←stageWorker: deserialize took %.2f ms (req %s, stage %s)", - _t_load_ms, - req_id, - stage_id, - ) if isinstance(engine_outputs, list): engine_outputs = engine_outputs[0] diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index b50a31127be..fb7cefb4ad6 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -1098,7 +1098,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: try: _batch_seq += 1 gen_outputs: list[OmniRequestOutput | RequestOutput] = [] - _pre_gen_ms = (_time.time() - _recv_dequeue_ts) * 1000.0 _gen_t0 = _time.time() if stage_type == "diffusion": stage_engine = cast(OmniDiffusion, stage_engine) @@ -1145,7 +1144,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: _agg_total_gen_time_ms += _gen_ms # Emit per-request results - _post_gen_t0 = _time.time() for i, rid in enumerate(batch_request_ids): r_outputs = req_to_outputs.get(rid, []) _metrics = make_request_stats( @@ -1162,20 +1160,11 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: _metrics.stage_stats = make_stage_stats(_agg_total_tokens, _agg_total_gen_time_ms) else: _metrics.stage_stats = None - _t_serialize = _time.time() try: use_shm, payload = maybe_dump_to_shm(r_outputs, shm_threshold_bytes) except Exception: use_shm, payload = False, r_outputs - _t_serialize_ms = (_time.time() - _t_serialize) * 1000.0 - _hop3_timing = { - "generate_ms": _gen_ms, - "serialize_ms": _t_serialize_ms, - "use_shm": use_shm, - } - - _t_enqueue = _time.time() try: if use_shm: out_q.put( @@ -1184,7 +1173,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "stage_id": stage_id, "engine_outputs_shm": payload, "metrics": _metrics, - "_hop3_timing": _hop3_timing, } ) else: @@ -1194,7 +1182,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "stage_id": stage_id, "engine_outputs": payload, "metrics": _metrics, - "_hop3_timing": _hop3_timing, } ) except Exception: From e3dec547e16616e6460d6d531bca782777514013 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 04:27:28 +0000 Subject: [PATCH 07/15] fix inline Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py | 1 - vllm_omni/entrypoints/async_omni.py | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 33292987ff5..4bdd83f9d1a 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -35,7 +35,6 @@ from vllm_omni.platforms import current_omni_platform logger = logging.getLogger(__name__) - DEBUG_PERF = False diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index a6757e71723..95e626e74e7 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -807,6 +807,10 @@ def dead_error(self) -> BaseException: return EngineDeadError() async def abort(self, request_id: str | Iterable[str]) -> None: + if self._inline_diffusion: + if self._inline_engine is not None: + self._inline_engine.engine.abort(request_id) + return None abort_task = {"type": OmniStageTaskType.ABORT, "request_id": request_id} for stage in self.stage_list: stage.submit(abort_task) From 2cd9f9f768013794856cc0fda3a97958528543c6 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 06:30:30 +0000 Subject: [PATCH 08/15] fix ci Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index 77cdd0afb1f..abd668466de 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -70,6 +70,7 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: diffusion_engine_start_time = time.perf_counter() # Apply pre-processing if available + preprocess_time = 0.0 if self.pre_process_func is not None: preprocess_start_time = time.perf_counter() request = self.pre_process_func(request) From 172040a8e9cf98dcfec44bbb5a0b7380e90d14d5 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 07:39:43 +0000 Subject: [PATCH 09/15] fix ci Signed-off-by: samithuang <285365963@qq.com> --- tests/entrypoints/test_async_omni_diffusion_config.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/entrypoints/test_async_omni_diffusion_config.py b/tests/entrypoints/test_async_omni_diffusion_config.py index ce443d5a5c7..ed205032ee2 100644 --- a/tests/entrypoints/test_async_omni_diffusion_config.py +++ b/tests/entrypoints/test_async_omni_diffusion_config.py @@ -11,12 +11,18 @@ MODEL = "riverclouds/qwen_image_random" +def _noop_inline_engine(self, model, stage_config, kwargs): + self._inline_diffusion = False + self._inline_engine = None + + def test_default_stage_config_includes_cache_backend(monkeypatch): """Ensure cache_backend/cache_config are preserved in default diffusion stage.""" monkeypatch.setattr(utils_module, "load_stage_configs_from_model", lambda model, base_engine_args=None: []) monkeypatch.setattr(utils_module, "resolve_model_config_path", lambda model: None) monkeypatch.setattr(AsyncOmni, "_start_stages", lambda self, model: None) monkeypatch.setattr(AsyncOmni, "_wait_for_stages_ready", lambda self, timeout=0: None) + monkeypatch.setattr(AsyncOmni, "_init_inline_diffusion_engine", _noop_inline_engine) omni = AsyncOmni( model=MODEL, @@ -47,6 +53,7 @@ def test_default_cache_config_used_when_missing(monkeypatch): monkeypatch.setattr(utils_module, "resolve_model_config_path", lambda model: None) monkeypatch.setattr(AsyncOmni, "_start_stages", lambda self, model: None) monkeypatch.setattr(AsyncOmni, "_wait_for_stages_ready", lambda self, timeout=0: None) + monkeypatch.setattr(AsyncOmni, "_init_inline_diffusion_engine", _noop_inline_engine) omni = AsyncOmni( model=MODEL, @@ -65,6 +72,7 @@ def test_default_stage_devices_from_sequence_parallel(monkeypatch): monkeypatch.setattr(utils_module, "resolve_model_config_path", lambda model: None) monkeypatch.setattr(AsyncOmni, "_start_stages", lambda self, model: None) monkeypatch.setattr(AsyncOmni, "_wait_for_stages_ready", lambda self, timeout=0: None) + monkeypatch.setattr(AsyncOmni, "_init_inline_diffusion_engine", _noop_inline_engine) omni = AsyncOmni( model=MODEL, From 0a86fc5f76e034aed50b9f17add0b24eac71123f Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 08:14:11 +0000 Subject: [PATCH 10/15] fix log Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 9bbedc32859..30a3c3cf8d6 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -652,8 +652,9 @@ def forward( if current_omni_platform.is_available(): current_omni_platform.empty_cache() self._current_timestep = None - current_omni_platform.synchronize() - _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 # For I2V mode: blend final latents with condition if self.expand_timesteps and latent_condition is not None: From 9b9c5974baf1a296b4ef939e55a51ae465cb1f03 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 08:32:26 +0000 Subject: [PATCH 11/15] fix Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/data.py | 72 --------------- vllm_omni/diffusion/ipc.py | 89 +++++++++++++++++++ vllm_omni/diffusion/scheduler.py | 3 +- .../diffusion/worker/diffusion_worker.py | 2 +- 4 files changed, 92 insertions(+), 74 deletions(-) create mode 100644 vllm_omni/diffusion/ipc.py diff --git a/vllm_omni/diffusion/data.py b/vllm_omni/diffusion/data.py index 8f55e529cd6..0a68c2e707a 100644 --- a/vllm_omni/diffusion/data.py +++ b/vllm_omni/diffusion/data.py @@ -626,78 +626,6 @@ class DiffusionOutput: # timings: Optional["RequestTimings"] = None -_SHM_TENSOR_THRESHOLD = 1_000_000 # 1 MB - - -def _tensor_to_shm(tensor: torch.Tensor) -> dict[str, Any]: - """Copy a tensor into POSIX shared memory and return a metadata handle. - - The shared memory segment remains alive after this call (the local fd is - closed, but the segment persists until ``_tensor_from_shm`` unlinks it). - """ - from multiprocessing import shared_memory - - import numpy as np - - tensor = tensor.detach().cpu().contiguous() - arr = tensor.numpy() - nbytes = arr.nbytes - shm = shared_memory.SharedMemory(create=True, size=nbytes) - shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf[:nbytes]) - np.copyto(shm_arr, arr) - handle = { - "__tensor_shm__": True, - "name": shm.name, - "shape": list(tensor.shape), - "torch_dtype": str(tensor.dtype), - "numpy_dtype": str(arr.dtype), - "nbytes": nbytes, - } - shm.close() - return handle - - -def _tensor_from_shm(handle: dict[str, Any]) -> torch.Tensor: - """Reconstruct a tensor from a shared-memory handle and free the segment.""" - from multiprocessing import shared_memory - - import numpy as np - - shm = shared_memory.SharedMemory(name=handle["name"]) - try: - np_dtype = np.dtype(handle["numpy_dtype"]) - arr = np.ndarray(handle["shape"], dtype=np_dtype, buffer=shm.buf[: handle["nbytes"]]) - tensor = torch.from_numpy(arr.copy()) - finally: - shm.close() - shm.unlink() - return tensor - - -def pack_diffusion_output_shm(output: "DiffusionOutput") -> "DiffusionOutput": - """Replace large tensors in *output* with shared-memory handles. - - The DiffusionOutput is modified **in-place** so that the (now lightweight) - object can be serialised cheaply through a MessageQueue. - """ - if output.output is not None and isinstance(output.output, torch.Tensor): - if output.output.nelement() * output.output.element_size() > _SHM_TENSOR_THRESHOLD: - output.output = _tensor_to_shm(output.output) - if output.trajectory_latents is not None and isinstance(output.trajectory_latents, torch.Tensor): - if output.trajectory_latents.nelement() * output.trajectory_latents.element_size() > _SHM_TENSOR_THRESHOLD: - output.trajectory_latents = _tensor_to_shm(output.trajectory_latents) - return output - - -def unpack_diffusion_output_shm(output: "DiffusionOutput") -> "DiffusionOutput": - """Reconstruct tensors from shared-memory handles produced by ``pack_diffusion_output_shm``.""" - if isinstance(output.output, dict) and output.output.get("__tensor_shm__"): - output.output = _tensor_from_shm(output.output) - if isinstance(output.trajectory_latents, dict) and output.trajectory_latents.get("__tensor_shm__"): - output.trajectory_latents = _tensor_from_shm(output.trajectory_latents) - return output - - class AttentionBackendEnum(enum.Enum): FA = enum.auto() SLIDING_TILE_ATTN = enum.auto() diff --git a/vllm_omni/diffusion/ipc.py b/vllm_omni/diffusion/ipc.py new file mode 100644 index 00000000000..cac406ef4c0 --- /dev/null +++ b/vllm_omni/diffusion/ipc.py @@ -0,0 +1,89 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""IPC utilities for transferring large tensors via POSIX shared memory. + +Used by Hop1 (GPU worker <-> scheduler) to avoid pickling large video tensors +through the MessageQueue. Tensors above ``_SHM_TENSOR_THRESHOLD`` are copied +into a named shared-memory segment; only a lightweight metadata dict is +serialised through the queue. +""" + +from __future__ import annotations + +from typing import Any + +import torch + +from vllm_omni.diffusion.data import DiffusionOutput + +_SHM_TENSOR_THRESHOLD = 1_000_000 # 1 MB + + +def _tensor_to_shm(tensor: torch.Tensor) -> dict[str, Any]: + """Copy a tensor into POSIX shared memory and return a metadata handle. + + The shared memory segment remains alive after this call (the local fd is + closed, but the segment persists until ``_tensor_from_shm`` unlinks it). + """ + from multiprocessing import shared_memory + + import numpy as np + + tensor = tensor.detach().cpu().contiguous() + arr = tensor.numpy() + nbytes = arr.nbytes + shm = shared_memory.SharedMemory(create=True, size=nbytes) + shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf[:nbytes]) + np.copyto(shm_arr, arr) + handle = { + "__tensor_shm__": True, + "name": shm.name, + "shape": list(tensor.shape), + "torch_dtype": str(tensor.dtype), + "numpy_dtype": str(arr.dtype), + "nbytes": nbytes, + } + shm.close() + return handle + + +def _tensor_from_shm(handle: dict[str, Any]) -> torch.Tensor: + """Reconstruct a tensor from a shared-memory handle and free the segment.""" + from multiprocessing import shared_memory + + import numpy as np + + shm = shared_memory.SharedMemory(name=handle["name"]) + try: + np_dtype = np.dtype(handle["numpy_dtype"]) + arr = np.ndarray(handle["shape"], dtype=np_dtype, buffer=shm.buf[: handle["nbytes"]]) + tensor = torch.from_numpy(arr.copy()) + finally: + shm.close() + shm.unlink() + return tensor + + +def pack_diffusion_output_shm(output: DiffusionOutput) -> DiffusionOutput: + """Replace large tensors in *output* with shared-memory handles. + + The DiffusionOutput is modified **in-place** so that the (now lightweight) + object can be serialised cheaply through a MessageQueue. + """ + if output.output is not None and isinstance(output.output, torch.Tensor): + if output.output.nelement() * output.output.element_size() > _SHM_TENSOR_THRESHOLD: + output.output = _tensor_to_shm(output.output) + if output.trajectory_latents is not None and isinstance(output.trajectory_latents, torch.Tensor): + if output.trajectory_latents.nelement() * output.trajectory_latents.element_size() > _SHM_TENSOR_THRESHOLD: + output.trajectory_latents = _tensor_to_shm(output.trajectory_latents) + return output + + +def unpack_diffusion_output_shm(output: DiffusionOutput) -> DiffusionOutput: + """Reconstruct tensors from shared-memory handles produced by ``pack_diffusion_output_shm``.""" + if isinstance(output.output, dict) and output.output.get("__tensor_shm__"): + output.output = _tensor_from_shm(output.output) + if isinstance(output.trajectory_latents, dict) and output.trajectory_latents.get("__tensor_shm__"): + output.trajectory_latents = _tensor_from_shm(output.trajectory_latents) + return output diff --git a/vllm_omni/diffusion/scheduler.py b/vllm_omni/diffusion/scheduler.py index 4c9ba3de6a5..5f1d01c1282 100644 --- a/vllm_omni/diffusion/scheduler.py +++ b/vllm_omni/diffusion/scheduler.py @@ -7,7 +7,8 @@ from vllm.distributed.device_communicators.shm_broadcast import MessageQueue from vllm.logger import init_logger -from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig, unpack_diffusion_output_shm +from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig +from vllm_omni.diffusion.ipc import unpack_diffusion_output_shm from vllm_omni.diffusion.request import OmniDiffusionRequest logger = init_logger(__name__) diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index 651c0fe33c0..799957d4a02 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -27,7 +27,6 @@ from vllm_omni.diffusion.data import ( DiffusionOutput, OmniDiffusionConfig, - pack_diffusion_output_shm, ) from vllm_omni.diffusion.distributed.parallel_state import ( destroy_distributed_env, @@ -35,6 +34,7 @@ initialize_model_parallel, ) from vllm_omni.diffusion.forward_context import set_forward_context +from vllm_omni.diffusion.ipc import pack_diffusion_output_shm from vllm_omni.diffusion.lora.manager import DiffusionLoRAManager from vllm_omni.diffusion.profiler import CurrentProfiler from vllm_omni.diffusion.request import OmniDiffusionRequest From bda0f2d317033597be7f537b8202f6f6098dfa77 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 08:45:41 +0000 Subject: [PATCH 12/15] fix log Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py | 3 ++- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 30a3c3cf8d6..9a0037a6a1b 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -660,7 +660,8 @@ def forward( if self.expand_timesteps and latent_condition is not None: latents = (1 - first_frame_mask) * latent_condition + first_frame_mask * latents - _t_decode_start = time.perf_counter() + if DEBUG_PERF: + _t_decode_start = time.perf_counter() if output_type == "latent": output = latents else: diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 4bdd83f9d1a..fe15a24f587 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -417,7 +417,8 @@ def forward( batch_size = prompt_embeds.shape[0] - _t_img_enc_start = time.perf_counter() + if DEBUG_PERF: + _t_img_enc_start = time.perf_counter() if self.has_image_encoder and self.transformer.config.image_dim is not None: if image_embeds is None: if last_image is None: From 140ef4afc3909ac15b94cb8085faaae4e3736ddf Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Wed, 18 Mar 2026 10:31:11 +0000 Subject: [PATCH 13/15] [Bugfix] Fix config misalignment between offline and online diffusion inference MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three root causes led to different generation results depending on whether offline (Omni.generate) or online (serving_chat / API) paths were used for the same model: 1. **guidance_scale_2 leaked the sentinel value (0.0)** — OmniDiffusionRequest.__post_init__ auto-filled guidance_scale_2 from guidance_scale *before* resolving the 0.0 sentinel back to 1.0. Online requests that omitted guidance_scale ended up with guidance_scale_2 = 0.0, disabling CFG on the low-noise stage in Wan2.2 (and silently altering quality for Qwen-Image models). Fix: resolve the sentinel first, then auto-fill guidance_scale_2. 2. **num_inference_steps hardcoded to 50 in serving_chat** — The chat endpoint forced 50 steps regardless of the pipeline's own default (Wan2.2 uses 40). Change the dataclass default to None (sentinel) so each pipeline's forward() applies its own default when the caller does not specify a value. 3. **Redundant guidance_scale_provided flag in AsyncOmniDiffusion** — AsyncOmniDiffusion.generate() manually set guidance_scale_provided before OmniDiffusionRequest.__post_init__ ran, which then overwrote it. Remove the redundant pre-set; __post_init__ now handles it correctly and consistently for both paths. Affected models: Wan2.2 (T2V/I2V), Qwen-Image, Qwen-Image-Edit. Made-with: Cursor Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/request.py | 20 ++++++++++++------- .../worker/diffusion_model_runner.py | 1 + vllm_omni/entrypoints/async_omni_diffusion.py | 3 --- vllm_omni/entrypoints/openai/serving_chat.py | 20 +++++++++---------- vllm_omni/inputs/data.py | 5 +++-- 5 files changed, 27 insertions(+), 22 deletions(-) diff --git a/vllm_omni/diffusion/request.py b/vllm_omni/diffusion/request.py index a6005290cdc..56a770461b0 100644 --- a/vllm_omni/diffusion/request.py +++ b/vllm_omni/diffusion/request.py @@ -28,17 +28,23 @@ class OmniDiffusionRequest: def __post_init__(self): """Initialize dependent fields after dataclass initialization.""" + # Detect whether user explicitly provided guidance_scale. + # The sentinel default is 0.0 (false-like); any truthy value means + # the caller set it intentionally. We must resolve this BEFORE + # auto-filling guidance_scale_2, otherwise the sentinel leaks into + # guidance_scale_2. + if self.sampling_params.guidance_scale: + self.sampling_params.guidance_scale_provided = True + else: + self.sampling_params.guidance_scale = 1.0 + # Set do_classifier_free_guidance based on guidance scale and negative prompt if self.sampling_params.guidance_scale > 1.0 and any( (not isinstance(p, str) and p.get("negative_prompt")) for p in self.prompts ): self.sampling_params.do_classifier_free_guidance = True + + # Auto-fill guidance_scale_2 from the (now-resolved) guidance_scale + # so downstream code always has a valid value. if self.sampling_params.guidance_scale_2 is None: self.sampling_params.guidance_scale_2 = self.sampling_params.guidance_scale - - # The dataclass default value is 0 (false-like), used to detect whether user explicitly provides this value - # After this check is done, reset this value to old default 1 - if self.sampling_params.guidance_scale: - self.sampling_params.guidance_scale_provided = True - else: - self.sampling_params.guidance_scale = 1.0 diff --git a/vllm_omni/diffusion/worker/diffusion_model_runner.py b/vllm_omni/diffusion/worker/diffusion_model_runner.py index accb173e1a0..972c95c292c 100644 --- a/vllm_omni/diffusion/worker/diffusion_model_runner.py +++ b/vllm_omni/diffusion/worker/diffusion_model_runner.py @@ -221,6 +221,7 @@ def execute_model(self, req: OmniDiffusionRequest) -> DiffusionOutput: not getattr(req, "skip_cache_refresh", False) and self.cache_backend is not None and self.cache_backend.is_enabled() + and req.sampling_params.num_inference_steps is not None ): self.cache_backend.refresh(self.pipeline, req.sampling_params.num_inference_steps) diff --git a/vllm_omni/entrypoints/async_omni_diffusion.py b/vllm_omni/entrypoints/async_omni_diffusion.py index 08812223db5..52bd6031c64 100644 --- a/vllm_omni/entrypoints/async_omni_diffusion.py +++ b/vllm_omni/entrypoints/async_omni_diffusion.py @@ -172,9 +172,6 @@ async def generate( if request_id is None: request_id = f"diff-{uuid.uuid4().hex[:16]}" - if sampling_params.guidance_scale: - sampling_params.guidance_scale_provided = True - if lora_request is not None: sampling_params.lora_request = lora_request diff --git a/vllm_omni/entrypoints/openai/serving_chat.py b/vllm_omni/entrypoints/openai/serving_chat.py index 1fc3cdb3621..3234c7e6473 100644 --- a/vllm_omni/entrypoints/openai/serving_chat.py +++ b/vllm_omni/entrypoints/openai/serving_chat.py @@ -2046,18 +2046,20 @@ async def _create_diffusion_chat_completion( except ValueError: logger.warning("Invalid size format: %s", extra_body.get("size")) - # Get request parameters from extra_body - # Text-to-image parameters (ref: text_to_image.py) - num_inference_steps = extra_body.get("num_inference_steps", 50) + # Get request parameters from extra_body. + # Avoid hardcoded defaults here — let each pipeline's forward() + # method apply its own model-specific default when the user does + # not provide a value. + num_inference_steps = extra_body.get("num_inference_steps") guidance_scale = extra_body.get("guidance_scale") - true_cfg_scale = extra_body.get("true_cfg_scale") # Qwen-Image specific + true_cfg_scale = extra_body.get("true_cfg_scale") seed = extra_body.get("seed") negative_prompt = extra_body.get("negative_prompt") num_outputs_per_prompt = extra_body.get("num_outputs_per_prompt", 1) # Text-to-video parameters (ref: text_to_video.py) num_frames = extra_body.get("num_frames") - guidance_scale_2 = extra_body.get("guidance_scale_2") # For video high-noise CFG + guidance_scale_2 = extra_body.get("guidance_scale_2") lora_body = extra_body.get("lora") logger.info( @@ -2083,21 +2085,19 @@ async def _create_diffusion_chat_completion( "negative_prompt": negative_prompt, } gen_params = OmniDiffusionSamplingParams( - num_inference_steps=num_inference_steps, height=height, width=width, num_outputs_per_prompt=num_outputs_per_prompt, seed=seed, ) + # Only override defaults when the user explicitly provides values + if num_inference_steps is not None: + gen_params.num_inference_steps = num_inference_steps if guidance_scale is not None: gen_params.guidance_scale = guidance_scale - - # Add Qwen-Image specific parameter if true_cfg_scale is not None: gen_params.true_cfg_scale = true_cfg_scale - - # Add video generation parameters if set if num_frames is not None: gen_params.num_frames = num_frames if guidance_scale_2 is not None: diff --git a/vllm_omni/inputs/data.py b/vllm_omni/inputs/data.py index c0f10af2b42..5768c3b6d99 100644 --- a/vllm_omni/inputs/data.py +++ b/vllm_omni/inputs/data.py @@ -234,8 +234,9 @@ class OmniDiffusionSamplingParams: step_index: int | None = None boundary_ratio: float | None = None - # Scheduler parameters - num_inference_steps: int = 50 + # Scheduler parameters – ``None`` means "not explicitly set by the caller"; + # each pipeline's ``forward()`` decides its own model-specific default. + num_inference_steps: int | None = None guidance_scale: float = 0.0 guidance_scale_provided: bool = False guidance_scale_2: float | None = None From 63b17151f79405544cf14da05b2a282e41665647 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 19 Mar 2026 04:40:17 +0000 Subject: [PATCH 14/15] fix tru cfg scale parsing Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/entrypoints/openai/serving_chat.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/vllm_omni/entrypoints/openai/serving_chat.py b/vllm_omni/entrypoints/openai/serving_chat.py index 3234c7e6473..01038c2fd11 100644 --- a/vllm_omni/entrypoints/openai/serving_chat.py +++ b/vllm_omni/entrypoints/openai/serving_chat.py @@ -2052,7 +2052,7 @@ async def _create_diffusion_chat_completion( # not provide a value. num_inference_steps = extra_body.get("num_inference_steps") guidance_scale = extra_body.get("guidance_scale") - true_cfg_scale = extra_body.get("true_cfg_scale") + true_cfg_scale = extra_body.get("true_cfg_scale") or extra_body.get("cfg_scale") seed = extra_body.get("seed") negative_prompt = extra_body.get("negative_prompt") num_outputs_per_prompt = extra_body.get("num_outputs_per_prompt", 1) @@ -2062,6 +2062,10 @@ async def _create_diffusion_chat_completion( guidance_scale_2 = extra_body.get("guidance_scale_2") lora_body = extra_body.get("lora") + # Qwen-Image-Layered parameters + layers = extra_body.get("layers") + resolution = extra_body.get("resolution") + logger.info( "Diffusion chat request %s: prompt=%r, ref_images=%d, params=%s", request_id, @@ -2102,6 +2106,10 @@ async def _create_diffusion_chat_completion( gen_params.num_frames = num_frames if guidance_scale_2 is not None: gen_params.guidance_scale_2 = guidance_scale_2 + if layers is not None: + gen_params.layers = layers + if resolution is not None: + gen_params.resolution = resolution # Parse per-request LoRA (works for both AsyncOmniDiffusion and AsyncOmni). if lora_body and isinstance(lora_body, dict): From f3c21ab9db7f8606d20f1b3ac7dbdde99f8951d2 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 19 Mar 2026 07:36:34 +0000 Subject: [PATCH 15/15] [Bugfix] Convert RGB input to RGBA for Qwen-Image-Layered pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Qwen-Image-Layered model VAE expects 4-channel (RGBA) input but online serving sends RGB images decoded from base64. Add automatic RGB→RGBA conversion in both the preprocessing function and the fallback path in forward() to prevent channel mismatch errors. Signed-off-by: samithuang <285365963@qq.com> Made-with: Cursor --- .../models/qwen_image/pipeline_qwen_image_layered.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_layered.py b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_layered.py index dbe0bfe4f85..e06768880bb 100644 --- a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_layered.py +++ b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_layered.py @@ -94,6 +94,9 @@ def pre_process_func( else: image = cast(PIL.Image.Image | torch.Tensor | np.ndarray, raw_image) + if isinstance(image, PIL.Image.Image) and image.mode != "RGBA": + image = image.convert("RGBA") + # 1. calculate dimensions image_size = image.size assert request.sampling_params.resolution in [640, 1024], ( @@ -648,6 +651,8 @@ def forward( width = req.sampling_params.width else: # fallback to run pre-processing in pipeline (debug only) + if isinstance(image, PIL.Image.Image) and image.mode != "RGBA": + image = image.convert("RGBA") image_size = image[0].size if isinstance(image, list) else image.size assert resolution in [640, 1024], f"resolution must be either 640 or 1024, but got {resolution}" calculated_width, calculated_height = calculate_dimensions(