diff --git a/docs/contributing/metrics.md b/docs/contributing/metrics.md index bc0f13efdb0..92dcd92ccac 100644 --- a/docs/contributing/metrics.md +++ b/docs/contributing/metrics.md @@ -69,20 +69,8 @@ With `--log-stats` enabled, the server will output detailed metrics logs after e | rx_decode_time_ms | 111.865 | 31.706 | | in_flight_time_ms | 2.015 | 2.819 | -### DiffusionStats - -| Field | Value | -|---------------------------------|------------| -| batch_id | 1 | -| batch_size | 1 | -| diffusion_engine_exec_time_ms | 16,363.419 | -| diffusion_engine_total_time_ms | 16,234.849 | -| image_num | 1.000 | -| num_inference_steps | 50.000 | -| postprocess_time_ms | 67.685 | -| preprocess_time_ms | 60.106 | -| resolution | 640.000 | -| stage_gen_time_ms | 16,363.714 | + +These logs include: - **Overall summary**: total requests, wall time, average tokens/sec, etc. @@ -92,9 +80,6 @@ With `--log-stats` enabled, the server will output detailed metrics logs after e - **Transfer table**: data transfer and timing for each edge. -- **DiffusionStats**:preprocessing time, model execution time, postprocessing time, generated images, resolution, and inference steps. - - You can use these logs to monitor system health, debug performance, and analyze request-level metrics as described above. @@ -145,11 +130,6 @@ For **online inference** (serving mode), the summary is always per-request. `e2e | `image_num` | Number of images generated (for diffusion/image stages). | | `resolution` | Image resolution (for diffusion/image stages). | | `postprocess_time_ms` | Diffusion/image: post-processing time in ms. | -| `preprocess_time_ms` | Diffusion/image: pre-processing time in ms (before model execution). | -| `diffusion_engine_exec_time_ms` | Diffusion/image: model execution time in ms, excluding pre-processing and post-processing. | -| `diffusion_engine_total_time_ms` | Diffusion/image: total diffusion engine time in ms, including pre-processing, model execution, and post-processing. | -| `num_inference_steps` | Number of inference steps for diffusion generation. | - --- @@ -163,8 +143,6 @@ For **online inference** (serving mode), the summary is always per-request. `e2e | `in_flight_time_ms` | In-flight time in ms. | - - ### Expectation of the Numbers (Verification) **Formulas:** @@ -193,14 +171,3 @@ For each edge: - 1->2: tx_time_ms (**18.790**) + rx_decode_time_ms (**31.706**) + in_flight_time_ms (**2.819**) = **53.315** 192.581 + 53.315 = **245.896** = transfers_total_time_ms, which matches the calculation (difference is due to rounding) - -**diffusion_engine_total_time_ms** - -- preprocess_time_ms: Time spent on input preprocessing -- diffusion_engine_exec_time_ms: Pure model execution time -- postprocess_time_ms: Time spent on output postprocessing -- diffusion_engine_total_time_ms: Total time spent in the diffusion engine - -so `diffusion_engine_total_time_ms` = `diffusion_engine_exec_time_ms` + `preprocess_time_ms` + `postprocess_time_ms` - = **16,363.419** + **60.106** + **67.685** - = **16,491.210** diff --git a/examples/offline_inference/custom_pipeline/image_to_image/image_edit.py b/examples/offline_inference/custom_pipeline/image_to_image/image_edit.py index 6af06fa19cf..8ad5cbf9bcb 100644 --- a/examples/offline_inference/custom_pipeline/image_to_image/image_edit.py +++ b/examples/offline_inference/custom_pipeline/image_to_image/image_edit.py @@ -99,7 +99,6 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--vae_use_slicing", action="store_true") parser.add_argument("--vae_use_tiling", action="store_true") parser.add_argument("--enable-cpu-offload", action="store_true") - parser.add_argument("--log-stats", "--log_stats", dest="log_stats", action="store_true", default=False) return parser.parse_args() @@ -156,7 +155,6 @@ async def main(): cache_config=cache_config, parallel_config=parallel_config, enforce_eager=args.enforce_eager, - log_stats=args.log_stats, enable_cpu_offload=args.enable_cpu_offload, diffusion_load_format="dummy", custom_pipeline_args={"pipeline_class": "custom_pipeline.CustomPipeline"}, diff --git a/examples/offline_inference/image_to_image/image_edit.py b/examples/offline_inference/image_to_image/image_edit.py index 305a6d54ab1..81fe24e2189 100644 --- a/examples/offline_inference/image_to_image/image_edit.py +++ b/examples/offline_inference/image_to_image/image_edit.py @@ -320,11 +320,6 @@ def parse_args() -> argparse.Namespace: action="store_true", help="Enable layerwise (blockwise) offloading on DiT modules.", ) - parser.add_argument( - "--log-stats", - action="store_true", - help="Enable vLLM-Omni statistics logging.", - ) return parser.parse_args() @@ -388,7 +383,6 @@ def main(): parallel_config=parallel_config, enforce_eager=args.enforce_eager, enable_cpu_offload=args.enable_cpu_offload, - log_stats=args.log_stats, ) print("Pipeline loaded") diff --git a/examples/offline_inference/image_to_video/image_to_video.py b/examples/offline_inference/image_to_video/image_to_video.py index 58da640d93b..cdcabf48cfe 100644 --- a/examples/offline_inference/image_to_video/image_to_video.py +++ b/examples/offline_inference/image_to_video/image_to_video.py @@ -176,11 +176,6 @@ def parse_args() -> argparse.Namespace: "Default 1 means pure sharding (no replication). " ), ) - parser.add_argument( - "--log-stats", - action="store_true", - help="Enable vLLM-Omni statistics logging.", - ) return parser.parse_args() @@ -283,7 +278,6 @@ def main(): enable_cpu_offload=args.enable_cpu_offload, parallel_config=parallel_config, enforce_eager=args.enforce_eager, - log_stats=args.log_stats, model_class_name=model_class_name, cache_backend=args.cache_backend, cache_config=cache_config, diff --git a/examples/offline_inference/text_to_image/text_to_image.py b/examples/offline_inference/text_to_image/text_to_image.py index 15cbbbf72ae..38297160682 100644 --- a/examples/offline_inference/text_to_image/text_to_image.py +++ b/examples/offline_inference/text_to_image/text_to_image.py @@ -94,11 +94,6 @@ def parse_args() -> argparse.Namespace: action="store_true", help="Enable cache-dit summary logging after diffusion forward passes.", ) - parser.add_argument( - "--log-stats", - action="store_true", - help="Enable vLLM-Omni statistics logging.", - ) parser.add_argument( "--ulysses-degree", type=int, @@ -304,7 +299,6 @@ def main(): "parallel_config": parallel_config, "enforce_eager": args.enforce_eager, "enable_cpu_offload": args.enable_cpu_offload, - "log_stats": args.log_stats, **lora_args, **quant_kwargs, } diff --git a/examples/offline_inference/text_to_video/text_to_video.py b/examples/offline_inference/text_to_video/text_to_video.py index 857530045a7..c1efd342e45 100644 --- a/examples/offline_inference/text_to_video/text_to_video.py +++ b/examples/offline_inference/text_to_video/text_to_video.py @@ -127,11 +127,6 @@ def parse_args() -> argparse.Namespace: default=1, help="Number of GPUs used for VAE patch/tile parallelism (decode).", ) - parser.add_argument( - "--log-stats", - action="store_true", - help="Enable vLLM-Omni statistics logging.", - ) return parser.parse_args() @@ -181,7 +176,6 @@ def main(): enable_cpu_offload=args.enable_cpu_offload, parallel_config=parallel_config, enforce_eager=args.enforce_eager, - log_stats=args.log_stats, cache_backend=args.cache_backend, cache_config=cache_config, ) diff --git a/vllm_omni/diffusion/diffusion_engine.py b/vllm_omni/diffusion/diffusion_engine.py index e5ace3b28c8..066ba2bb8bd 100644 --- a/vllm_omni/diffusion/diffusion_engine.py +++ b/vllm_omni/diffusion/diffusion_engine.py @@ -67,19 +67,14 @@ def __init__(self, od_config: OmniDiffusionConfig): raise e def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: - diffusion_engine_start_time = time.time() # Apply pre-processing if available - preprocess_time = 0.0 if self.pre_process_func is not None: preprocess_start_time = time.time() request = self.pre_process_func(request) preprocess_time = time.time() - preprocess_start_time logger.info(f"Pre-processing completed in {preprocess_time:.4f} seconds") - exec_start_time = time.time() output = self.add_req_and_wait_for_response(request) - exec_total_time = time.time() - exec_start_time - if output.error: raise Exception(f"{output.error}") logger.info("Generation completed successfully.") @@ -112,20 +107,14 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: outputs = [outputs] if outputs is not None else [] metrics = { - "preprocess_time_ms": preprocess_time * 1000, - "diffusion_engine_exec_time_ms": (time.time() - diffusion_engine_start_time) * 1000, - "diffusion_engine_total_time_ms": exec_total_time * 1000, "image_num": int(request.sampling_params.num_outputs_per_prompt), "resolution": int(request.sampling_params.resolution), + "postprocess_time_ms": postprocess_time * 1000, } - if self.pre_process_func is not None: metrics["preprocessing_time_ms"] = preprocess_time * 1000 # Handle single request or multiple requests - metrics["postprocess_time_ms"] = postprocess_time * 1000 - metrics["num_inference_steps"] = int(request.sampling_params.num_inference_steps) - if len(request.prompts) == 1: # Single request: return single OmniRequestOutput prompt = request.prompts[0] @@ -212,7 +201,7 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]: ), ) - return results + return results @staticmethod def make_engine(config: OmniDiffusionConfig) -> "DiffusionEngine": diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index dddcb12b5ce..9ecc5f76236 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -1135,15 +1135,6 @@ def _run_generation( final_output_type=stage.final_output_type, # type: ignore[attr-defined] request_output=engine_outputs, ) - try: - if stage.final_output_type == "text" or metrics.log_stats: - output_to_yield.metrics = metrics.build_output_metrics(stage_id, req_id) - except Exception as e: - # Make metrics contract explicit on failure. - output_to_yield.metrics = {} - logger.exception( - f"[{self._name}] Failed to attach output metrics for req {req_id} at stage {stage_id}: {e}", - ) # Record audio generated frames (only when finished) try: diff --git a/vllm_omni/metrics/stats.py b/vllm_omni/metrics/stats.py index 068b4a30b8e..46fec6fcc1f 100644 --- a/vllm_omni/metrics/stats.py +++ b/vllm_omni/metrics/stats.py @@ -39,7 +39,7 @@ class StageRequestStats: final_output_type: str | None = None request_id: str | None = None postprocess_time_ms: float = 0.0 - diffusion_metrics: dict[str, float] = None + diffusion_metrics: dict[str, int] = None audio_generated_frames: int = 0 @property @@ -293,15 +293,23 @@ def process_stage_metrics( return # 4. Finished with output: assign text metrics if available - output_to_yield.metrics = self.build_output_metrics(stage_id, req_id) + output_to_yield.metrics = {} + stage_event = next( + (evt for evt in reversed(self.stage_events.get(req_id, [])) if evt.stage_id == stage_id), + None, + ) + if stage_event is not None and stage_event.final_output_type == "text": + output_to_yield.metrics = { + "num_tokens_in": stage_event.num_tokens_in, + "num_tokens_out": stage_event.num_tokens_out, + "stage_id": stage_event.stage_id, + "final_output_type": stage_event.final_output_type, + } # 5. Finished: record audio generated frames self.record_audio_generated_frames(output_to_yield, stage_id, req_id) except Exception: - if output_to_yield is not None: - # Make metrics contract explicit on failure. - output_to_yield.metrics = {} logger.exception( "Failed to process metrics for stage %s, req %s", stage_id, @@ -321,56 +329,12 @@ def _as_stage_request_stats( stats.request_id = req_id stats.final_output_type = final_output_type stats.diffusion_metrics = ( - {k: float(v) for k, v in self.diffusion_metrics.pop(req_id, {}).items()} + {k: int(v) for k, v in self.diffusion_metrics.pop(req_id, {}).items()} if req_id in self.diffusion_metrics else None ) return stats - def _get_stage_event(self, stage_id: int, req_id: Any) -> StageRequestStats | None: - rid_key = str(req_id) - for evt in reversed(self.stage_events.get(rid_key, [])): - if evt.stage_id == stage_id: - return evt - return None - - def _collect_diffusion_metrics(self, req_id: Any) -> dict[str, float]: - """Aggregate diffusion metrics across all stages for a request.""" - rid_key = str(req_id) - merged: dict[str, float] = {} - for evt in self.stage_events.get(rid_key, []): - if not evt.diffusion_metrics: - continue - for key, value in evt.diffusion_metrics.items(): - merged[key] = merged.get(key, 0.0) + float(value) - return merged - - def build_output_metrics(self, stage_id: int, req_id: Any) -> dict[str, Any]: - stage_event = self._get_stage_event(stage_id, req_id) - if stage_event is None: - return {} - - merged: dict[str, Any] = {} - - if stage_event.final_output_type == "text": - merged.update( - { - "num_tokens_in": stage_event.num_tokens_in, - "num_tokens_out": stage_event.num_tokens_out, - } - ) - - if self.log_stats: - diffusion_metrics = self._collect_diffusion_metrics(req_id) - if diffusion_metrics: - merged.update(diffusion_metrics) - - if merged: - merged["stage_id"] = stage_event.stage_id - merged["final_output_type"] = stage_event.final_output_type - - return merged - def on_stage_metrics( self, stage_id: int, diff --git a/vllm_omni/outputs.py b/vllm_omni/outputs.py index af14dd0e600..5bbd27a1dbd 100644 --- a/vllm_omni/outputs.py +++ b/vllm_omni/outputs.py @@ -248,6 +248,7 @@ def __repr__(self) -> str: f"images={images_repr}", f"prompt={self.prompt!r}", f"latents={self.latents}", + f"metrics={self.metrics}", f"multimodal_output={self._multimodal_output}", ]