From b40fff241b945bc098e964d61fb29685468f44f4 Mon Sep 17 00:00:00 2001 From: gcanlin Date: Sun, 1 Feb 2026 15:23:20 +0000 Subject: [PATCH 01/10] [Profiler] Support online profiling for diffusion Signed-off-by: gcanlin --- vllm_omni/entrypoints/async_omni_diffusion.py | 26 +++++++++++++++++++ vllm_omni/entrypoints/omni_stage.py | 17 ++++++++---- vllm_omni/entrypoints/openai/api_server.py | 23 +++++++++++++++- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/vllm_omni/entrypoints/async_omni_diffusion.py b/vllm_omni/entrypoints/async_omni_diffusion.py index 535f04f7d2e..0cb36fafc18 100644 --- a/vllm_omni/entrypoints/async_omni_diffusion.py +++ b/vllm_omni/entrypoints/async_omni_diffusion.py @@ -294,3 +294,29 @@ async def pin_lora(self, lora_id: int) -> bool: None, ) return all(results) if isinstance(results, list) else results + + async def start_profile(self, trace_filename: str | None = None) -> None: + """Start profiling for the diffusion model. + + Args: + trace_filename: Optional base filename for trace files. + If None, a timestamp-based name will be generated. + """ + loop = asyncio.get_event_loop() + await loop.run_in_executor( + self._executor, + self.engine.start_profile, + trace_filename, + ) + + async def stop_profile(self) -> dict: + """Stop profiling and return profiling results. + + Returns: + Dictionary containing paths to trace and table files. + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + self._executor, + self.engine.stop_profile, + ) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index fc01c147cd6..fa177e6cfb7 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -1283,16 +1283,15 @@ async def _force_log(): await stage_engine.reset_mm_cache() logger.debug("[Stage-%s] Engine initialized", stage_id) - async def handle_profiler_task_async(task_type: OmniStageTaskType) -> None: + async def handle_profiler_task_async(task_type: OmniStageTaskType) -> dict: """Handle profiler task asynchronously for both LLM and diffusion stages.""" if task_type == OmniStageTaskType.PROFILER_START: if stage_type == "diffusion": try: - # Sync call is safe here — diffusion profiling is lightweight profile_dir = os.environ.get("VLLM_TORCH_PROFILER_DIR", "./profiles") os.makedirs(profile_dir, exist_ok=True) trace_filename = f"stage_{stage_id}_diffusion_{int(time.time())}" - stage_engine.start_profile(trace_filename=trace_filename) + await stage_engine.start_profile(trace_filename=trace_filename) logger.info("[Stage-%s] Diffusion Torch profiler started", stage_id) except Exception as e: logger.warning("[Stage-%s] Failed to start diffusion profiler: %s", stage_id, e) @@ -1302,14 +1301,17 @@ async def handle_profiler_task_async(task_type: OmniStageTaskType) -> None: logger.info("[Stage-%s] vLLM profiler started", stage_id) except Exception as e: logger.warning("[Stage-%s] Failed to start vLLM profiler: %s", stage_id, e) + return {} elif task_type == OmniStageTaskType.PROFILER_STOP: + result_data: dict = {} if stage_type == "diffusion": try: - trace_files = stage_engine.stop_profile() + trace_files = await stage_engine.stop_profile() logger.info("[Stage-%s] Diffusion Torch profiler stopped", stage_id) if trace_files: logger.info("Diffusion trace files: %s", trace_files) + result_data = trace_files except Exception as e: logger.warning("[Stage-%s] Failed to stop diffusion profiler: %s", stage_id, e) else: @@ -1318,6 +1320,8 @@ async def handle_profiler_task_async(task_type: OmniStageTaskType) -> None: logger.info("[Stage-%s] vLLM profiler stopped", stage_id) except Exception as e: logger.warning("[Stage-%s] Failed to stop vLLM profiler: %s", stage_id, e) + return result_data + return {} # Signal readiness to orchestrator and send vllm_config back to main process try: @@ -1419,7 +1423,10 @@ async def generation_single_request(task: dict[str, Any]): rid = task["request_id"] asyncio.create_task(stage_engine.abort(rid)) elif is_profiler_task(task_type): - await handle_profiler_task_async(task_type) + profiler_data = await handle_profiler_task_async(task_type) + # Send result back to orchestrator for STOP command + if task_type == OmniStageTaskType.PROFILER_STOP: + out_q.put({"type": "profiler_result", "data": profiler_data}) else: asyncio.create_task(generation_single_request(task)) diff --git a/vllm_omni/entrypoints/openai/api_server.py b/vllm_omni/entrypoints/openai/api_server.py index 2e057901a5b..ac0ea058e28 100644 --- a/vllm_omni/entrypoints/openai/api_server.py +++ b/vllm_omni/entrypoints/openai/api_server.py @@ -20,7 +20,7 @@ import httpx import vllm.envs as envs from fastapi import Depends, File, Form, HTTPException, Request, UploadFile -from fastapi.responses import JSONResponse, StreamingResponse +from fastapi.responses import JSONResponse, Response, StreamingResponse from PIL import Image from starlette.datastructures import State from starlette.routing import Route @@ -1418,3 +1418,24 @@ def apply_stage_default_sampling_params( for param_name, param_value in stage_defaults.items(): if hasattr(sampling_params, param_name): setattr(sampling_params, param_name, param_value) + + +# Profiling endpoints - always available for vLLM-Omni + + +@router.post("/start_profile") +async def start_profile(raw_request: Request): + """Start profiling for the engine.""" + logger.info("Starting profiler...") + await raw_request.app.state.engine_client.start_profile() + logger.info("Profiler started.") + return Response(status_code=200) + + +@router.post("/stop_profile") +async def stop_profile(raw_request: Request): + """Stop profiling for the engine.""" + logger.info("Stopping profiler...") + await raw_request.app.state.engine_client.stop_profile() + logger.info("Profiler stopped.") + return Response(status_code=200) From 3194760070ecfce2555ef93c855c40179bf8746f Mon Sep 17 00:00:00 2001 From: gcanlin Date: Sat, 14 Feb 2026 08:23:57 +0000 Subject: [PATCH 02/10] try .. except .. Signed-off-by: gcanlin --- vllm_omni/entrypoints/openai/api_server.py | 30 ++++++++++++++++------ 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/vllm_omni/entrypoints/openai/api_server.py b/vllm_omni/entrypoints/openai/api_server.py index 9a72e0a13c4..d97c4c4fe39 100644 --- a/vllm_omni/entrypoints/openai/api_server.py +++ b/vllm_omni/entrypoints/openai/api_server.py @@ -1491,19 +1491,33 @@ def apply_stage_default_sampling_params( @router.post("/start_profile") async def start_profile(raw_request: Request): """Start profiling for the engine.""" - logger.info("Starting profiler...") - await raw_request.app.state.engine_client.start_profile() - logger.info("Profiler started.") - return Response(status_code=200) + try: + logger.info("Starting profiler...") + result = await raw_request.app.state.engine_client.start_profile() + logger.info("Profiler started.") + return JSONResponse(content=result) + except Exception as e: + logger.exception("Failed to start profiler: %s", e) + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + detail=f"Failed to start profiler: {str(e)}" + ) @router.post("/stop_profile") async def stop_profile(raw_request: Request): """Stop profiling for the engine.""" - logger.info("Stopping profiler...") - await raw_request.app.state.engine_client.stop_profile() - logger.info("Profiler stopped.") - return Response(status_code=200) + try: + logger.info("Stopping profiler...") + result = await raw_request.app.state.engine_client.stop_profile() + logger.info("Profiler stopped.") + return JSONResponse(content=result) + except Exception as e: + logger.exception("Failed to stop profiler: %s", e) + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, + detail=f"Failed to stop profiler: {str(e)}" + ) async def _run_video_generation( From 77807d9dd262032707eaff5f2c30c2502dde2b15 Mon Sep 17 00:00:00 2001 From: gcanlin Date: Sat, 14 Feb 2026 09:43:53 +0000 Subject: [PATCH 03/10] lint Signed-off-by: gcanlin --- vllm_omni/entrypoints/openai/api_server.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/vllm_omni/entrypoints/openai/api_server.py b/vllm_omni/entrypoints/openai/api_server.py index d97c4c4fe39..82721e4ce88 100644 --- a/vllm_omni/entrypoints/openai/api_server.py +++ b/vllm_omni/entrypoints/openai/api_server.py @@ -20,7 +20,7 @@ import httpx import vllm.envs as envs from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile -from fastapi.responses import JSONResponse, Response, StreamingResponse +from fastapi.responses import JSONResponse, StreamingResponse from PIL import Image from starlette.datastructures import State from starlette.routing import Route @@ -1499,8 +1499,7 @@ async def start_profile(raw_request: Request): except Exception as e: logger.exception("Failed to start profiler: %s", e) raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - detail=f"Failed to start profiler: {str(e)}" + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=f"Failed to start profiler: {str(e)}" ) @@ -1515,8 +1514,7 @@ async def stop_profile(raw_request: Request): except Exception as e: logger.exception("Failed to stop profiler: %s", e) raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, - detail=f"Failed to stop profiler: {str(e)}" + status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value, detail=f"Failed to stop profiler: {str(e)}" ) From 6e6207e9b853eb820d5165e73c1ad76078b7f6c1 Mon Sep 17 00:00:00 2001 From: gcanlin Date: Mon, 16 Feb 2026 18:18:43 +0000 Subject: [PATCH 04/10] add stages parameter Signed-off-by: gcanlin --- vllm_omni/entrypoints/openai/api_server.py | 56 ++++++++++++++++++---- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/vllm_omni/entrypoints/openai/api_server.py b/vllm_omni/entrypoints/openai/api_server.py index 82721e4ce88..1d038cde4d5 100644 --- a/vllm_omni/entrypoints/openai/api_server.py +++ b/vllm_omni/entrypoints/openai/api_server.py @@ -22,6 +22,7 @@ from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile from fastapi.responses import JSONResponse, StreamingResponse from PIL import Image +from pydantic import BaseModel, Field from starlette.datastructures import State from starlette.routing import Route from vllm import SamplingParams @@ -107,6 +108,15 @@ router = APIRouter() +class ProfileRequest(BaseModel): + """Request model for profiling endpoints.""" + + stages: list[int] | None = Field( + default=None, + description="List of stage IDs to profile. If None, profiles all stages.", + ) + + def _remove_route_from_router( router: APIRouter, path: str, @@ -1489,11 +1499,26 @@ def apply_stage_default_sampling_params( @router.post("/start_profile") -async def start_profile(raw_request: Request): - """Start profiling for the engine.""" +async def start_profile(raw_request: Request, request: ProfileRequest | None = None): + """Start profiling for the engine. + + Args: + request: Optional request body with stages to profile. + - stages: List of stage IDs to profile. If None, profiles all stages. + + Example: + POST /start_profile + {"stages": [0, 1]} # Profile only stages 0 and 1 + """ try: - logger.info("Starting profiler...") - result = await raw_request.app.state.engine_client.start_profile() + stages = request.stages if request else None + logger.info("Starting profiler for stages: %s", stages if stages else "all") + engine_client = raw_request.app.state.engine_client + # Only pass stages if specified (for backward compatibility with non-Omni engines) + if stages is not None: + result = await engine_client.start_profile(stages=stages) + else: + result = await engine_client.start_profile() logger.info("Profiler started.") return JSONResponse(content=result) except Exception as e: @@ -1504,11 +1529,26 @@ async def start_profile(raw_request: Request): @router.post("/stop_profile") -async def stop_profile(raw_request: Request): - """Stop profiling for the engine.""" +async def stop_profile(raw_request: Request, request: ProfileRequest | None = None): + """Stop profiling for the engine. + + Args: + request: Optional request body with stages to stop profiling. + - stages: List of stage IDs to stop profiling. If None, stops all stages. + + Example: + POST /stop_profile + {"stages": [0, 1]} # Stop profiling only stages 0 and 1 + """ try: - logger.info("Stopping profiler...") - result = await raw_request.app.state.engine_client.stop_profile() + stages = request.stages if request else None + logger.info("Stopping profiler for stages: %s", stages if stages else "all") + engine_client = raw_request.app.state.engine_client + # Only pass stages if specified (for backward compatibility with non-Omni engines) + if stages is not None: + result = await engine_client.stop_profile(stages=stages) + else: + result = await engine_client.stop_profile() logger.info("Profiler stopped.") return JSONResponse(content=result) except Exception as e: From 9d2f1d1dd799b5b6214f5d83c504ffaf302def1d Mon Sep 17 00:00:00 2001 From: gcanlin Date: Mon, 23 Feb 2026 12:13:56 +0000 Subject: [PATCH 05/10] simply Signed-off-by: gcanlin --- vllm_omni/entrypoints/openai/api_server.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/vllm_omni/entrypoints/openai/api_server.py b/vllm_omni/entrypoints/openai/api_server.py index 1d038cde4d5..34bc659c33d 100644 --- a/vllm_omni/entrypoints/openai/api_server.py +++ b/vllm_omni/entrypoints/openai/api_server.py @@ -1514,11 +1514,7 @@ async def start_profile(raw_request: Request, request: ProfileRequest | None = N stages = request.stages if request else None logger.info("Starting profiler for stages: %s", stages if stages else "all") engine_client = raw_request.app.state.engine_client - # Only pass stages if specified (for backward compatibility with non-Omni engines) - if stages is not None: - result = await engine_client.start_profile(stages=stages) - else: - result = await engine_client.start_profile() + result = await engine_client.start_profile(stages=stages) logger.info("Profiler started.") return JSONResponse(content=result) except Exception as e: @@ -1544,11 +1540,7 @@ async def stop_profile(raw_request: Request, request: ProfileRequest | None = No stages = request.stages if request else None logger.info("Stopping profiler for stages: %s", stages if stages else "all") engine_client = raw_request.app.state.engine_client - # Only pass stages if specified (for backward compatibility with non-Omni engines) - if stages is not None: - result = await engine_client.stop_profile(stages=stages) - else: - result = await engine_client.stop_profile() + result = await engine_client.stop_profile(stages=stages) logger.info("Profiler stopped.") return JSONResponse(content=result) except Exception as e: From 012ef73ad1eb96c8df6d80f195a4108125360104 Mon Sep 17 00:00:00 2001 From: gcanlin Date: Tue, 24 Feb 2026 08:31:52 +0000 Subject: [PATCH 06/10] remove note Signed-off-by: gcanlin --- docs/contributing/profiling.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/contributing/profiling.md b/docs/contributing/profiling.md index 99b07cb9869..3e54aed8f9a 100644 --- a/docs/contributing/profiling.md +++ b/docs/contributing/profiling.md @@ -131,9 +131,6 @@ python image_to_video.py \ 2. **Wan-AI/Wan2.2-I2V-A14B-Diffusers**: [https://github.com/vllm-project/vllm-omni/tree/main/examples/offline_inference/image_to_video](https://github.com/vllm-project/vllm-omni/tree/main/examples/offline_inference/image_to_video) -> **Note:** -As of now, asynchronous (online) profiling is not fully supported in vLLM-Omni. While start_profile() and stop_profile() methods exist, they are only reliable in offline inference scripts (e.g., the provided end2end.py examples). Do not use them in server-mode or streaming scenarios—traces may be incomplete or fail to flush. - ### 4. Analyzing Omni Traces Output files are saved to your configured ```VLLM_TORCH_PROFILER_DIR```. From 2d24711001a578539352490961b688852d13d440 Mon Sep 17 00:00:00 2001 From: gcanlin Date: Tue, 24 Feb 2026 15:14:22 +0000 Subject: [PATCH 07/10] add profiler router Signed-off-by: gcanlin --- vllm_omni/entrypoints/openai/api_server.py | 30 +++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/vllm_omni/entrypoints/openai/api_server.py b/vllm_omni/entrypoints/openai/api_server.py index 34bc659c33d..3570566d9c6 100644 --- a/vllm_omni/entrypoints/openai/api_server.py +++ b/vllm_omni/entrypoints/openai/api_server.py @@ -106,6 +106,23 @@ logger = init_logger(__name__) router = APIRouter() +profiler_router = APIRouter() + + +def _should_enable_profiler_endpoints(args: Namespace) -> bool: + # Check upstream vLLM's profiler_config + profiler_config = getattr(args, "profiler_config", None) + if profiler_config is not None: + # profiler_config exists, check if profiler is set + profiler = getattr(profiler_config, "profiler", None) + if profiler is not None: + return True + + # TODO: remove this env after refactoring torch profiler to CLI args + env_value = os.environ.get("VLLM_TORCH_PROFILER_DIR") + if env_value is not None: + return True + return False class ProfileRequest(BaseModel): @@ -237,6 +254,14 @@ async def omni_run_server_worker(listen_address, sock, args, client_config=None, await omni_init_app_state(engine_client, app.state, args) + # Conditionally register profiler endpoints based on config or env var + if _should_enable_profiler_endpoints(args): + logger.warning( + "Profiler endpoints are enabled. " + "This should ONLY be used for local development!" + ) + app.include_router(profiler_router) + vllm_config = await engine_client.get_vllm_config() # Check if pure diffusion mode (vllm_config will be None) @@ -1497,8 +1522,7 @@ def apply_stage_default_sampling_params( if hasattr(sampling_params, param_name): setattr(sampling_params, param_name, param_value) - -@router.post("/start_profile") +@profiler_router.post("/start_profile") async def start_profile(raw_request: Request, request: ProfileRequest | None = None): """Start profiling for the engine. @@ -1524,7 +1548,7 @@ async def start_profile(raw_request: Request, request: ProfileRequest | None = N ) -@router.post("/stop_profile") +@profiler_router.post("/stop_profile") async def stop_profile(raw_request: Request, request: ProfileRequest | None = None): """Stop profiling for the engine. From bf9a5e934587c02a5b855e95c0e29758c976e6f3 Mon Sep 17 00:00:00 2001 From: gcanlin Date: Wed, 25 Feb 2026 02:08:55 +0000 Subject: [PATCH 08/10] pre-commit Signed-off-by: gcanlin --- vllm_omni/entrypoints/openai/api_server.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/vllm_omni/entrypoints/openai/api_server.py b/vllm_omni/entrypoints/openai/api_server.py index 3570566d9c6..bc6b279bfa6 100644 --- a/vllm_omni/entrypoints/openai/api_server.py +++ b/vllm_omni/entrypoints/openai/api_server.py @@ -256,10 +256,7 @@ async def omni_run_server_worker(listen_address, sock, args, client_config=None, # Conditionally register profiler endpoints based on config or env var if _should_enable_profiler_endpoints(args): - logger.warning( - "Profiler endpoints are enabled. " - "This should ONLY be used for local development!" - ) + logger.warning("Profiler endpoints are enabled. This should ONLY be used for local development!") app.include_router(profiler_router) vllm_config = await engine_client.get_vllm_config() @@ -1522,6 +1519,7 @@ def apply_stage_default_sampling_params( if hasattr(sampling_params, param_name): setattr(sampling_params, param_name, param_value) + @profiler_router.post("/start_profile") async def start_profile(raw_request: Request, request: ProfileRequest | None = None): """Start profiling for the engine. From 0be1acb9a69e23b5fc03f1032a8da3650eef90d1 Mon Sep 17 00:00:00 2001 From: Canlin Guo <961750412@qq.com> Date: Wed, 25 Feb 2026 17:55:14 +0800 Subject: [PATCH 09/10] Update vllm_omni/entrypoints/openai/api_server.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Nicolò Lucchesi Signed-off-by: Canlin Guo <961750412@qq.com> --- vllm_omni/entrypoints/openai/api_server.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vllm_omni/entrypoints/openai/api_server.py b/vllm_omni/entrypoints/openai/api_server.py index bc6b279bfa6..84a81a86139 100644 --- a/vllm_omni/entrypoints/openai/api_server.py +++ b/vllm_omni/entrypoints/openai/api_server.py @@ -120,9 +120,7 @@ def _should_enable_profiler_endpoints(args: Namespace) -> bool: # TODO: remove this env after refactoring torch profiler to CLI args env_value = os.environ.get("VLLM_TORCH_PROFILER_DIR") - if env_value is not None: - return True - return False + return env_value is not None class ProfileRequest(BaseModel): From 718a43e39701c2d4b138294b7a2875d100fdcf6b Mon Sep 17 00:00:00 2001 From: gcanlin Date: Thu, 26 Feb 2026 12:14:24 +0000 Subject: [PATCH 10/10] skipped stage that doesn't enable profiler Signed-off-by: gcanlin --- vllm_omni/entrypoints/omni.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index f30cd7d368e..7dccae6c08e 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -395,6 +395,22 @@ def _wait_for_stages_ready(self, timeout: int = 120) -> None: logger.warning(f"[{self._name}] Stage initialization timeout. Troubleshooting Steps:\n{formatted_suggestions}") + def _is_profiler_enabled(self, stage_id: int) -> bool: + """Check if profiler config is set for a given stage.""" + stage = self.stage_list[stage_id] + # For diffusion stages, profiling is controlled by VLLM_TORCH_PROFILER_DIR env var + if stage.stage_type == "diffusion": + return True + # For LLM stages, check if profiler_config is set in engine_args + engine_args = getattr(stage.stage_config, "engine_args", None) + if engine_args is None: + return False + profiler_config = getattr(engine_args, "profiler_config", None) + if profiler_config is None: + return False + profiler = getattr(profiler_config, "profiler", None) + return profiler is not None + def start_profile(self, stages: list[int] | None = None) -> None: """Start profiling for specified stages. @@ -419,6 +435,13 @@ def start_profile(self, stages: list[int] | None = None) -> None: for stage_id in stages: if stage_id < len(self.stage_list): + if not self._is_profiler_enabled(stage_id): + logger.info( + "[%s] Skipping start_profile for stage-%s: profiler config not set", + self._name, + stage_id, + ) + continue try: self.stage_list[stage_id].submit({"type": OmniStageTaskType.PROFILER_START}) logger.info("[%s] Sent start_profile to stage-%s", self._name, stage_id) @@ -442,6 +465,13 @@ def stop_profile(self, stages: list[int] | None = None) -> dict: for stage_id in stages: if stage_id < len(self.stage_list): + if not self._is_profiler_enabled(stage_id): + logger.info( + "[%s] Skipping stop_profile for stage-%s: profiler config not set", + self._name, + stage_id, + ) + continue stage = self.stage_list[stage_id] # Check if the stage object has our new bridge method