From f245b84cb303ea8c6989791668343f5c42beefd3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 12 Jan 2026 11:33:22 +0000 Subject: [PATCH 1/7] Initial plan From f2f83b21647a9ac15ca540a6575ced4d3e34e086 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 12 Jan 2026 11:49:50 +0000 Subject: [PATCH 2/7] feat: add omni metrics aggregator and loggers Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com> --- tests/metrics/test_stats.py | 80 +++ vllm_omni/entrypoints/async_omni.py | 13 + vllm_omni/entrypoints/log_utils.py | 647 +++---------------------- vllm_omni/entrypoints/omni.py | 13 + vllm_omni/metrics/__init__.py | 49 ++ vllm_omni/metrics/loggers.py | 79 +++ vllm_omni/metrics/prometheus.py | 75 +++ vllm_omni/metrics/stats.py | 723 ++++++++++++++++++++++++++++ 8 files changed, 1090 insertions(+), 589 deletions(-) create mode 100644 tests/metrics/test_stats.py create mode 100644 vllm_omni/metrics/__init__.py create mode 100644 vllm_omni/metrics/loggers.py create mode 100644 vllm_omni/metrics/prometheus.py create mode 100644 vllm_omni/metrics/stats.py diff --git a/tests/metrics/test_stats.py b/tests/metrics/test_stats.py new file mode 100644 index 00000000000..e589e08dcd3 --- /dev/null +++ b/tests/metrics/test_stats.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +from vllm_omni.metrics.loggers import OmniStatLoggerBase, OmniStatLoggerManager +from vllm_omni.metrics.stats import OrchestratorAggregator + + +def test_orchestrator_aggregator_builds_summary() -> None: + agg = OrchestratorAggregator(num_stages=2, enable_debug_events=False, wall_start_ts=0.0) + agg.set_final_stage_map({"r1": 1}) + agg.stage_first_ts[0] = 0.0 + agg.stage_last_ts[0] = 0.03 + agg.stage_first_ts[1] = 0.05 + agg.stage_last_ts[1] = 0.07 + + agg.on_forward(0, 1, "r1", size_bytes=1024, tx_ms=5.0, used_shm=False) + agg.on_stage_metrics( + 0, + "r1", + { + "num_tokens_in": 3, + "num_tokens_out": 3, + "stage_gen_time_ms": 30.0, + "batch_id": 1, + "batch_size": 1, + "rx_transfer_bytes": 0, + "rx_decode_time_ms": 0.0, + }, + ) + agg.on_stage_metrics( + 1, + "r1", + { + "num_tokens_out": 4, + "stage_gen_time_ms": 20.0, + "batch_id": 1, + "batch_size": 1, + "rx_transfer_bytes": 1024, + "rx_decode_time_ms": 5.0, + "rx_in_flight_time_ms": 2.0, + }, + ) + agg.on_finalize_request(1, "r1", req_start_ts=0.0) + + summary = agg.build_run_summary() + data = summary.to_dict() + assert data["e2e_requests"] == 1 + assert len(data["stages"]) == 2 + assert data["stages"][0]["requests"] == 1 + assert data["transfers"][0]["samples"] == 1 + assert data["transfers"][0]["total_mbps"] >= 0.0 + + +class _DummyLogger(OmniStatLoggerBase): + def __init__(self, interval_s: float = 0.0) -> None: + super().__init__(interval_s=interval_s) + self.logged: list[dict] = [] + + def log(self, summary) -> None: # type: ignore[override] + self.logged.append(summary.to_dict()) + + +def test_logger_manager_triggers_logging_on_interval() -> None: + agg = OrchestratorAggregator(num_stages=1, enable_debug_events=False, wall_start_ts=0.0) + agg.set_final_stage_map({"r": 0}) + dummy_logger = _DummyLogger(interval_s=0.0) + mgr = OmniStatLoggerManager( + aggregator=agg, + loggers=[dummy_logger], + final_stage_map_provider=lambda: agg.final_stage_map, + ) + agg.set_logger_manager(mgr) + agg.stage_first_ts[0] = 0.0 + agg.stage_last_ts[0] = 0.01 + agg.on_stage_metrics( + 0, + "r", + {"num_tokens_out": 1, "stage_gen_time_ms": 1.0, "batch_id": 1, "batch_size": 1, "rx_transfer_bytes": 0}, + ) + agg.on_finalize_request(0, "r", req_start_ts=0.0) + assert dummy_logger.logged, "logger manager should emit summary when interval is reached" diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index b3c54736ef9..d8f31d44bf1 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -35,6 +35,8 @@ get_final_stage_id_for_e2e, ) from vllm_omni.outputs import OmniRequestOutput +from vllm_omni.metrics.loggers import OmniLoggingStatLogger, OmniStatLoggerManager +from vllm_omni.metrics.prometheus import OmniPrometheusStatLogger logger = init_logger(__name__) @@ -321,6 +323,15 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator self._enable_stats, _wall_start_ts, ) + metrics.set_final_stage_map({str(request_id): final_stage_id_for_e2e}) + stat_logger_manager = OmniStatLoggerManager( + aggregator=metrics, + loggers=[ + OmniLoggingStatLogger(interval_s=10.0, enabled=True), + OmniPrometheusStatLogger(interval_s=10.0, enabled=True), + ], + final_stage_map_provider=lambda: metrics.final_stage_map, + ) # Seed stage-0 queue with all requests logger.debug(f"[{self._name}] Seeding request into stage-0") req_state = ClientRequestState(request_id) @@ -465,6 +476,8 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator # Summarize and print stats try: + if stat_logger_manager: + stat_logger_manager.force_log() summary = metrics.build_and_log_summary(final_stage_id_for_e2e) logger.info("[Summary] %s", pformat(summary, sort_dicts=False)) except Exception as e: diff --git a/vllm_omni/entrypoints/log_utils.py b/vllm_omni/entrypoints/log_utils.py index 2031038da38..3c6915c3b9d 100644 --- a/vllm_omni/entrypoints/log_utils.py +++ b/vllm_omni/entrypoints/log_utils.py @@ -1,591 +1,60 @@ from __future__ import annotations -import time -from dataclasses import dataclass -from pprint import pformat -from typing import Any - -from vllm.logger import init_logger - -logger = init_logger(__name__) - - -def log_transfer_tx( - from_stage: int, - to_stage: int, - request_id: Any, - size_bytes: int, - tx_time_ms: float, - used_shm: bool, -) -> None: - logger.info( - pformat( - { - "type": "transfer_stats", - "from_stage": from_stage, - "to_stage": to_stage, - "request_id": request_id, - "size_bytes": int(size_bytes), - "tx_time_ms": float(tx_time_ms), - "tx_mbps": (float(size_bytes) * 8.0) / (max(tx_time_ms, 1e-6) * 1000.0), - "used_shm": bool(used_shm), - }, - sort_dicts=False, - ) - ) - - -def log_transfer_rx( - from_stage: int, - to_stage: int, - request_id: Any, - rx_bytes: int, - rx_decode_time_ms: float, - in_flight_time_ms: float, -) -> None: - logger.info( - pformat( - { - "type": "transfer_rx_stats", - "from_stage": from_stage, - "to_stage": to_stage, - "request_id": request_id, - "rx_bytes": int(rx_bytes), - "rx_decode_time_ms": float(rx_decode_time_ms), - "in_flight_time_ms": float(in_flight_time_ms), - "rx_time_per_kb_ms": ( - (float(rx_decode_time_ms) / max(float(rx_bytes) / 1024.0, 1e-6)) if rx_bytes > 0 else 0.0 - ), - }, - sort_dicts=False, - ) - ) - - -def log_transfer_total( - from_stage: int, - to_stage: int, - request_id: Any, - size_bytes: int, - tx_time_ms: float, - in_flight_time_ms: float, - rx_decode_time_ms: float, - total_time_ms: float, -) -> None: - logger.info( - pformat( - { - "type": "transfer_total_stats", - "from_stage": from_stage, - "to_stage": to_stage, - "request_id": request_id, - "size_bytes": int(size_bytes), - "tx_time_ms": float(tx_time_ms), - "in_flight_time_ms": float(in_flight_time_ms), - "rx_decode_time_ms": float(rx_decode_time_ms), - "total_time_ms": float(total_time_ms), - "total_time_per_kb_ms": ( - float(total_time_ms) / max(float(size_bytes) / 1024.0, 1e-6) if size_bytes > 0 else 0.0 - ), - }, - sort_dicts=False, - ) - ) - - -def log_stage_request_stats( - stage_id: int, - request_id: Any, - batch_size: int, - num_tokens_out: int, - stage_gen_time_ms: float, - tokens_per_s: float, - rx_transfer_bytes: int, - rx_decode_time_ms: float, - rx_mbps: float, -) -> None: - logger.info( - pformat( - { - "type": "Request_stage_stats", - "stage_id": stage_id, - "request_id": request_id, - "batch_size": int(batch_size), - "num_tokens_out": int(num_tokens_out), - "stage_gen_time_ms": float(stage_gen_time_ms), - "tokens_per_s": float(tokens_per_s), - "rx_transfer_bytes": int(rx_transfer_bytes), - "rx_decode_time_ms": float(rx_decode_time_ms), - "rx_mbps": float(rx_mbps), - }, - sort_dicts=False, - ) - ) - - -def compute_and_log_stage_request_stats( - stage_id: int, - request_id: Any, - batch_size: int, - num_engine_outputs: int, - stage_gen_time_ms: float, - rx_transfer_bytes: int, - rx_decode_time_ms: float, -) -> None: - """Compute per-request metrics and log them in one call.""" - tokens_per_s = (num_engine_outputs * 1000.0 / stage_gen_time_ms) if stage_gen_time_ms > 0 else 0.0 - rx_mbps = ( - (float(rx_transfer_bytes) * 8.0) / (max(float(rx_decode_time_ms), 1e-6) * 1000.0) - if rx_transfer_bytes > 0 - else 0.0 - ) - log_stage_request_stats( - stage_id, - request_id, - int(batch_size), - int(num_engine_outputs), - float(stage_gen_time_ms), - float(tokens_per_s), - int(rx_transfer_bytes), - float(rx_decode_time_ms), - float(rx_mbps), - ) - - -# ----------------- Aggregation helpers for orchestrator ----------------- - - -def record_stage_metrics( - per_request: dict[str, dict[str, Any]], - stage_req_counts: list[int], - stage_total_time_ms: list[float], - stage_total_tokens: list[int], - stage_id: int, - req_id: Any, - metrics: dict[str, Any], -) -> None: - try: - stage_req_counts[stage_id] += 1 - stage_total_tokens[stage_id] += int(metrics.get("num_tokens_out", 0)) - rid_key = str(req_id) - pr = per_request.setdefault(rid_key, {"stages": {}, "transfers_ms": 0.0, "transfers_bytes": 0}) - pr_stages = pr["stages"] # type: ignore[index] - stage_data: dict[str, Any] = { - "stage_gen_time_ms": float(metrics.get("stage_gen_time_ms", 0.0)), - "num_tokens_out": int(metrics.get("num_tokens_out", 0)), - } - # Only record num_tokens_in for stage 0 (initial prompt) - if stage_id == 0: - stage_data["num_tokens_in"] = int(metrics.get("num_tokens_in", 0)) - stage_total_tokens[stage_id] += int(metrics.get("num_tokens_in", 0)) - pr_stages[stage_id] = stage_data - except Exception: - pass - - -def aggregate_rx_and_maybe_total( - transfer_edge_req: dict[tuple[int, int, str], dict[str, float]], - transfer_agg: dict[tuple[int, int], dict[str, float]], - per_request: dict[str, dict[str, Any]], - stage_id: int, - req_id: Any, - rx_bytes: float, - rx_ms: float, - in_flight_ms: float, -) -> tuple[int, float, float] | None: - try: - # Update RX aggregates for (stage_id-1 -> stage_id) - if stage_id > 0: - key = (stage_id - 1, stage_id) - agg = transfer_agg.get(key) - if agg is None: - agg = { - "sum_bytes": 0.0, - "sum_ms": 0.0, - "count": 0.0, - "sum_rx_bytes": 0.0, - "sum_rx_ms": 0.0, - "rx_count": 0.0, - "sum_total_ms": 0.0, - "total_count": 0.0, - } - transfer_agg[key] = agg - agg["sum_rx_bytes"] += float(rx_bytes) - agg["sum_rx_ms"] += float(rx_ms) - agg["rx_count"] += 1.0 - - # Try combine with sender-side timing if present - rid_key = str(req_id) - s = transfer_edge_req.get((stage_id - 1, stage_id, rid_key)) - if s is None: - return None - tx_ms = float(s.get("tx_ms", 0.0)) - size_b = float(s.get("size_bytes", rx_bytes)) - total_ms = tx_ms + float(in_flight_ms) + float(rx_ms) - agg["sum_total_ms"] += total_ms - agg["total_count"] += 1.0 - # accumulate per-request transfer totals - try: - pr = per_request.setdefault(rid_key, {"stages": {}, "transfers_ms": 0.0, "transfers_bytes": 0}) - pr["transfers_ms"] = float(pr.get("transfers_ms", 0.0)) + total_ms # type: ignore[index] - pr["transfers_bytes"] = int(pr.get("transfers_bytes", 0)) + int(rx_bytes) # type: ignore[index] - except Exception: - pass - return int(size_b), float(tx_ms), float(total_ms) - return None - except Exception: - return None - - -def record_sender_transfer_agg( - transfer_agg: dict[tuple[int, int], dict[str, float]], - transfer_edge_req: dict[tuple[int, int, str], dict[str, float]], - from_stage: int, - to_stage: int, - req_id: Any, - size_bytes: int, - tx_ms: float, -) -> None: - try: - key = (from_stage, to_stage) - agg = transfer_agg.get(key) - if agg is None: - agg = { - "sum_bytes": 0.0, - "sum_ms": 0.0, - "count": 0.0, - "sum_rx_bytes": 0.0, - "sum_rx_ms": 0.0, - "rx_count": 0.0, - "sum_total_ms": 0.0, - "total_count": 0.0, - } - transfer_agg[key] = agg - agg["sum_bytes"] += float(size_bytes) - agg["sum_ms"] += float(tx_ms) - agg["count"] += 1.0 - # Store sender-side timing for per-request combination - rid_key = str(req_id) - transfer_edge_req[(from_stage, to_stage, rid_key)] = { - "tx_ms": float(tx_ms), - "size_bytes": float(size_bytes), - } - except Exception: - pass - - -def count_tokens_from_outputs(engine_outputs: list[Any]) -> int: - total = 0 - for _ro in engine_outputs: - try: - outs = getattr(_ro, "outputs", None) - if outs and len(outs) > 0: - tokens = getattr(outs[0], "token_ids", None) - if tokens is not None: - total += len(tokens) - except Exception: - pass - return total - - -def build_stage_summary( - stage_req_counts: list[int], - stage_total_tokens: list[int], - stage_total_time_ms: list[float], -) -> list[dict[str, Any]]: - summary: list[dict[str, Any]] = [] - for sid in range(len(stage_req_counts)): - reqs = stage_req_counts[sid] - tokens = stage_total_tokens[sid] - total_ms = float(stage_total_time_ms[sid]) - avg_req = (total_ms / reqs) if reqs > 0 else 0.0 - avg_tok = (tokens * 1000.0 / total_ms) if total_ms > 0 else 0.0 - summary.append( - { - "stage_id": sid, - "requests": int(reqs), - "tokens": int(tokens), - "total_time_ms": total_ms, - "avg_time_per_request_ms": avg_req, - "avg_tokens_per_s": avg_tok, - } - ) - return summary - - -def build_transfer_summary( - transfer_agg: dict[tuple[int, int], dict[str, float]], -) -> list[dict[str, Any]]: - summary: list[dict[str, Any]] = [] - for (src, dst), agg in transfer_agg.items(): - sum_bytes = float(agg.get("sum_bytes", 0.0)) - sum_ms = float(agg.get("sum_ms", 0.0)) - samples = int(agg.get("count", 0.0)) - tx_mbps = (sum_bytes * 8.0) / (max(sum_ms, 1e-6) * 1000.0) if sum_bytes > 0 else 0.0 - sum_rx_bytes = float(agg.get("sum_rx_bytes", 0.0)) - sum_rx_ms = float(agg.get("sum_rx_ms", 0.0)) - samples_rx = int(agg.get("rx_count", 0.0)) - rx_mbps = (sum_rx_bytes * 8.0) / (max(sum_rx_ms, 1e-6) * 1000.0) if sum_rx_bytes > 0 else 0.0 - sum_total_ms = float(agg.get("sum_total_ms", 0.0)) - samples_total = int(agg.get("total_count", 0.0)) - total_mbps = (sum_bytes * 8.0) / (max(sum_total_ms, 1e-6) * 1000.0) if sum_bytes > 0 else 0.0 - summary.append( - { - "from_stage": src, - "to_stage": dst, - "samples": samples, - "total_bytes": int(sum_bytes), - "total_time_ms": sum_ms, - "tx_mbps": tx_mbps, - "rx_samples": samples_rx, - "rx_total_bytes": int(sum_rx_bytes), - "rx_total_time_ms": sum_rx_ms, - "rx_mbps": rx_mbps, - "total_samples": samples_total, - "total_transfer_time_ms": sum_total_ms, - "total_mbps": total_mbps, - } - ) - return summary - - -@dataclass -class StageStats: - total_token: int - total_gen_time: float - - -@dataclass -class StageRequestMetrics: - num_tokens_in: int - num_tokens_out: int - stage_gen_time_ms: float - batch_id: int - batch_size: int - rx_decode_time_ms: float - rx_transfer_bytes: int - rx_in_flight_time_ms: float - - stage_stats: StageStats - - -class OrchestratorMetrics: - def __init__( - self, - num_stages: int, - enable_stats: bool, - wall_start_ts: float, - ) -> None: - self.num_stages = int(num_stages) - self.enable_stats = bool(enable_stats) - self.stage_total_time_ms: list[float] = [0.0 for _ in range(self.num_stages)] - self.stage_total_tokens: list[int] = [0 for _ in range(self.num_stages)] - self.stage_req_counts: list[int] = [0 for _ in range(self.num_stages)] - self.transfer_agg: dict[tuple[int, int], dict[str, float]] = {} - self.transfer_edge_req: dict[tuple[int, int, str], dict[str, float]] = {} - self.e2e_total_ms: float = 0.0 - self.e2e_total_tokens: int = 0 - self.e2e_count: int = 0 - self.e2e_done: set[str] = set() - self.per_request: dict[str, dict[str, Any]] = {} - self.sum_per_request_transfer_ms: float = 0.0 - self.wall_start_ts: float = float(wall_start_ts) - self.last_finish_ts: float = float(wall_start_ts) - self.stage_seen_batches: dict[int, set] = {sid: set() for sid in range(self.num_stages)} - self.stage_first_ts: list[float | None] = [None for _ in range(self.num_stages)] - self.stage_last_ts: list[float | None] = [None for _ in range(self.num_stages)] - - def on_stage_metrics(self, stage_id: int, req_id: Any, metrics: dict[str, Any]) -> None: - record_stage_metrics( - self.per_request, - self.stage_req_counts, - self.stage_total_time_ms, - self.stage_total_tokens, - stage_id, - req_id, - metrics, - ) - if self.enable_stats: - compute_and_log_stage_request_stats( - stage_id=stage_id, - request_id=req_id, - batch_size=metrics.get("batch_size"), - num_engine_outputs=metrics.get("num_tokens_out"), - stage_gen_time_ms=metrics.get("stage_gen_time_ms"), - rx_decode_time_ms=metrics.get("rx_decode_time_ms"), - rx_transfer_bytes=metrics.get("rx_transfer_bytes"), - ) - if stage_stats := metrics.get("stage_stats", None): - total_token = int(stage_stats.get("total_token")) - total_gen_time = float(stage_stats.get("total_gen_time")) - _avg_tokens_per_s = (total_token * 1000.0 / total_gen_time) if total_gen_time > 0 else 0.0 - logger.info( - pformat( - { - "type": "Stage_running_avg", - "stage_id": stage_id, - "total_tokens": total_token, - "total_gen_time_ms": total_gen_time, - "avg_tokens_per_s": _avg_tokens_per_s, - }, - sort_dicts=False, - ) - ) - try: - batch_id_raw = metrics.get("batch_id", None) - if batch_id_raw is not None: - batch_id = int(batch_id_raw) - if batch_id not in self.stage_seen_batches[stage_id]: - self.stage_total_time_ms[stage_id] += float(metrics.get("stage_gen_time_ms", 0.0)) - self.stage_seen_batches[stage_id].add(batch_id) - except Exception: - pass - rx_b = float(metrics.get("rx_transfer_bytes", 0.0)) - rx_ms = float(metrics.get("rx_decode_time_ms", 0.0)) - in_flight_ms = float(metrics.get("rx_in_flight_time_ms", 0.0)) - combined = aggregate_rx_and_maybe_total( - self.transfer_edge_req, - self.transfer_agg, - self.per_request, - stage_id, - req_id, - rx_b, - rx_ms, - in_flight_ms, - ) - if self.enable_stats and stage_id > 0: - log_transfer_rx( - stage_id - 1, - stage_id, - req_id, - int(rx_b), - float(rx_ms), - float(in_flight_ms), - ) - if combined is not None: - size_b_c, tx_ms_c, total_ms_c = combined - log_transfer_total( - stage_id - 1, - stage_id, - req_id, - int(size_b_c), - float(tx_ms_c), - float(in_flight_ms), - float(rx_ms), - float(total_ms_c), - ) - - def on_forward( - self, - from_stage: int, - to_stage: int, - req_id: Any, - size_bytes: int, - tx_ms: float, - used_shm: bool, - ) -> None: - # Mark first input time for the destination stage if not set - if self.stage_first_ts[to_stage] is None: - self.stage_first_ts[to_stage] = time.time() - if self.enable_stats: - log_transfer_tx( - from_stage, - to_stage, - req_id, - int(size_bytes), - float(tx_ms), - bool(used_shm), - ) - record_sender_transfer_agg( - self.transfer_agg, - self.transfer_edge_req, - from_stage, - to_stage, - req_id, - int(size_bytes), - float(tx_ms), - ) - - def on_finalize_request( - self, - stage_id: int, - req_id: Any, - req_start_ts: float, - ) -> None: - rid_key = str(req_id) - _t0 = float(req_start_ts) - _t1 = time.time() - # Update last output time for this stage - prev_last = self.stage_last_ts[stage_id] - self.stage_last_ts[stage_id] = _t1 if prev_last is None else max(prev_last, _t1) - self.last_finish_ts = max(self.last_finish_ts, _t1) - e2e_ms = (_t1 - _t0) * 1000.0 - - # Sum tokens from all stages for this request - # Include input tokens from stage 0 + output tokens from all stages - pr = self.per_request.setdefault(rid_key, {"stages": {}, "transfers_ms": 0.0, "transfers_bytes": 0}) - total_tokens = 0 - stages_info = pr.get("stages", {}) - for sid, stage_data in stages_info.items(): - # Add input tokens only from stage 0 (initial prompt) - if sid == 0: - total_tokens += int(stage_data.get("num_tokens_in", 0)) - total_tokens += int(stage_data.get("num_tokens_out", 0)) - - self.e2e_total_ms += e2e_ms - self.e2e_total_tokens += total_tokens - self.e2e_count += 1 - self.e2e_done.add(rid_key) - per_req_record = { - "type": "request_level_metrics", - "request_id": rid_key, - "e2e_time_ms": e2e_ms, - "e2e_tpt": (e2e_ms / total_tokens) if total_tokens > 0 else 0.0, - "e2e_total_tokens": total_tokens, - "transfers_total_time_ms": float(pr.get("transfers_ms", 0.0)), - "transfers_total_bytes": int(pr.get("transfers_bytes", 0)), - "stages": stages_info, - } - self.sum_per_request_transfer_ms += float(pr.get("transfers_ms", 0.0)) - logger.info(pformat(per_req_record, sort_dicts=False)) - - def build_and_log_summary(self, final_stage_id_to_prompt: dict[str, int]) -> dict[str, Any]: - # Compute stage summary using wall time between first input and last output per stage - stage_summary: list[dict[str, Any]] = [] - for sid in range(self.num_stages): - first_ts = self.stage_first_ts[sid] - last_ts = self.stage_last_ts[sid] - total_ms = ( - (max(0.0, (last_ts - first_ts)) * 1000.0) if (first_ts is not None and last_ts is not None) else 0.0 - ) - reqs = self.stage_req_counts[sid] - tokens = self.stage_total_tokens[sid] - avg_req = (total_ms / reqs) if reqs > 0 else 0.0 - avg_tok = (tokens * 1000.0 / total_ms) if total_ms > 0 else 0.0 - stage_summary.append( - { - "stage_id": sid, - "requests": int(reqs), - "tokens": int(tokens), - "total_time_ms": float(total_ms), - "avg_time_per_request_ms": float(avg_req), - "avg_tokens_per_s": float(avg_tok), - } - ) - transfer_summary = build_transfer_summary(self.transfer_agg) - e2e_avg_req = (self.e2e_total_ms / self.e2e_count) if self.e2e_count > 0 else 0.0 - e2e_avg_tok = (self.e2e_total_tokens * 1000.0 / self.e2e_total_ms) if self.e2e_total_ms > 0 else 0.0 - wall_time_ms = max(0.0, (self.last_finish_ts - self.wall_start_ts) * 1000.0) - summary: dict[str, Any] = { - "e2e_requests": int(self.e2e_count), - "e2e_total_time_ms": float(wall_time_ms), - "e2e_sum_time_ms": float(self.e2e_total_ms), - "e2e_total_tokens": int(self.e2e_total_tokens), - "e2e_avg_time_per_request_ms": e2e_avg_req, - "e2e_avg_tokens_per_s": e2e_avg_tok, - "wall_time_ms": wall_time_ms, - "final_stage_id": final_stage_id_to_prompt, - "stages": stage_summary, - "transfers": transfer_summary, - } - return summary +"""Compatibility wrappers for orchestrator statistics utilities. + +The new metrics architecture lives under vllm_omni.metrics.* following the +Stats -> Aggregator -> Logger/Exporter layering. This module re-exports the +core types so existing imports continue to work. +""" + +from vllm_omni.metrics import ( # noqa: F401 + OmniLoggingStatLogger, + OmniPrometheusStatLogger, + OmniStatLoggerBase, + OmniStatLoggerManager, + OrchestratorAggregator, + OrchestratorMetrics, + RequestE2EStats, + RunSummary, + StageRequestMetrics, + StageRequestStats, + StageStats, + TransferEdgeStats, + aggregate_rx_and_maybe_total, + build_stage_summary, + build_transfer_summary, + compute_and_log_stage_request_stats, + count_tokens_from_outputs, + log_stage_request_stats, + log_transfer_rx, + log_transfer_total, + log_transfer_tx, + record_sender_transfer_agg, + record_stage_metrics, +) + +__all__ = [ + "OmniLoggingStatLogger", + "OmniPrometheusStatLogger", + "OmniStatLoggerBase", + "OmniStatLoggerManager", + "OrchestratorAggregator", + "OrchestratorMetrics", + "RequestE2EStats", + "RunSummary", + "StageRequestMetrics", + "StageRequestStats", + "StageStats", + "TransferEdgeStats", + "aggregate_rx_and_maybe_total", + "build_stage_summary", + "build_transfer_summary", + "compute_and_log_stage_request_stats", + "count_tokens_from_outputs", + "log_stage_request_stats", + "log_transfer_rx", + "log_transfer_total", + "log_transfer_tx", + "record_sender_transfer_agg", + "record_stage_metrics", +] diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index 109d5f85473..8d8809e3140 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -38,6 +38,8 @@ load_stage_configs_from_yaml, resolve_model_config_path, ) +from vllm_omni.metrics.loggers import OmniLoggingStatLogger, OmniStatLoggerManager +from vllm_omni.metrics.prometheus import OmniPrometheusStatLogger from vllm_omni.outputs import OmniRequestOutput logger = init_logger(__name__) @@ -597,6 +599,15 @@ def _run_generation( self._enable_stats, _wall_start_ts, ) + metrics.set_final_stage_map(final_stage_id_to_prompt) + stat_logger_manager = OmniStatLoggerManager( + aggregator=metrics, + loggers=[ + OmniLoggingStatLogger(interval_s=10.0, enabled=True), + OmniPrometheusStatLogger(interval_s=10.0, enabled=True), + ], + final_stage_map_provider=lambda: metrics.final_stage_map, + ) it = request_id_to_prompt.items() if use_tqdm: @@ -778,6 +789,8 @@ def _run_generation( # Summarize and print stats try: + if stat_logger_manager: + stat_logger_manager.force_log() summary = metrics.build_and_log_summary(final_stage_id_to_prompt) logger.info("[Summary] %s", pformat(summary, sort_dicts=False)) except Exception as e: diff --git a/vllm_omni/metrics/__init__.py b/vllm_omni/metrics/__init__.py new file mode 100644 index 00000000000..5e790faefc2 --- /dev/null +++ b/vllm_omni/metrics/__init__.py @@ -0,0 +1,49 @@ +from .loggers import OmniLoggingStatLogger, OmniStatLoggerBase, OmniStatLoggerManager +from .prometheus import OmniPrometheusStatLogger +from .stats import ( + OrchestratorAggregator, + OrchestratorMetrics, + RequestE2EStats, + RunSummary, + StageRequestMetrics, + StageRequestStats, + StageStats, + TransferEdgeStats, + aggregate_rx_and_maybe_total, + build_stage_summary, + build_transfer_summary, + compute_and_log_stage_request_stats, + count_tokens_from_outputs, + log_stage_request_stats, + log_transfer_rx, + log_transfer_total, + log_transfer_tx, + record_sender_transfer_agg, + record_stage_metrics, +) + +__all__ = [ + "OmniLoggingStatLogger", + "OmniStatLoggerBase", + "OmniStatLoggerManager", + "OmniPrometheusStatLogger", + "OrchestratorAggregator", + "OrchestratorMetrics", + "RequestE2EStats", + "RunSummary", + "StageRequestMetrics", + "StageRequestStats", + "StageStats", + "TransferEdgeStats", + "aggregate_rx_and_maybe_total", + "build_stage_summary", + "build_transfer_summary", + "compute_and_log_stage_request_stats", + "count_tokens_from_outputs", + "log_stage_request_stats", + "log_transfer_rx", + "log_transfer_total", + "log_transfer_tx", + "record_sender_transfer_agg", + "record_stage_metrics", +] diff --git a/vllm_omni/metrics/loggers.py b/vllm_omni/metrics/loggers.py new file mode 100644 index 00000000000..ba49600e782 --- /dev/null +++ b/vllm_omni/metrics/loggers.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import time +from abc import ABC, abstractmethod +from pprint import pformat +from typing import Any, Callable + +from vllm.logger import init_logger + +from .stats import RunSummary + +logger = init_logger(__name__) + + +class OmniStatLoggerBase(ABC): + """Abstract logger for Omni stats.""" + + def __init__(self, interval_s: float = 10.0, enabled: bool = True) -> None: + self.interval_s = float(interval_s) + self.enabled = enabled + self._last_log_ts: float = 0.0 + + def should_log(self, now: float) -> bool: + if not self.enabled: + return False + return (now - self._last_log_ts) >= self.interval_s + + def update_ts(self, now: float) -> None: + self._last_log_ts = now + + @abstractmethod + def log(self, summary: RunSummary) -> None: + raise NotImplementedError + + +class OmniLoggingStatLogger(OmniStatLoggerBase): + """Console/file logger for aggregated Omni stats.""" + + def __init__(self, interval_s: float = 10.0, enabled: bool = True) -> None: + super().__init__(interval_s=interval_s, enabled=enabled) + + def log(self, summary: RunSummary) -> None: + logger.info("[OmniStats] %s", pformat(summary.to_dict(), sort_dicts=False)) + + +class OmniStatLoggerManager: + """Manages multiple stat loggers over a shared aggregator.""" + + def __init__( + self, + aggregator: Any, + loggers: list[OmniStatLoggerBase] | None = None, + final_stage_map_provider: Callable[[], dict[str, int] | int | None] | None = None, + ) -> None: + self.aggregator = aggregator + self.loggers = loggers or [] + self.final_stage_map_provider = final_stage_map_provider + if hasattr(self.aggregator, "set_logger_manager"): + self.aggregator.set_logger_manager(self) + + def maybe_log(self) -> None: + now = time.time() + ready_loggers = [l for l in self.loggers if l.should_log(now)] + if not ready_loggers: + return + final_stage = self.final_stage_map_provider() if self.final_stage_map_provider else None + summary = self.aggregator.build_run_summary(final_stage) + for lg in ready_loggers: + lg.log(summary) + lg.update_ts(now) + + def force_log(self) -> None: + final_stage = self.final_stage_map_provider() if self.final_stage_map_provider else None + summary = self.aggregator.build_run_summary(final_stage) + for lg in self.loggers: + if not lg.enabled: + continue + lg.log(summary) + lg.update_ts(time.time()) diff --git a/vllm_omni/metrics/prometheus.py b/vllm_omni/metrics/prometheus.py new file mode 100644 index 00000000000..a76c7116a32 --- /dev/null +++ b/vllm_omni/metrics/prometheus.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from typing import Any + +from vllm.logger import init_logger + +from .loggers import OmniStatLoggerBase +from .stats import RunSummary + +logger = init_logger(__name__) + + +class OmniPrometheusStatLogger(OmniStatLoggerBase): + """Prometheus exporter for Omni stats (best-effort, optional dependency).""" + + def __init__(self, interval_s: float = 10.0, enabled: bool = True, registry: Any = None, prefix: str = "omni"): + super().__init__(interval_s=interval_s, enabled=enabled) + try: + from prometheus_client import CollectorRegistry, Gauge + except Exception: + logger.debug("prometheus_client not available; disabling Prometheus stat logger") + self.enabled = False + self._registry = None + self._Gauge = None + return + self._registry = registry or CollectorRegistry(auto_describe=True) + self._Gauge = Gauge + self.prefix = prefix + self.stage_requests = self._Gauge( + f"{prefix}_stage_requests", + "Number of requests per stage", + ["stage_id"], + registry=self._registry, + ) + self.stage_tokens_per_s = self._Gauge( + f"{prefix}_stage_avg_tokens_per_s", + "Average tokens per second per stage", + ["stage_id"], + registry=self._registry, + ) + self.transfer_mbps = self._Gauge( + f"{prefix}_transfer_total_mbps", + "Average transfer Mbps per edge", + ["from_stage", "to_stage"], + registry=self._registry, + ) + self.e2e_avg_latency = self._Gauge( + f"{prefix}_e2e_avg_latency_ms", + "Average end-to-end latency per request (ms)", + registry=self._registry, + ) + self.e2e_throughput = self._Gauge( + f"{prefix}_e2e_avg_tokens_per_s", + "Average end-to-end tokens per second", + registry=self._registry, + ) + + @property + def registry(self) -> Any | None: + return getattr(self, "_registry", None) + + def log(self, summary: RunSummary) -> None: + if not self.enabled or self._Gauge is None or self._registry is None: + return + for stage in summary.stages: + sid = str(stage.get("stage_id")) + self.stage_requests.labels(stage_id=sid).set(stage.get("requests", 0)) + self.stage_tokens_per_s.labels(stage_id=sid).set(stage.get("avg_tokens_per_s", 0.0)) + for transfer in summary.transfers: + self.transfer_mbps.labels( + from_stage=str(transfer.get("from_stage")), + to_stage=str(transfer.get("to_stage")), + ).set(transfer.get("total_mbps", 0.0)) + self.e2e_avg_latency.set(summary.e2e_avg_time_per_request_ms) + self.e2e_throughput.set(summary.e2e_avg_tokens_per_s) diff --git a/vllm_omni/metrics/stats.py b/vllm_omni/metrics/stats.py new file mode 100644 index 00000000000..c0e7709858f --- /dev/null +++ b/vllm_omni/metrics/stats.py @@ -0,0 +1,723 @@ +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from pprint import pformat +from typing import Any, Iterable + +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +def log_transfer_tx( + from_stage: int, + to_stage: int, + request_id: Any, + size_bytes: int, + tx_time_ms: float, + used_shm: bool, +) -> None: + logger.info( + pformat( + { + "type": "transfer_stats", + "from_stage": from_stage, + "to_stage": to_stage, + "request_id": request_id, + "size_bytes": int(size_bytes), + "tx_time_ms": float(tx_time_ms), + "tx_mbps": (float(size_bytes) * 8.0) / (max(tx_time_ms, 1e-6) * 1000.0), + "used_shm": bool(used_shm), + }, + sort_dicts=False, + ) + ) + + +def log_transfer_rx( + from_stage: int, + to_stage: int, + request_id: Any, + rx_bytes: int, + rx_decode_time_ms: float, + in_flight_time_ms: float, +) -> None: + logger.info( + pformat( + { + "type": "transfer_rx_stats", + "from_stage": from_stage, + "to_stage": to_stage, + "request_id": request_id, + "rx_bytes": int(rx_bytes), + "rx_decode_time_ms": float(rx_decode_time_ms), + "in_flight_time_ms": float(in_flight_time_ms), + "rx_time_per_kb_ms": ( + (float(rx_decode_time_ms) / max(float(rx_bytes) / 1024.0, 1e-6)) if rx_bytes > 0 else 0.0 + ), + }, + sort_dicts=False, + ) + ) + + +def log_transfer_total( + from_stage: int, + to_stage: int, + request_id: Any, + size_bytes: int, + tx_time_ms: float, + in_flight_time_ms: float, + rx_decode_time_ms: float, + total_time_ms: float, +) -> None: + logger.info( + pformat( + { + "type": "transfer_total_stats", + "from_stage": from_stage, + "to_stage": to_stage, + "request_id": request_id, + "size_bytes": int(size_bytes), + "tx_time_ms": float(tx_time_ms), + "in_flight_time_ms": float(in_flight_time_ms), + "rx_decode_time_ms": float(rx_decode_time_ms), + "total_time_ms": float(total_time_ms), + "total_time_per_kb_ms": ( + float(total_time_ms) / max(float(size_bytes) / 1024.0, 1e-6) if size_bytes > 0 else 0.0 + ), + }, + sort_dicts=False, + ) + ) + + +def log_stage_request_stats( + stage_id: int, + request_id: Any, + batch_size: int, + num_tokens_out: int, + stage_gen_time_ms: float, + tokens_per_s: float, + rx_transfer_bytes: int, + rx_decode_time_ms: float, + rx_mbps: float, +) -> None: + logger.info( + pformat( + { + "type": "Request_stage_stats", + "stage_id": stage_id, + "request_id": request_id, + "batch_size": int(batch_size), + "num_tokens_out": int(num_tokens_out), + "stage_gen_time_ms": float(stage_gen_time_ms), + "tokens_per_s": float(tokens_per_s), + "rx_transfer_bytes": int(rx_transfer_bytes), + "rx_decode_time_ms": float(rx_decode_time_ms), + "rx_mbps": float(rx_mbps), + }, + sort_dicts=False, + ) + ) + + +def compute_and_log_stage_request_stats(stats: "StageRequestStats") -> None: + tokens_per_s = (stats.num_tokens_out * 1000.0 / stats.stage_gen_time_ms) if stats.stage_gen_time_ms > 0 else 0.0 + rx_mbps = ( + (float(stats.rx_transfer_bytes) * 8.0) / (max(float(stats.rx_decode_time_ms), 1e-6) * 1000.0) + if stats.rx_transfer_bytes > 0 + else 0.0 + ) + log_stage_request_stats( + stats.stage_id, + stats.request_id, + int(stats.batch_size or 0), + int(stats.num_tokens_out), + float(stats.stage_gen_time_ms), + float(tokens_per_s), + int(stats.rx_transfer_bytes), + float(stats.rx_decode_time_ms), + float(rx_mbps), + ) + + +@dataclass +class StageStats: + total_token: int + total_gen_time: float + + +@dataclass +class StageRequestStats: + stage_id: int = 0 + request_id: Any = None + num_tokens_in: int = 0 + num_tokens_out: int = 0 + stage_gen_time_ms: float = 0.0 + batch_id: int | None = None + batch_size: int | None = None + rx_decode_time_ms: float = 0.0 + rx_transfer_bytes: int = 0 + rx_in_flight_time_ms: float = 0.0 + stage_stats: StageStats | None = None + + @property + def tokens_per_s(self) -> float: + return (self.num_tokens_out * 1000.0 / self.stage_gen_time_ms) if self.stage_gen_time_ms > 0 else 0.0 + + @property + def rx_mbps(self) -> float: + if self.rx_transfer_bytes <= 0: + return 0.0 + return (float(self.rx_transfer_bytes) * 8.0) / (max(float(self.rx_decode_time_ms), 1e-6) * 1000.0) + + +@dataclass +class TransferEdgeStats: + from_stage: int + to_stage: int + request_id: Any + size_bytes: int + tx_time_ms: float + rx_decode_time_ms: float + in_flight_time_ms: float + + @property + def total_time_ms(self) -> float: + return float(self.tx_time_ms) + float(self.rx_decode_time_ms) + float(self.in_flight_time_ms) + + +@dataclass +class RequestE2EStats: + request_id: Any + e2e_time_ms: float + e2e_total_tokens: int + transfers_total_time_ms: float + transfers_total_bytes: int + + +@dataclass +class RunSummary: + e2e_requests: int + e2e_total_time_ms: float + e2e_sum_time_ms: float + e2e_total_tokens: int + e2e_avg_time_per_request_ms: float + e2e_avg_tokens_per_s: float + wall_time_ms: float + stages: list[dict[str, Any]] = field(default_factory=list) + transfers: list[dict[str, Any]] = field(default_factory=list) + final_stage_id: dict[str, int] | int | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "e2e_requests": self.e2e_requests, + "e2e_total_time_ms": self.e2e_total_time_ms, + "e2e_sum_time_ms": self.e2e_sum_time_ms, + "e2e_total_tokens": self.e2e_total_tokens, + "e2e_avg_time_per_request_ms": self.e2e_avg_time_per_request_ms, + "e2e_avg_tokens_per_s": self.e2e_avg_tokens_per_s, + "wall_time_ms": self.wall_time_ms, + "final_stage_id": self.final_stage_id, + "stages": self.stages, + "transfers": self.transfers, + } + + +def record_stage_metrics( + per_request: dict[str, dict[str, Any]], + stage_req_counts: list[int], + stage_total_tokens: list[int], + stage_id: int, + stats: StageRequestStats, +) -> None: + stage_req_counts[stage_id] += 1 + stage_total_tokens[stage_id] += int(stats.num_tokens_out) + rid_key = str(stats.request_id) + pr = per_request.setdefault(rid_key, {"stages": {}, "transfers_ms": 0.0, "transfers_bytes": 0}) + pr_stages = pr["stages"] # type: ignore[index] + stage_data: dict[str, Any] = { + "stage_gen_time_ms": float(stats.stage_gen_time_ms), + "num_tokens_out": int(stats.num_tokens_out), + } + if stats.stage_stats is not None: + stage_data["stage_stats"] = { + "total_token": int(stats.stage_stats.total_token), + "total_gen_time": float(stats.stage_stats.total_gen_time), + } + # Only record num_tokens_in for stage 0 (initial prompt) + if stage_id == 0: + stage_data["num_tokens_in"] = int(stats.num_tokens_in) + stage_total_tokens[stage_id] += int(stats.num_tokens_in) + pr_stages[stage_id] = stage_data + + +def aggregate_rx_and_maybe_total( + transfer_edge_req: dict[tuple[int, int, str], dict[str, float]], + transfer_agg: dict[tuple[int, int], dict[str, float]], + per_request: dict[str, dict[str, Any]], + stage_id: int, + req_id: Any, + rx_bytes: float, + rx_ms: float, + in_flight_ms: float, +) -> tuple[int, float, float] | None: + try: + if stage_id > 0: + key = (stage_id - 1, stage_id) + agg = transfer_agg.get(key) + if agg is None: + agg = { + "sum_bytes": 0.0, + "sum_ms": 0.0, + "count": 0.0, + "sum_rx_bytes": 0.0, + "sum_rx_ms": 0.0, + "rx_count": 0.0, + "sum_total_ms": 0.0, + "total_count": 0.0, + } + transfer_agg[key] = agg + agg["sum_rx_bytes"] += float(rx_bytes) + agg["sum_rx_ms"] += float(rx_ms) + agg["rx_count"] += 1.0 + + rid_key = str(req_id) + s = transfer_edge_req.get((stage_id - 1, stage_id, rid_key)) + if s is None: + return None + tx_ms = float(s.get("tx_ms", 0.0)) + size_b = float(s.get("size_bytes", rx_bytes)) + total_ms = tx_ms + float(in_flight_ms) + float(rx_ms) + agg["sum_total_ms"] += total_ms + agg["total_count"] += 1.0 + try: + pr = per_request.setdefault(rid_key, {"stages": {}, "transfers_ms": 0.0, "transfers_bytes": 0}) + pr["transfers_ms"] = float(pr.get("transfers_ms", 0.0)) + total_ms # type: ignore[index] + pr["transfers_bytes"] = int(pr.get("transfers_bytes", 0)) + int(rx_bytes) # type: ignore[index] + except Exception: + pass + return int(size_b), float(tx_ms), float(total_ms) + return None + except Exception: + return None + + +def record_sender_transfer_agg( + transfer_agg: dict[tuple[int, int], dict[str, float]], + transfer_edge_req: dict[tuple[int, int, str], dict[str, float]], + from_stage: int, + to_stage: int, + req_id: Any, + size_bytes: int, + tx_ms: float, +) -> None: + try: + key = (from_stage, to_stage) + agg = transfer_agg.get(key) + if agg is None: + agg = { + "sum_bytes": 0.0, + "sum_ms": 0.0, + "count": 0.0, + "sum_rx_bytes": 0.0, + "sum_rx_ms": 0.0, + "rx_count": 0.0, + "sum_total_ms": 0.0, + "total_count": 0.0, + } + transfer_agg[key] = agg + agg["sum_bytes"] += float(size_bytes) + agg["sum_ms"] += float(tx_ms) + agg["count"] += 1.0 + rid_key = str(req_id) + transfer_edge_req[(from_stage, to_stage, rid_key)] = { + "tx_ms": float(tx_ms), + "size_bytes": float(size_bytes), + } + except Exception: + pass + + +def count_tokens_from_outputs(engine_outputs: list[Any]) -> int: + total = 0 + for _ro in engine_outputs: + try: + outs = getattr(_ro, "outputs", None) + if outs and len(outs) > 0: + tokens = getattr(outs[0], "token_ids", None) + if tokens is not None: + total += len(tokens) + except Exception: + pass + return total + + +def build_stage_summary( + stage_req_counts: list[int], + stage_total_tokens: list[int], + stage_total_time_ms: list[float], +) -> list[dict[str, Any]]: + summary: list[dict[str, Any]] = [] + for sid in range(len(stage_req_counts)): + reqs = stage_req_counts[sid] + tokens = stage_total_tokens[sid] + total_ms = float(stage_total_time_ms[sid]) + avg_req = (total_ms / reqs) if reqs > 0 else 0.0 + avg_tok = (tokens * 1000.0 / total_ms) if total_ms > 0 else 0.0 + summary.append( + { + "stage_id": sid, + "requests": int(reqs), + "tokens": int(tokens), + "total_time_ms": total_ms, + "avg_time_per_request_ms": avg_req, + "avg_tokens_per_s": avg_tok, + } + ) + return summary + + +def build_transfer_summary( + transfer_agg: dict[tuple[int, int], dict[str, float]], +) -> list[dict[str, Any]]: + summary: list[dict[str, Any]] = [] + for (src, dst), agg in transfer_agg.items(): + sum_bytes = float(agg.get("sum_bytes", 0.0)) + sum_ms = float(agg.get("sum_ms", 0.0)) + samples = int(agg.get("count", 0.0)) + tx_mbps = (sum_bytes * 8.0) / (max(sum_ms, 1e-6) * 1000.0) if sum_bytes > 0 else 0.0 + sum_rx_bytes = float(agg.get("sum_rx_bytes", 0.0)) + sum_rx_ms = float(agg.get("sum_rx_ms", 0.0)) + samples_rx = int(agg.get("rx_count", 0.0)) + rx_mbps = (sum_rx_bytes * 8.0) / (max(sum_rx_ms, 1e-6) * 1000.0) if sum_rx_bytes > 0 else 0.0 + sum_total_ms = float(agg.get("sum_total_ms", 0.0)) + samples_total = int(agg.get("total_count", 0.0)) + total_mbps = (sum_bytes * 8.0) / (max(sum_total_ms, 1e-6) * 1000.0) if sum_bytes > 0 else 0.0 + summary.append( + { + "from_stage": src, + "to_stage": dst, + "samples": samples, + "total_bytes": int(sum_bytes), + "total_time_ms": sum_ms, + "tx_mbps": tx_mbps, + "rx_samples": samples_rx, + "rx_total_bytes": int(sum_rx_bytes), + "rx_total_time_ms": sum_rx_ms, + "rx_mbps": rx_mbps, + "total_samples": samples_total, + "total_transfer_time_ms": sum_total_ms, + "total_mbps": total_mbps, + } + ) + return summary + + +class OrchestratorAggregator: + """Aggregator for orchestrator-level metrics. + + Acts as the single interface for recording per-stage request stats, + transfer edge stats, and end-to-end request completions. Downstream + loggers (text or Prometheus) can subscribe via the logger manager. + """ + + def __init__( + self, + num_stages: int, + enable_debug_events: bool, + wall_start_ts: float | None, + ) -> None: + self.num_stages = int(num_stages) + self.enable_debug_events = bool(enable_debug_events) + # Backward compatible attribute name + self.enable_stats = self.enable_debug_events + self.stage_total_time_ms: list[float] = [0.0 for _ in range(self.num_stages)] + self.stage_total_tokens: list[int] = [0 for _ in range(self.num_stages)] + self.stage_req_counts: list[int] = [0 for _ in range(self.num_stages)] + self.transfer_agg: dict[tuple[int, int], dict[str, float]] = {} + self.transfer_edge_req: dict[tuple[int, int, str], dict[str, float]] = {} + self.e2e_total_ms: float = 0.0 + self.e2e_total_tokens: int = 0 + self.e2e_count: int = 0 + self.e2e_done: set[str] = set() + self.per_request: dict[str, dict[str, Any]] = {} + self.sum_per_request_transfer_ms: float = 0.0 + self.wall_start_ts: float = float(wall_start_ts if wall_start_ts is not None else time.time()) + self.last_finish_ts: float = float(self.wall_start_ts) + self.stage_seen_batches: dict[int, set] = {sid: set() for sid in range(self.num_stages)} + self.stage_first_ts: list[float | None] = [None for _ in range(self.num_stages)] + self.stage_last_ts: list[float | None] = [None for _ in range(self.num_stages)] + self.final_stage_map: dict[str, int] | int | None = None + self.stage_events: list[StageRequestStats] = [] + self.transfer_events: list[TransferEdgeStats] = [] + self.request_events: list[RequestE2EStats] = [] + self.logger_manager: Any = None + + def set_final_stage_map(self, final_stage_map: dict[str, int] | int | None) -> None: + self.final_stage_map = final_stage_map + + def set_logger_manager(self, manager: Any) -> None: + self.logger_manager = manager + + def _maybe_log_interval(self) -> None: + if self.logger_manager is not None: + try: + self.logger_manager.maybe_log() + except Exception: + logger.debug("Failed to emit periodic stats", exc_info=True) + + def _as_stage_request_stats( + self, + stage_id: int, + req_id: Any, + metrics: StageRequestStats | dict[str, Any], + ) -> StageRequestStats: + if isinstance(metrics, StageRequestStats): + stats = metrics + stats.stage_id = stage_id + stats.request_id = req_id + return stats + m = metrics or {} + stage_stats_raw = m.get("stage_stats", None) + stage_stats = None + if stage_stats_raw is not None: + try: + stage_stats = StageStats( + total_token=int(stage_stats_raw.get("total_token", 0)), + total_gen_time=float(stage_stats_raw.get("total_gen_time", 0.0)), + ) + except Exception: + stage_stats = None + return StageRequestStats( + stage_id=stage_id, + request_id=req_id, + num_tokens_in=int(m.get("num_tokens_in", 0) or 0), + num_tokens_out=int(m.get("num_tokens_out", 0) or 0), + stage_gen_time_ms=float(m.get("stage_gen_time_ms", 0.0) or 0.0), + batch_id=m.get("batch_id"), + batch_size=m.get("batch_size"), + rx_decode_time_ms=float(m.get("rx_decode_time_ms", 0.0) or 0.0), + rx_transfer_bytes=int(m.get("rx_transfer_bytes", 0) or 0), + rx_in_flight_time_ms=float(m.get("rx_in_flight_time_ms", 0.0) or 0.0), + stage_stats=stage_stats, + ) + + def on_stage_metrics(self, stage_id: int, req_id: Any, metrics: StageRequestStats | dict[str, Any]) -> None: + stats = self._as_stage_request_stats(stage_id, req_id, metrics) + record_stage_metrics( + self.per_request, + self.stage_req_counts, + self.stage_total_tokens, + stage_id, + stats, + ) + if self.enable_debug_events: + compute_and_log_stage_request_stats(stats) + self.stage_events.append(stats) + if stats.stage_stats is not None: + total_token = int(stats.stage_stats.total_token) + total_gen_time = float(stats.stage_stats.total_gen_time) + _avg_tokens_per_s = (total_token * 1000.0 / total_gen_time) if total_gen_time > 0 else 0.0 + logger.info( + pformat( + { + "type": "Stage_running_avg", + "stage_id": stage_id, + "total_tokens": total_token, + "total_gen_time_ms": total_gen_time, + "avg_tokens_per_s": _avg_tokens_per_s, + }, + sort_dicts=False, + ) + ) + try: + batch_id_raw = stats.batch_id + if batch_id_raw is not None: + batch_id = int(batch_id_raw) + if batch_id not in self.stage_seen_batches[stage_id]: + self.stage_total_time_ms[stage_id] += float(stats.stage_gen_time_ms) + self.stage_seen_batches[stage_id].add(batch_id) + except Exception: + pass + rx_b = float(stats.rx_transfer_bytes) + rx_ms = float(stats.rx_decode_time_ms) + in_flight_ms = float(stats.rx_in_flight_time_ms) + combined = aggregate_rx_and_maybe_total( + self.transfer_edge_req, + self.transfer_agg, + self.per_request, + stage_id, + req_id, + rx_b, + rx_ms, + in_flight_ms, + ) + if self.enable_debug_events and stage_id > 0: + log_transfer_rx( + stage_id - 1, + stage_id, + req_id, + int(rx_b), + float(rx_ms), + float(in_flight_ms), + ) + if combined is not None: + size_b_c, tx_ms_c, total_ms_c = combined + tx_event = TransferEdgeStats( + from_stage=stage_id - 1, + to_stage=stage_id, + request_id=req_id, + size_bytes=int(size_b_c), + tx_time_ms=float(tx_ms_c), + rx_decode_time_ms=float(rx_ms), + in_flight_time_ms=float(in_flight_ms), + ) + self.transfer_events.append(tx_event) + log_transfer_total( + stage_id - 1, + stage_id, + req_id, + int(size_b_c), + float(tx_ms_c), + float(in_flight_ms), + float(rx_ms), + float(total_ms_c), + ) + self._maybe_log_interval() + + def on_forward( + self, + from_stage: int, + to_stage: int, + req_id: Any, + size_bytes: int, + tx_ms: float, + used_shm: bool, + ) -> None: + if self.stage_first_ts[to_stage] is None: + self.stage_first_ts[to_stage] = time.time() + if self.enable_debug_events: + log_transfer_tx( + from_stage, + to_stage, + req_id, + int(size_bytes), + float(tx_ms), + bool(used_shm), + ) + record_sender_transfer_agg( + self.transfer_agg, + self.transfer_edge_req, + from_stage, + to_stage, + req_id, + int(size_bytes), + float(tx_ms), + ) + self._maybe_log_interval() + + def on_finalize_request( + self, + stage_id: int, + req_id: Any, + req_start_ts: float, + ) -> None: + rid_key = str(req_id) + _t0 = float(req_start_ts) + _t1 = time.time() + prev_last = self.stage_last_ts[stage_id] + self.stage_last_ts[stage_id] = _t1 if prev_last is None else max(prev_last, _t1) + self.last_finish_ts = max(self.last_finish_ts, _t1) + e2e_ms = (_t1 - _t0) * 1000.0 + + pr = self.per_request.setdefault(rid_key, {"stages": {}, "transfers_ms": 0.0, "transfers_bytes": 0}) + total_tokens = 0 + stages_info = pr.get("stages", {}) + for sid, stage_data in stages_info.items(): + if sid == 0: + total_tokens += int(stage_data.get("num_tokens_in", 0)) + total_tokens += int(stage_data.get("num_tokens_out", 0)) + + self.e2e_total_ms += e2e_ms + self.e2e_total_tokens += total_tokens + self.e2e_count += 1 + self.e2e_done.add(rid_key) + per_req_record = RequestE2EStats( + request_id=rid_key, + e2e_time_ms=e2e_ms, + e2e_total_tokens=total_tokens, + transfers_total_time_ms=float(pr.get("transfers_ms", 0.0)), + transfers_total_bytes=int(pr.get("transfers_bytes", 0)), + ) + self.sum_per_request_transfer_ms += float(pr.get("transfers_ms", 0.0)) + self.request_events.append(per_req_record) + if self.enable_debug_events: + logger.info( + pformat( + { + "type": "request_level_metrics", + "request_id": rid_key, + "e2e_time_ms": e2e_ms, + "e2e_tpt": (e2e_ms / total_tokens) if total_tokens > 0 else 0.0, + "e2e_total_tokens": total_tokens, + "transfers_total_time_ms": float(pr.get("transfers_ms", 0.0)), + "transfers_total_bytes": int(pr.get("transfers_bytes", 0)), + "stages": stages_info, + }, + sort_dicts=False, + ) + ) + self._maybe_log_interval() + + def build_run_summary(self, final_stage_id_to_prompt: dict[str, int] | int | None = None) -> RunSummary: + if final_stage_id_to_prompt is not None: + self.set_final_stage_map(final_stage_id_to_prompt) + stage_summary: list[dict[str, Any]] = [] + for sid in range(self.num_stages): + first_ts = self.stage_first_ts[sid] + last_ts = self.stage_last_ts[sid] + total_ms = ( + (max(0.0, (last_ts - first_ts)) * 1000.0) if (first_ts is not None and last_ts is not None) else 0.0 + ) + reqs = self.stage_req_counts[sid] + tokens = self.stage_total_tokens[sid] + avg_req = (total_ms / reqs) if reqs > 0 else 0.0 + avg_tok = (tokens * 1000.0 / total_ms) if total_ms > 0 else 0.0 + stage_summary.append( + { + "stage_id": sid, + "requests": int(reqs), + "tokens": int(tokens), + "total_time_ms": float(total_ms), + "avg_time_per_request_ms": float(avg_req), + "avg_tokens_per_s": float(avg_tok), + } + ) + transfer_summary = build_transfer_summary(self.transfer_agg) + e2e_avg_req = (self.e2e_total_ms / self.e2e_count) if self.e2e_count > 0 else 0.0 + e2e_avg_tok = (self.e2e_total_tokens * 1000.0 / self.e2e_total_ms) if self.e2e_total_ms > 0 else 0.0 + wall_time_ms = max(0.0, (self.last_finish_ts - self.wall_start_ts) * 1000.0) + return RunSummary( + e2e_requests=int(self.e2e_count), + e2e_total_time_ms=float(wall_time_ms), + e2e_sum_time_ms=float(self.e2e_total_ms), + e2e_total_tokens=int(self.e2e_total_tokens), + e2e_avg_time_per_request_ms=e2e_avg_req, + e2e_avg_tokens_per_s=e2e_avg_tok, + wall_time_ms=wall_time_ms, + final_stage_id=self.final_stage_map, + stages=stage_summary, + transfers=transfer_summary, + ) + + def build_and_log_summary(self, final_stage_id_to_prompt: dict[str, int] | int | None = None) -> dict[str, Any]: + summary = self.build_run_summary(final_stage_id_to_prompt) + return summary.to_dict() + + +# Backward compatible alias +OrchestratorMetrics = OrchestratorAggregator +StageRequestMetrics = StageRequestStats From 5a08eb34583dbac92e9533fcdf70285be7c387f9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 12 Jan 2026 11:53:42 +0000 Subject: [PATCH 3/7] chore: address metrics review feedback Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com> --- vllm_omni/entrypoints/async_omni.py | 1 + vllm_omni/entrypoints/omni.py | 1 + vllm_omni/metrics/stats.py | 21 ++++++++++++--------- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index d8f31d44bf1..6fb48c0fb92 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -310,6 +310,7 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator _req_start_ts: dict[int, float] = {} _wall_start_ts: float = time.time() # _last_finish_ts: float = _wall_start_ts + stat_logger_manager: OmniStatLoggerManager | None = None # Determine the final stage for E2E stats (highest stage_id with # final_output=True; fallback to last stage) diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index 8d8809e3140..132729e8aa3 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -572,6 +572,7 @@ def _run_generation( # Orchestrator keeps stage objects for input derivation num_stages = len(self.stage_list) + stat_logger_manager: OmniStatLoggerManager | None = None # Generate globally unique request IDs and map them to original prompts request_ids: list[str] = [f"{i}_{uuid.uuid4()}" for i in range(len(request_prompts))] diff --git a/vllm_omni/metrics/stats.py b/vllm_omni/metrics/stats.py index c0e7709858f..d0ee9b81c20 100644 --- a/vllm_omni/metrics/stats.py +++ b/vllm_omni/metrics/stats.py @@ -8,6 +8,7 @@ from vllm.logger import init_logger logger = init_logger(__name__) +MIN_TIME_MS = 1e-6 def log_transfer_tx( @@ -27,7 +28,7 @@ def log_transfer_tx( "request_id": request_id, "size_bytes": int(size_bytes), "tx_time_ms": float(tx_time_ms), - "tx_mbps": (float(size_bytes) * 8.0) / (max(tx_time_ms, 1e-6) * 1000.0), + "tx_mbps": (float(size_bytes) * 8.0) / (max(tx_time_ms, MIN_TIME_MS) * 1000.0), "used_shm": bool(used_shm), }, sort_dicts=False, @@ -54,7 +55,9 @@ def log_transfer_rx( "rx_decode_time_ms": float(rx_decode_time_ms), "in_flight_time_ms": float(in_flight_time_ms), "rx_time_per_kb_ms": ( - (float(rx_decode_time_ms) / max(float(rx_bytes) / 1024.0, 1e-6)) if rx_bytes > 0 else 0.0 + (float(rx_decode_time_ms) / max(float(rx_bytes) / 1024.0, MIN_TIME_MS)) + if rx_bytes > 0 + else 0.0 ), }, sort_dicts=False, @@ -85,7 +88,7 @@ def log_transfer_total( "rx_decode_time_ms": float(rx_decode_time_ms), "total_time_ms": float(total_time_ms), "total_time_per_kb_ms": ( - float(total_time_ms) / max(float(size_bytes) / 1024.0, 1e-6) if size_bytes > 0 else 0.0 + float(total_time_ms) / max(float(size_bytes) / 1024.0, MIN_TIME_MS) if size_bytes > 0 else 0.0 ), }, sort_dicts=False, @@ -126,7 +129,7 @@ def log_stage_request_stats( def compute_and_log_stage_request_stats(stats: "StageRequestStats") -> None: tokens_per_s = (stats.num_tokens_out * 1000.0 / stats.stage_gen_time_ms) if stats.stage_gen_time_ms > 0 else 0.0 rx_mbps = ( - (float(stats.rx_transfer_bytes) * 8.0) / (max(float(stats.rx_decode_time_ms), 1e-6) * 1000.0) + (float(stats.rx_transfer_bytes) * 8.0) / (max(float(stats.rx_decode_time_ms), MIN_TIME_MS) * 1000.0) if stats.rx_transfer_bytes > 0 else 0.0 ) @@ -171,7 +174,7 @@ def tokens_per_s(self) -> float: def rx_mbps(self) -> float: if self.rx_transfer_bytes <= 0: return 0.0 - return (float(self.rx_transfer_bytes) * 8.0) / (max(float(self.rx_decode_time_ms), 1e-6) * 1000.0) + return (float(self.rx_transfer_bytes) * 8.0) / (max(float(self.rx_decode_time_ms), MIN_TIME_MS) * 1000.0) @dataclass @@ -388,14 +391,14 @@ def build_transfer_summary( sum_bytes = float(agg.get("sum_bytes", 0.0)) sum_ms = float(agg.get("sum_ms", 0.0)) samples = int(agg.get("count", 0.0)) - tx_mbps = (sum_bytes * 8.0) / (max(sum_ms, 1e-6) * 1000.0) if sum_bytes > 0 else 0.0 + tx_mbps = (sum_bytes * 8.0) / (max(sum_ms, MIN_TIME_MS) * 1000.0) if sum_bytes > 0 else 0.0 sum_rx_bytes = float(agg.get("sum_rx_bytes", 0.0)) sum_rx_ms = float(agg.get("sum_rx_ms", 0.0)) samples_rx = int(agg.get("rx_count", 0.0)) - rx_mbps = (sum_rx_bytes * 8.0) / (max(sum_rx_ms, 1e-6) * 1000.0) if sum_rx_bytes > 0 else 0.0 + rx_mbps = (sum_rx_bytes * 8.0) / (max(sum_rx_ms, MIN_TIME_MS) * 1000.0) if sum_rx_bytes > 0 else 0.0 sum_total_ms = float(agg.get("sum_total_ms", 0.0)) samples_total = int(agg.get("total_count", 0.0)) - total_mbps = (sum_bytes * 8.0) / (max(sum_total_ms, 1e-6) * 1000.0) if sum_bytes > 0 else 0.0 + total_mbps = (sum_bytes * 8.0) / (max(sum_total_ms, MIN_TIME_MS) * 1000.0) if sum_bytes > 0 else 0.0 summary.append( { "from_stage": src, @@ -432,7 +435,7 @@ def __init__( ) -> None: self.num_stages = int(num_stages) self.enable_debug_events = bool(enable_debug_events) - # Backward compatible attribute name + # Backward compatible alias for legacy callers that still check enable_stats. self.enable_stats = self.enable_debug_events self.stage_total_time_ms: list[float] = [0.0 for _ in range(self.num_stages)] self.stage_total_tokens: list[int] = [0 for _ in range(self.num_stages)] From 397c4d8320d341a068c329b193203cdcea753a53 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 12 Jan 2026 11:55:02 +0000 Subject: [PATCH 4/7] fix: use final stage mapping in async metrics Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com> --- vllm_omni/entrypoints/async_omni.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 6fb48c0fb92..86b52a693e7 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -317,6 +317,7 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator final_stage_id_for_e2e = get_final_stage_id_for_e2e( output_modalities, self.output_modalities, self.stage_list ) + final_stage_id_to_prompt = {str(request_id): final_stage_id_for_e2e} # Metrics/aggregation helper metrics = OrchestratorMetrics( @@ -324,7 +325,7 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator self._enable_stats, _wall_start_ts, ) - metrics.set_final_stage_map({str(request_id): final_stage_id_for_e2e}) + metrics.set_final_stage_map(final_stage_id_to_prompt) stat_logger_manager = OmniStatLoggerManager( aggregator=metrics, loggers=[ @@ -479,7 +480,7 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator try: if stat_logger_manager: stat_logger_manager.force_log() - summary = metrics.build_and_log_summary(final_stage_id_for_e2e) + summary = metrics.build_and_log_summary(final_stage_id_to_prompt) logger.info("[Summary] %s", pformat(summary, sort_dicts=False)) except Exception as e: logger.exception(f"[{self._name}] Failed to build/log summary: {e}") From d14f6b4bd7027f2cbafad2262d58a0f845c88b94 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 12 Jan 2026 11:57:39 +0000 Subject: [PATCH 5/7] chore: tidy metrics typing and imports Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com> --- vllm_omni/metrics/loggers.py | 2 +- vllm_omni/metrics/prometheus.py | 2 +- vllm_omni/metrics/stats.py | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/vllm_omni/metrics/loggers.py b/vllm_omni/metrics/loggers.py index ba49600e782..e2a023f4312 100644 --- a/vllm_omni/metrics/loggers.py +++ b/vllm_omni/metrics/loggers.py @@ -50,7 +50,7 @@ def __init__( self, aggregator: Any, loggers: list[OmniStatLoggerBase] | None = None, - final_stage_map_provider: Callable[[], dict[str, int] | int | None] | None = None, + final_stage_map_provider: Callable[[], dict[str, int] | None] | None = None, ) -> None: self.aggregator = aggregator self.loggers = loggers or [] diff --git a/vllm_omni/metrics/prometheus.py b/vllm_omni/metrics/prometheus.py index a76c7116a32..b1a3201a51d 100644 --- a/vllm_omni/metrics/prometheus.py +++ b/vllm_omni/metrics/prometheus.py @@ -17,7 +17,7 @@ def __init__(self, interval_s: float = 10.0, enabled: bool = True, registry: Any super().__init__(interval_s=interval_s, enabled=enabled) try: from prometheus_client import CollectorRegistry, Gauge - except Exception: + except ImportError: logger.debug("prometheus_client not available; disabling Prometheus stat logger") self.enabled = False self._registry = None diff --git a/vllm_omni/metrics/stats.py b/vllm_omni/metrics/stats.py index d0ee9b81c20..8bea086d179 100644 --- a/vllm_omni/metrics/stats.py +++ b/vllm_omni/metrics/stats.py @@ -126,7 +126,7 @@ def log_stage_request_stats( ) -def compute_and_log_stage_request_stats(stats: "StageRequestStats") -> None: +def compute_and_log_stage_request_stats(stats: StageRequestStats) -> None: tokens_per_s = (stats.num_tokens_out * 1000.0 / stats.stage_gen_time_ms) if stats.stage_gen_time_ms > 0 else 0.0 rx_mbps = ( (float(stats.rx_transfer_bytes) * 8.0) / (max(float(stats.rx_decode_time_ms), MIN_TIME_MS) * 1000.0) @@ -212,7 +212,7 @@ class RunSummary: wall_time_ms: float stages: list[dict[str, Any]] = field(default_factory=list) transfers: list[dict[str, Any]] = field(default_factory=list) - final_stage_id: dict[str, int] | int | None = None + final_stage_id: dict[str, int] | None = None def to_dict(self) -> dict[str, Any]: return { @@ -453,13 +453,13 @@ def __init__( self.stage_seen_batches: dict[int, set] = {sid: set() for sid in range(self.num_stages)} self.stage_first_ts: list[float | None] = [None for _ in range(self.num_stages)] self.stage_last_ts: list[float | None] = [None for _ in range(self.num_stages)] - self.final_stage_map: dict[str, int] | int | None = None + self.final_stage_map: dict[str, int] | None = None self.stage_events: list[StageRequestStats] = [] self.transfer_events: list[TransferEdgeStats] = [] self.request_events: list[RequestE2EStats] = [] self.logger_manager: Any = None - def set_final_stage_map(self, final_stage_map: dict[str, int] | int | None) -> None: + def set_final_stage_map(self, final_stage_map: dict[str, int] | None) -> None: self.final_stage_map = final_stage_map def set_logger_manager(self, manager: Any) -> None: @@ -675,7 +675,7 @@ def on_finalize_request( ) self._maybe_log_interval() - def build_run_summary(self, final_stage_id_to_prompt: dict[str, int] | int | None = None) -> RunSummary: + def build_run_summary(self, final_stage_id_to_prompt: dict[str, int] | None = None) -> RunSummary: if final_stage_id_to_prompt is not None: self.set_final_stage_map(final_stage_id_to_prompt) stage_summary: list[dict[str, Any]] = [] @@ -716,7 +716,7 @@ def build_run_summary(self, final_stage_id_to_prompt: dict[str, int] | int | Non transfers=transfer_summary, ) - def build_and_log_summary(self, final_stage_id_to_prompt: dict[str, int] | int | None = None) -> dict[str, Any]: + def build_and_log_summary(self, final_stage_id_to_prompt: dict[str, int] | None = None) -> dict[str, Any]: summary = self.build_run_summary(final_stage_id_to_prompt) return summary.to_dict() From a9d01cc686d2b7d53533f25c420a50e390b8d15a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 12 Jan 2026 11:59:37 +0000 Subject: [PATCH 6/7] chore: add build_summary alias Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com> --- vllm_omni/entrypoints/async_omni.py | 2 +- vllm_omni/entrypoints/omni.py | 2 +- vllm_omni/metrics/stats.py | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 86b52a693e7..832eeb9e59d 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -480,7 +480,7 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator try: if stat_logger_manager: stat_logger_manager.force_log() - summary = metrics.build_and_log_summary(final_stage_id_to_prompt) + summary = metrics.build_summary(final_stage_id_to_prompt) logger.info("[Summary] %s", pformat(summary, sort_dicts=False)) except Exception as e: logger.exception(f"[{self._name}] Failed to build/log summary: {e}") diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index 132729e8aa3..c311e16984d 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -792,7 +792,7 @@ def _run_generation( try: if stat_logger_manager: stat_logger_manager.force_log() - summary = metrics.build_and_log_summary(final_stage_id_to_prompt) + summary = metrics.build_summary(final_stage_id_to_prompt) logger.info("[Summary] %s", pformat(summary, sort_dicts=False)) except Exception as e: logger.exception(f"[{self._name}] Failed to build/log summary: {e}") diff --git a/vllm_omni/metrics/stats.py b/vllm_omni/metrics/stats.py index 8bea086d179..c4fde13e65f 100644 --- a/vllm_omni/metrics/stats.py +++ b/vllm_omni/metrics/stats.py @@ -720,6 +720,10 @@ def build_and_log_summary(self, final_stage_id_to_prompt: dict[str, int] | None summary = self.build_run_summary(final_stage_id_to_prompt) return summary.to_dict() + def build_summary(self, final_stage_id_to_prompt: dict[str, int] | None = None) -> dict[str, Any]: + """Alias for clarity; retained build_and_log_summary for backward compatibility.""" + return self.build_and_log_summary(final_stage_id_to_prompt) + # Backward compatible alias OrchestratorMetrics = OrchestratorAggregator From 2576d516afad3f072a75b504360910ec0c0a9b32 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 12 Jan 2026 12:02:02 +0000 Subject: [PATCH 7/7] chore: streamline summary logging Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com> --- vllm_omni/entrypoints/async_omni.py | 19 +++++++++++-------- vllm_omni/entrypoints/omni.py | 9 ++++++--- vllm_omni/metrics/loggers.py | 3 ++- vllm_omni/metrics/stats.py | 2 +- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/vllm_omni/entrypoints/async_omni.py b/vllm_omni/entrypoints/async_omni.py index 832eeb9e59d..324a3252fb3 100644 --- a/vllm_omni/entrypoints/async_omni.py +++ b/vllm_omni/entrypoints/async_omni.py @@ -314,10 +314,10 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator # Determine the final stage for E2E stats (highest stage_id with # final_output=True; fallback to last stage) - final_stage_id_for_e2e = get_final_stage_id_for_e2e( + final_stage_id = get_final_stage_id_for_e2e( output_modalities, self.output_modalities, self.stage_list ) - final_stage_id_to_prompt = {str(request_id): final_stage_id_for_e2e} + final_stage_id_to_prompt = {str(request_id): final_stage_id} # Metrics/aggregation helper metrics = OrchestratorMetrics( @@ -353,7 +353,7 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator logger.debug(f"[{self._name}] Enqueued request {request_id} to stage-0") logger.debug(f"[{self._name}] Entering scheduling loop: stages={num_stages}") - for stage_id, stage in enumerate(self.stage_list[: final_stage_id_for_e2e + 1]): + for stage_id, stage in enumerate(self.stage_list[: final_stage_id + 1]): finished = False while not finished: result = await req_state.queue.get() @@ -400,7 +400,7 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator # (only once per request at the designated final stage) try: rid_key = str(req_id) - if stage_id == final_stage_id_for_e2e and rid_key not in metrics.e2e_done and finished: + if stage_id == final_stage_id and rid_key not in metrics.e2e_done and finished: metrics.on_finalize_request( stage_id, req_id, @@ -436,7 +436,7 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator stage.set_engine_outputs(engine_outputs) # Forward to next stage if there is one next_stage_id = stage_id + 1 - if next_stage_id <= final_stage_id_for_e2e and finished: + if next_stage_id <= final_stage_id and finished: next_stage: OmniStage = self.stage_list[next_stage_id] next_inputs = next_stage.process_engine_inputs(self.stage_list, prompt) sp_next: SamplingParams = sampling_params_list[next_stage_id] @@ -478,10 +478,13 @@ async def generate(self, *args: Any, **kwargs: dict[str, Any]) -> AsyncGenerator # Summarize and print stats try: + summary_dict = None if stat_logger_manager: - stat_logger_manager.force_log() - summary = metrics.build_summary(final_stage_id_to_prompt) - logger.info("[Summary] %s", pformat(summary, sort_dicts=False)) + summary_obj = stat_logger_manager.force_log() + summary_dict = summary_obj.to_dict() + if summary_dict is None: + summary_dict = metrics.build_summary(final_stage_id_to_prompt) + logger.info("[Summary] %s", pformat(summary_dict, sort_dicts=False)) except Exception as e: logger.exception(f"[{self._name}] Failed to build/log summary: {e}") finally: diff --git a/vllm_omni/entrypoints/omni.py b/vllm_omni/entrypoints/omni.py index c311e16984d..bf0888087d4 100644 --- a/vllm_omni/entrypoints/omni.py +++ b/vllm_omni/entrypoints/omni.py @@ -790,10 +790,13 @@ def _run_generation( # Summarize and print stats try: + summary_dict = None if stat_logger_manager: - stat_logger_manager.force_log() - summary = metrics.build_summary(final_stage_id_to_prompt) - logger.info("[Summary] %s", pformat(summary, sort_dicts=False)) + summary_obj = stat_logger_manager.force_log() + summary_dict = summary_obj.to_dict() + if summary_dict is None: + summary_dict = metrics.build_summary(final_stage_id_to_prompt) + logger.info("[Summary] %s", pformat(summary_dict, sort_dicts=False)) except Exception as e: logger.exception(f"[{self._name}] Failed to build/log summary: {e}") diff --git a/vllm_omni/metrics/loggers.py b/vllm_omni/metrics/loggers.py index e2a023f4312..cc4c102beff 100644 --- a/vllm_omni/metrics/loggers.py +++ b/vllm_omni/metrics/loggers.py @@ -69,7 +69,7 @@ def maybe_log(self) -> None: lg.log(summary) lg.update_ts(now) - def force_log(self) -> None: + def force_log(self) -> RunSummary: final_stage = self.final_stage_map_provider() if self.final_stage_map_provider else None summary = self.aggregator.build_run_summary(final_stage) for lg in self.loggers: @@ -77,3 +77,4 @@ def force_log(self) -> None: continue lg.log(summary) lg.update_ts(time.time()) + return summary diff --git a/vllm_omni/metrics/stats.py b/vllm_omni/metrics/stats.py index c4fde13e65f..e7c605017ae 100644 --- a/vllm_omni/metrics/stats.py +++ b/vllm_omni/metrics/stats.py @@ -435,7 +435,7 @@ def __init__( ) -> None: self.num_stages = int(num_stages) self.enable_debug_events = bool(enable_debug_events) - # Backward compatible alias for legacy callers that still check enable_stats. + # Backward compatible alias for legacy callers that still check enable_stats; prefer enable_debug_events. self.enable_stats = self.enable_debug_events self.stage_total_time_ms: list[float] = [0.0 for _ in range(self.num_stages)] self.stage_total_tokens: list[int] = [0 for _ in range(self.num_stages)]