From 26a4e8d8d6a6f2724ab23b3a4b58460640227c25 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Fri, 6 Mar 2026 10:14:45 +0000 Subject: [PATCH 01/13] add time cost log for different stages Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 20 ++++++-- .../models/wan2_2/pipeline_wan2_2.py | 36 +++++++++++++-- .../models/wan2_2/pipeline_wan2_2_i2v.py | 46 ++++++++++++++++--- vllm_omni/diffusion/scheduler.py | 14 +++++- .../diffusion/worker/diffusion_worker.py | 7 +++ vllm_omni/entrypoints/async_omni.py | 19 ++++++++ vllm_omni/entrypoints/omni_diffusion.py | 10 +++- vllm_omni/entrypoints/omni_stage.py | 22 +++++++-- vllm_omni/entrypoints/openai/serving_video.py | 3 ++ 9 files changed, 155 insertions(+), 22 deletions(-) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index 066ba2bb8bd..78c80ad9796 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -67,14 +67,19 @@ def __init__(self, od_config: OmniDiffusionConfig): raise e def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: + _step_t0 = time.perf_counter() + # Apply pre-processing if available + preprocess_time = 0.0 if self.pre_process_func is not None: - preprocess_start_time = time.time() + preprocess_start_time = time.perf_counter() request = self.pre_process_func(request) - preprocess_time = time.time() - preprocess_start_time + preprocess_time = time.perf_counter() - preprocess_start_time logger.info(f"Pre-processing completed in {preprocess_time:.4f} seconds") + _generate_t0 = time.perf_counter() output = self.add_req_and_wait_for_response(request) + _generate_ms = (time.perf_counter() - _generate_t0) * 1000 if output.error: raise Exception(f"{output.error}") logger.info("Generation completed successfully.") @@ -92,15 +97,22 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: for i, prompt in enumerate(request.prompts) ] - postprocess_start_time = time.time() + postprocess_start_time = time.perf_counter() outputs = self.post_process_func(output.output) if self.post_process_func is not None else output.output audio_payload = None if isinstance(outputs, dict): audio_payload = outputs.get("audio") outputs = outputs.get("video", outputs) - postprocess_time = time.time() - postprocess_start_time + postprocess_time = time.perf_counter() - postprocess_start_time logger.info(f"Post-processing completed in {postprocess_time:.4f} seconds") + _step_total_ms = (time.perf_counter() - _step_t0) * 1000 + logger.info( + "DiffusionEngine.step breakdown: preprocess=%.2f ms, " + "add_req_and_wait=%.2f ms, postprocess=%.2f ms, total=%.2f ms", + preprocess_time * 1000, _generate_ms, postprocess_time * 1000, _step_total_ms, + ) + # Convert to OmniRequestOutput format # Ensure outputs is a list if not isinstance(outputs, list): diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 1de801c10de..9dd01136070 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -6,6 +6,7 @@ import json import logging import os +import time from collections.abc import Iterable from typing import Any, cast @@ -21,7 +22,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin +from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin, _is_rank_zero from vllm_omni.diffusion.models.schedulers import FlowUniPCMultistepScheduler from vllm_omni.diffusion.models.wan2_2.wan2_2_transformer import WanTransformer3DModel from vllm_omni.diffusion.request import OmniDiffusionRequest @@ -424,7 +425,10 @@ def forward( if generator is None and req.sampling_params.seed is not None: generator = torch.Generator(device=device).manual_seed(req.sampling_params.seed) - # Encode prompts + # Sync GPU before timing to ensure accurate measurements + current_omni_platform.synchronize() + _t_pipeline_start = time.perf_counter() + _t_text_enc_start = _t_pipeline_start if prompt_embeds is None: prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt=prompt, @@ -443,6 +447,8 @@ def forward( raise ValueError( "negative_prompt_embeds must be provided when prompt_embeds are given and guidance > 1." ) + current_omni_platform.synchronize() + _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 # Timesteps self.scheduler.set_timesteps(num_steps, device=device) @@ -452,7 +458,7 @@ def forward( if boundary_ratio is not None: boundary_timestep = boundary_ratio * self.scheduler.config.num_train_timesteps - # Handle I2V mode when expand_timesteps=True and image is provided + _t_latent_prep_start = time.perf_counter() multi_modal_data = req.prompts[0].get("multi_modal_data", {}) if not isinstance(req.prompts[0], str) else None raw_image = multi_modal_data.get("image", None) if multi_modal_data is not None else None if isinstance(raw_image, list): @@ -543,11 +549,13 @@ def forward( generator=generator, latents=req.sampling_params.latents, ) + current_omni_platform.synchronize() + _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 if attention_kwargs is None: attention_kwargs = {} - # Denoising + _t_denoise_start = time.perf_counter() with self.progress_bar(total=len(timesteps)) as pbar: for t in timesteps: self._current_timestep = t @@ -638,12 +646,14 @@ def forward( if current_omni_platform.is_available(): current_omni_platform.empty_cache() self._current_timestep = None + current_omni_platform.synchronize() + _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 # For I2V mode: blend final latents with condition if self.expand_timesteps and latent_condition is not None: latents = (1 - first_frame_mask) * latent_condition + first_frame_mask * latents - # Decode + _t_decode_start = time.perf_counter() if output_type == "latent": output = latents else: @@ -658,6 +668,22 @@ def forward( ) latents = latents / latents_std + latents_mean output = self.vae.decode(latents, return_dict=False)[0] + current_omni_platform.synchronize() + _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 + _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 + _t_stages_sum = _t_text_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms + + if _is_rank_zero(): + logger.info( + "Pipeline stage timing summary: " + "TextEncoding=%.2f ms, LatentPreparation=%.2f ms, " + "Denoising=%.2f ms (%d steps), Decoding=%.2f ms, " + "StagesSum=%.2f ms, PipelineWall=%.2f ms, Unaccounted=%.2f ms", + _t_text_enc_ms, _t_latent_prep_ms, + _t_denoise_ms, len(timesteps), _t_decode_ms, + _t_stages_sum, _t_pipeline_wall_ms, + _t_pipeline_wall_ms - _t_stages_sum, + ) return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index b673cd0fae4..98ebff63e15 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -5,6 +5,7 @@ import logging import os +import time from collections.abc import Iterable from typing import Any, cast @@ -22,7 +23,7 @@ from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader from vllm_omni.diffusion.models.interface import SupportImageInput -from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin +from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin, _is_rank_zero from vllm_omni.diffusion.models.schedulers import FlowUniPCMultistepScheduler from vllm_omni.diffusion.models.wan2_2.pipeline_wan2_2 import ( create_transformer_from_config, @@ -336,6 +337,9 @@ def forward( num_frames = req.sampling_params.num_frames or frame_num num_steps = req.sampling_params.num_inference_steps or num_inference_steps + # print("D--: num_frames", num_frames) + # print("D--: height x width =", height, "x", width) + # Respect per-request guidance_scale when explicitly provided. if req.sampling_params.guidance_scale_provided: guidance_scale = req.sampling_params.guidance_scale @@ -388,7 +392,10 @@ def forward( if generator is None and req.sampling_params.seed is not None: generator = torch.Generator(device=device).manual_seed(req.sampling_params.seed) - # Encode prompts + # Sync GPU before timing to ensure accurate measurements + current_omni_platform.synchronize() + _t_pipeline_start = time.perf_counter() + _t_text_enc_start = _t_pipeline_start if prompt_embeds is None: prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt=prompt, @@ -403,10 +410,12 @@ def forward( prompt_embeds = prompt_embeds.to(device=device, dtype=dtype) if negative_prompt_embeds is not None: negative_prompt_embeds = negative_prompt_embeds.to(device=device, dtype=dtype) + current_omni_platform.synchronize() + _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 batch_size = prompt_embeds.shape[0] - # Encode image embeddings (for Wan2.1-style with CLIP) + _t_img_enc_start = time.perf_counter() if self.has_image_encoder and self.transformer.config.image_dim is not None: if image_embeds is None: if last_image is None: @@ -417,6 +426,8 @@ def forward( image_embeds = image_embeds.to(dtype) else: image_embeds = None + current_omni_platform.synchronize() + _t_img_enc_ms = (time.perf_counter() - _t_img_enc_start) * 1000 # Timesteps self.scheduler.set_timesteps(num_steps, device=device) @@ -430,7 +441,7 @@ def forward( # Prepare latents (use out_channels=16 for VAE latent, not in_channels=36) num_channels_latents = self.transformer.config.out_channels - # Preprocess image for VAE + _t_latent_prep_start = time.perf_counter() from diffusers.video_processor import VideoProcessor video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial) @@ -464,11 +475,13 @@ def forward( latents=req.sampling_params.latents, last_image=last_image_tensor, ) + current_omni_platform.synchronize() + _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 if attention_kwargs is None: attention_kwargs = {} - # Denoising loop + _t_denoise_start = time.perf_counter() with self.progress_bar(total=len(timesteps)) as pbar: for t in timesteps: self._current_timestep = t @@ -527,6 +540,8 @@ def forward( cfg_normalize=False, ) + # print("D--: latent shape", latents.shape) + # Compute the previous noisy sample x_t -> x_t-1 with automatic CFG sync latents = self.scheduler_step_maybe_with_cfg(noise_pred, t, latents, do_true_cfg) @@ -537,12 +552,14 @@ def forward( if current_omni_platform.is_available(): current_omni_platform.empty_cache() self._current_timestep = None + current_omni_platform.synchronize() + _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 # For expand_timesteps mode, blend final latents with condition if self.expand_timesteps: latents = (1 - first_frame_mask) * condition + first_frame_mask * latents - # Decode + _t_decode_start = time.perf_counter() if output_type == "latent": output = latents else: @@ -557,6 +574,23 @@ def forward( ) latents = latents / latents_std + latents_mean output = self.vae.decode(latents, return_dict=False)[0] + current_omni_platform.synchronize() + _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 + _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 + _t_stages_sum = _t_text_enc_ms + _t_img_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms + + if _is_rank_zero(): + logger.info( + "Pipeline stage timing summary: " + "TextEncoding=%.2f ms, ImageEncoding=%.2f ms, " + "LatentPreparation=%.2f ms, Denoising=%.2f ms (%d steps), " + "Decoding=%.2f ms, StagesSum=%.2f ms, PipelineWall=%.2f ms, " + "Unaccounted=%.2f ms", + _t_text_enc_ms, _t_img_enc_ms, + _t_latent_prep_ms, _t_denoise_ms, len(timesteps), + _t_decode_ms, _t_stages_sum, _t_pipeline_wall_ms, + _t_pipeline_wall_ms - _t_stages_sum, + ) return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/scheduler.py b/vllm_omni/diffusion/scheduler.py index c30d907aa3b..97693782e39 100644 --- a/vllm_omni/diffusion/scheduler.py +++ b/vllm_omni/diffusion/scheduler.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import threading +import time as _time import zmq from vllm.distributed.device_communicators.shm_broadcast import MessageQueue @@ -58,13 +59,24 @@ def add_req(self, request: OmniDiffusionRequest) -> DiffusionOutput: } # Broadcast RPC request to all workers + _t_broadcast = _time.perf_counter() self.mq.enqueue(rpc_request) - # Wait for result from Rank 0 (or whoever sends it) + _t_broadcast_ms = (_time.perf_counter() - _t_broadcast) * 1000 + logger.info("Hop1 scheduler→workers: mq.enqueue (broadcast request) took %.2f ms", _t_broadcast_ms) + # Wait for result from Rank 0 (or whoever sends it) if self.result_mq is None: raise RuntimeError("Result queue not initialized") + _t_dequeue = _time.perf_counter() output = self.result_mq.dequeue() + _t_dequeue_ms = (_time.perf_counter() - _t_dequeue) * 1000 + logger.info( + "Hop1 scheduler←worker: result_mq.dequeue took %.2f ms " + "(includes waiting for generation + enqueue serialization)", + _t_dequeue_ms, + ) + # {"status": "error", "error": str(e)} if isinstance(output, dict) and output.get("status") == "error": raise RuntimeError("worker error") diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index d4389f4d2bf..8118890bd7f 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -373,7 +373,14 @@ def _create_worker( def return_result(self, output: DiffusionOutput): """Reply to client, only on rank 0.""" if self.result_mq is not None: + import time as _time + _t0 = _time.perf_counter() self.result_mq.enqueue(output) + _t_ms = (_time.perf_counter() - _t0) * 1000 + logger.info( + "Hop1 worker→scheduler: result_mq.enqueue took %.2f ms (rank %s)", + _t_ms, self.gpu_id, + ) def recv_message(self): """Receive messages from broadcast queue.""" diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 6939987b04e..a86791fc8de 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -522,7 +522,26 @@ def _process_single_result( ) raise RuntimeError(result) + _t_load = time.perf_counter() engine_outputs = _load(result, obj_key="engine_outputs", shm_key="engine_outputs_shm") + _t_load_ms = (time.perf_counter() - _t_load) * 1000 + + _hop3 = result.get("_hop3_timing") + if _hop3: + logger.info( + "Hop3 stageWorker→orchestrator: " + "generate=%.2f ms, serialize(shm=%s)=%.2f ms, " + "orchestrator_deserialize=%.2f ms", + _hop3.get("generate_ms", 0), + _hop3.get("use_shm", "?"), + _hop3.get("serialize_ms", 0), + _t_load_ms, + ) + elif _t_load_ms > 10: + logger.info( + "Hop3 orchestrator←stageWorker: deserialize took %.2f ms (req %s, stage %s)", + _t_load_ms, req_id, stage_id, + ) if isinstance(engine_outputs, list): engine_outputs = engine_outputs[0] diff --git a/vllm_omni/entrypoints/omni_diffusion.py b/vllm_omni/entrypoints/omni_diffusion.py index 9937726e51f..59ddf2eeb17 100644 --- a/vllm_omni/entrypoints/omni_diffusion.py +++ b/vllm_omni/entrypoints/omni_diffusion.py @@ -1,9 +1,11 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import time import uuid from collections.abc import Sequence +from vllm.logger import init_logger from vllm.transformers_utils.config import get_hf_file_to_dict from vllm_omni.diffusion.data import OmniDiffusionConfig, TransformerConfig @@ -12,6 +14,8 @@ from vllm_omni.inputs.data import OmniDiffusionSamplingParams, OmniPromptType from vllm_omni.outputs import OmniRequestOutput +logger = init_logger(__name__) + class OmniDiffusion: """ @@ -108,6 +112,7 @@ def generate( sampling_params: OmniDiffusionSamplingParams, request_ids: list[str] = [], ) -> list[OmniRequestOutput]: + _t0 = time.perf_counter() if isinstance(prompts, (str, dict)): prompts = [prompts] else: @@ -118,7 +123,10 @@ def generate( request_ids.extend(f"{i + len(request_ids)}_{uuid.uuid4()}" for i in range(len(prompts) - len(request_ids))) request = OmniDiffusionRequest(prompts, sampling_params, request_ids) - return self._run_engine(request) + result = self._run_engine(request) + _t_ms = (time.perf_counter() - _t0) * 1000 + logger.info("OmniDiffusion.generate total: %.2f ms", _t_ms) + return result def _run_engine(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: return self.engine.step(request) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 098cfa15d88..de99bff714d 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -982,6 +982,7 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: try: _batch_seq += 1 gen_outputs: list[OmniRequestOutput | RequestOutput] = [] + _pre_gen_ms = (_time.time() - _recv_dequeue_ts) * 1000.0 _gen_t0 = _time.time() if stage_type == "diffusion": stage_engine = cast(OmniDiffusion, stage_engine) @@ -1007,7 +1008,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: gen_outputs.extend(results) _gen_t1 = _time.time() _gen_ms = (_gen_t1 - _gen_t0) * 1000.0 - logger.debug(f"Generate done: batch={len(batch_tasks)}, req_ids={batch_request_ids}, gen_ms={_gen_ms:.1f}") # Group outputs per request id with fallback req_to_outputs: dict[Any, list[Any]] = {rid: [] for rid in batch_request_ids} @@ -1029,6 +1029,7 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: _agg_total_gen_time_ms += _gen_ms # Emit per-request results + _post_gen_t0 = _time.time() for i, rid in enumerate(batch_request_ids): r_outputs = req_to_outputs.get(rid, []) _metrics = make_request_stats( @@ -1045,8 +1046,21 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: _metrics.stage_stats = make_stage_stats(_agg_total_tokens, _agg_total_gen_time_ms) else: _metrics.stage_stats = None + _t_serialize = _time.time() try: use_shm, payload = maybe_dump_to_shm(r_outputs, shm_threshold_bytes) + except Exception: + use_shm, payload = False, r_outputs + _t_serialize_ms = (_time.time() - _t_serialize) * 1000.0 + + _hop3_timing = { + "generate_ms": _gen_ms, + "serialize_ms": _t_serialize_ms, + "use_shm": use_shm, + } + + _t_enqueue = _time.time() + try: if use_shm: out_q.put( { @@ -1054,6 +1068,7 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "stage_id": stage_id, "engine_outputs_shm": payload, "metrics": _metrics, + "_hop3_timing": _hop3_timing, } ) else: @@ -1063,6 +1078,7 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "stage_id": stage_id, "engine_outputs": payload, "metrics": _metrics, + "_hop3_timing": _hop3_timing, } ) except Exception: @@ -1074,10 +1090,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "metrics": _metrics, } ) - logger.debug( - "Enqueued result for request %s to downstream", - rid, - ) except Exception as e: logger.exception("Failed on batch %s: %s", batch_request_ids, e) _tb = traceback.format_exc() diff --git a/vllm_omni/entrypoints/openai/serving_video.py b/vllm_omni/entrypoints/openai/serving_video.py index f73df5ea498..b663eb52f03 100644 --- a/vllm_omni/entrypoints/openai/serving_video.py +++ b/vllm_omni/entrypoints/openai/serving_video.py @@ -141,6 +141,7 @@ async def generate_videos( ) result = await self._run_generation(prompt, gen_params, request_id, raw_request) + _t_encode_start = time.perf_counter() videos = self._extract_video_outputs(result) audios = self._extract_audio_outputs(result, expected_count=len(videos)) output_fps = fps or 24 @@ -161,6 +162,8 @@ async def generate_videos( ) for idx, video in enumerate(videos) ] + _t_encode_ms = (time.perf_counter() - _t_encode_start) * 1000 + logger.info("Video response encoding (MP4+base64): %.2f ms", _t_encode_ms) return VideoGenerationResponse(created=int(time.time()), data=video_data) def _resolve_model_name(self, raw_request: Request | None) -> str | None: From b3b70a8be4cd02a27706fc4da4845924a439c38d Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Fri, 6 Mar 2026 10:55:23 +0000 Subject: [PATCH 02/13] reduce hop3 overhead Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 5 +- .../models/wan2_2/pipeline_wan2_2.py | 10 +- .../models/wan2_2/pipeline_wan2_2_i2v.py | 11 +- .../diffusion/worker/diffusion_worker.py | 4 +- vllm_omni/entrypoints/async_omni.py | 105 +++++++++++++++++- vllm_omni/entrypoints/omni.py | 80 +++++++++++++ 6 files changed, 204 insertions(+), 11 deletions(-) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index 78c80ad9796..327f79e91e4 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -110,7 +110,10 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: logger.info( "DiffusionEngine.step breakdown: preprocess=%.2f ms, " "add_req_and_wait=%.2f ms, postprocess=%.2f ms, total=%.2f ms", - preprocess_time * 1000, _generate_ms, postprocess_time * 1000, _step_total_ms, + preprocess_time * 1000, + _generate_ms, + postprocess_time * 1000, + _step_total_ms, ) # Convert to OmniRequestOutput format diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 9dd01136070..89af9cae09c 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -679,9 +679,13 @@ def forward( "TextEncoding=%.2f ms, LatentPreparation=%.2f ms, " "Denoising=%.2f ms (%d steps), Decoding=%.2f ms, " "StagesSum=%.2f ms, PipelineWall=%.2f ms, Unaccounted=%.2f ms", - _t_text_enc_ms, _t_latent_prep_ms, - _t_denoise_ms, len(timesteps), _t_decode_ms, - _t_stages_sum, _t_pipeline_wall_ms, + _t_text_enc_ms, + _t_latent_prep_ms, + _t_denoise_ms, + len(timesteps), + _t_decode_ms, + _t_stages_sum, + _t_pipeline_wall_ms, _t_pipeline_wall_ms - _t_stages_sum, ) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 98ebff63e15..8f9a7010c03 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -586,9 +586,14 @@ def forward( "LatentPreparation=%.2f ms, Denoising=%.2f ms (%d steps), " "Decoding=%.2f ms, StagesSum=%.2f ms, PipelineWall=%.2f ms, " "Unaccounted=%.2f ms", - _t_text_enc_ms, _t_img_enc_ms, - _t_latent_prep_ms, _t_denoise_ms, len(timesteps), - _t_decode_ms, _t_stages_sum, _t_pipeline_wall_ms, + _t_text_enc_ms, + _t_img_enc_ms, + _t_latent_prep_ms, + _t_denoise_ms, + len(timesteps), + _t_decode_ms, + _t_stages_sum, + _t_pipeline_wall_ms, _t_pipeline_wall_ms - _t_stages_sum, ) diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index 8118890bd7f..f2dfccacf63 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -374,12 +374,14 @@ def return_result(self, output: DiffusionOutput): """Reply to client, only on rank 0.""" if self.result_mq is not None: import time as _time + _t0 = _time.perf_counter() self.result_mq.enqueue(output) _t_ms = (_time.perf_counter() - _t0) * 1000 logger.info( "Hop1 worker→scheduler: result_mq.enqueue took %.2f ms (rank %s)", - _t_ms, self.gpu_id, + _t_ms, + self.gpu_id, ) def recv_message(self): diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index a86791fc8de..3b0e1f61fbb 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -38,8 +38,13 @@ logger = init_logger(__name__) -def _weak_close_cleanup_async(stage_list, stage_in_queues, stage_out_queues, ray_pg, output_handler, zmq_ctx=None): +def _weak_close_cleanup_async(stage_list, stage_in_queues, stage_out_queues, ray_pg, output_handler, zmq_ctx=None, inline_engine=None): """Weak reference cleanup function for AsyncOmni instances.""" + if inline_engine is not None: + try: + inline_engine.close() + except Exception as e: + logger.warning("Failed to close inline diffusion engine: %s", e) if stage_list: for q in stage_in_queues: try: @@ -121,6 +126,7 @@ def __init__(self, model: str, **kwargs: dict[str, Any]) -> None: self._ray_pg, self.output_handler, self._zmq_ctx, + getattr(self, "_inline_engine", None), ) def _create_default_diffusion_stage_cfg(self, kwargs: dict[str, Any]) -> dict[str, Any]: @@ -297,6 +303,13 @@ async def generate( async with self._pause_cond: await self._pause_cond.wait_for(lambda: not self._paused) + if self._inline_diffusion: + async for output in self._generate_inline( + prompt, request_id, sampling_params_list, output_modalities + ): + yield output + return + logger.debug(f"[{self._name}] generate() called") try: # Start output handler on the first call to generate() @@ -388,6 +401,89 @@ async def generate( logger.info("[AsyncOrchestrator] Request %s aborted.", request_id) raise + async def _generate_inline( + self, + prompt: OmniPromptType, + request_id: str, + sampling_params_list: Sequence[OmniSamplingParams] | None = None, + output_modalities: list[str] | None = None, + ) -> AsyncGenerator[OmniRequestOutput, None]: + """Generate using inline diffusion engine (no stage worker subprocess). + + Eliminates Hop3 IPC overhead by running OmniDiffusion directly in the + orchestrator process. The blocking generate() call is offloaded to a + thread executor so the asyncio event loop remains responsive. + """ + _wall_start_ts = time.time() + + if sampling_params_list is None: + sampling_params_list = self.default_sampling_params_list + sp0 = sampling_params_list[0] + + stage = self.stage_list[0] + final_stage_id_for_e2e = 0 + + metrics = OrchestratorAggregator( + num_stages=1, + log_stats=self.log_stats, + wall_start_ts=_wall_start_ts, + final_stage_id_for_e2e=final_stage_id_for_e2e, + ) + metrics.stage_first_ts[0] = time.time() + + logger.info( + "[%s] Inline diffusion generate for request %s", + self._name, request_id, + ) + + try: + loop = asyncio.get_running_loop() + results = await loop.run_in_executor( + None, + self._inline_engine.generate, + prompt, + sp0, + [request_id], + ) + + for result in results: + images = getattr(result, "images", None) or [] + finished = getattr(result, "finished", True) + + output_to_yield = OmniRequestOutput( + stage_id=0, + final_output_type=stage.final_output_type, + request_output=result, + images=images, + finished=finished, + ) + + metrics.stage_last_ts[0] = time.time() + yield output_to_yield + + try: + metrics.on_finalize_request( + final_stage_id_for_e2e, request_id, _wall_start_ts, + ) + metrics.build_and_log_summary() + except Exception as e: + logger.exception( + "[%s] Failed to finalize inline metrics: %s", + self._name, e, + ) + + except (asyncio.CancelledError, GeneratorExit): + logger.info( + "[%s] Inline request %s cancelled.", self._name, request_id, + ) + raise + except Exception as e: + logger.exception( + "[%s] Inline diffusion failed for request %s: %s", + self._name, request_id, e, + ) + raise + async def _process_async_results( self, request_id: str, @@ -540,7 +636,9 @@ def _process_single_result( elif _t_load_ms > 10: logger.info( "Hop3 orchestrator←stageWorker: deserialize took %.2f ms (req %s, stage %s)", - _t_load_ms, req_id, stage_id, + _t_load_ms, + req_id, + stage_id, ) if isinstance(engine_outputs, list): engine_outputs = engine_outputs[0] @@ -649,7 +747,8 @@ async def output_handler(): @property def is_running(self) -> bool: - # Is None before the loop is started. + if self._inline_diffusion: + return self._inline_engine is not None return len(self._stage_in_queues) > 0 @property diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index 9ecc5f76236..543c48ac844 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -40,6 +40,7 @@ from vllm_omni.entrypoints.stage_utils import SHUTDOWN_TASK, OmniStageTaskType from vllm_omni.entrypoints.stage_utils import maybe_load_from_ipc as _load from vllm_omni.entrypoints.utils import ( + filter_dataclass_kwargs, get_final_stage_id_for_e2e, inject_omni_kv_config, load_and_resolve_stage_configs, @@ -304,6 +305,9 @@ def _resolve_stage_configs(self, model: str, kwargs: dict[str, Any]) -> tuple[st def _initialize_stages(self, model: str, kwargs: dict[str, Any]) -> None: """Initialize stage list management.""" + self._inline_diffusion = False + self._inline_engine = None + stage_init_timeout = kwargs.get("stage_init_timeout", 20) shm_threshold_bytes = kwargs.get("shm_threshold_bytes", 65536) init_timeout = kwargs.get("init_timeout", 300) @@ -351,6 +355,17 @@ def _build_stage(idx_cfg: tuple[int, Any]) -> tuple[int, OmniStage]: self.output_modalities = [st.final_output_type for st in self.stage_list] logger.info(f"[{self._name}] Loaded {len(self.stage_list)} stages") + # Phase 1 optimization: for a single diffusion stage in async mode, + # run the engine directly in the orchestrator process to eliminate + # the stage worker subprocess and its IPC serialization overhead. + if ( + len(self.stage_list) == 1 + and self.stage_list[0].stage_type == "diffusion" + and self.is_async + ): + self._init_inline_diffusion_engine(model, self.stage_configs[0], kwargs) + return + if self.worker_backend == "ray": self._queue_cls = get_ray_queue_class() else: @@ -363,6 +378,71 @@ def _build_stage(idx_cfg: tuple[int, Any]) -> tuple[int, OmniStage]: # Wait for all stages to report readiness before seeding self._wait_for_stages_ready(timeout=init_timeout) + def _init_inline_diffusion_engine( + self, + model: str, + stage_config: Any, + kwargs: dict[str, Any], + ) -> None: + """Initialize diffusion engine directly in the orchestrator process. + + For single-stage diffusion pipelines, this eliminates the stage worker + subprocess and the associated Hop3 IPC serialization overhead. + GPU workers for tensor parallelism are still spawned by the + DiffusionExecutor as separate processes. + """ + from vllm_omni.diffusion.data import OmniDiffusionConfig + from vllm_omni.entrypoints.omni_diffusion import OmniDiffusion + from vllm_omni.entrypoints.stage_utils import ( + _to_dict, + load_func_from_config, + set_stage_devices, + ) + + stage_id = stage_config.stage_id + engine_args = _to_dict(stage_config.engine_args) + runtime_cfg = _to_dict(getattr(stage_config, "runtime", {})) + + if os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn": + os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" + + try: + from vllm_omni.platforms import current_omni_platform + + device_type = current_omni_platform.device_type + set_stage_devices(stage_id, runtime_cfg.get("devices"), device_type=device_type) + except Exception as e: + logger.warning("Device setup for inline diffusion failed: %s", e) + + engine_args = filter_dataclass_kwargs(OmniDiffusionConfig, engine_args) + engine_args.pop("model_stage", None) + engine_args.pop("model", None) + + cfg_kv_collect_func = load_func_from_config( + getattr(stage_config, "cfg_kv_collect_func", None) + ) + + self._inline_engine = OmniDiffusion( + model=model, + stage_id=stage_id, + engine_input_source=getattr(stage_config, "engine_input_source", []), + cfg_kv_collect_func=cfg_kv_collect_func, + **engine_args, + ) + self._inline_diffusion = True + + # These attributes are normally set by AsyncOmni._wait_for_stages_ready + # but we skip that for inline mode. Set them to None since there is no + # LLM stage to provide them. + self.input_processor = None + self.io_processor = None + self.model_config = None + + logger.info( + "[%s] Inline diffusion mode active – stage worker subprocess bypassed", + self._name, + ) + def _is_async_chunk_enable(self, stage_args: list) -> bool: """get async chunk flag""" engine_args = getattr(stage_args[0], "engine_args", None) From 104d71c4cf42a1c3ada2f7ab218a2f7463fa1413 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Fri, 6 Mar 2026 11:13:23 +0000 Subject: [PATCH 03/13] perf: reduce IPC overhead for single-stage diffusion serving MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two optimizations that eliminate ~6.5s of IPC serialization overhead for single-stage diffusion pipelines (e.g. Wan2.2 I2V/T2V) in online serving mode: Phase 1 – Inline diffusion (eliminate Hop3): When there is exactly one diffusion stage in async mode, initialize OmniDiffusion directly in the orchestrator process instead of spawning a stage worker subprocess. This removes the entire Hop3 serialization path (pickle + mp.Queue/SHM) between the stage worker and orchestrator. GPU workers for tensor parallelism are still spawned by DiffusionExecutor. Phase 2 – SHM tensor transfer (optimize Hop1): Replace pickle-based serialization of large tensors through MessageQueue with POSIX shared memory. The worker copies tensor data into a named SHM segment and enqueues only lightweight metadata; the scheduler reconstructs the tensor from SHM. This reduces Hop1 overhead from ~3.4s to ~1.5s. Measured on Wan2.2-I2V-A14B (TP=2, 1280x720, 5s@16fps, 1 step): Before: e2e = 37.5s Phase 1: e2e = 33.1s (−4.4s) Phase 2: e2e = 31.0s (−2.1s) Total: e2e = 31.0s (−6.5s, −17.5%) Made-with: Cursor Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/data.py | 72 +++++++++++++++++++ vllm_omni/diffusion/scheduler.py | 14 +++- .../diffusion/worker/diffusion_worker.py | 17 ++++- vllm_omni/entrypoints/async_omni.py | 26 ++++--- vllm_omni/entrypoints/omni.py | 10 +-- 5 files changed, 116 insertions(+), 23 deletions(-) diff --git a/vllm_omni/diffusion/data.py b/vllm_omni/diffusion/data.py index da68adbdb17..30f761cfb62 100644 --- a/vllm_omni/diffusion/data.py +++ b/vllm_omni/diffusion/data.py @@ -622,6 +622,78 @@ class DiffusionOutput: # timings: Optional["RequestTimings"] = None +_SHM_TENSOR_THRESHOLD = 1_000_000 # 1 MB + + +def _tensor_to_shm(tensor: torch.Tensor) -> dict[str, Any]: + """Copy a tensor into POSIX shared memory and return a metadata handle. + + The shared memory segment remains alive after this call (the local fd is + closed, but the segment persists until ``_tensor_from_shm`` unlinks it). + """ + from multiprocessing import shared_memory + + import numpy as np + + tensor = tensor.detach().cpu().contiguous() + arr = tensor.numpy() + nbytes = arr.nbytes + shm = shared_memory.SharedMemory(create=True, size=nbytes) + shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf[:nbytes]) + np.copyto(shm_arr, arr) + handle = { + "__tensor_shm__": True, + "name": shm.name, + "shape": list(tensor.shape), + "torch_dtype": str(tensor.dtype), + "numpy_dtype": str(arr.dtype), + "nbytes": nbytes, + } + shm.close() + return handle + + +def _tensor_from_shm(handle: dict[str, Any]) -> torch.Tensor: + """Reconstruct a tensor from a shared-memory handle and free the segment.""" + from multiprocessing import shared_memory + + import numpy as np + + shm = shared_memory.SharedMemory(name=handle["name"]) + try: + np_dtype = np.dtype(handle["numpy_dtype"]) + arr = np.ndarray(handle["shape"], dtype=np_dtype, buffer=shm.buf[: handle["nbytes"]]) + tensor = torch.from_numpy(arr.copy()) + finally: + shm.close() + shm.unlink() + return tensor + + +def pack_diffusion_output_shm(output: "DiffusionOutput") -> "DiffusionOutput": + """Replace large tensors in *output* with shared-memory handles. + + The DiffusionOutput is modified **in-place** so that the (now lightweight) + object can be serialised cheaply through a MessageQueue. + """ + if output.output is not None and isinstance(output.output, torch.Tensor): + if output.output.nelement() * output.output.element_size() > _SHM_TENSOR_THRESHOLD: + output.output = _tensor_to_shm(output.output) + if output.trajectory_latents is not None and isinstance(output.trajectory_latents, torch.Tensor): + if output.trajectory_latents.nelement() * output.trajectory_latents.element_size() > _SHM_TENSOR_THRESHOLD: + output.trajectory_latents = _tensor_to_shm(output.trajectory_latents) + return output + + +def unpack_diffusion_output_shm(output: "DiffusionOutput") -> "DiffusionOutput": + """Reconstruct tensors from shared-memory handles produced by ``pack_diffusion_output_shm``.""" + if isinstance(output.output, dict) and output.output.get("__tensor_shm__"): + output.output = _tensor_from_shm(output.output) + if isinstance(output.trajectory_latents, dict) and output.trajectory_latents.get("__tensor_shm__"): + output.trajectory_latents = _tensor_from_shm(output.trajectory_latents) + return output + + class AttentionBackendEnum(enum.Enum): FA = enum.auto() SLIDING_TILE_ATTN = enum.auto() diff --git a/vllm_omni/diffusion/scheduler.py b/vllm_omni/diffusion/scheduler.py index 97693782e39..c4891d06e00 100644 --- a/vllm_omni/diffusion/scheduler.py +++ b/vllm_omni/diffusion/scheduler.py @@ -8,7 +8,7 @@ from vllm.distributed.device_communicators.shm_broadcast import MessageQueue from vllm.logger import init_logger -from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig +from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig, unpack_diffusion_output_shm from vllm_omni.diffusion.request import OmniDiffusionRequest logger = init_logger(__name__) @@ -71,10 +71,18 @@ def add_req(self, request: OmniDiffusionRequest) -> DiffusionOutput: _t_dequeue = _time.perf_counter() output = self.result_mq.dequeue() _t_dequeue_ms = (_time.perf_counter() - _t_dequeue) * 1000 + + _t_unpack = _time.perf_counter() + try: + unpack_diffusion_output_shm(output) + except Exception as e: + logger.warning("SHM unpack failed (data may already be inline): %s", e) + _t_unpack_ms = (_time.perf_counter() - _t_unpack) * 1000 + logger.info( - "Hop1 scheduler←worker: result_mq.dequeue took %.2f ms " - "(includes waiting for generation + enqueue serialization)", + "Hop1 scheduler←worker: mq.dequeue=%.2f ms, shm_unpack=%.2f ms (dequeue includes generation wait)", _t_dequeue_ms, + _t_unpack_ms, ) # {"status": "error", "error": str(e)} diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index f2dfccacf63..1ae974cf5bd 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -27,6 +27,7 @@ from vllm_omni.diffusion.data import ( DiffusionOutput, OmniDiffusionConfig, + pack_diffusion_output_shm, ) from vllm_omni.diffusion.distributed.parallel_state import ( destroy_distributed_env, @@ -376,11 +377,21 @@ def return_result(self, output: DiffusionOutput): import time as _time _t0 = _time.perf_counter() + try: + pack_diffusion_output_shm(output) + _t_pack = (_time.perf_counter() - _t0) * 1000 + except Exception as e: + logger.warning("SHM pack failed, falling back to raw enqueue: %s", e) + _t_pack = 0.0 + _t1 = _time.perf_counter() self.result_mq.enqueue(output) - _t_ms = (_time.perf_counter() - _t0) * 1000 + _t_enqueue = (_time.perf_counter() - _t1) * 1000 + _t_total = (_time.perf_counter() - _t0) * 1000 logger.info( - "Hop1 worker→scheduler: result_mq.enqueue took %.2f ms (rank %s)", - _t_ms, + "Hop1 worker→scheduler: shm_pack=%.2f ms, mq.enqueue=%.2f ms, total=%.2f ms (rank %s)", + _t_pack, + _t_enqueue, + _t_total, self.gpu_id, ) diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 3b0e1f61fbb..b3175d48946 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -38,7 +38,9 @@ logger = init_logger(__name__) -def _weak_close_cleanup_async(stage_list, stage_in_queues, stage_out_queues, ray_pg, output_handler, zmq_ctx=None, inline_engine=None): +def _weak_close_cleanup_async( + stage_list, stage_in_queues, stage_out_queues, ray_pg, output_handler, zmq_ctx=None, inline_engine=None +): """Weak reference cleanup function for AsyncOmni instances.""" if inline_engine is not None: try: @@ -304,9 +306,7 @@ async def generate( await self._pause_cond.wait_for(lambda: not self._paused) if self._inline_diffusion: - async for output in self._generate_inline( - prompt, request_id, sampling_params_list, output_modalities - ): + async for output in self._generate_inline(prompt, request_id, sampling_params_list, output_modalities): yield output return @@ -433,7 +433,8 @@ async def _generate_inline( logger.info( "[%s] Inline diffusion generate for request %s", - self._name, request_id, + self._name, + request_id, ) try: @@ -463,24 +464,31 @@ async def _generate_inline( try: metrics.on_finalize_request( - final_stage_id_for_e2e, request_id, _wall_start_ts, + final_stage_id_for_e2e, + request_id, + _wall_start_ts, ) metrics.build_and_log_summary() except Exception as e: logger.exception( "[%s] Failed to finalize inline metrics: %s", - self._name, e, + self._name, + e, ) except (asyncio.CancelledError, GeneratorExit): logger.info( - "[%s] Inline request %s cancelled.", self._name, request_id, + "[%s] Inline request %s cancelled.", + self._name, + request_id, ) raise except Exception as e: logger.exception( "[%s] Inline diffusion failed for request %s: %s", - self._name, request_id, e, + self._name, + request_id, + e, ) raise diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index 543c48ac844..db91cece323 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -358,11 +358,7 @@ def _build_stage(idx_cfg: tuple[int, Any]) -> tuple[int, OmniStage]: # Phase 1 optimization: for a single diffusion stage in async mode, # run the engine directly in the orchestrator process to eliminate # the stage worker subprocess and its IPC serialization overhead. - if ( - len(self.stage_list) == 1 - and self.stage_list[0].stage_type == "diffusion" - and self.is_async - ): + if len(self.stage_list) == 1 and self.stage_list[0].stage_type == "diffusion" and self.is_async: self._init_inline_diffusion_engine(model, self.stage_configs[0], kwargs) return @@ -418,9 +414,7 @@ def _init_inline_diffusion_engine( engine_args.pop("model_stage", None) engine_args.pop("model", None) - cfg_kv_collect_func = load_func_from_config( - getattr(stage_config, "cfg_kv_collect_func", None) - ) + cfg_kv_collect_func = load_func_from_config(getattr(stage_config, "cfg_kv_collect_func", None)) self._inline_engine = OmniDiffusion( model=model, From dd4468cbb298194682885b800fdf2902b19d02fa Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Fri, 6 Mar 2026 16:33:12 +0000 Subject: [PATCH 04/13] fix conflicts Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index 906d0a37667..b0bd92b56fb 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -106,14 +106,14 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: postprocess_time = time.perf_counter() - postprocess_start_time logger.info(f"Post-processing completed in {postprocess_time:.4f} seconds") - _step_total_ms = (time.perf_counter() - _step_t0) * 1000 + step_total_ms = (time.perf_counter() - diffusion_engine_start_time) * 1000 logger.info( "DiffusionEngine.step breakdown: preprocess=%.2f ms, " "add_req_and_wait=%.2f ms, postprocess=%.2f ms, total=%.2f ms", preprocess_time * 1000, - _generate_ms, + exec_total_time * 1000, postprocess_time * 1000, - _step_total_ms, + step_total_ms, ) # Convert to OmniRequestOutput format From ff62a1eba3be5fb9598014e50d5ee0b3350ce392 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 03:25:32 +0000 Subject: [PATCH 05/13] rm redundancy Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 8 ++++---- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py | 5 ----- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index b0bd92b56fb..ded29ca6418 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -67,7 +67,7 @@ def __init__(self, od_config: OmniDiffusionConfig): raise e def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: - diffusion_engine_start_time = time.time() + diffusion_engine_start_time = time.perf_counter() # Apply pre-processing if available preprocess_time = 0.0 if self.pre_process_func is not None: @@ -76,9 +76,9 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: preprocess_time = time.perf_counter() - preprocess_start_time logger.info(f"Pre-processing completed in {preprocess_time:.4f} seconds") - exec_start_time = time.time() + exec_start_time = time.perf_counter() output = self.add_req_and_wait_for_response(request) - exec_total_time = time.time() - exec_start_time + exec_total_time = time.perf_counter() - exec_start_time if output.error: raise Exception(f"{output.error}") @@ -123,7 +123,7 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: metrics = { "preprocess_time_ms": preprocess_time * 1000, - "diffusion_engine_exec_time_ms": (time.time() - diffusion_engine_start_time) * 1000, + "diffusion_engine_exec_time_ms": (time.perf_counter() - diffusion_engine_start_time) * 1000, "diffusion_engine_total_time_ms": exec_total_time * 1000, "image_num": int(request.sampling_params.num_outputs_per_prompt), "resolution": int(request.sampling_params.resolution), diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 8f9a7010c03..e7b526adb69 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -337,9 +337,6 @@ def forward( num_frames = req.sampling_params.num_frames or frame_num num_steps = req.sampling_params.num_inference_steps or num_inference_steps - # print("D--: num_frames", num_frames) - # print("D--: height x width =", height, "x", width) - # Respect per-request guidance_scale when explicitly provided. if req.sampling_params.guidance_scale_provided: guidance_scale = req.sampling_params.guidance_scale @@ -540,8 +537,6 @@ def forward( cfg_normalize=False, ) - # print("D--: latent shape", latents.shape) - # Compute the previous noisy sample x_t -> x_t-1 with automatic CFG sync latents = self.scheduler_step_maybe_with_cfg(noise_pred, t, latents, do_true_cfg) From 5414a428a677d9ce75e5d6a590a5b7a512b8b3a2 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 04:20:55 +0000 Subject: [PATCH 06/13] rm logs Signed-off-by: samithuang <285365963@qq.com> --- .../models/wan2_2/pipeline_wan2_2.py | 68 ++++++++------ .../models/wan2_2/pipeline_wan2_2_i2v.py | 92 +++++++++++-------- vllm_omni/diffusion/scheduler.py | 14 --- .../diffusion/worker/diffusion_worker.py | 15 --- vllm_omni/entrypoints/async_omni.py | 20 ---- vllm_omni/entrypoints/omni_stage.py | 13 --- 6 files changed, 93 insertions(+), 129 deletions(-) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 89af9cae09c..9bbedc32859 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -30,6 +30,7 @@ from vllm_omni.platforms import current_omni_platform logger = logging.getLogger(__name__) +DEBUG_PERF = False def retrieve_latents( @@ -425,10 +426,11 @@ def forward( if generator is None and req.sampling_params.seed is not None: generator = torch.Generator(device=device).manual_seed(req.sampling_params.seed) - # Sync GPU before timing to ensure accurate measurements - current_omni_platform.synchronize() - _t_pipeline_start = time.perf_counter() - _t_text_enc_start = _t_pipeline_start + if DEBUG_PERF: + # Sync GPU before timing to ensure accurate measurements + current_omni_platform.synchronize() + _t_pipeline_start = time.perf_counter() + _t_text_enc_start = _t_pipeline_start if prompt_embeds is None: prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt=prompt, @@ -447,8 +449,9 @@ def forward( raise ValueError( "negative_prompt_embeds must be provided when prompt_embeds are given and guidance > 1." ) - current_omni_platform.synchronize() - _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 # Timesteps self.scheduler.set_timesteps(num_steps, device=device) @@ -458,7 +461,8 @@ def forward( if boundary_ratio is not None: boundary_timestep = boundary_ratio * self.scheduler.config.num_train_timesteps - _t_latent_prep_start = time.perf_counter() + if DEBUG_PERF: + _t_latent_prep_start = time.perf_counter() multi_modal_data = req.prompts[0].get("multi_modal_data", {}) if not isinstance(req.prompts[0], str) else None raw_image = multi_modal_data.get("image", None) if multi_modal_data is not None else None if isinstance(raw_image, list): @@ -549,13 +553,15 @@ def forward( generator=generator, latents=req.sampling_params.latents, ) - current_omni_platform.synchronize() - _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 if attention_kwargs is None: attention_kwargs = {} - _t_denoise_start = time.perf_counter() + if DEBUG_PERF: + _t_denoise_start = time.perf_counter() with self.progress_bar(total=len(timesteps)) as pbar: for t in timesteps: self._current_timestep = t @@ -668,26 +674,28 @@ def forward( ) latents = latents / latents_std + latents_mean output = self.vae.decode(latents, return_dict=False)[0] - current_omni_platform.synchronize() - _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 - _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 - _t_stages_sum = _t_text_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms - - if _is_rank_zero(): - logger.info( - "Pipeline stage timing summary: " - "TextEncoding=%.2f ms, LatentPreparation=%.2f ms, " - "Denoising=%.2f ms (%d steps), Decoding=%.2f ms, " - "StagesSum=%.2f ms, PipelineWall=%.2f ms, Unaccounted=%.2f ms", - _t_text_enc_ms, - _t_latent_prep_ms, - _t_denoise_ms, - len(timesteps), - _t_decode_ms, - _t_stages_sum, - _t_pipeline_wall_ms, - _t_pipeline_wall_ms - _t_stages_sum, - ) + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 + _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 + _t_stages_sum = _t_text_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms + + if _is_rank_zero(): + logger.info( + "Pipeline stage timing summary: " + "TextEncoding=%.2f ms, LatentPreparation=%.2f ms, " + "Denoising=%.2f ms (%d steps), Decoding=%.2f ms, " + "StagesSum=%.2f ms, PipelineWall=%.2f ms, Unaccounted=%.2f ms", + _t_text_enc_ms, + _t_latent_prep_ms, + _t_denoise_ms, + len(timesteps), + _t_decode_ms, + _t_stages_sum, + _t_pipeline_wall_ms, + _t_pipeline_wall_ms - _t_stages_sum, + ) return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index e7b526adb69..33292987ff5 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -36,6 +36,8 @@ logger = logging.getLogger(__name__) +DEBUG_PERF = False + def _load_model_index(model: str, local_files_only: bool) -> dict: """Load model_index.json from local path or HF Hub.""" @@ -389,10 +391,12 @@ def forward( if generator is None and req.sampling_params.seed is not None: generator = torch.Generator(device=device).manual_seed(req.sampling_params.seed) - # Sync GPU before timing to ensure accurate measurements - current_omni_platform.synchronize() - _t_pipeline_start = time.perf_counter() - _t_text_enc_start = _t_pipeline_start + if DEBUG_PERF: + # Sync GPU before timing to ensure accurate measurements + current_omni_platform.synchronize() + _t_pipeline_start = time.perf_counter() + _t_text_enc_start = _t_pipeline_start + if prompt_embeds is None: prompt_embeds, negative_prompt_embeds = self.encode_prompt( prompt=prompt, @@ -407,8 +411,10 @@ def forward( prompt_embeds = prompt_embeds.to(device=device, dtype=dtype) if negative_prompt_embeds is not None: negative_prompt_embeds = negative_prompt_embeds.to(device=device, dtype=dtype) - current_omni_platform.synchronize() - _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_text_enc_ms = (time.perf_counter() - _t_text_enc_start) * 1000 batch_size = prompt_embeds.shape[0] @@ -423,8 +429,10 @@ def forward( image_embeds = image_embeds.to(dtype) else: image_embeds = None - current_omni_platform.synchronize() - _t_img_enc_ms = (time.perf_counter() - _t_img_enc_start) * 1000 + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_img_enc_ms = (time.perf_counter() - _t_img_enc_start) * 1000 # Timesteps self.scheduler.set_timesteps(num_steps, device=device) @@ -438,7 +446,8 @@ def forward( # Prepare latents (use out_channels=16 for VAE latent, not in_channels=36) num_channels_latents = self.transformer.config.out_channels - _t_latent_prep_start = time.perf_counter() + if DEBUG_PERF: + _t_latent_prep_start = time.perf_counter() from diffusers.video_processor import VideoProcessor video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial) @@ -472,13 +481,16 @@ def forward( latents=req.sampling_params.latents, last_image=last_image_tensor, ) - current_omni_platform.synchronize() - _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_latent_prep_ms = (time.perf_counter() - _t_latent_prep_start) * 1000 if attention_kwargs is None: attention_kwargs = {} - _t_denoise_start = time.perf_counter() + if DEBUG_PERF: + _t_denoise_start = time.perf_counter() with self.progress_bar(total=len(timesteps)) as pbar: for t in timesteps: self._current_timestep = t @@ -547,14 +559,18 @@ def forward( if current_omni_platform.is_available(): current_omni_platform.empty_cache() self._current_timestep = None - current_omni_platform.synchronize() - _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 # For expand_timesteps mode, blend final latents with condition if self.expand_timesteps: latents = (1 - first_frame_mask) * condition + first_frame_mask * latents - _t_decode_start = time.perf_counter() + if DEBUG_PERF: + _t_decode_start = time.perf_counter() + if output_type == "latent": output = latents else: @@ -569,28 +585,30 @@ def forward( ) latents = latents / latents_std + latents_mean output = self.vae.decode(latents, return_dict=False)[0] - current_omni_platform.synchronize() - _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 - _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 - _t_stages_sum = _t_text_enc_ms + _t_img_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms - - if _is_rank_zero(): - logger.info( - "Pipeline stage timing summary: " - "TextEncoding=%.2f ms, ImageEncoding=%.2f ms, " - "LatentPreparation=%.2f ms, Denoising=%.2f ms (%d steps), " - "Decoding=%.2f ms, StagesSum=%.2f ms, PipelineWall=%.2f ms, " - "Unaccounted=%.2f ms", - _t_text_enc_ms, - _t_img_enc_ms, - _t_latent_prep_ms, - _t_denoise_ms, - len(timesteps), - _t_decode_ms, - _t_stages_sum, - _t_pipeline_wall_ms, - _t_pipeline_wall_ms - _t_stages_sum, - ) + + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_decode_ms = (time.perf_counter() - _t_decode_start) * 1000 + _t_pipeline_wall_ms = (time.perf_counter() - _t_pipeline_start) * 1000 + _t_stages_sum = _t_text_enc_ms + _t_img_enc_ms + _t_latent_prep_ms + _t_denoise_ms + _t_decode_ms + + if _is_rank_zero(): + logger.info( + "Pipeline stage timing summary: " + "TextEncoding=%.2f ms, ImageEncoding=%.2f ms, " + "LatentPreparation=%.2f ms, Denoising=%.2f ms (%d steps), " + "Decoding=%.2f ms, StagesSum=%.2f ms, PipelineWall=%.2f ms, " + "Unaccounted=%.2f ms", + _t_text_enc_ms, + _t_img_enc_ms, + _t_latent_prep_ms, + _t_denoise_ms, + len(timesteps), + _t_decode_ms, + _t_stages_sum, + _t_pipeline_wall_ms, + _t_pipeline_wall_ms - _t_stages_sum, + ) return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/scheduler.py b/vllm_omni/diffusion/scheduler.py index c4891d06e00..4c9ba3de6a5 100644 --- a/vllm_omni/diffusion/scheduler.py +++ b/vllm_omni/diffusion/scheduler.py @@ -2,7 +2,6 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import threading -import time as _time import zmq from vllm.distributed.device_communicators.shm_broadcast import MessageQueue @@ -59,31 +58,18 @@ def add_req(self, request: OmniDiffusionRequest) -> DiffusionOutput: } # Broadcast RPC request to all workers - _t_broadcast = _time.perf_counter() self.mq.enqueue(rpc_request) - _t_broadcast_ms = (_time.perf_counter() - _t_broadcast) * 1000 - logger.info("Hop1 scheduler→workers: mq.enqueue (broadcast request) took %.2f ms", _t_broadcast_ms) # Wait for result from Rank 0 (or whoever sends it) if self.result_mq is None: raise RuntimeError("Result queue not initialized") - _t_dequeue = _time.perf_counter() output = self.result_mq.dequeue() - _t_dequeue_ms = (_time.perf_counter() - _t_dequeue) * 1000 - _t_unpack = _time.perf_counter() try: unpack_diffusion_output_shm(output) except Exception as e: logger.warning("SHM unpack failed (data may already be inline): %s", e) - _t_unpack_ms = (_time.perf_counter() - _t_unpack) * 1000 - - logger.info( - "Hop1 scheduler←worker: mq.dequeue=%.2f ms, shm_unpack=%.2f ms (dequeue includes generation wait)", - _t_dequeue_ms, - _t_unpack_ms, - ) # {"status": "error", "error": str(e)} if isinstance(output, dict) and output.get("status") == "error": diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index a69acca2424..651c0fe33c0 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -376,26 +376,11 @@ def _create_worker( def return_result(self, output: DiffusionOutput): """Reply to client, only on rank 0.""" if self.result_mq is not None: - import time as _time - - _t0 = _time.perf_counter() try: pack_diffusion_output_shm(output) - _t_pack = (_time.perf_counter() - _t0) * 1000 except Exception as e: logger.warning("SHM pack failed, falling back to raw enqueue: %s", e) - _t_pack = 0.0 - _t1 = _time.perf_counter() self.result_mq.enqueue(output) - _t_enqueue = (_time.perf_counter() - _t1) * 1000 - _t_total = (_time.perf_counter() - _t0) * 1000 - logger.info( - "Hop1 worker→scheduler: shm_pack=%.2f ms, mq.enqueue=%.2f ms, total=%.2f ms (rank %s)", - _t_pack, - _t_enqueue, - _t_total, - self.gpu_id, - ) def recv_message(self): """Receive messages from broadcast queue.""" diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 0a4ddd4ff71..a6757e71723 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -659,28 +659,8 @@ def _process_single_result( ) raise RuntimeError(result) - _t_load = time.perf_counter() engine_outputs = _load(result, obj_key="engine_outputs", shm_key="engine_outputs_shm") - _t_load_ms = (time.perf_counter() - _t_load) * 1000 - _hop3 = result.get("_hop3_timing") - if _hop3: - logger.info( - "Hop3 stageWorker→orchestrator: " - "generate=%.2f ms, serialize(shm=%s)=%.2f ms, " - "orchestrator_deserialize=%.2f ms", - _hop3.get("generate_ms", 0), - _hop3.get("use_shm", "?"), - _hop3.get("serialize_ms", 0), - _t_load_ms, - ) - elif _t_load_ms > 10: - logger.info( - "Hop3 orchestrator←stageWorker: deserialize took %.2f ms (req %s, stage %s)", - _t_load_ms, - req_id, - stage_id, - ) if isinstance(engine_outputs, list): engine_outputs = engine_outputs[0] diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index b50a31127be..fb7cefb4ad6 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -1098,7 +1098,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: try: _batch_seq += 1 gen_outputs: list[OmniRequestOutput | RequestOutput] = [] - _pre_gen_ms = (_time.time() - _recv_dequeue_ts) * 1000.0 _gen_t0 = _time.time() if stage_type == "diffusion": stage_engine = cast(OmniDiffusion, stage_engine) @@ -1145,7 +1144,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: _agg_total_gen_time_ms += _gen_ms # Emit per-request results - _post_gen_t0 = _time.time() for i, rid in enumerate(batch_request_ids): r_outputs = req_to_outputs.get(rid, []) _metrics = make_request_stats( @@ -1162,20 +1160,11 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: _metrics.stage_stats = make_stage_stats(_agg_total_tokens, _agg_total_gen_time_ms) else: _metrics.stage_stats = None - _t_serialize = _time.time() try: use_shm, payload = maybe_dump_to_shm(r_outputs, shm_threshold_bytes) except Exception: use_shm, payload = False, r_outputs - _t_serialize_ms = (_time.time() - _t_serialize) * 1000.0 - _hop3_timing = { - "generate_ms": _gen_ms, - "serialize_ms": _t_serialize_ms, - "use_shm": use_shm, - } - - _t_enqueue = _time.time() try: if use_shm: out_q.put( @@ -1184,7 +1173,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "stage_id": stage_id, "engine_outputs_shm": payload, "metrics": _metrics, - "_hop3_timing": _hop3_timing, } ) else: @@ -1194,7 +1182,6 @@ def handle_profiler_task_local(task_type: OmniStageTaskType) -> dict: "stage_id": stage_id, "engine_outputs": payload, "metrics": _metrics, - "_hop3_timing": _hop3_timing, } ) except Exception: From e3dec547e16616e6460d6d531bca782777514013 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 04:27:28 +0000 Subject: [PATCH 07/13] fix inline Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py | 1 - vllm_omni/entrypoints/async_omni.py | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 33292987ff5..4bdd83f9d1a 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -35,7 +35,6 @@ from vllm_omni.platforms import current_omni_platform logger = logging.getLogger(__name__) - DEBUG_PERF = False diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index a6757e71723..95e626e74e7 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -807,6 +807,10 @@ def dead_error(self) -> BaseException: return EngineDeadError() async def abort(self, request_id: str | Iterable[str]) -> None: + if self._inline_diffusion: + if self._inline_engine is not None: + self._inline_engine.engine.abort(request_id) + return None abort_task = {"type": OmniStageTaskType.ABORT, "request_id": request_id} for stage in self.stage_list: stage.submit(abort_task) From 2cd9f9f768013794856cc0fda3a97958528543c6 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 06:30:30 +0000 Subject: [PATCH 08/13] fix ci Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/diffusion_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index 77cdd0afb1f..abd668466de 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -70,6 +70,7 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: diffusion_engine_start_time = time.perf_counter() # Apply pre-processing if available + preprocess_time = 0.0 if self.pre_process_func is not None: preprocess_start_time = time.perf_counter() request = self.pre_process_func(request) From 172040a8e9cf98dcfec44bbb5a0b7380e90d14d5 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 07:39:43 +0000 Subject: [PATCH 09/13] fix ci Signed-off-by: samithuang <285365963@qq.com> --- tests/entrypoints/test_async_omni_diffusion_config.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/entrypoints/test_async_omni_diffusion_config.py b/tests/entrypoints/test_async_omni_diffusion_config.py index ce443d5a5c7..ed205032ee2 100644 --- a/tests/entrypoints/test_async_omni_diffusion_config.py +++ b/tests/entrypoints/test_async_omni_diffusion_config.py @@ -11,12 +11,18 @@ MODEL = "riverclouds/qwen_image_random" +def _noop_inline_engine(self, model, stage_config, kwargs): + self._inline_diffusion = False + self._inline_engine = None + + def test_default_stage_config_includes_cache_backend(monkeypatch): """Ensure cache_backend/cache_config are preserved in default diffusion stage.""" monkeypatch.setattr(utils_module, "load_stage_configs_from_model", lambda model, base_engine_args=None: []) monkeypatch.setattr(utils_module, "resolve_model_config_path", lambda model: None) monkeypatch.setattr(AsyncOmni, "_start_stages", lambda self, model: None) monkeypatch.setattr(AsyncOmni, "_wait_for_stages_ready", lambda self, timeout=0: None) + monkeypatch.setattr(AsyncOmni, "_init_inline_diffusion_engine", _noop_inline_engine) omni = AsyncOmni( model=MODEL, @@ -47,6 +53,7 @@ def test_default_cache_config_used_when_missing(monkeypatch): monkeypatch.setattr(utils_module, "resolve_model_config_path", lambda model: None) monkeypatch.setattr(AsyncOmni, "_start_stages", lambda self, model: None) monkeypatch.setattr(AsyncOmni, "_wait_for_stages_ready", lambda self, timeout=0: None) + monkeypatch.setattr(AsyncOmni, "_init_inline_diffusion_engine", _noop_inline_engine) omni = AsyncOmni( model=MODEL, @@ -65,6 +72,7 @@ def test_default_stage_devices_from_sequence_parallel(monkeypatch): monkeypatch.setattr(utils_module, "resolve_model_config_path", lambda model: None) monkeypatch.setattr(AsyncOmni, "_start_stages", lambda self, model: None) monkeypatch.setattr(AsyncOmni, "_wait_for_stages_ready", lambda self, timeout=0: None) + monkeypatch.setattr(AsyncOmni, "_init_inline_diffusion_engine", _noop_inline_engine) omni = AsyncOmni( model=MODEL, From 0a86fc5f76e034aed50b9f17add0b24eac71123f Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 08:14:11 +0000 Subject: [PATCH 10/13] fix log Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 9bbedc32859..30a3c3cf8d6 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -652,8 +652,9 @@ def forward( if current_omni_platform.is_available(): current_omni_platform.empty_cache() self._current_timestep = None - current_omni_platform.synchronize() - _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 + if DEBUG_PERF: + current_omni_platform.synchronize() + _t_denoise_ms = (time.perf_counter() - _t_denoise_start) * 1000 # For I2V mode: blend final latents with condition if self.expand_timesteps and latent_condition is not None: From 9b9c5974baf1a296b4ef939e55a51ae465cb1f03 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 08:32:26 +0000 Subject: [PATCH 11/13] fix Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/data.py | 72 --------------- vllm_omni/diffusion/ipc.py | 89 +++++++++++++++++++ vllm_omni/diffusion/scheduler.py | 3 +- .../diffusion/worker/diffusion_worker.py | 2 +- 4 files changed, 92 insertions(+), 74 deletions(-) create mode 100644 vllm_omni/diffusion/ipc.py diff --git a/vllm_omni/diffusion/data.py b/vllm_omni/diffusion/data.py index 8f55e529cd6..0a68c2e707a 100644 --- a/vllm_omni/diffusion/data.py +++ b/vllm_omni/diffusion/data.py @@ -626,78 +626,6 @@ class DiffusionOutput: # timings: Optional["RequestTimings"] = None -_SHM_TENSOR_THRESHOLD = 1_000_000 # 1 MB - - -def _tensor_to_shm(tensor: torch.Tensor) -> dict[str, Any]: - """Copy a tensor into POSIX shared memory and return a metadata handle. - - The shared memory segment remains alive after this call (the local fd is - closed, but the segment persists until ``_tensor_from_shm`` unlinks it). - """ - from multiprocessing import shared_memory - - import numpy as np - - tensor = tensor.detach().cpu().contiguous() - arr = tensor.numpy() - nbytes = arr.nbytes - shm = shared_memory.SharedMemory(create=True, size=nbytes) - shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf[:nbytes]) - np.copyto(shm_arr, arr) - handle = { - "__tensor_shm__": True, - "name": shm.name, - "shape": list(tensor.shape), - "torch_dtype": str(tensor.dtype), - "numpy_dtype": str(arr.dtype), - "nbytes": nbytes, - } - shm.close() - return handle - - -def _tensor_from_shm(handle: dict[str, Any]) -> torch.Tensor: - """Reconstruct a tensor from a shared-memory handle and free the segment.""" - from multiprocessing import shared_memory - - import numpy as np - - shm = shared_memory.SharedMemory(name=handle["name"]) - try: - np_dtype = np.dtype(handle["numpy_dtype"]) - arr = np.ndarray(handle["shape"], dtype=np_dtype, buffer=shm.buf[: handle["nbytes"]]) - tensor = torch.from_numpy(arr.copy()) - finally: - shm.close() - shm.unlink() - return tensor - - -def pack_diffusion_output_shm(output: "DiffusionOutput") -> "DiffusionOutput": - """Replace large tensors in *output* with shared-memory handles. - - The DiffusionOutput is modified **in-place** so that the (now lightweight) - object can be serialised cheaply through a MessageQueue. - """ - if output.output is not None and isinstance(output.output, torch.Tensor): - if output.output.nelement() * output.output.element_size() > _SHM_TENSOR_THRESHOLD: - output.output = _tensor_to_shm(output.output) - if output.trajectory_latents is not None and isinstance(output.trajectory_latents, torch.Tensor): - if output.trajectory_latents.nelement() * output.trajectory_latents.element_size() > _SHM_TENSOR_THRESHOLD: - output.trajectory_latents = _tensor_to_shm(output.trajectory_latents) - return output - - -def unpack_diffusion_output_shm(output: "DiffusionOutput") -> "DiffusionOutput": - """Reconstruct tensors from shared-memory handles produced by ``pack_diffusion_output_shm``.""" - if isinstance(output.output, dict) and output.output.get("__tensor_shm__"): - output.output = _tensor_from_shm(output.output) - if isinstance(output.trajectory_latents, dict) and output.trajectory_latents.get("__tensor_shm__"): - output.trajectory_latents = _tensor_from_shm(output.trajectory_latents) - return output - - class AttentionBackendEnum(enum.Enum): FA = enum.auto() SLIDING_TILE_ATTN = enum.auto() diff --git a/vllm_omni/diffusion/ipc.py b/vllm_omni/diffusion/ipc.py new file mode 100644 index 00000000000..cac406ef4c0 --- /dev/null +++ b/vllm_omni/diffusion/ipc.py @@ -0,0 +1,89 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""IPC utilities for transferring large tensors via POSIX shared memory. + +Used by Hop1 (GPU worker <-> scheduler) to avoid pickling large video tensors +through the MessageQueue. Tensors above ``_SHM_TENSOR_THRESHOLD`` are copied +into a named shared-memory segment; only a lightweight metadata dict is +serialised through the queue. +""" + +from __future__ import annotations + +from typing import Any + +import torch + +from vllm_omni.diffusion.data import DiffusionOutput + +_SHM_TENSOR_THRESHOLD = 1_000_000 # 1 MB + + +def _tensor_to_shm(tensor: torch.Tensor) -> dict[str, Any]: + """Copy a tensor into POSIX shared memory and return a metadata handle. + + The shared memory segment remains alive after this call (the local fd is + closed, but the segment persists until ``_tensor_from_shm`` unlinks it). + """ + from multiprocessing import shared_memory + + import numpy as np + + tensor = tensor.detach().cpu().contiguous() + arr = tensor.numpy() + nbytes = arr.nbytes + shm = shared_memory.SharedMemory(create=True, size=nbytes) + shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf[:nbytes]) + np.copyto(shm_arr, arr) + handle = { + "__tensor_shm__": True, + "name": shm.name, + "shape": list(tensor.shape), + "torch_dtype": str(tensor.dtype), + "numpy_dtype": str(arr.dtype), + "nbytes": nbytes, + } + shm.close() + return handle + + +def _tensor_from_shm(handle: dict[str, Any]) -> torch.Tensor: + """Reconstruct a tensor from a shared-memory handle and free the segment.""" + from multiprocessing import shared_memory + + import numpy as np + + shm = shared_memory.SharedMemory(name=handle["name"]) + try: + np_dtype = np.dtype(handle["numpy_dtype"]) + arr = np.ndarray(handle["shape"], dtype=np_dtype, buffer=shm.buf[: handle["nbytes"]]) + tensor = torch.from_numpy(arr.copy()) + finally: + shm.close() + shm.unlink() + return tensor + + +def pack_diffusion_output_shm(output: DiffusionOutput) -> DiffusionOutput: + """Replace large tensors in *output* with shared-memory handles. + + The DiffusionOutput is modified **in-place** so that the (now lightweight) + object can be serialised cheaply through a MessageQueue. + """ + if output.output is not None and isinstance(output.output, torch.Tensor): + if output.output.nelement() * output.output.element_size() > _SHM_TENSOR_THRESHOLD: + output.output = _tensor_to_shm(output.output) + if output.trajectory_latents is not None and isinstance(output.trajectory_latents, torch.Tensor): + if output.trajectory_latents.nelement() * output.trajectory_latents.element_size() > _SHM_TENSOR_THRESHOLD: + output.trajectory_latents = _tensor_to_shm(output.trajectory_latents) + return output + + +def unpack_diffusion_output_shm(output: DiffusionOutput) -> DiffusionOutput: + """Reconstruct tensors from shared-memory handles produced by ``pack_diffusion_output_shm``.""" + if isinstance(output.output, dict) and output.output.get("__tensor_shm__"): + output.output = _tensor_from_shm(output.output) + if isinstance(output.trajectory_latents, dict) and output.trajectory_latents.get("__tensor_shm__"): + output.trajectory_latents = _tensor_from_shm(output.trajectory_latents) + return output diff --git a/vllm_omni/diffusion/scheduler.py b/vllm_omni/diffusion/scheduler.py index 4c9ba3de6a5..5f1d01c1282 100644 --- a/vllm_omni/diffusion/scheduler.py +++ b/vllm_omni/diffusion/scheduler.py @@ -7,7 +7,8 @@ from vllm.distributed.device_communicators.shm_broadcast import MessageQueue from vllm.logger import init_logger -from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig, unpack_diffusion_output_shm +from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig +from vllm_omni.diffusion.ipc import unpack_diffusion_output_shm from vllm_omni.diffusion.request import OmniDiffusionRequest logger = init_logger(__name__) diff --git a/vllm_omni/diffusion/worker/diffusion_worker.py b/vllm_omni/diffusion/worker/diffusion_worker.py index 651c0fe33c0..799957d4a02 100644 --- a/vllm_omni/diffusion/worker/diffusion_worker.py +++ b/vllm_omni/diffusion/worker/diffusion_worker.py @@ -27,7 +27,6 @@ from vllm_omni.diffusion.data import ( DiffusionOutput, OmniDiffusionConfig, - pack_diffusion_output_shm, ) from vllm_omni.diffusion.distributed.parallel_state import ( destroy_distributed_env, @@ -35,6 +34,7 @@ initialize_model_parallel, ) from vllm_omni.diffusion.forward_context import set_forward_context +from vllm_omni.diffusion.ipc import pack_diffusion_output_shm from vllm_omni.diffusion.lora.manager import DiffusionLoRAManager from vllm_omni.diffusion.profiler import CurrentProfiler from vllm_omni.diffusion.request import OmniDiffusionRequest From bda0f2d317033597be7f537b8202f6f6098dfa77 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Mar 2026 08:45:41 +0000 Subject: [PATCH 12/13] fix log Signed-off-by: samithuang <285365963@qq.com> --- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py | 3 ++- vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 30a3c3cf8d6..9a0037a6a1b 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -660,7 +660,8 @@ def forward( if self.expand_timesteps and latent_condition is not None: latents = (1 - first_frame_mask) * latent_condition + first_frame_mask * latents - _t_decode_start = time.perf_counter() + if DEBUG_PERF: + _t_decode_start = time.perf_counter() if output_type == "latent": output = latents else: diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 4bdd83f9d1a..fe15a24f587 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -417,7 +417,8 @@ def forward( batch_size = prompt_embeds.shape[0] - _t_img_enc_start = time.perf_counter() + if DEBUG_PERF: + _t_img_enc_start = time.perf_counter() if self.has_image_encoder and self.transformer.config.image_dim is not None: if image_embeds is None: if last_image is None: From 5fcf3022a5413eede322c051c2275574ee885030 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 12 Mar 2026 06:49:30 +0000 Subject: [PATCH 13/13] [Enhancement] Upgrade cache-dit from 1.2.0 to 1.3.0 Upgrade cache-dit dependency to the latest release (1.3.0). All existing imports and APIs remain compatible. Verified with Qwen-Image offline inference showing ~2x speedup with cache-dit acceleration. Signed-off-by: yx Made-with: Cursor Signed-off-by: samithuang <285365963@qq.com> --- requirements/common.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/common.txt b/requirements/common.txt index 548f8f0bf95..fc39c6c14f0 100644 --- a/requirements/common.txt +++ b/requirements/common.txt @@ -6,7 +6,7 @@ diffusers>=0.36.0 accelerate==1.12.0 gradio==5.50 soundfile>=0.13.1 -cache-dit==1.2.0 +cache-dit==1.3.0 tqdm>=4.66.0 torchsde>=0.2.6 openai-whisper>=20250625