Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 2 additions & 35 deletions docs/contributing/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,8 @@ With `--log-stats` enabled, the server will output detailed metrics logs after e
| rx_decode_time_ms | 111.865 | 31.706 |
| in_flight_time_ms | 2.015 | 2.819 |

### DiffusionStats

| Field | Value |
|---------------------------------|------------|
| batch_id | 1 |
| batch_size | 1 |
| diffusion_engine_exec_time_ms | 16,363.419 |
| diffusion_engine_total_time_ms | 16,234.849 |
| image_num | 1.000 |
| num_inference_steps | 50.000 |
| postprocess_time_ms | 67.685 |
| preprocess_time_ms | 60.106 |
| resolution | 640.000 |
| stage_gen_time_ms | 16,363.714 |

These logs include:

- **Overall summary**: total requests, wall time, average tokens/sec, etc.

Expand All @@ -92,9 +80,6 @@ With `--log-stats` enabled, the server will output detailed metrics logs after e

- **Transfer table**: data transfer and timing for each edge.

- **DiffusionStats**:preprocessing time, model execution time, postprocessing time, generated images, resolution, and inference steps.


You can use these logs to monitor system health, debug performance, and analyze request-level metrics as described above.


Expand Down Expand Up @@ -145,11 +130,6 @@ For **online inference** (serving mode), the summary is always per-request. `e2e
| `image_num` | Number of images generated (for diffusion/image stages). |
| `resolution` | Image resolution (for diffusion/image stages). |
| `postprocess_time_ms` | Diffusion/image: post-processing time in ms. |
| `preprocess_time_ms` | Diffusion/image: pre-processing time in ms (before model execution). |
| `diffusion_engine_exec_time_ms` | Diffusion/image: model execution time in ms, excluding pre-processing and post-processing. |
| `diffusion_engine_total_time_ms` | Diffusion/image: total diffusion engine time in ms, including pre-processing, model execution, and post-processing. |
| `num_inference_steps` | Number of inference steps for diffusion generation. |


---

Expand All @@ -163,8 +143,6 @@ For **online inference** (serving mode), the summary is always per-request. `e2e
| `in_flight_time_ms` | In-flight time in ms. |




### Expectation of the Numbers (Verification)

**Formulas:**
Expand Down Expand Up @@ -193,14 +171,3 @@ For each edge:
- 1->2: tx_time_ms (**18.790**) + rx_decode_time_ms (**31.706**) + in_flight_time_ms (**2.819**) = **53.315**

192.581 + 53.315 = **245.896** = transfers_total_time_ms, which matches the calculation (difference is due to rounding)

**diffusion_engine_total_time_ms**

- preprocess_time_ms: Time spent on input preprocessing
- diffusion_engine_exec_time_ms: Pure model execution time
- postprocess_time_ms: Time spent on output postprocessing
- diffusion_engine_total_time_ms: Total time spent in the diffusion engine

so `diffusion_engine_total_time_ms` = `diffusion_engine_exec_time_ms` + `preprocess_time_ms` + `postprocess_time_ms`
= **16,363.419** + **60.106** + **67.685**
= **16,491.210**
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--vae_use_slicing", action="store_true")
parser.add_argument("--vae_use_tiling", action="store_true")
parser.add_argument("--enable-cpu-offload", action="store_true")
parser.add_argument("--log-stats", "--log_stats", dest="log_stats", action="store_true", default=False)

return parser.parse_args()

Expand Down Expand Up @@ -156,7 +155,6 @@ async def main():
cache_config=cache_config,
parallel_config=parallel_config,
enforce_eager=args.enforce_eager,
log_stats=args.log_stats,
enable_cpu_offload=args.enable_cpu_offload,
diffusion_load_format="dummy",
custom_pipeline_args={"pipeline_class": "custom_pipeline.CustomPipeline"},
Expand Down
6 changes: 0 additions & 6 deletions examples/offline_inference/image_to_image/image_edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,6 @@ def parse_args() -> argparse.Namespace:
action="store_true",
help="Enable layerwise (blockwise) offloading on DiT modules.",
)
parser.add_argument(
"--log-stats",
action="store_true",
help="Enable vLLM-Omni statistics logging.",
)
return parser.parse_args()


Expand Down Expand Up @@ -388,7 +383,6 @@ def main():
parallel_config=parallel_config,
enforce_eager=args.enforce_eager,
enable_cpu_offload=args.enable_cpu_offload,
log_stats=args.log_stats,
)
print("Pipeline loaded")

Expand Down
6 changes: 0 additions & 6 deletions examples/offline_inference/image_to_video/image_to_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,6 @@ def parse_args() -> argparse.Namespace:
"Default 1 means pure sharding (no replication). "
),
)
parser.add_argument(
"--log-stats",
action="store_true",
help="Enable vLLM-Omni statistics logging.",
)
return parser.parse_args()


Expand Down Expand Up @@ -283,7 +278,6 @@ def main():
enable_cpu_offload=args.enable_cpu_offload,
parallel_config=parallel_config,
enforce_eager=args.enforce_eager,
log_stats=args.log_stats,
model_class_name=model_class_name,
cache_backend=args.cache_backend,
cache_config=cache_config,
Expand Down
6 changes: 0 additions & 6 deletions examples/offline_inference/text_to_image/text_to_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ def parse_args() -> argparse.Namespace:
action="store_true",
help="Enable cache-dit summary logging after diffusion forward passes.",
)
parser.add_argument(
"--log-stats",
action="store_true",
help="Enable vLLM-Omni statistics logging.",
)
parser.add_argument(
"--ulysses-degree",
type=int,
Expand Down Expand Up @@ -304,7 +299,6 @@ def main():
"parallel_config": parallel_config,
"enforce_eager": args.enforce_eager,
"enable_cpu_offload": args.enable_cpu_offload,
"log_stats": args.log_stats,
**lora_args,
**quant_kwargs,
}
Expand Down
6 changes: 0 additions & 6 deletions examples/offline_inference/text_to_video/text_to_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,6 @@ def parse_args() -> argparse.Namespace:
default=1,
help="Number of GPUs used for VAE patch/tile parallelism (decode).",
)
parser.add_argument(
"--log-stats",
action="store_true",
help="Enable vLLM-Omni statistics logging.",
)
return parser.parse_args()


Expand Down Expand Up @@ -181,7 +176,6 @@ def main():
enable_cpu_offload=args.enable_cpu_offload,
parallel_config=parallel_config,
enforce_eager=args.enforce_eager,
log_stats=args.log_stats,
cache_backend=args.cache_backend,
cache_config=cache_config,
)
Expand Down
15 changes: 2 additions & 13 deletions vllm_omni/diffusion/diffusion_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,14 @@ def __init__(self, od_config: OmniDiffusionConfig):
raise e

def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]:
diffusion_engine_start_time = time.time()
# Apply pre-processing if available
preprocess_time = 0.0
if self.pre_process_func is not None:
preprocess_start_time = time.time()
request = self.pre_process_func(request)
preprocess_time = time.time() - preprocess_start_time
logger.info(f"Pre-processing completed in {preprocess_time:.4f} seconds")

exec_start_time = time.time()
output = self.add_req_and_wait_for_response(request)
exec_total_time = time.time() - exec_start_time

if output.error:
raise Exception(f"{output.error}")
logger.info("Generation completed successfully.")
Expand Down Expand Up @@ -112,20 +107,14 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]:
outputs = [outputs] if outputs is not None else []

metrics = {
"preprocess_time_ms": preprocess_time * 1000,
"diffusion_engine_exec_time_ms": (time.time() - diffusion_engine_start_time) * 1000,
"diffusion_engine_total_time_ms": exec_total_time * 1000,
"image_num": int(request.sampling_params.num_outputs_per_prompt),
"resolution": int(request.sampling_params.resolution),
"postprocess_time_ms": postprocess_time * 1000,
}

if self.pre_process_func is not None:
metrics["preprocessing_time_ms"] = preprocess_time * 1000

# Handle single request or multiple requests
metrics["postprocess_time_ms"] = postprocess_time * 1000
metrics["num_inference_steps"] = int(request.sampling_params.num_inference_steps)

if len(request.prompts) == 1:
# Single request: return single OmniRequestOutput
prompt = request.prompts[0]
Expand Down Expand Up @@ -212,7 +201,7 @@ def step(self, request: OmniDiffusionRequest) -> list[OmniRequestOutput]:
),
)

return results
return results

@staticmethod
def make_engine(config: OmniDiffusionConfig) -> "DiffusionEngine":
Expand Down
9 changes: 0 additions & 9 deletions vllm_omni/entrypoints/omni.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,15 +1135,6 @@ def _run_generation(
final_output_type=stage.final_output_type, # type: ignore[attr-defined]
request_output=engine_outputs,
)
try:
if stage.final_output_type == "text" or metrics.log_stats:
output_to_yield.metrics = metrics.build_output_metrics(stage_id, req_id)
except Exception as e:
# Make metrics contract explicit on failure.
output_to_yield.metrics = {}
logger.exception(
f"[{self._name}] Failed to attach output metrics for req {req_id} at stage {stage_id}: {e}",
)

# Record audio generated frames (only when finished)
try:
Expand Down
64 changes: 14 additions & 50 deletions vllm_omni/metrics/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class StageRequestStats:
final_output_type: str | None = None
request_id: str | None = None
postprocess_time_ms: float = 0.0
diffusion_metrics: dict[str, float] = None
diffusion_metrics: dict[str, int] = None
audio_generated_frames: int = 0

@property
Expand Down Expand Up @@ -293,15 +293,23 @@ def process_stage_metrics(
return

# 4. Finished with output: assign text metrics if available
output_to_yield.metrics = self.build_output_metrics(stage_id, req_id)
output_to_yield.metrics = {}
stage_event = next(
(evt for evt in reversed(self.stage_events.get(req_id, [])) if evt.stage_id == stage_id),
None,
)
if stage_event is not None and stage_event.final_output_type == "text":
output_to_yield.metrics = {
"num_tokens_in": stage_event.num_tokens_in,
"num_tokens_out": stage_event.num_tokens_out,
"stage_id": stage_event.stage_id,
"final_output_type": stage_event.final_output_type,
}

# 5. Finished: record audio generated frames
self.record_audio_generated_frames(output_to_yield, stage_id, req_id)

except Exception:
if output_to_yield is not None:
# Make metrics contract explicit on failure.
output_to_yield.metrics = {}
logger.exception(
"Failed to process metrics for stage %s, req %s",
stage_id,
Expand All @@ -321,56 +329,12 @@ def _as_stage_request_stats(
stats.request_id = req_id
stats.final_output_type = final_output_type
stats.diffusion_metrics = (
{k: float(v) for k, v in self.diffusion_metrics.pop(req_id, {}).items()}
{k: int(v) for k, v in self.diffusion_metrics.pop(req_id, {}).items()}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep diffusion metric values as floats

Casting self.diffusion_metrics to int here truncates timing metrics such as postprocess_time_ms/preprocessing_time_ms that are produced as floats in DiffusionEngine.step and accumulated in accumulate_diffusion_metrics, so the stage summary loses precision and can mislead profiling/experiment analysis when log_stats is enabled. This regression is user-visible in logged/request-level stats because build_and_log_summary expands evt.diffusion_metrics directly into the stage table.

Useful? React with 👍 / 👎.

if req_id in self.diffusion_metrics
else None
)
return stats

def _get_stage_event(self, stage_id: int, req_id: Any) -> StageRequestStats | None:
rid_key = str(req_id)
for evt in reversed(self.stage_events.get(rid_key, [])):
if evt.stage_id == stage_id:
return evt
return None

def _collect_diffusion_metrics(self, req_id: Any) -> dict[str, float]:
"""Aggregate diffusion metrics across all stages for a request."""
rid_key = str(req_id)
merged: dict[str, float] = {}
for evt in self.stage_events.get(rid_key, []):
if not evt.diffusion_metrics:
continue
for key, value in evt.diffusion_metrics.items():
merged[key] = merged.get(key, 0.0) + float(value)
return merged

def build_output_metrics(self, stage_id: int, req_id: Any) -> dict[str, Any]:
stage_event = self._get_stage_event(stage_id, req_id)
if stage_event is None:
return {}

merged: dict[str, Any] = {}

if stage_event.final_output_type == "text":
merged.update(
{
"num_tokens_in": stage_event.num_tokens_in,
"num_tokens_out": stage_event.num_tokens_out,
}
)

if self.log_stats:
diffusion_metrics = self._collect_diffusion_metrics(req_id)
if diffusion_metrics:
merged.update(diffusion_metrics)

if merged:
merged["stage_id"] = stage_event.stage_id
merged["final_output_type"] = stage_event.final_output_type

return merged

def on_stage_metrics(
self,
stage_id: int,
Expand Down
1 change: 1 addition & 0 deletions vllm_omni/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def __repr__(self) -> str:
f"images={images_repr}",
f"prompt={self.prompt!r}",
f"latents={self.latents}",
f"metrics={self.metrics}",
f"multimodal_output={self._multimodal_output}",
]

Expand Down