diff --git a/docs/contributing/metrics.md b/docs/contributing/metrics.md index 92dcd92ccac..c854a088603 100644 --- a/docs/contributing/metrics.md +++ b/docs/contributing/metrics.md @@ -3,7 +3,7 @@ You can use these metrics in production to monitor the health and performance of the vLLM-omni system. Typical scenarios include: -- **Performance Monitoring**: Track throughput (e.g., `e2e_avg_tokens_per_s`), latency (e.g., `e2e_total_ms`), and resource utilization to verify that the system meets expected standards. +- **Performance Monitoring**: Track throughput (e.g., `avg_tokens_per_s`), latency (e.g., `request_wall_time_ms` / `engine_pipeline_time_ms`), and resource utilization to verify that the system meets expected standards. - **Debugging and Troubleshooting**: Use detailed per-request metrics to diagnose issues, such as high transfer times or unexpected token counts. @@ -25,69 +25,75 @@ python openai_chat_completion_client_for_multimodal_generation.py --query-type u With `--log-stats` enabled, the server will output detailed metrics logs after each request. Example output: +For multi-stage pipelines, vLLM-Omni also emits a concise per-request timing line: -#### Overall Summary - -| Field | Value | -|-----------------------------|--------------| -| e2e_requests | 1 | -| e2e_wall_time_ms | 41,299.190 | -| e2e_total_tokens | 5,202 | -| e2e_avg_time_per_request_ms | 41,299.190 | -| e2e_avg_tokens_per_s | 125.959 | -| e2e_stage_0_wall_time_ms | 10,192.289 | -| e2e_stage_1_wall_time_ms | 30,541.409 | -| e2e_stage_2_wall_time_ms | 207.496 | - -#### RequestE2EStats - -| Field | Value | -|-------------------------|------------| -| e2e_total_ms | 41,299.133 | -| e2e_total_tokens | 5,202 | -| transfers_total_time_ms | 245.895 | -| transfers_total_kbytes | 138,089.939| - -#### StageRequestStats - -| Field | 0 | 1 | 2 | -|------------------------|--------|--------|--------| -| audio_generated_frames | 0 | 0 | 525,525| -| batch_id | 38 | 274 | 0 | -| batch_size | 1 | 1 | 1 | -| num_tokens_in | 4,860 | 4,826 | 4,384 | -| num_tokens_out | 67 | 275 | 0 | -| postprocess_time_ms | 256.158| 0.491 | 0.000 | -| stage_gen_time_ms | 9,910.007|30,379.198|160.745| - -#### TransferEdgeStats - -| Field | 0->1 | 1->2 | -|---------------------|-------------|------------| -| size_kbytes | 109,277.349 | 28,812.591 | -| tx_time_ms | 78.701 | 18.790 | -| rx_decode_time_ms | 111.865 | 31.706 | -| in_flight_time_ms | 2.015 | 2.819 | - - -These logs include: - -- **Overall summary**: total requests, wall time, average tokens/sec, etc. +```text +[OmniTiming] req=chatcmpl-a0edd05 total=32.99s preprocess=2.80s engine=30.19s stages=[0:ar=16.23s,1:diffusion=13.96s] transfers=[0->1=0.780ms] +``` -- **E2E table**: per-request latency and token counts. -- **Stage table**: per-stage batch and timing details. +#### Omni Metrics Summary + +```text +============ Omni Metrics Summary ============ +Successful requests: 1 +Total E2E time (ms): 41,356.133 +Input preprocess time (ms): 57.000 +Engine pipeline time (ms): 41,299.133 +Sum check (ms): 41,356.133 + +------------ Overall Time Breakdown ------------ +Input preprocess time (ms): 57.000 +Stage 0 generation time (ms): 9,910.007 +Stage 0 output processor time (ms): 7.000 +Stage 0 -> Stage 1 handoff time (ms): 245.895 +Stage 1 generation time (ms): 30,379.198 +Stage 1 output processor time (ms): 10.000 +Final output time (ms): 747.033 +Component sum (ms): 41,356.133 +E2E - component sum (ms): 0.000 + +------------ Stage 0 Breakdown ------------ +Stage generation time (ms): 9,910.007 +Output processor time (ms): 7.000 +Stage sum check (ms): 9,917.007 + +Stage id: 0 +Stage name: ar +Stage type: llm +Final output type: text +Batch id: 38 +Batch size: 1 + +Input tokens: 4,860 +Output tokens: 67 +Output token throughput (tok/s): 6.761 +Postprocess time (ms): 256.158 + +------------ Stage 0 -> Stage 1 Handoff ------------ +Handoff total time (ms): 245.895 +AR to diffusion time (ms): 53.314 +Other handoff processing time (ms): 192.581 + +------------ Final Output Breakdown ------------ +Final output wrapping time (ms): 160.745 +Final orchestration overhead time (ms): 586.288 +Final output total time (ms): 747.033 +Final output sum check (ms): 747.033 +``` -- **Transfer table**: data transfer and timing for each edge. +These logs include a high-level end-to-end summary, a non-overlapping stage/time breakdown, and per-stage details from `StageRequestStats`. The final output layer includes output wrapping plus the remaining orchestration overhead so the top-level component sum can be checked directly against E2E time. You can use these logs to monitor system health, debug performance, and analyze request-level metrics as described above. ## Metrics Scope: Offline vs Online Inference -For **offline inference** (batch mode), the summary includes both system-level metrics (aggregated across all requests) and per-request metrics. In this case, `e2e_requests` can be greater than 1, reflecting multiple completed requests in a batch. +For **offline inference** (batch mode), the summary includes both system-level metrics (aggregated across all requests) and per-request metrics. In this case, `num_of_requests` can be greater than 1, reflecting multiple completed requests in a batch. + +For **online inference** (serving mode), the summary is always per-request. `num_of_requests` is always 1, and only request-level metrics are reported for each completion. -For **online inference** (serving mode), the summary is always per-request. `e2e_requests` is always 1, and only request-level metrics are reported for each completion. +When `num_of_requests` is 1, average fields are omitted from the returned overall summary because they are identical to the total/request-level values. They remain meaningful for offline batches with multiple completed requests. --- @@ -97,12 +103,48 @@ For **online inference** (serving mode), the summary is always per-request. `e2e | Field | Meaning | |---------------------------|----------------------------------------------------------------------------------------------| -| `e2e_requests` | Number of completed requests. | -| `e2e_wall_time_ms` | Wall-clock time span from run start to last completion, in ms. | -| `e2e_total_tokens` | Total tokens counted across all completed requests (stage0 input + all stage outputs). | -| `e2e_avg_time_per_request_ms` | Average wall time per request: `e2e_wall_time_ms / e2e_requests`. | -| `e2e_avg_tokens_per_s` | Average token throughput over wall time: `e2e_total_tokens * 1000 / e2e_wall_time_ms`. | -| `e2e_stage_{i}_wall_time_ms` | Wall-clock time span for stage i, in ms. Each stage's wall time is reported as a separate field, e.g., `e2e_stage_0_wall_time_ms`, `e2e_stage_1_wall_time_ms`, etc. | +| `num_of_requests` | Number of completed requests. | +| `request_wall_time_ms` | Wall-clock time span from request preparation start to final completion, in ms. | +| `input_preprocess_time_ms` | Time spent preparing and submitting requests before the engine pipeline starts. | +| `engine_pipeline_time_ms` | Time from engine request submission to final completion. | +| `stage_gen_total_time_ms` | Sum of all stage `stage_gen_time_ms` values. | +| `output_processor_total_time_ms` | Sum of time spent in per-stage output processors after raw engine outputs are returned. | +| `stage_handoff_total_time_ms` | Sum of inter-stage handoff time, measured after an upstream stage finishes and before the downstream stage is submitted. | +| `ar2diffusion_total_time_ms` | Subset of `stage_handoff_total_time_ms` spent converting AR outputs into diffusion inputs. | +| `final_output_total_time_ms` | Measured time spent wrapping final stage outputs into `OmniRequestOutput` objects before yielding them. | +| `breakdown_delta_time_ms` | Difference between E2E wall time and the printed component sum; useful for detecting missing timers or overlap. | +| `stage_{i}_to_{j}_handoff_time_ms` | Handoff time for a specific stage edge, e.g. `stage_0_to_1_handoff_time_ms`. | +| `stage_{i}_to_{j}_ar2diffusion_time_ms` | AR-to-diffusion conversion time included in that edge's handoff time. | +| `total_tokens` | Total tokens counted across all completed requests (stage0 input + all stage outputs). | +| `avg_request_wall_time_ms` | Average wall time per request: `request_wall_time_ms / num_of_requests`. | +| `avg_input_preprocess_time_ms` | Average pre-submit request preparation time per completed request. | +| `avg_engine_pipeline_time_ms` | Average engine pipeline time per completed request. | +| `avg_stage_gen_total_time_ms` | Average summed stage generation time per completed request. | +| `avg_output_processor_time_ms` | Average output processor time per completed request. | +| `avg_stage_handoff_total_time_ms` | Average summed inter-stage handoff time per completed request. | +| `avg_ar2diffusion_time_ms` | Average AR-to-diffusion conversion time per completed request. | +| `avg_final_output_time_ms` | Average final output wrapping time per completed request. | +| `avg_breakdown_delta_time_ms` | Average difference between E2E wall time and the printed component sum per completed request. | +| `avg_tokens_per_s` | Average token throughput over wall time: `total_tokens * 1000 / request_wall_time_ms`. | +| `stage_{i}_wall_time_ms` | Wall-clock time span for stage i, in ms. Each stage's wall time is reported as a separate field, e.g., `stage_0_wall_time_ms`, `stage_1_wall_time_ms`, etc. | + +### Timing Breakdown + +The printed summary includes sum checks that show the containment relationship: + +`request_wall_time_ms = input_preprocess_time_ms + engine_pipeline_time_ms` + +`request_wall_time_ms ~= input_preprocess_time_ms + sum(stage_gen_time_ms) + sum(output_processor_time_ms) + sum(stage_handoff_time_ms) + final_output_total_time_ms` + +Any remaining difference is reported as `breakdown_delta_time_ms`. + +`ar2diffusion_total_time_ms` is included in `stage_handoff_total_time_ms`. For a concrete AR-to-diffusion edge, `stage_0_to_1_ar2diffusion_time_ms` is included in `stage_0_to_1_handoff_time_ms`. + +For offline batches, average component fields such as `avg_stage_gen_total_time_ms`, +`avg_output_processor_time_ms`, `avg_stage_handoff_total_time_ms`, and +`avg_ar2diffusion_time_ms` are computed by dividing the batch aggregate by +`num_of_requests`. They are not individually timed per request and then averaged. +Per-request timers remain available in the E2E table where they are measured directly. --- @@ -110,10 +152,13 @@ For **online inference** (serving mode), the summary is always per-request. `e2e | Field | Meaning | |---------------------------|-----------------------------------------------------------------------| -| `e2e_total_ms` | End-to-end latency in ms. | -| `e2e_total_tokens` | Total tokens for the request (stage0 input + all stage outputs). | +| `request_wall_time_ms` | End-to-end latency in ms, including input preprocessing and engine pipeline time. | +| `input_preprocess_time_ms` | Time spent preparing and submitting the request before `engine_pipeline_time_ms` starts. | +| `engine_pipeline_time_ms` | Time from engine request submission to final completion. | +| `total_tokens` | Total tokens for the request (stage0 input + all stage outputs). | | `transfers_total_time_ms` | Sum of transfer edge `total_time_ms` for this request. | | `transfers_total_kbytes` | Sum of transfer kbytes for this request. | +| `final_output_time_ms` | Time spent wrapping the final engine output into an `OmniRequestOutput` before yielding it. | --- @@ -126,7 +171,8 @@ For **online inference** (serving mode), the summary is always per-request. `e2e | `batch_size` | Batch size. | | `num_tokens_in` | Input tokens to the stage. | | `num_tokens_out` | Output tokens from the stage. | -| `stage_gen_time_ms` | Stage compute time in ms, excluding postprocessing time (reported separately as `postprocess_time_ms`). | +| `stage_gen_time_ms` | Stage compute time in ms, excluding output processor time and postprocessing time. | +| `output_processor_time_ms` | Time spent in the stage output processor after raw engine outputs are returned. | | `image_num` | Number of images generated (for diffusion/image stages). | | `resolution` | Image resolution (for diffusion/image stages). | | `postprocess_time_ms` | Diffusion/image: post-processing time in ms. | @@ -147,20 +193,20 @@ For **online inference** (serving mode), the summary is always per-request. `e2e **Formulas:** -- `e2e_total_tokens = Stage0's num_tokens_in + sum(all stages' num_tokens_out)` +- `total_tokens = Stage0's num_tokens_in + sum(all stages' num_tokens_out)` - `transfers_total_time_ms = sum(tx_time_ms + rx_decode_time_ms + in_flight_time_ms)` for every edge **Using the example above:** -**e2e_total_tokens** +**total_tokens** - Stage0's `num_tokens_in`: **4,860** - Stage0's `num_tokens_out`: **67** - Stage1's `num_tokens_out`: **275** - Stage2's `num_tokens_out`: **0** -so `e2e_total_tokens = 4,860 + 67 + 275 + 0 = 5,202`, which matches the table value `e2e_total_tokens`. +so `total_tokens = 4,860 + 67 + 275 + 0 = 5,202`, which matches the table value `total_tokens`. **transfers_total_time_ms** diff --git a/tests/entrypoints/test_async_omni.py b/tests/entrypoints/test_async_omni.py index c8ef5eddf20..c71dd10c94d 100644 --- a/tests/entrypoints/test_async_omni.py +++ b/tests/entrypoints/test_async_omni.py @@ -28,8 +28,15 @@ async def fake_abort_async(request_ids): return fake_abort_async -async def fake_process_results(request_id, metrics, final_stage_id_for_e2e, req_start_ts, wall_start_ts): - del metrics, final_stage_id_for_e2e, req_start_ts, wall_start_ts +async def fake_process_results( + request_id, + metrics, + final_stage_id_for_e2e, + req_start_ts, + input_preprocess_time_ms, + wall_start_ts, +): + del metrics, final_stage_id_for_e2e, req_start_ts, input_preprocess_time_ms, wall_start_ts if request_id.startswith("cancel-"): await asyncio.Future() return diff --git a/tests/entrypoints/test_omni_entrypoints.py b/tests/entrypoints/test_omni_entrypoints.py index 251e54ab83d..ce7ced549e0 100644 --- a/tests/entrypoints/test_omni_entrypoints.py +++ b/tests/entrypoints/test_omni_entrypoints.py @@ -698,6 +698,84 @@ def test_omni_generate_returns_list_when_not_using_generator(monkeypatch: pytest assert [output.stage_id for output in outputs] == [0, 2, 0, 2] +def test_omni_generate_batches_offline_metrics_summary(monkeypatch: pytest.MonkeyPatch): + class FakeMetrics: + instances: list[FakeMetrics] = [] + + def __init__( + self, + num_stages: int, + log_stats: bool, + wall_start_ts: float, + final_stage_id_for_e2e: dict[str, int], + **_: Any, + ) -> None: + self.num_stages = num_stages + self.log_stats = log_stats + self.wall_start_ts = wall_start_ts + self.final_stage_id_for_e2e = final_stage_id_for_e2e + self.stage_first_ts = [None] * num_stages + self.stage_last_ts = [None] * num_stages + self.e2e_done: set[str] = set() + self.final_output_time_ms: dict[str, float] = {} + self.build_count = 0 + FakeMetrics.instances.append(self) + + def on_stage_metrics(self, *_: Any, **__: Any) -> None: + pass + + def record_final_output_time( + self, + req_id: str, + final_output_time_ms: float, + start_ts: float | None = None, + end_ts: float | None = None, + ) -> None: + del start_ts, end_ts + self.final_output_time_ms[req_id] = final_output_time_ms + + def on_finalize_request( + self, + stage_id: int, + req_id: str, + req_start_ts: float, + input_preprocess_time_ms: float = 0.0, + ) -> None: + del stage_id, req_start_ts, input_preprocess_time_ms + self.e2e_done.add(req_id) + + def build_and_log_summary(self) -> dict[str, Any]: + self.build_count += 1 + return {"num_requests": len(self.e2e_done)} + + sampling_params = [SamplingParams(max_tokens=8) for _ in range(2)] + engine = FakeAsyncOmniEngine( + stage_metadata=LLM_DIFFUSION_META, + default_sampling_params_list=sampling_params, + on_add_request=_enqueue_omni_llm_diffusion_outputs, + ) + _patch_engine(monkeypatch, engine) + monkeypatch.setattr("vllm_omni.entrypoints.omni.OrchestratorMetrics", FakeMetrics) + + app = Omni("dummy-model", log_stats=True) + try: + outputs = list(app.generate(["p1", "p2"], py_generator=True, use_tqdm=False)) + finally: + if not engine.shutdown_called: + app.shutdown() + + assert len(outputs) == 4 + assert len(FakeMetrics.instances) == 1 + metrics = FakeMetrics.instances[0] + assert metrics.final_stage_id_for_e2e == { + engine.submitted[0]["request_id"]: 1, + engine.submitted[1]["request_id"]: 1, + } + assert metrics.e2e_done == {engine.submitted[0]["request_id"], engine.submitted[1]["request_id"]} + assert metrics.build_count == 1 + assert app.request_states == {} + + def test_omni_generate_diffusion_only_yields_single_image_per_request(monkeypatch: pytest.MonkeyPatch): sampling_params = [SamplingParams(max_tokens=8)] engine = FakeAsyncOmniEngine( diff --git a/tests/entrypoints/test_pd_disaggregation.py b/tests/entrypoints/test_pd_disaggregation.py index 5ffabfbf2af..b03f425084e 100644 --- a/tests/entrypoints/test_pd_disaggregation.py +++ b/tests/entrypoints/test_pd_disaggregation.py @@ -246,7 +246,8 @@ def _setup_ipc_mocks(monkeypatch): def _setup_log_mocks(monkeypatch): class _FakeOrchestratorAggregator: - def __init__(self, num_stages, enable_stats, wall_start_ts, final_stage_id_for_e2e=None): + def __init__(self, num_stages, enable_stats, wall_start_ts, final_stage_id_for_e2e=None, **kwargs): + del kwargs self.num_stages = num_stages self.enable_stats = enable_stats self.stage_first_ts = [None] * num_stages @@ -255,12 +256,14 @@ def __init__(self, num_stages, enable_stats, wall_start_ts, final_stage_id_for_e self.accumulated_gen_time_ms = {} self.e2e_done = set() self.e2e_count = 0 - self.e2e_total_ms = 0.0 + self.engine_pipeline_total_ms = 0.0 + self.input_preprocess_total_ms = 0.0 def on_stage_metrics(self, stage_id, req_id, metrics, final_output_type=None): pass - def on_finalize_request(self, stage_id, req_id, start_ts): + def on_finalize_request(self, stage_id, req_id, start_ts, input_preprocess_time_ms=0.0): + del start_ts, input_preprocess_time_ms self.e2e_done.add(req_id) def on_forward(self, from_stage, to_stage, req_id, size_bytes, tx_ms, use_shm): diff --git a/tests/entrypoints/test_serve.py b/tests/entrypoints/test_serve.py index 1267a189a32..e02b79d9b58 100644 --- a/tests/entrypoints/test_serve.py +++ b/tests/entrypoints/test_serve.py @@ -31,6 +31,23 @@ def test_serve_parser_accepts_no_async_chunk() -> None: assert args.async_chunk is False +def test_serve_parser_accepts_log_stats_request_breakdown_limit() -> None: + try: + from vllm.utils.argparse_utils import FlexibleArgumentParser + except Exception as exc: + pytest.skip(f"Cannot build parser in this environment: {exc}") + + root = FlexibleArgumentParser() + subparsers = root.add_subparsers(dest="subcommand") + cmd = OmniServeCommand() + cmd.subparser_init(subparsers) + + argv = ["serve", "fake-model", "--omni", "--log-stats-request-breakdown-limit", "5"] + args = root.parse_args(argv) + + assert args.log_stats_request_breakdown_limit == 5 + + def _make_headless_args() -> argparse.Namespace: return argparse.Namespace( model="fake-model", diff --git a/tests/metrics/test_stats.py b/tests/metrics/test_stats.py index 8b18c5dffa1..b273184cef8 100644 --- a/tests/metrics/test_stats.py +++ b/tests/metrics/test_stats.py @@ -15,7 +15,7 @@ def _get_request_entry(table: list[dict], request_id: str) -> dict: raise AssertionError(f"request_id={request_id} not found") -def test_orchestrator_aggregator_builds_summary() -> None: +def test_orchestrator_aggregator_builds_summary(monkeypatch: pytest.MonkeyPatch) -> None: agg = OrchestratorAggregator(num_stages=2, log_stats=True, wall_start_ts=0.0, final_stage_id_for_e2e=1) agg.stage_first_ts[0] = 0.0 agg.stage_last_ts[0] = 0.03 @@ -37,6 +37,10 @@ def test_orchestrator_aggregator_builds_summary() -> None: rx_decode_time_ms=0.0, rx_in_flight_time_ms=0.0, stage_stats=StageStats(), + handoff_to_stage_id=1, + stage_handoff_time_ms=20.0, + ar2diffusion_time_ms=12.5, + output_processor_time_ms=7.0, ), ) agg.on_stage_metrics( @@ -54,22 +58,36 @@ def test_orchestrator_aggregator_builds_summary() -> None: stage_stats=StageStats(), ), ) + agg.record_final_output_time("r1", 3.0) + monkeypatch.setattr("vllm_omni.metrics.stats.time.time", lambda: 0.07) agg.on_finalize_request(1, "r1", req_start_ts=0.0) summary = agg.build_and_log_summary() overall = summary["overall_summary"] - assert overall["e2e_requests"] == 1 + assert overall["num_of_requests"] == 1 + assert overall["request_wall_time_ms"] == 70.0 + assert overall["input_preprocess_time_ms"] == 0.0 + assert overall["stage_gen_total_time_ms"] == 50.0 + assert overall["output_processor_total_time_ms"] == 7.0 + assert overall["stage_handoff_total_time_ms"] == 20.0 + assert overall["final_output_total_time_ms"] == 3.0 + assert overall["stage_0_to_1_handoff_time_ms"] == 20.0 + assert overall["ar2diffusion_total_time_ms"] == 12.5 + assert overall["stage_0_to_1_ar2diffusion_time_ms"] == 12.5 stage_entry = _get_request_entry(summary["stage_table"], "r1") stage_ids = [row["stage_id"] for row in stage_entry["stages"]] assert stage_ids == [0, 1] + assert "stage_handoff_time_ms" not in stage_entry["stages"][0] + assert "ar2diffusion_time_ms" not in stage_entry["stages"][0] transfer_entry = _get_request_entry(summary["trans_table"], "r1") assert transfer_entry["transfers"][0]["edge"] == "0->1" assert transfer_entry["transfers"][0]["size_kbytes"] == 1.0 e2e_entry = _get_request_entry(summary["e2e_table"], "r1") - assert e2e_entry["e2e_total_tokens"] == 10 + assert e2e_entry["total_tokens"] == 10 + assert e2e_entry["final_output_time_ms"] == 3.0 def test_build_and_log_summary_e2e_only() -> None: @@ -77,8 +95,10 @@ def test_build_and_log_summary_e2e_only() -> None: agg.e2e_events.append( RequestE2EStats( request_id="r", - e2e_total_ms=10.0, - e2e_total_tokens=5, + request_wall_time_ms=10.0, + input_preprocess_time_ms=0.0, + engine_pipeline_time_ms=10.0, + total_tokens=5, transfers_total_time_ms=0.0, transfers_total_bytes=0, ) @@ -86,11 +106,59 @@ def test_build_and_log_summary_e2e_only() -> None: summary = agg.build_and_log_summary() e2e_entry = _get_request_entry(summary["e2e_table"], "r") - assert e2e_entry["e2e_total_tokens"] == 5 + assert e2e_entry["request_wall_time_ms"] == 10.0 + assert e2e_entry["engine_pipeline_time_ms"] == 10.0 + assert e2e_entry["total_tokens"] == 5 stage_entry = _get_request_entry(summary["stage_table"], "r") assert stage_entry["stages"] == [] +def test_request_breakdown_log_limit_does_not_truncate_summary_tables() -> None: + agg = OrchestratorAggregator( + num_stages=1, + log_stats=True, + wall_start_ts=0.0, + final_stage_id_for_e2e=0, + request_breakdown_limit=2, + ) + for idx in range(4): + agg.e2e_events.append( + RequestE2EStats( + request_id=f"r{idx}", + request_wall_time_ms=10.0, + input_preprocess_time_ms=1.0, + engine_pipeline_time_ms=9.0, + total_tokens=1, + transfers_total_time_ms=0.0, + transfers_total_bytes=0, + ) + ) + agg.e2e_count = 4 + + lines = agg._build_omni_metrics_summary_lines( + { + "num_of_requests": 4, + "request_wall_time_ms": 40.0, + "input_preprocess_wall_time_ms": 4.0, + "engine_pipeline_time_ms": 36.0, + "final_output_total_time_ms": 0.0, + "avg_input_preprocess_time_ms": 1.0, + "avg_final_output_time_ms": 0.0, + } + ) + rendered = "\n".join(lines) + + assert "Request r0 Breakdown" in rendered + assert "Request r1 Breakdown" in rendered + assert "Request r2 Breakdown" not in rendered + assert "Request r3 Breakdown" not in rendered + assert "Request breakdowns shown:" in rendered + assert "Request breakdowns omitted:" in rendered + + summary = agg.build_and_log_summary() + assert {entry["request_id"] for entry in summary["e2e_table"]} == {"r0", "r1", "r2", "r3"} + + def test_build_and_log_summary_multiple_requests() -> None: agg = OrchestratorAggregator( num_stages=2, log_stats=True, wall_start_ts=0.0, final_stage_id_for_e2e={"r1": 1, "r2": 0} @@ -150,8 +218,107 @@ def test_build_and_log_summary_multiple_requests() -> None: summary = agg.build_and_log_summary() assert len(summary["stage_table"]) == 2 assert {entry["request_id"] for entry in summary["e2e_table"]} == {"r1", "r2"} + assert summary["overall_summary"]["engine_pipeline_time_ms"] > 0.0 # Check that r1 has two stages and r2 has one r1_stage_entry = next(e for e in summary["stage_table"] if e["request_id"] == "r1") r2_stage_entry = next(e for e in summary["stage_table"] if e["request_id"] == "r2") assert len(r1_stage_entry["stages"]) == 2 assert len(r2_stage_entry["stages"]) == 1 + + +def test_overall_engine_pipeline_time_uses_batch_wall_time() -> None: + agg = OrchestratorAggregator(num_stages=1, log_stats=True, wall_start_ts=0.0, final_stage_id_for_e2e=0) + agg.stage_first_ts[0] = 1.0 + agg.last_finish_ts = 251.0 + agg.engine_pipeline_total_ms = 1_336_711.0 + agg.e2e_count = 10 + agg.e2e_events = [ + RequestE2EStats( + request_id=f"r{i}", + request_wall_time_ms=250_000.0, + input_preprocess_time_ms=1.0, + engine_pipeline_time_ms=249_999.0, + total_tokens=0, + transfers_total_time_ms=0.0, + transfers_total_bytes=0, + ) + for i in range(10) + ] + + summary = agg.build_and_log_summary() + + assert summary["overall_summary"]["request_wall_time_ms"] == 251_000.0 + assert summary["overall_summary"]["input_preprocess_wall_time_ms"] == 1_000.0 + assert summary["overall_summary"]["engine_pipeline_time_ms"] == 250_000.0 + assert summary["overall_summary"]["avg_engine_pipeline_time_ms"] == 249_999.0 + + +def test_multi_request_summary_includes_averages_and_request_input_preprocess() -> None: + agg = OrchestratorAggregator( + num_stages=1, + log_stats=True, + wall_start_ts=0.0, + final_stage_id_for_e2e={"r1": 0, "r2": 0}, + ) + for req_id in ("r1", "r2"): + agg.on_stage_metrics( + 0, + req_id, + StageRequestStats( + batch_id=1, + batch_size=2, + num_tokens_in=1, + num_tokens_out=2, + stage_gen_time_ms=10.0, + rx_transfer_bytes=0, + rx_decode_time_ms=0.0, + rx_in_flight_time_ms=0.0, + stage_stats=StageStats(), + ), + ) + agg.e2e_events.append( + RequestE2EStats( + request_id=req_id, + request_wall_time_ms=20.0, + input_preprocess_time_ms=2.0, + engine_pipeline_time_ms=18.0, + total_tokens=3, + transfers_total_time_ms=0.0, + transfers_total_bytes=0, + ) + ) + agg.e2e_count = 2 + agg.input_preprocess_total_ms = 4.0 + + lines = agg._build_omni_metrics_summary_lines( + { + "num_of_requests": 2, + "request_wall_time_ms": 20.0, + "input_preprocess_time_ms": 4.0, + "input_preprocess_wall_time_ms": 2.0, + "engine_pipeline_time_ms": 18.0, + "final_output_total_time_ms": 0.0, + "breakdown_delta_time_ms": 12.0, + "avg_request_wall_time_ms": 20.0, + "avg_engine_pipeline_time_ms": 18.0, + "avg_input_preprocess_time_ms": 2.0, + "avg_final_output_time_ms": 0.0, + "avg_breakdown_delta_time_ms": 6.0, + "stage_0_wall_time_ms": 12.0, + } + ) + rendered = "\n".join(lines) + + assert "Sum check (ms):" in rendered + assert "Component total work time (ms):" not in rendered + assert "Component sum (ms):" not in rendered + assert "Stage sum check (ms):" not in rendered + assert "Average E2E time" not in rendered + assert "Average engine pipeline time" not in rendered + assert "Average Time Breakdown" in rendered + assert "Average Stage 0 latency time" in rendered + assert "Average Stage 0 queue wait time" in rendered + assert "Average Stage 0 execution time" in rendered + assert "Request r1 Breakdown" in rendered + assert "Request r2 Breakdown" in rendered + assert "Input preprocess time (ms):" in rendered diff --git a/vllm_omni/engine/async_omni_engine.py b/vllm_omni/engine/async_omni_engine.py index b2dc839c976..c049679a7be 100644 --- a/vllm_omni/engine/async_omni_engine.py +++ b/vllm_omni/engine/async_omni_engine.py @@ -1699,6 +1699,7 @@ def add_request( ) if self.request_queue is None: raise RuntimeError("request_queue is not initialized") + msg["request_dispatch_start_ts"] = time.time() self.request_queue.sync_q.put_nowait(msg) # CFG companion expansion: create and enqueue companion requests diff --git a/vllm_omni/engine/cfg_companion_tracker.py b/vllm_omni/engine/cfg_companion_tracker.py index 1105e887c68..151b40c4e84 100644 --- a/vllm_omni/engine/cfg_companion_tracker.py +++ b/vllm_omni/engine/cfg_companion_tracker.py @@ -67,7 +67,13 @@ def on_companion_completed(self, companion_id: str) -> str | None: return parent_id return None - def defer_parent(self, parent_id: str, engine_outputs: Any, stage_id: int) -> None: + def defer_parent( + self, + parent_id: str, + engine_outputs: Any, + stage_id: int, + stage_metrics: Any | None = None, + ) -> None: """Hold parent result while waiting for companions to finish.""" # TODO: Add timeout/error recovery when the orchestrator grows a # companion-failure path. Today deferred parents are released only when @@ -75,6 +81,7 @@ def defer_parent(self, parent_id: str, engine_outputs: Any, stage_id: int) -> No self._pending_parents[parent_id] = { "engine_outputs": engine_outputs, "stage_id": stage_id, + "stage_metrics": stage_metrics, } logger.debug("Parent %s deferred, waiting for CFG companions", parent_id) diff --git a/vllm_omni/engine/orchestrator.py b/vllm_omni/engine/orchestrator.py index 2d2ac47cbb3..e87a5a41ec6 100644 --- a/vllm_omni/engine/orchestrator.py +++ b/vllm_omni/engine/orchestrator.py @@ -88,9 +88,11 @@ class OrchestratorRequestState: # Metrics: timestamp when request was submitted to each stage. stage_submit_ts: dict[int, float] = field(default_factory=dict) + output_processor_time_ms: dict[int, float] = field(default_factory=dict) mm_processor_kwargs: dict | None = None mm_features: list | None = None pd_prefill_multimodal_output: dict[str, Any] | None = None + request_dispatch_wait_time_ms: float = 0.0 streaming: StreamingInputState = field(default_factory=lambda: StreamingInputState()) @@ -249,6 +251,12 @@ async def _handle_add_request(self, msg: dict[str, Any]) -> None: self.request_states[request_id] = req_state req_state.streaming.enabled = bool(getattr(prompt, "resumable", False)) req_state.stage_submit_ts[stage_id] = _time.time() + request_dispatch_start_ts = msg.get("request_dispatch_start_ts") + if request_dispatch_start_ts is not None: + req_state.request_dispatch_wait_time_ms = max( + 0.0, + (req_state.stage_submit_ts[stage_id] - request_dispatch_start_ts) * 1000.0, + ) enqueue_ts = msg.get("enqueue_ts", 0.0) if enqueue_ts > 0: req_state.pipeline_timings["queue_wait_ms"] = (_time.perf_counter() - enqueue_ts) * 1000.0 @@ -446,7 +454,18 @@ async def _orchestration_loop(self) -> None: "new_prompt_len_snapshot", None, ) + output_processor_start = _time.perf_counter() raw_output = await pool.process_llm_raw_outputs(replica_id, raw_outputs) + output_processor_time_ms = (_time.perf_counter() - output_processor_start) * 1000.0 + processed_request_ids = {output.request_id for output in raw_output} + if processed_request_ids: + per_request_time_ms = output_processor_time_ms / len(processed_request_ids) + for request_id in processed_request_ids: + req_state = self.request_states.get(request_id) + if req_state is not None: + req_state.output_processor_time_ms[stage_id] = ( + req_state.output_processor_time_ms.get(stage_id, 0.0) + per_request_time_ms + ) except asyncio.CancelledError: raise except EngineDeadError as e: @@ -518,11 +537,21 @@ async def _handle_processed_outputs(self, stage_id: int, replica_id: int, output stage_metrics = None if output.finished: + submit_ts = req_state.stage_submit_ts.get(stage_id, _time.time()) stage_metrics = pool.build_stage_metrics( [output], - submit_ts=req_state.stage_submit_ts.get(stage_id, _time.time()), + submit_ts=submit_ts, replica_id=replica_id, ) + stage_end_ts = _time.time() + output_processor_time_ms = float(req_state.output_processor_time_ms.get(stage_id, 0.0)) + stage_latency_time_ms = max(0.0, (stage_end_ts - submit_ts) * 1000.0) + stage_metrics.output_processor_time_ms = output_processor_time_ms + stage_metrics.stage_submit_ts = submit_ts + stage_metrics.stage_end_ts = stage_end_ts + stage_metrics.stage_latency_time_ms = stage_latency_time_ms + stage_metrics.stage_gen_time_ms = max(0.0, stage_latency_time_ms - output_processor_time_ms) + stage_metrics.request_dispatch_wait_time_ms = req_state.request_dispatch_wait_time_ms stage_metrics.pipeline_timings = dict(req_state.pipeline_timings) await self._route_output(stage_id, output, req_state, stage_metrics) @@ -634,13 +663,14 @@ async def _route_output( and self._cfg_tracker.has_companions(req_id) and not self._cfg_tracker.all_companions_done(req_id) ): - self._cfg_tracker.defer_parent(req_id, output, stage_id) + self._cfg_tracker.defer_parent(req_id, output, stage_id, stage_metrics) else: await self._forward_to_next_stage( req_id, stage_id, output, req_state, + stage_metrics=stage_metrics, is_streaming_session=req_state.streaming.enabled, is_final_update=False, ) @@ -651,6 +681,7 @@ async def _route_output( stage_id, output, req_state, + stage_metrics=stage_metrics, is_streaming_session=True, is_final_update=True, ) @@ -684,6 +715,7 @@ async def _handle_cfg_companion_ready(self, req_id: str) -> None: stage_id, deferred["engine_outputs"], parent_state, + stage_metrics=deferred.get("stage_metrics"), ) async def _handle_kv_ready_raw_outputs( @@ -768,6 +800,7 @@ async def _forward_to_next_stage( src_stage_id: int, output: Any, req_state: OrchestratorRequestState, + stage_metrics: Any | None = None, *, is_streaming_session: bool = False, is_final_update: bool = False, @@ -783,6 +816,8 @@ async def _forward_to_next_stage( requires_multimodal_data = getattr(next_client, "requires_multimodal_data", False) if next_pool.stage_type == "diffusion": + handoff_start_ts = getattr(stage_metrics, "stage_end_ts", None) or _time.time() + ar2diffusion_ms = 0.0 if next_client.custom_process_input_func is not None: _t_ar2d = _time.perf_counter() diffusion_prompt = next_client.custom_process_input_func( @@ -791,6 +826,9 @@ async def _forward_to_next_stage( requires_multimodal_data, ) _dt_ar2d = (_time.perf_counter() - _t_ar2d) * 1000 + ar2diffusion_ms = _dt_ar2d + if stage_metrics is not None: + stage_metrics.ar2diffusion_time_ms = _dt_ar2d req_state.pipeline_timings["ar2diffusion_ms"] = _dt_ar2d logger.info( "[Orchestrator] ar2diffusion req=%s wall_time=%.3fms stage=%d->%d", @@ -846,7 +884,13 @@ async def _forward_to_next_stage( }, params_override=self._maybe_clone_diffusion_params_for_cfg(req_id, params), ) - req_state.stage_submit_ts[next_logical] = _time.time() + next_submit_ts = _time.time() + req_state.stage_submit_ts[next_logical] = next_submit_ts + if stage_metrics is not None: + stage_metrics.handoff_to_stage_id = next_logical + stage_metrics.stage_handoff_time_ms = max(0.0, (next_submit_ts - handoff_start_ts) * 1000.0) + if stage_metrics.ar2diffusion_time_ms == 0.0: + stage_metrics.ar2diffusion_time_ms = ar2diffusion_ms return # PD disaggregation: prefill → decode routing uses original prompt + KV transfer params diff --git a/vllm_omni/engine/stage_init_utils.py b/vllm_omni/engine/stage_init_utils.py index ce68a23daa4..d8ac2b1caad 100644 --- a/vllm_omni/engine/stage_init_utils.py +++ b/vllm_omni/engine/stage_init_utils.py @@ -929,3 +929,98 @@ def initialize_diffusion_stage( od_config = build_diffusion_config(model, stage_cfg, metadata) return create_diffusion_client(model, od_config, metadata, stage_init_timeout, batch_size, use_inline) + + +def _shutdown_or_close_resource(resource: Any, resource_name: str, stage_id: int) -> None: + """vLLM CoreEngineProcManager / coordinators use ``shutdown()``, not ``close()``.""" + if resource is None: + return + shutdown = getattr(resource, "shutdown", None) + if callable(shutdown): + try: + shutdown() + except Exception as cleanup_error: + logger.warning( + "[stage_init] Failed to shutdown launched %s for stage %s: %s", + resource_name, + stage_id, + cleanup_error, + ) + return + close = getattr(resource, "close", None) + if callable(close): + try: + close() + except Exception as cleanup_error: + logger.warning( + "[stage_init] Failed to close launched %s for stage %s: %s", + resource_name, + stage_id, + cleanup_error, + ) + + +def close_started_llm_stage(started: Any) -> None: + """Release resources owned by a launched stage that never attached.""" + if started.proc is not None: + try: + terminate_alive_proc(started.proc) + except Exception as cleanup_error: + logger.warning( + "[stage_init] Failed to terminate process for stage %s: %s", + started.stage_id, + cleanup_error, + ) + _shutdown_or_close_resource(started.engine_manager, "engine manager", started.stage_id) + _shutdown_or_close_resource(started.coordinator, "coordinator", started.stage_id) + + +def finalize_initialized_stages( + stage_clients: list[Any | None], + input_processor: InputProcessor | None, +) -> tuple[list[Any], list[Any], list[dict[str, Any]]]: + """Validate successful init and build runtime metadata lists.""" + if any(stage_client is None for stage_client in stage_clients): + raise RuntimeError("Stage initialization completed with missing stage clients") + + initialized_stage_clients = [stage_client for stage_client in stage_clients if stage_client is not None] + default_sampling_params_list = [stage_client.default_sampling_params for stage_client in initialized_stage_clients] + stage_metadata = [ + { + "final_output": stage_client.final_output, + "final_output_type": stage_client.final_output_type, + "stage_type": stage_client.stage_type, + "model_stage": getattr(stage_client, "model_stage", None), + } + for stage_client in initialized_stage_clients + ] + + if not isinstance(input_processor, InputProcessor): + has_llm_stage = any(metadata.get("stage_type") != "diffusion" for metadata in stage_metadata) + if has_llm_stage: + raise RuntimeError("Failed to initialize stage-0 InputProcessor for LLM pipeline") + + return initialized_stage_clients, default_sampling_params_list, stage_metadata + + +def cleanup_failed_stage_initialization( + stage_clients: list[Any | None], + started_llm_stages: list[Any], +) -> None: + """Shutdown attached stages and close any launched-but-unattached engines.""" + for cleanup_stage_id, stage_client in reversed(list(enumerate(stage_clients))): + if stage_client is None: + continue + try: + stage_client.shutdown() + except Exception as cleanup_error: + logger.warning( + "[stage_init] Failed to shutdown initialized stage %s after init failure: %s", + cleanup_stage_id, + cleanup_error, + ) + + for started in reversed(started_llm_stages): + if stage_clients[started.stage_id] is not None: + continue + close_started_llm_stage(started) diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 056f56c003b..747a77f3717 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -289,7 +289,9 @@ async def generate( # Track per-request metrics wall_start_ts = time.time() + request_prep_start_ts = wall_start_ts req_start_ts: dict[str, float] = {} + input_preprocess_time_ms: dict[str, float] = {} # Determine the final stage for E2E stats final_stage_id_for_e2e = self._compute_final_stage_id(output_modalities) @@ -299,6 +301,8 @@ async def generate( self.log_stats, wall_start_ts, final_stage_id_for_e2e, + stage_metadata=getattr(self.engine, "stage_metadata", None), + request_breakdown_limit=self.log_stats_request_breakdown_limit, ) req_state = ClientRequestState(request_id) req_state.metrics = metrics @@ -328,6 +332,7 @@ async def generate( final_stage_id=final_stage_id_for_e2e, ) submit_ts = time.time() + input_preprocess_time_ms[request_id] = (submit_ts - request_prep_start_ts) * 1000.0 req_state.metrics.stage_first_ts[0] = submit_ts req_start_ts[request_id] = submit_ts @@ -340,6 +345,7 @@ async def generate( metrics, final_stage_id_for_e2e, req_start_ts, + input_preprocess_time_ms, wall_start_ts, ): yield output @@ -486,6 +492,7 @@ async def _process_orchestrator_results( metrics: OrchestratorMetrics, final_stage_id_for_e2e: int, req_start_ts: dict[str, float], + input_preprocess_time_ms: dict[str, float], wall_start_ts: float, ) -> AsyncGenerator[OmniRequestOutput, None]: """Read results from the Orchestrator (via the request's asyncio.Queue) @@ -527,6 +534,7 @@ async def _process_orchestrator_results( stage_id, metrics, req_start_ts, + input_preprocess_time_ms, wall_start_ts, final_stage_id_for_e2e, ) diff --git a/vllm_omni/entrypoints/cli/serve.py b/vllm_omni/entrypoints/cli/serve.py index b4293d59fd7..9a28ff8076e 100644 --- a/vllm_omni/entrypoints/cli/serve.py +++ b/vllm_omni/entrypoints/cli/serve.py @@ -216,6 +216,13 @@ def subparser_init(self, subparsers: argparse._SubParsersAction) -> FlexibleArgu action="store_true", help="Enable logging the stats.", ) + omni_config_group.add_argument( + "--log-stats-request-breakdown-limit", + type=int, + default=5, + help="Maximum number of per-request breakdowns to print in the final stats summary. " + "Use a negative value to print all request breakdowns.", + ) omni_config_group.add_argument( "--log-file", type=str, diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index 223c208af98..8485d852eec 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -114,22 +114,27 @@ def _run_generation( request_ids = [f"{i}_{uuid.uuid4()}" for i in range(len(request_prompts))] req_start_ts: dict[str, float] = {} + input_preprocess_time_ms: dict[str, float] = {} wall_start_ts = time.time() req_final_stage_ids: dict[str, int] = {} - for req_id, prompt in zip(request_ids, request_prompts): prompt_modalities = prompt.get("modalities", None) if isinstance(prompt, dict) else None - final_stage_id = self._compute_final_stage_id(prompt_modalities) - req_final_stage_ids[req_id] = final_stage_id - - metrics = OrchestratorMetrics( - self.num_stages, - self.log_stats, - wall_start_ts, - final_stage_id, - ) + req_final_stage_ids[req_id] = self._compute_final_stage_id(prompt_modalities) + batch_metrics = OrchestratorMetrics( + self.num_stages, + self.log_stats, + wall_start_ts, + req_final_stage_ids, + stage_metadata=getattr(self.engine, "stage_metadata", None), + request_breakdown_limit=self.log_stats_request_breakdown_limit, + ) + + for req_id, prompt in zip(request_ids, request_prompts): + request_prep_start_ts = time.time() + final_stage_id = req_final_stage_ids[req_id] + req_state = ClientRequestState(req_id) - req_state.metrics = metrics + req_state.metrics = batch_metrics self.request_states[req_id] = req_state # PD disaggregation: modify stage-0 (prefill) sampling params per request @@ -144,9 +149,14 @@ def _run_generation( prompt=prompt, sampling_params_list=req_sp_list, final_stage_id=final_stage_id, + arrival_time=request_prep_start_ts, ) submit_ts = time.time() - req_state.metrics.stage_first_ts[0] = submit_ts + input_preprocess_time_ms[req_id] = (submit_ts - request_prep_start_ts) * 1000.0 + if batch_metrics.stage_first_ts[0] is None: + batch_metrics.stage_first_ts[0] = submit_ts + else: + batch_metrics.stage_first_ts[0] = min(batch_metrics.stage_first_ts[0], submit_ts) req_start_ts[req_id] = submit_ts active_reqs = set(request_ids) @@ -175,6 +185,7 @@ def _run_generation( stage_id=stage_id, metrics=req_state.metrics, req_start_ts=req_start_ts, + input_preprocess_time_ms=input_preprocess_time_ms, wall_start_ts=wall_start_ts, final_stage_id_for_e2e=req_final_stage_ids[req_id], ) @@ -185,7 +196,11 @@ def _run_generation( active_reqs.discard(req_id) if pbar is not None: pbar.update(1) - self._log_summary_and_cleanup(req_id) + self.request_states.pop(req_id, None) + + summary = batch_metrics.build_and_log_summary() + if summary: + logger.debug("[Summary] %s", summary) except Exception: if "active_reqs" in locals() and active_reqs: self.abort(list(active_reqs)) diff --git a/vllm_omni/entrypoints/omni_base.py b/vllm_omni/entrypoints/omni_base.py index 4e84d620026..0d0b230b188 100644 --- a/vllm_omni/entrypoints/omni_base.py +++ b/vllm_omni/entrypoints/omni_base.py @@ -8,6 +8,7 @@ import warnings import weakref from collections.abc import Sequence +from pprint import pformat from typing import TYPE_CHECKING, Any, Literal import huggingface_hub @@ -142,6 +143,7 @@ def __init__( stage_init_timeout = kwargs.pop("stage_init_timeout", 300) init_timeout = kwargs.pop("init_timeout", 600) log_stats = kwargs.pop("log_stats", False) + self.log_stats_request_breakdown_limit = kwargs.pop("log_stats_request_breakdown_limit", 5) self._enable_ar_profiler = kwargs.pop("enable_ar_profiler", False) # NOTE: read-only lookup — must NOT pop. Popping here drops the key # before it reaches ``StageConfigFactory._create_from_registry``, so @@ -273,6 +275,9 @@ def _log_summary_and_cleanup(self, request_id: str) -> None: try: if req_state is None or req_state.metrics is None: return + summary = req_state.metrics.build_and_log_summary() + if summary: + logger.debug("[Summary] %s", pformat(summary, sort_dicts=False)) except Exception: logger.exception( "[%s] Failed to build/log summary for req=%s", @@ -392,6 +397,7 @@ def _process_single_result( stage_id: int, metrics: OrchestratorMetrics, req_start_ts: dict[str, float], + input_preprocess_time_ms: dict[str, float], wall_start_ts: float, final_stage_id_for_e2e: int, ) -> OmniRequestOutput | None: @@ -401,8 +407,9 @@ def _process_single_result( peak_memory_mb = getattr(result["engine_outputs"], "peak_memory_mb", 0.0) # Merge AR stage timing from OrchestratorAggregator.stage_events + stage_events = getattr(metrics, "stage_events", {}) if self._enable_ar_profiler: - ar_events = metrics.stage_events.get(str(req_id), []) + ar_events = stage_events.get(str(req_id), []) for evt in ar_events: if evt.stage_id != stage_id: stage_durations[f"ar_stage_{evt.stage_id}"] = evt.stage_gen_time_ms / 1000.0 @@ -415,7 +422,7 @@ def _process_single_result( stage_durations[key] = value # Merge per-stage gen times into stage_durations - for evt in metrics.stage_events.get(str(req_id), []): + for evt in stage_events.get(str(req_id), []): key = f"stage_{evt.stage_id}_gen_ms" if key not in stage_durations: stage_durations[key] = evt.stage_gen_time_ms @@ -439,20 +446,11 @@ def _process_single_result( if not stage_meta["final_output"]: return None - try: - rid_key = str(req_id) - if stage_id == final_stage_id_for_e2e and rid_key not in metrics.e2e_done and finished: - metrics.on_finalize_request( - stage_id, - req_id, - req_start_ts.get(req_id, wall_start_ts), - ) - except Exception: - logger.exception("[%s] Finalize request handling error", self.__class__.__name__) - + final_output_start_ts = time.time() + final_output_start = time.perf_counter() output_type = getattr(engine_outputs, "final_output_type", stage_meta["final_output_type"]) images = getattr(engine_outputs, "images", []) if output_type == "image" else [] - return OmniRequestOutput( + output_to_yield = OmniRequestOutput( request_id=req_id or "", stage_id=stage_id, final_output_type=output_type, @@ -466,6 +464,22 @@ def _process_single_result( stage_durations=stage_durations, peak_memory_mb=peak_memory_mb, ) + final_output_time_ms = (time.perf_counter() - final_output_start) * 1000.0 + metrics.record_final_output_time(req_id, final_output_time_ms, final_output_start_ts, time.time()) + + try: + rid_key = str(req_id) + if stage_id == final_stage_id_for_e2e and rid_key not in metrics.e2e_done and finished: + metrics.on_finalize_request( + stage_id, + req_id, + req_start_ts.get(req_id, wall_start_ts), + input_preprocess_time_ms=input_preprocess_time_ms.get(req_id, 0.0), + ) + except Exception: + logger.exception("[%s] Finalize request handling error", self.__class__.__name__) + + return output_to_yield def shutdown(self) -> None: logger.info("[%s] Shutting down", self.__class__.__name__) diff --git a/vllm_omni/metrics/stats.py b/vllm_omni/metrics/stats.py index 4245deb5453..f3f7d5c9074 100644 --- a/vllm_omni/metrics/stats.py +++ b/vllm_omni/metrics/stats.py @@ -9,7 +9,7 @@ from vllm.logger import init_logger -from vllm_omni.metrics.utils import _build_field_defs, _build_row, _format_table +from vllm_omni.metrics.utils import _build_field_defs, _build_row logger = init_logger(__name__) @@ -36,11 +36,23 @@ class StageRequestStats: rx_in_flight_time_ms: float stage_stats: StageStats stage_id: int | None = None + stage_name: str | None = None + stage_type: str | None = None final_output_type: str | None = None request_id: str | None = None postprocess_time_ms: float = 0.0 + output_processor_time_ms: float = 0.0 diffusion_metrics: dict[str, int] = None audio_generated_frames: int = 0 + stage_submit_ts: float | None = None + stage_end_ts: float | None = None + request_dispatch_wait_time_ms: float = 0.0 + stage_latency_time_ms: float | None = None + stage_queue_wait_time_ms: float | None = None + stage_execution_time_ms: float | None = None + handoff_to_stage_id: int | None = None + stage_handoff_time_ms: float = 0.0 + ar2diffusion_time_ms: float = 0.0 pipeline_timings: dict[str, float] | None = None @property @@ -75,14 +87,17 @@ def total_time_ms(self) -> float: @dataclass class RequestE2EStats: request_id: str - e2e_total_ms: float - e2e_total_tokens: int + request_wall_time_ms: float + input_preprocess_time_ms: float + engine_pipeline_time_ms: float + total_tokens: int transfers_total_time_ms: float transfers_total_bytes: int + final_output_time_ms: float = 0.0 @property def e2e_tpt(self) -> float: - return (self.e2e_total_ms / self.e2e_total_tokens) if self.e2e_total_tokens > 0 else 0.0 + return (self.engine_pipeline_time_ms / self.total_tokens) if self.total_tokens > 0 else 0.0 # === Field Configuration === @@ -102,16 +117,51 @@ def e2e_tpt(self) -> float: "rx_decode_time_ms", "rx_in_flight_time_ms", "final_output_type", + "stage_submit_ts", + "stage_end_ts", + "request_dispatch_wait_time_ms", + "stage_latency_time_ms", + "stage_queue_wait_time_ms", + "stage_execution_time_ms", + "handoff_to_stage_id", + "stage_handoff_time_ms", + "ar2diffusion_time_ms", "pipeline_timings", } TRANSFER_EXCLUDE = {"from_stage", "to_stage", "request_id", "used_shm"} E2E_EXCLUDE = {"request_id"} # Decide the order of overall summary fields, or None for auto -OVERALL_FIELDS: list[str] | None = None +OVERALL_FIELDS: list[str] | None = [ + "num_of_requests", + "request_wall_time_ms", + "input_preprocess_time_ms", + "engine_pipeline_time_ms", + "stage_gen_total_time_ms", + "stage_latency_total_time_ms", + "stage_queue_wait_total_time_ms", + "stage_execution_total_time_ms", + "output_processor_total_time_ms", + "stage_handoff_total_time_ms", + "ar2diffusion_total_time_ms", + "final_output_total_time_ms", + "breakdown_delta_time_ms", + "total_tokens", + "avg_request_wall_time_ms", + "avg_input_preprocess_time_ms", + "avg_engine_pipeline_time_ms", + "avg_stage_gen_total_time_ms", + "avg_output_processor_time_ms", + "avg_stage_handoff_total_time_ms", + "avg_ar2diffusion_time_ms", + "avg_final_output_time_ms", + "avg_breakdown_delta_time_ms", + "avg_tokens_per_s", +] STAGE_FIELDS = _build_field_defs(StageRequestStats, STAGE_EXCLUDE, FIELD_TRANSFORMS) TRANSFER_FIELDS = _build_field_defs(TransferEdgeStats, TRANSFER_EXCLUDE, FIELD_TRANSFORMS) E2E_FIELDS = _build_field_defs(RequestE2EStats, E2E_EXCLUDE, FIELD_TRANSFORMS) +DEFAULT_REQUEST_BREAKDOWN_LIMIT = 5 class OrchestratorAggregator: @@ -121,10 +171,14 @@ def __init__( log_stats: bool, wall_start_ts: float, final_stage_id_for_e2e: dict[str, int] | int, + stage_metadata: list[dict[str, Any]] | None = None, + request_breakdown_limit: int | None = DEFAULT_REQUEST_BREAKDOWN_LIMIT, ) -> None: self.num_stages = int(num_stages) self.log_stats = bool(log_stats) self.final_stage_id_for_e2e = final_stage_id_for_e2e + self.stage_metadata = list(stage_metadata or []) + self.request_breakdown_limit = self._normalize_request_breakdown_limit(request_breakdown_limit) self.init_run_state(wall_start_ts) self.stage_events: dict[str, list[StageRequestStats]] = {} self.transfer_events: dict[ @@ -135,8 +189,10 @@ def __init__( def init_run_state(self, wall_start_ts: float) -> None: # Per-run aggregates and timing state self.stage_total_tokens = [0 for _ in range(self.num_stages)] - self.e2e_total_ms = 0.0 - self.e2e_total_tokens = 0 + self.engine_pipeline_total_ms = 0.0 + self.input_preprocess_total_ms = 0.0 + self.final_output_total_ms = 0.0 + self.total_tokens = 0 self.e2e_count = 0 self.e2e_done = set() self.wall_start_ts = float(wall_start_ts) @@ -149,6 +205,466 @@ def init_run_state(self, wall_start_ts: float) -> None: self.diffusion_metrics: defaultdict[str, defaultdict[str, float]] = defaultdict( lambda: defaultdict(float) ) # {request_id: {diffusion_metrics_key: accumulated_metrics_data}} + self.final_output_time_ms: defaultdict[str, float] = defaultdict(float) + + def _get_stage_metadata(self, stage_id: int | None) -> dict[str, Any]: + if stage_id is None or stage_id < 0 or stage_id >= len(self.stage_metadata): + return {} + meta = self.stage_metadata[stage_id] + return dict(meta) if isinstance(meta, dict) else {} + + @staticmethod + def _get_stage_type(stage_meta: dict[str, Any]) -> str: + return str(stage_meta.get("stage_type") or "unknown") + + @classmethod + def _get_stage_name(cls, stage_id: int, stage_meta: dict[str, Any]) -> str: + stage_type = cls._get_stage_type(stage_meta) + model_stage = stage_meta.get("model_stage") + if model_stage: + return str(model_stage) + if stage_type == "diffusion": + return "diffusion" + return f"stage_{stage_id}" + + def _stage_label(self, stage_id: int | None) -> str: + if stage_id is None: + return "unknown" + stage_meta = self._get_stage_metadata(stage_id) + return f"{stage_id}:{self._get_stage_name(stage_id, stage_meta)}" + + def _format_seconds(self, ms: float) -> str: + return f"{ms / 1000.0:.2f}s" + + def _format_ms(self, ms: float) -> str: + return f"{ms:,.3f} ms" + + @staticmethod + def _normalize_request_breakdown_limit(limit: int | None) -> int | None: + if limit is None: + return None + limit = int(limit) + return None if limit < 0 else limit + + def _log_omni_timing(self, evt: RequestE2EStats) -> None: + rid = evt.request_id + stages = [] + handoffs = [] + ar2diffusion_ms = 0.0 + for stage_evt in sorted( + self.stage_events.get(rid, []), + key=lambda e: e.stage_id if e.stage_id is not None else -1, + ): + stages.append( + f"{self._stage_label(stage_evt.stage_id)}={self._format_seconds(stage_evt.stage_gen_time_ms)}" + ) + ar2diffusion_ms += float(stage_evt.ar2diffusion_time_ms or 0.0) + if stage_evt.handoff_to_stage_id is not None and stage_evt.stage_handoff_time_ms > 0.0: + handoff = ( + f"{stage_evt.stage_id}->{stage_evt.handoff_to_stage_id}={stage_evt.stage_handoff_time_ms:.3f}ms" + ) + if stage_evt.ar2diffusion_time_ms > 0.0: + handoff += f"(ar2diffusion={stage_evt.ar2diffusion_time_ms:.3f}ms)" + handoffs.append(handoff) + transfers = [] + for transfer_evt in sorted( + [e for e in self.transfer_events.values() if e.request_id == rid], + key=lambda e: (e.from_stage, e.to_stage), + ): + transfers.append(f"{transfer_evt.from_stage}->{transfer_evt.to_stage}={transfer_evt.total_time_ms:.3f}ms") + + ar2diffusion = f" ar2diffusion={self._format_seconds(ar2diffusion_ms)}" if ar2diffusion_ms > 0.0 else "" + logger.info( + "[OmniTiming] req=%s total=%s preprocess=%s engine=%s%s stages=[%s] handoffs=[%s] transfers=[%s]", + rid, + self._format_seconds(evt.request_wall_time_ms), + self._format_seconds(evt.input_preprocess_time_ms), + self._format_seconds(evt.engine_pipeline_time_ms), + ar2diffusion, + ",".join(stages), + ",".join(handoffs), + ",".join(transfers), + ) + + @staticmethod + def _summary_line(label: str, value: Any) -> str: + if isinstance(value, bool): + text = str(value).lower() + elif isinstance(value, int): + text = f"{value:d}" + elif isinstance(value, float): + text = f"{value:,.3f}" + else: + text = str(value) + label_width = max(42, len(label) + 1) + return f"{label:<{label_width}}{text:>16}" + + @staticmethod + def _summary_header(title: str, char: str = "=") -> str: + return f"{char * 12} {title} {char * 12}" + + def _append_stage_info_lines(self, lines: list[str], evt: StageRequestStats) -> None: + lines.extend( + [ + self._summary_line("Stage id:", evt.stage_id if evt.stage_id is not None else "unknown"), + self._summary_line("Stage name:", evt.stage_name or ""), + self._summary_line("Stage type:", evt.stage_type or ""), + self._summary_line("Final output type:", evt.final_output_type or ""), + self._summary_line("Batch id:", evt.batch_id), + self._summary_line("Batch size:", evt.batch_size), + ] + ) + if evt.num_tokens_in > 0 or evt.num_tokens_out > 0: + lines.extend( + [ + "", + self._summary_line("Input tokens:", evt.num_tokens_in), + self._summary_line("Output tokens:", evt.num_tokens_out), + self._summary_line("Output token throughput (tok/s):", evt.tokens_per_s), + ] + ) + if evt.postprocess_time_ms > 0.0: + lines.extend( + [ + "", + self._summary_line("Postprocess time (ms):", evt.postprocess_time_ms), + ] + ) + if evt.diffusion_metrics: + lines.append("") + for key, value in sorted(evt.diffusion_metrics.items()): + lines.append(self._summary_line(f"{key}:", value)) + + def _request_stage_component_time_ms(self, request_id: str) -> float: + return sum( + self._optional_ms(evt.stage_queue_wait_time_ms) + + self._stage_execution_ms(evt) + + float(evt.output_processor_time_ms or 0.0) + + float(evt.stage_handoff_time_ms or 0.0) + for evt in self.stage_events.get(request_id, []) + ) + + def _request_final_orchestration_time_ms(self, evt: RequestE2EStats) -> float: + return ( + float(evt.request_wall_time_ms) + - float(evt.input_preprocess_time_ms) + - self._request_stage_component_time_ms(evt.request_id) + - float(self.final_output_time_ms.get(evt.request_id, 0.0)) + ) + + def _request_dispatch_wait_time_ms(self, request_id: str) -> float: + return sum( + float(evt.request_dispatch_wait_time_ms or 0.0) + for evt in self.stage_events.get(request_id, []) + if evt.stage_id == 0 + ) + + def _request_remaining_orchestration_time_ms(self, evt: RequestE2EStats) -> float: + return self._request_final_orchestration_time_ms(evt) - self._request_dispatch_wait_time_ms(evt.request_id) + + def _get_overall_summary_time_ms(self, overall_summary: dict[str, Any], key: str) -> float: + return float(overall_summary.get(key, overall_summary.get(key.replace("_wall_", "_"), 0.0))) + + @staticmethod + def _optional_ms(value: float | None, default: float = 0.0) -> float: + return float(default if value is None else value) + + @classmethod + def _stage_latency_ms(cls, evt: StageRequestStats) -> float: + return cls._optional_ms(evt.stage_latency_time_ms, float(evt.stage_gen_time_ms or 0.0)) + + @classmethod + def _stage_execution_ms(cls, evt: StageRequestStats) -> float: + return cls._optional_ms(evt.stage_execution_time_ms, float(evt.stage_gen_time_ms or 0.0)) + + @staticmethod + def _average(total: float, count: int) -> float: + return float(total / count) if count > 0 else 0.0 + + def _average_e2e_field(self, field_name: str) -> float: + if self.e2e_count <= 0: + return 0.0 + return float(sum(float(getattr(evt, field_name)) for evt in self.e2e_events) / self.e2e_count) + + def _estimate_stage_wait_and_execution_times(self) -> None: + """Populate derived latency, queue wait, and execution times on stage events.""" + for stage_id in range(self.num_stages): + stage_evts = sorted( + [ + evt + for evts in self.stage_events.values() + for evt in evts + if evt.stage_id == stage_id and evt.stage_submit_ts is not None and evt.stage_end_ts is not None + ], + key=lambda evt: float(evt.stage_end_ts or 0.0), + ) + previous_finish_ts: float | None = None + for evt in stage_evts: + submit_ts = float(evt.stage_submit_ts or 0.0) + end_ts = float(evt.stage_end_ts or submit_ts) + latency_ms = max(0.0, (end_ts - submit_ts) * 1000.0) + service_start_ts = submit_ts + if previous_finish_ts is not None: + service_start_ts = max(submit_ts, previous_finish_ts) + queue_wait_ms = max(0.0, (service_start_ts - submit_ts) * 1000.0) + service_time_ms = max(0.0, (end_ts - service_start_ts) * 1000.0) + execution_ms = max(0.0, service_time_ms - float(evt.output_processor_time_ms or 0.0)) + evt.stage_latency_time_ms = latency_ms + evt.stage_queue_wait_time_ms = queue_wait_ms + evt.stage_execution_time_ms = execution_ms + previous_finish_ts = end_ts if previous_finish_ts is None else max(previous_finish_ts, end_ts) + + def _build_omni_metrics_summary_lines(self, overall_summary: dict[str, Any]) -> list[str]: + lines = [self._summary_header("Omni Metrics Summary")] + lines.extend( + [ + self._summary_line("Successful requests:", int(overall_summary.get("num_of_requests", 0))), + self._summary_line("Total E2E time (ms):", float(overall_summary.get("request_wall_time_ms", 0.0))), + self._summary_line( + "Input preprocess time (ms):", + self._get_overall_summary_time_ms(overall_summary, "input_preprocess_wall_time_ms"), + ), + self._summary_line( + "Engine pipeline time (ms):", + float(overall_summary.get("engine_pipeline_time_ms", 0.0)), + ), + self._summary_line( + "Sum check (ms):", + self._get_overall_summary_time_ms(overall_summary, "input_preprocess_wall_time_ms") + + float(overall_summary.get("engine_pipeline_time_ms", 0.0)), + ), + "", + self._summary_header("Overall Time Breakdown", "-"), + self._summary_line( + "Input preprocess time (ms):", + self._get_overall_summary_time_ms(overall_summary, "input_preprocess_wall_time_ms"), + ), + ] + ) + + component_sum = self._get_overall_summary_time_ms(overall_summary, "input_preprocess_wall_time_ms") + stage_average_lines: list[str] = [] + stage_ids = sorted( + { + evt.stage_id + for stage_evts in self.stage_events.values() + for evt in stage_evts + if evt.stage_id is not None + } + ) + for stage_id in stage_ids: + stage_latency_ms = sum( + self._stage_latency_ms(evt) + for stage_evts in self.stage_events.values() + for evt in stage_evts + if evt.stage_id == stage_id + ) + stage_queue_wait_ms = sum( + self._optional_ms(evt.stage_queue_wait_time_ms) + for stage_evts in self.stage_events.values() + for evt in stage_evts + if evt.stage_id == stage_id + ) + stage_execution_ms = sum( + self._stage_execution_ms(evt) + for stage_evts in self.stage_events.values() + for evt in stage_evts + if evt.stage_id == stage_id + ) + output_processor_total_ms = sum( + float(evt.output_processor_time_ms or 0.0) + for stage_evts in self.stage_events.values() + for evt in stage_evts + if evt.stage_id == stage_id + ) + stage_event_count = sum( + 1 for stage_evts in self.stage_events.values() for evt in stage_evts if evt.stage_id == stage_id + ) + lines.append(self._summary_line(f"Stage {stage_id} total latency time (ms):", stage_latency_ms)) + lines.append(self._summary_line(f"Stage {stage_id} queue wait time (ms):", stage_queue_wait_ms)) + lines.append(self._summary_line(f"Stage {stage_id} execution time (ms):", stage_execution_ms)) + lines.append(self._summary_line(f"Stage {stage_id} output processor time (ms):", output_processor_total_ms)) + component_sum += stage_execution_ms + output_processor_total_ms + + handoff_ms = sum( + float(evt.stage_handoff_time_ms or 0.0) + for stage_evts in self.stage_events.values() + for evt in stage_evts + if evt.stage_id == stage_id + ) + if handoff_ms > 0.0: + to_stage_ids = sorted( + { + evt.handoff_to_stage_id + for stage_evts in self.stage_events.values() + for evt in stage_evts + if evt.stage_id == stage_id and evt.handoff_to_stage_id is not None + } + ) + to_stage = to_stage_ids[0] if len(to_stage_ids) == 1 else "next" + lines.append(self._summary_line(f"Stage {stage_id} -> Stage {to_stage} handoff time (ms):", handoff_ms)) + component_sum += handoff_ms + if int(overall_summary.get("num_of_requests", 0)) > 1 and stage_event_count > 0: + stage_average_lines.extend( + [ + self._summary_line( + f"Average Stage {stage_id} latency time (ms):", + stage_latency_ms / stage_event_count, + ), + self._summary_line( + f"Average Stage {stage_id} queue wait time (ms):", + stage_queue_wait_ms / stage_event_count, + ), + self._summary_line( + f"Average Stage {stage_id} execution time (ms):", + stage_execution_ms / stage_event_count, + ), + self._summary_line( + f"Average Stage {stage_id} output processor time (ms):", + output_processor_total_ms / stage_event_count, + ), + ] + ) + if handoff_ms > 0.0: + stage_average_lines.append( + self._summary_line( + f"Average Stage {stage_id} handoff time (ms):", + handoff_ms / stage_event_count, + ) + ) + + final_output_bucket_ms = float(overall_summary.get("final_output_total_time_ms", 0.0)) + lines.extend( + [ + self._summary_line("Final output time (ms):", final_output_bucket_ms), + ] + ) + if int(overall_summary.get("num_of_requests", 0)) == 1: + lines.extend( + [ + self._summary_line("Component sum (ms):", component_sum + final_output_bucket_ms), + self._summary_line( + "E2E - component sum (ms):", + float(overall_summary.get("request_wall_time_ms", 0.0)) + - component_sum + - final_output_bucket_ms, + ), + ] + ) + else: + lines.extend( + [ + "", + self._summary_header("Average Time Breakdown", "-"), + self._summary_line( + "Average input preprocess time (ms):", + float(overall_summary.get("avg_input_preprocess_time_ms", 0.0)), + ), + *stage_average_lines, + self._summary_line( + "Average final output time (ms):", + float(overall_summary.get("avg_final_output_time_ms", 0.0)), + ), + ] + ) + + all_breakdown_request_ids = sorted(set(self.stage_events.keys()) | {evt.request_id for evt in self.e2e_events}) + if self.request_breakdown_limit is None: + breakdown_request_ids = all_breakdown_request_ids + else: + breakdown_request_ids = all_breakdown_request_ids[: self.request_breakdown_limit] + omitted_breakdowns = len(all_breakdown_request_ids) - len(breakdown_request_ids) + if omitted_breakdowns > 0: + lines.extend( + [ + "", + self._summary_line("Request breakdowns shown:", len(breakdown_request_ids)), + self._summary_line("Request breakdowns omitted:", omitted_breakdowns), + ] + ) + + for rid in breakdown_request_ids: + stage_evts = sorted( + self.stage_events.get(rid, []), + key=lambda evt: evt.stage_id if evt.stage_id is not None else -1, + ) + e2e_evt = next((evt for evt in self.e2e_events if evt.request_id == rid), None) + if e2e_evt is not None: + request_dispatch_wait_ms = self._request_dispatch_wait_time_ms(rid) + lines.extend( + [ + "", + self._summary_header(f"Request {rid} Breakdown", "-"), + self._summary_line("Input preprocess time (ms):", e2e_evt.input_preprocess_time_ms), + self._summary_line("Input preprocess sum check (ms):", e2e_evt.input_preprocess_time_ms), + self._summary_line("Request dispatch wait time (ms):", request_dispatch_wait_ms), + ] + ) + for evt in stage_evts: + stage_id = evt.stage_id if evt.stage_id is not None else "unknown" + lines.extend( + [ + "", + self._summary_header(f"Stage {stage_id} Breakdown", "-"), + self._summary_line( + "Stage latency time (ms):", + self._stage_latency_ms(evt), + ), + self._summary_line("Queue wait time (ms):", self._optional_ms(evt.stage_queue_wait_time_ms)), + self._summary_line( + "Execution time (ms):", + self._stage_execution_ms(evt), + ), + self._summary_line( + "Output processor time (ms):", + float(evt.output_processor_time_ms or 0.0), + ), + "", + ] + ) + self._append_stage_info_lines(lines, evt) + + if evt.handoff_to_stage_id is not None and evt.stage_handoff_time_ms > 0.0: + ar2diffusion_ms = float(evt.ar2diffusion_time_ms or 0.0) + lines.extend( + [ + "", + self._summary_header( + f"Stage {stage_id} -> Stage {evt.handoff_to_stage_id} Handoff", + "-", + ), + self._summary_line("Handoff total time (ms):", float(evt.stage_handoff_time_ms)), + self._summary_line("AR to diffusion time (ms):", ar2diffusion_ms), + self._summary_line( + "Other handoff processing time (ms):", + float(evt.stage_handoff_time_ms) - ar2diffusion_ms, + ), + ] + ) + + final_output_ms = float(self.final_output_time_ms.get(rid, 0.0)) + remaining_orchestration_ms = ( + self._request_remaining_orchestration_time_ms(e2e_evt) if e2e_evt is not None else 0.0 + ) + if final_output_ms != 0.0: + lines.extend( + [ + "", + self._summary_header("Final Output Breakdown", "-"), + self._summary_line("Final output wrapping time (ms):", final_output_ms), + self._summary_line("Final output total time (ms):", final_output_ms), + self._summary_line("Final output sum check (ms):", final_output_ms), + ] + ) + if remaining_orchestration_ms != 0.0: + lines.append( + self._summary_line( + "Remaining orchestration overhead time (ms):", + remaining_orchestration_ms, + ) + ) + + return lines def _get_or_create_transfer_event( self, @@ -328,8 +844,17 @@ def _as_stage_request_stats( "Convert dict to StageRequestStats if needed." stats = metrics stats.stage_id = stage_id + stage_meta = self._get_stage_metadata(stage_id) + stats.stage_type = self._get_stage_type(stage_meta) + stats.stage_name = self._get_stage_name(stage_id, stage_meta) stats.request_id = req_id stats.final_output_type = final_output_type + if stats.stage_latency_time_ms is None: + stats.stage_latency_time_ms = ( + float(stats.stage_end_ts - stats.stage_submit_ts) * 1000.0 + if stats.stage_submit_ts is not None and stats.stage_end_ts is not None + else float(stats.stage_gen_time_ms or 0.0) + float(stats.output_processor_time_ms or 0.0) + ) stats.diffusion_metrics = ( {k: int(v) for k, v in self.diffusion_metrics.pop(req_id, {}).items()} if req_id in self.diffusion_metrics @@ -352,19 +877,47 @@ def on_stage_metrics( self.record_transfer_rx(stats) - def record_stage_postprocess_time(self, stage_id: int, req_id: Any, postproc_time_ms: float) -> None: - if req_id in self.stage_events: - for stats in self.stage_events[req_id]: + def _update_stage_event_field(self, stage_id: int, req_id: Any, field_name: str, value: float) -> None: + rid_key = str(req_id) + if rid_key in self.stage_events: + for stats in self.stage_events[rid_key]: if stats.stage_id == stage_id: - stats.postprocess_time_ms = float(postproc_time_ms) + setattr(stats, field_name, value if field_name == "handoff_to_stage_id" else float(value)) break else: logger.warning( - "Failed to record postprocess time for request %s at stage %s: no stage event found", + "Failed to record %s for request %s at stage %s: no stage event found", + field_name, req_id, stage_id, ) + def record_stage_postprocess_time(self, stage_id: int, req_id: Any, postproc_time_ms: float) -> None: + self._update_stage_event_field(stage_id, req_id, "postprocess_time_ms", postproc_time_ms) + + def record_ar2diffusion_time(self, stage_id: int, req_id: Any, ar2diffusion_time_ms: float) -> None: + self._update_stage_event_field(stage_id, req_id, "ar2diffusion_time_ms", ar2diffusion_time_ms) + + def record_final_output_time( + self, + req_id: Any, + final_output_time_ms: float, + start_ts: float | None = None, + end_ts: float | None = None, + ) -> None: + del start_ts, end_ts + self.final_output_time_ms[str(req_id)] += float(final_output_time_ms) + + def record_stage_handoff_time( + self, + from_stage: int, + to_stage: int, + req_id: Any, + handoff_time_ms: float, + ) -> None: + self._update_stage_event_field(from_stage, req_id, "handoff_to_stage_id", to_stage) + self._update_stage_event_field(from_stage, req_id, "stage_handoff_time_ms", handoff_time_ms) + @contextmanager def stage_postprocess_timer(self, stage_id: int, req_id: Any): """Context manager for measuring and recording stage postprocessing time. @@ -425,6 +978,7 @@ def on_finalize_request( stage_id: int, req_id: Any, req_start_ts: float, + input_preprocess_time_ms: float = 0.0, ) -> None: rid_key = str(req_id) if rid_key in self.e2e_done: @@ -435,7 +989,10 @@ def on_finalize_request( prev_last = self.stage_last_ts[stage_id] self.stage_last_ts[stage_id] = _t1 if prev_last is None else max(prev_last, _t1) self.last_finish_ts = max(self.last_finish_ts, _t1) - e2e_ms = (_t1 - _t0) * 1000.0 + engine_pipeline_ms = (_t1 - _t0) * 1000.0 + input_preprocess_ms = float(input_preprocess_time_ms) + request_wall_ms = engine_pipeline_ms + input_preprocess_ms + final_output_ms = float(self.final_output_time_ms.get(rid_key, 0.0)) # Sum tokens from all stages for this request # Include input tokens from stage 0 + output tokens from all stages @@ -446,29 +1003,50 @@ def on_finalize_request( total_tokens += int(evt.num_tokens_in) total_tokens += int(evt.num_tokens_out) - self.e2e_total_ms += e2e_ms - self.e2e_total_tokens += total_tokens + self.engine_pipeline_total_ms += engine_pipeline_ms + self.input_preprocess_total_ms += input_preprocess_ms + self.final_output_total_ms += final_output_ms + self.total_tokens += total_tokens self.e2e_count += 1 self.e2e_done.add(rid_key) per_req_record = RequestE2EStats( request_id=rid_key, - e2e_total_ms=e2e_ms, - e2e_total_tokens=total_tokens, + request_wall_time_ms=request_wall_ms, + input_preprocess_time_ms=input_preprocess_ms, + engine_pipeline_time_ms=engine_pipeline_ms, + total_tokens=total_tokens, transfers_total_time_ms=float( sum(evt.total_time_ms for evt in self.transfer_events.values() if evt.request_id == rid_key) ), transfers_total_bytes=int( sum(evt.size_bytes for evt in self.transfer_events.values() if evt.request_id == rid_key) ), + final_output_time_ms=final_output_ms, ) self.e2e_events.append(per_req_record) + if self.num_stages > 1: + self._log_omni_timing(per_req_record) + def build_and_log_summary(self) -> dict[str, Any]: if not self.log_stats: return {} + self._estimate_stage_wait_and_execution_times() wall_time_ms = max(0.0, (self.last_finish_ts - self.wall_start_ts) * 1000.0) - e2e_avg_req = (wall_time_ms / self.e2e_count) if self.e2e_count > 0 else 0.0 - e2e_avg_tok = (self.e2e_total_tokens * 1000.0 / wall_time_ms) if wall_time_ms > 0 else 0.0 + e2e_avg_req = self._average_e2e_field("request_wall_time_ms") + avg_engine_pipeline_ms = self._average_e2e_field("engine_pipeline_time_ms") + avg_tok = (self.total_tokens * 1000.0 / wall_time_ms) if wall_time_ms > 0 else 0.0 + first_engine_ts = min((ts for ts in self.stage_first_ts if ts is not None), default=None) + engine_pipeline_wall_ms = ( + max(0.0, (self.last_finish_ts - first_engine_ts) * 1000.0) + if first_engine_ts is not None + else float(self.engine_pipeline_total_ms) + ) + input_preprocess_wall_ms = ( + max(0.0, (first_engine_ts - self.wall_start_ts) * 1000.0) + if first_engine_ts is not None + else float(self.input_preprocess_total_ms) + ) if isinstance(self.final_stage_id_for_e2e, int): final_stage_id_map: dict[str, int] = {"*": int(self.final_stage_id_for_e2e)} @@ -481,39 +1059,101 @@ def build_and_log_summary(self) -> dict[str, Any]: else 0.0 for i in range(self.num_stages) ] + stage_gen_total_ms = 0.0 + stage_latency_total_ms = 0.0 + stage_queue_wait_total_ms = 0.0 + stage_execution_total_ms = 0.0 + output_processor_total_ms = 0.0 + stage_handoff_total_ms = 0.0 + ar2diffusion_total_ms = 0.0 + handoff_edge_totals: defaultdict[str, float] = defaultdict(float) + handoff_edge_ar2diffusion: defaultdict[str, float] = defaultdict(float) + for stage_evts in self.stage_events.values(): + for evt in stage_evts: + stage_gen_total_ms += float(evt.stage_gen_time_ms or 0.0) + stage_latency_total_ms += self._stage_latency_ms(evt) + stage_queue_wait_total_ms += self._optional_ms(evt.stage_queue_wait_time_ms) + stage_execution_total_ms += self._stage_execution_ms(evt) + output_processor_total_ms += float(evt.output_processor_time_ms or 0.0) + handoff_ms = float(evt.stage_handoff_time_ms or 0.0) + ar2d_ms = float(evt.ar2diffusion_time_ms or 0.0) + stage_handoff_total_ms += handoff_ms + ar2diffusion_total_ms += ar2d_ms + if evt.stage_id is not None and evt.handoff_to_stage_id is not None: + edge = f"stage_{evt.stage_id}_to_{evt.handoff_to_stage_id}" + handoff_edge_totals[edge] += handoff_ms + handoff_edge_ar2diffusion[edge] += ar2d_ms + breakdown_delta_ms = sum(self._request_final_orchestration_time_ms(evt) for evt in self.e2e_events) overall_summary = { - "e2e_requests": int(self.e2e_count), - "e2e_wall_time_ms": float(wall_time_ms), - "e2e_total_tokens": int(self.e2e_total_tokens), - "e2e_avg_time_per_request_ms": float(e2e_avg_req), - "e2e_avg_tokens_per_s": float(e2e_avg_tok), + "num_of_requests": int(self.e2e_count), + "request_wall_time_ms": float(wall_time_ms), + "input_preprocess_time_ms": float(self.input_preprocess_total_ms), + "input_preprocess_wall_time_ms": float(input_preprocess_wall_ms), + "engine_pipeline_time_ms": float(engine_pipeline_wall_ms), + "stage_gen_total_time_ms": float(stage_gen_total_ms), + "stage_latency_total_time_ms": float(stage_latency_total_ms), + "stage_queue_wait_total_time_ms": float(stage_queue_wait_total_ms), + "stage_execution_total_time_ms": float(stage_execution_total_ms), + "output_processor_total_time_ms": float(output_processor_total_ms), + "stage_handoff_total_time_ms": float(stage_handoff_total_ms), + "ar2diffusion_total_time_ms": float(ar2diffusion_total_ms), + "final_output_total_time_ms": float(self.final_output_total_ms), + "breakdown_delta_time_ms": float(breakdown_delta_ms), + "total_tokens": int(self.total_tokens), + "avg_request_wall_time_ms": float(e2e_avg_req), + "avg_input_preprocess_time_ms": self._average(self.input_preprocess_total_ms, self.e2e_count), + "avg_engine_pipeline_time_ms": float(avg_engine_pipeline_ms), + "avg_stage_gen_total_time_ms": self._average(stage_gen_total_ms, self.e2e_count), + "avg_output_processor_time_ms": self._average(output_processor_total_ms, self.e2e_count), + "avg_stage_handoff_total_time_ms": self._average(stage_handoff_total_ms, self.e2e_count), + "avg_ar2diffusion_time_ms": self._average(ar2diffusion_total_ms, self.e2e_count), + "avg_final_output_time_ms": self._average(self.final_output_total_ms, self.e2e_count), + "avg_breakdown_delta_time_ms": self._average(breakdown_delta_ms, self.e2e_count), + "avg_tokens_per_s": float(avg_tok), } - # Add average input preprocess time across all requests - preprocess_times = [] - for _rid, evts in self.stage_events.items(): - for evt in evts: - if evt.pipeline_timings and "preprocess_ms" in evt.pipeline_timings: - preprocess_times.append(evt.pipeline_timings["preprocess_ms"]) - break # only once per request - if preprocess_times: - overall_summary["input_preprocess_time_ms"] = sum(preprocess_times) / len(preprocess_times) + for edge, handoff_time in sorted(handoff_edge_totals.items()): + overall_summary[f"{edge}_handoff_time_ms"] = float(handoff_time) + ar2d_time = handoff_edge_ar2diffusion.get(edge, 0.0) + if ar2d_time > 0.0: + overall_summary[f"{edge}_ar2diffusion_time_ms"] = float(ar2d_time) # Add stage_wall_time_ms as separate fields for each stage for idx, wall_time in enumerate(stage_wall_time_ms): - overall_summary[f"e2e_stage_{idx}_wall_time_ms"] = wall_time - - # Print overall summary - # filter out all-zero fields for logging - overall_fields = [] - for k in OVERALL_FIELDS or list(overall_summary.keys()): - v = overall_summary.get(k, None) - if v not in (0, 0.0, 0.000, None, ""): - overall_fields.append(k) - if overall_fields: - logger.info( - "\n%s", - _format_table("Overall Summary", overall_summary, overall_fields), - ) + overall_summary[f"stage_{idx}_wall_time_ms"] = wall_time + + avg_fields = { + "avg_request_wall_time_ms", + "avg_input_preprocess_time_ms", + "avg_engine_pipeline_time_ms", + "avg_stage_gen_total_time_ms", + "avg_output_processor_time_ms", + "avg_stage_handoff_total_time_ms", + "avg_ar2diffusion_time_ms", + "avg_final_output_time_ms", + "avg_breakdown_delta_time_ms", + } + dynamic_overall_fields = [ + k + for k in overall_summary + if k.startswith("stage_") + and "_to_" in k + and (k.endswith("_handoff_time_ms") or k.endswith("_ar2diffusion_time_ms")) + ] + stage_wall_fields = [ + k for k in overall_summary if k.startswith("stage_") and k.endswith("_wall_time_ms") and "_to_" not in k + ] + ordered_overall_fields = list(OVERALL_FIELDS or []) + insert_at = ordered_overall_fields.index("total_tokens") + ordered_overall_fields[insert_at:insert_at] = sorted( + dynamic_overall_fields, + key=lambda name: (name.replace("_ar2diffusion_time_ms", "_zz_ar2diffusion_time_ms")), + ) + sorted(stage_wall_fields) + overall_fields = [ + k + for k in ordered_overall_fields or list(overall_summary.keys()) + if not (self.e2e_count <= 1 and k in avg_fields) + and overall_summary.get(k, None) not in (0, 0.0, 0.000, None, "") + ] all_request_ids = sorted(set(self.stage_events.keys()) | {e.request_id for e in self.e2e_events}) @@ -528,56 +1168,11 @@ def build_and_log_summary(self) -> dict[str, Any]: e2e_data = _build_row(e2e_evt, E2E_FIELDS) result_e2e_table.append({"request_id": rid, **e2e_data}) - # filter out all-zero fields for logging - nonzero_e2e_fields = set() - for k, v in e2e_data.items(): - if v not in (0, 0.000, None, ""): - nonzero_e2e_fields.add(k) - value_fields_e2e = sorted(nonzero_e2e_fields) - - if value_fields_e2e: - logger.info( - "\n%s", - _format_table( - f"RequestE2EStats [request_id={rid}]", - e2e_data, - value_fields=value_fields_e2e, - ), - ) - - # === [OmniTiming] concise per-request summary === + # === Stage table (columns = stage_id) === stage_evts = sorted( self.stage_events.get(rid, []), key=lambda e: e.stage_id if e.stage_id is not None else -1, ) - pt = {} - if stage_evts: - pt = stage_evts[-1].pipeline_timings or {} - if pt or e2e_evt: - parts = [f"req={rid}"] - if e2e_evt: - parts.append(f"total={e2e_evt.e2e_total_ms / 1000.0:.2f}s") - if "preprocess_ms" in pt: - parts.append(f"preprocess={pt['preprocess_ms'] / 1000.0:.2f}s") - if e2e_evt: - engine_ms = e2e_evt.e2e_total_ms - pt.get("preprocess_ms", 0.0) - parts.append(f"engine={engine_ms / 1000.0:.2f}s") - stage_parts = [] - for evt in stage_evts: - sid = evt.stage_id if evt.stage_id is not None else "?" - t = evt.stage_gen_time_ms / 1000.0 - stage_parts.append(f"{sid}:{t:.2f}s") - if stage_parts: - parts.append(f"stages=[{','.join(stage_parts)}]") - transfer_parts = [] - for te in self.transfer_events.values(): - if te.request_id == rid: - transfer_parts.append(f"{te.from_stage}->{te.to_stage}={te.tx_time_ms:.2f}ms") - if transfer_parts: - parts.append(f"transfers=[{','.join(transfer_parts)}]") - if "ar2diffusion_ms" in pt: - parts.append(f"ar2diffusion={pt['ar2diffusion_ms']:.2f}ms") - logger.info("[OmniTiming] %s", " ".join(parts)) # === Stage table (columns = stage_id) === # if any stage has diffusion_metrics, remove postprocess_time_ms field @@ -592,7 +1187,11 @@ def build_and_log_summary(self) -> dict[str, Any]: # then remove diffusion_metrics from the table stage_rows = [] for evt in stage_evts: - row = {"stage_id": evt.stage_id, **_build_row(evt, local_stage_fields)} + row = { + "stage": self._stage_label(evt.stage_id), + "stage_id": evt.stage_id, + **_build_row(evt, local_stage_fields), + } if evt.diffusion_metrics: row.update(evt.diffusion_metrics) row.pop("diffusion_metrics", None) # Remove the dict itself @@ -600,35 +1199,6 @@ def build_and_log_summary(self) -> dict[str, Any]: result_stage_table.append({"request_id": rid, "stages": stage_rows}) - if stage_rows: - # filter out all-zero fields for logging - all_value_fields = set() - for row in stage_rows: - for k in row.keys(): - if k != "stage_id": - all_value_fields.add(k) - value_fields_list = [] - for field in sorted(all_value_fields): - all_zero = True - for row in stage_rows: - v = row.get(field, None) - if v not in (0, 0.0, 0.000, None, ""): - all_zero = False - break - if not all_zero: - value_fields_list.append(field) - - if value_fields_list: - logger.info( - "\n%s", - _format_table( - f"StageRequestStats [request_id={rid}]", - stage_rows, - column_key="stage_id", - value_fields=value_fields_list, - ), - ) - # === Transfer table (columns = edge) === transfer_evts = sorted( [e for e in self.transfer_events.values() if e.request_id == rid], @@ -640,34 +1210,8 @@ def build_and_log_summary(self) -> dict[str, Any]: ] result_trans_table.append({"request_id": rid, "transfers": transfer_rows}) - if transfer_rows: - # filter out all-zero fields for logging - all_value_fields = set() - for row in transfer_rows: - for k in row.keys(): - if k != "edge": - all_value_fields.add(k) - value_fields_list = [] - for field in sorted(all_value_fields): - all_zero = True - for row in transfer_rows: - v = row.get(field, None) - if v not in (0, 0.0, 0.000, None, ""): - all_zero = False - break - if not all_zero: - value_fields_list.append(field) - - if value_fields_list: - logger.info( - "\n%s", - _format_table( - f"TransferEdgeStats [request_id={rid}]", - transfer_rows, - column_key="edge", - value_fields=value_fields_list, - ), - ) + if overall_fields: + logger.info("\n%s", "\n".join(self._build_omni_metrics_summary_lines(overall_summary))) return { "final_stage_id": final_stage_id_map,