From d5a92861053b7eceb868731be385833670b96f4b Mon Sep 17 00:00:00 2001 From: vraiti Date: Mon, 18 May 2026 10:23:26 -0400 Subject: [PATCH 1/9] Add Prometheus metrics for multi-stage pipelines Signed-off-by: vraiti --- docs/design/index.md | 4 + docs/design/metrics.md | 198 +++++++++++++++++++ docs/usage/metrics.md | 79 ++++++++ tests/metrics/test_prometheus.py | 152 ++++++++++++++ vllm_omni/core/sched/omni_scheduler_mixin.py | 15 ++ vllm_omni/engine/async_omni_engine.py | 7 +- vllm_omni/engine/messages.py | 4 + vllm_omni/engine/orchestrator.py | 60 +++++- vllm_omni/engine/stage_init_utils.py | 2 +- vllm_omni/engine/stage_pool.py | 4 +- vllm_omni/entrypoints/omni_base.py | 28 ++- vllm_omni/metrics/__init__.py | 3 + vllm_omni/metrics/prometheus.py | 128 ++++++++++++ vllm_omni/patch.py | 25 +++ 14 files changed, 700 insertions(+), 9 deletions(-) create mode 100644 docs/design/metrics.md create mode 100644 docs/usage/metrics.md create mode 100644 tests/metrics/test_prometheus.py create mode 100644 vllm_omni/metrics/prometheus.py diff --git a/docs/design/index.md b/docs/design/index.md index 61aa5048368..8789f2e1b22 100644 --- a/docs/design/index.md +++ b/docs/design/index.md @@ -13,6 +13,10 @@ This section contains design documents and architecture specifications for vLLM- - [Adding Step Execution Support for Diffusion Pipelines](feature/diffusion_step_execution.md) - [Continuous Batching for Step-Wise Diffusion](feature/diffusion_continuous_batching.md) +## Infrastructure Design Documents + +- [Prometheus Metrics](metrics.md) + ## Module Design Documents - [AR Module](module/ar_module.md) diff --git a/docs/design/metrics.md b/docs/design/metrics.md new file mode 100644 index 00000000000..71ceb5c38ae --- /dev/null +++ b/docs/design/metrics.md @@ -0,0 +1,198 @@ +# Prometheus Metrics Design + +This document describes how vLLM-Omni exposes Prometheus metrics for +multi-stage pipelines, the constraints that shaped the design, and how +the pipeline-level metrics coexist with upstream vLLM per-engine +metrics. + +## Objectives + +- Expose pipeline-level request and latency metrics that span the full + multi-stage execution (orchestrator scope). +- Preserve all upstream vLLM per-engine metrics (`vllm:*`) for stages + backed by an AR LLM engine. +- Expose per-stage diffusion timing breakdowns for pipelines that + include a diffusion engine. +- Keep the metrics collection overhead low enough that it does not + regress TTFA or throughput. + +## Background + +### Upstream vLLM Metrics + +Upstream vLLM defines 44 Prometheus metrics under the `vllm:` prefix. +These are registered by `PrometheusStatLogger` and cover engine-level +state: KV cache usage, running/waiting request counts, token +throughput, TTFT, inter-token latency, e2e latency, and so on. They +are served via the `/metrics` HTTP endpoint provided by +`prometheus_fastapi_instrumentator` and the default +`prometheus_client` WSGI handler. + +vLLM's `unregister_vllm_metrics()` function strips every +`prometheus_client` collector whose `_name` attribute contains the +substring `"vllm"`. This runs during engine initialization to clean up +stale collectors from prior instantiations within the same process. + +### The Problem + +vLLM-Omni runs multiple engine instances (stages) within a single +process, coordinated by an Orchestrator. The pipeline needs its own +metrics — aggregate request counts, end-to-end latency across all +stages, and diffusion timing breakdowns — that do not exist in upstream +vLLM. All pipeline-level metrics use the `vllm_omni:` prefix to +distinguish them from upstream per-engine metrics. The +`unregister_vllm_metrics()` function is monkey-patched to a no-op at +import time (see `vllm_omni/patch.py`) so that these metrics are not +destroyed during engine initialization (this is a temporary fix until +vLLM patches this behavior). + +Upstream per-engine metrics retain the `vllm:` prefix and are +registered by a `PrometheusStatLogger` instance that the Orchestrator +creates and feeds directly. + +## Architecture + +### Component Overview + +``` + +-----------------------+ + | API Server (FastAPI)| + | GET /metrics | + +----------+------------+ + | + prometheus_client default handler + | + +-------------+-------------+ + | | + vllm_omni:* collectors vllm:* collectors + | | + +-----------+-----------+ +--------+---------+ + | OmniPrometheusMetrics | | PrometheusStatLogger | + +-----------+-----------+ +--------+---------+ + | | + OmniBase Orchestrator + (request lifecycle, (feeds SchedulerStats + diffusion timing) + IterationStats + per engine step) +``` + +### Data Flow + +There are two independent paths for metric collection. + +**Path 1: Pipeline-level metrics (`vllm_omni:*`)** + +`OmniPrometheusMetrics` registers Gauge, Counter, and Histogram +collectors at init time. It is instantiated once per entrypoint, +labeled with the model name. The entrypoint calls its methods as +requests progress: + +- `set_running(n)` / `set_waiting(n)` — updated after each request + completes. The running count comes from `OmniRequestCounter`, a + simple counter incremented/decremented by the Orchestrator as it + tracks requests. Waiting is derived as `total - running`. + +- `request_succeeded(e2e_seconds, queue_seconds)` — recorded when a + request finishes at the final stage. + +- `request_failed()` — recorded when a request errors. + +- `observe_diffusion_metrics(stage_id, metrics)` — recorded when a + diffusion stage finishes. The metrics dict contains timing + breakdowns (preprocess, exec, postprocess, total step time) + accumulated from engine output. + +**Path 2: Per-engine metrics (`vllm:*`)** + +The Orchestrator instantiates upstream vLLM's `PrometheusStatLogger` +and feeds it scheduler stats and iteration stats after processing +each batch of engine outputs. This populates the standard vLLM +metrics (TTFT, token throughput, cache usage, etc.) using the same +code path as standalone vLLM. For diffusion-only pipelines that have +no AR engine, `SchedulerStats` is never produced and `vllm:*` metrics +are absent. + +### Shared State Between Threads + +The Orchestrator runs in a background thread. The API server +(OmniBase) runs in the asyncio event loop thread. +`OmniRequestCounter` bridges them — a plain Python object with an +`int` field. The Orchestrator increments/decrements it; the +entrypoint reads it for gauge updates. No lock is needed because the +counter is advisory (a stale read by one Prometheus scrape interval +is acceptable). It is created by `AsyncOmniEngine.__init__()` and +passed to the Orchestrator at construction time. + +### Metric Registration and Lifecycle + +All `vllm_omni:*` collectors are registered once when +`OmniPrometheusMetrics.__init__()` runs. Per-stage labels +(`model_name`, `engine`) are bound lazily on first observation to +avoid registering labels for stages that never produce data (e.g., a +diffusion pipeline has no AR stage stats). + +The `prometheus_client` default registry holds all collectors. +FastAPI's `/metrics` endpoint serves the default registry, so both +`vllm_omni:*` and `vllm:*` metrics appear in the same scrape +response alongside `http_*` and `process_*` metrics from the +instrumentator and the Python client runtime. + +## Throttling: `make_stats()` Override + +Upstream vLLM's `Scheduler.make_stats()` runs on every AR generation step, +returning a SchedulerStats object for the orchestrator. +Under vLLM's architecture, this is fine. But since vLLM-Omni requires that the +object be serialized and transferred over ZMQ, receiving a SchedulerStats object on +every step can introduce unacceptable overhead to the system. + +`OmniSchedulerMixin.make_stats()` (in +`vllm_omni/core/sched/omni_scheduler_mixin.py`) throttles stats +emission to at most once per second. Between intervals it returns +`None`, which the engine core skips serializing. This keeps gauges +fresh enough for Prometheus scrapes (typically 15-30s intervals) while +eliminating the per-step overhead. + +## Metric Definitions + +### Pipeline-Level + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `vllm_omni:num_requests_running` | Gauge | `model_name` | Requests currently executing across all stages | +| `vllm_omni:num_requests_waiting` | Gauge | `model_name` | Requests queued but not yet scheduled | +| `vllm_omni:num_requests_success` | Counter | `model_name` | Requests completed without error | +| `vllm_omni:num_requests_fail` | Counter | `model_name` | Requests that returned an error | +| `vllm_omni:e2e_request_latency_seconds` | Histogram | `model_name` | End-to-end request latency across all stages | +| `vllm_omni:request_queue_time_seconds` | Histogram | `model_name` | Time spent waiting in the request queue | + +### Diffusion Stage-Level + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `vllm_omni:diffusion_preprocess_time_ms` | Histogram | `model_name`, `engine` | Diffusion input preprocessing time | +| `vllm_omni:diffusion_exec_time_ms` | Histogram | `model_name`, `engine` | Diffusion model forward pass time | +| `vllm_omni:diffusion_postprocess_time_ms` | Histogram | `model_name`, `engine` | Diffusion output postprocessing time | +| `vllm_omni:diffusion_step_time_ms` | Histogram | `model_name`, `engine` | Total diffusion step time | + +### LLM Stage-Level + +Reference [vLLM docs](https://github.com/vllm-project/vllm/blob/main/docs/usage/metrics.md) + +Note that metrics that depend upon features that are not supported in vLLM-Omni (e.g. speculative decoding, LoRA) will not be available as well. + +## Logging vs. Prometheus + +`OrchestratorAggregator` (in `vllm_omni/metrics/stats.py`) is the +logging-oriented metrics path. It collects detailed per-request, +per-stage, and per-transfer statistics and prints formatted tables to +the `INFO` log. This is designed for development and debugging — +individual request traces, transfer bandwidth, inter-stage timing. + +`OmniPrometheusMetrics` is the Prometheus-oriented path. It records +aggregate counters, gauges, and histograms suitable for time-series +monitoring and alerting. The two paths are independent; both can run +simultaneously. + +The separation follows upstream vLLM's pattern of `LoggingStatLogger` +vs. `PrometheusStatLogger` — same underlying data, different +consumption models. diff --git a/docs/usage/metrics.md b/docs/usage/metrics.md new file mode 100644 index 00000000000..0d8e023a6af --- /dev/null +++ b/docs/usage/metrics.md @@ -0,0 +1,79 @@ +# Production Metrics + +vLLM-Omni exposes Prometheus metrics via the `/metrics` endpoint on the +OpenAI-compatible API server. The metrics fall into three categories depending +on the pipeline type. + +```bash +vllm-omni serve Qwen/Qwen3-Omni-30B-A3B-Instruct --port 8000 +curl http://localhost:8000/metrics +``` + +## Metric Namespaces + +| Prefix | Source | Present when | +|--------|--------|--------------| +| `vllm_omni:` | vLLM-Omni orchestrato / diffusion stages | Always / Pipeline includes a diffusion stage | +| `vllm:` | Upstream vLLM engine | Pipeline includes an LLM (AR) stage | +| `http_` / `process_` | Uvicorn / Python runtime | Always | + +## Pipeline-Level Metrics (`vllm_omni:`) + +These metrics are defined in `vllm_omni/metrics/prometheus.py` and track +request lifecycle across the full multi-stage pipeline. + +### Request Tracking + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `vllm_omni:num_requests_running` | Gauge | `model_name` | Requests currently running across all pipeline stages | +| `vllm_omni:num_requests_waiting` | Gauge | `model_name` | Requests waiting to be scheduled | +| `vllm_omni:num_requests_success` | Counter | `model_name` | Requests that completed without error | +| `vllm_omni:num_requests_fail` | Counter | `model_name` | Requests that returned an error | + +### Latency + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `vllm_omni:e2e_request_latency_seconds` | Histogram | `model_name` | End-to-end request latency in seconds | +| `vllm_omni:request_queue_time_seconds` | Histogram | `model_name` | Time spent waiting in the request queue | + +## Diffusion Engine Metrics (`vllm_omni:`) + +These histograms are populated only when the pipeline includes a diffusion +stage (e.g. image or video generation models). + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `vllm_omni:diffusion_preprocess_time_ms` | Histogram | `model_name`, `engine` | Input preprocessing time per request | +| `vllm_omni:diffusion_exec_time_ms` | Histogram | `model_name`, `engine` | DiT forward pass execution time per request | +| `vllm_omni:diffusion_postprocess_time_ms` | Histogram | `model_name`, `engine` | Output postprocessing time (VAE decode) per request | +| `vllm_omni:diffusion_step_time_ms` | Histogram | `model_name`, `engine` | Total diffusion step time per request | + +## vLLM Engine Metrics (`vllm:`) + +When the pipeline includes an LLM stage, the upstream vLLM engine exposes its +full set of metrics under the `vllm:` prefix. These are registered by +`vllm.v1.metrics.loggers.PrometheusStatLogger` and cover scheduler state, +token throughput, cache utilization, and request latencies. + +For a full overview of vLLM metrics, consult [the vLLM docs](https://github.com/vllm-project/vllm/blob/main/docs/usage/metrics.md) + +## Metric Availability by Pipeline Type + +| Metric group | Multi-stage LLM (Qwen3-Omni) | Diffusion-only (Z-Image-Turbo) | +|---|---|---| +| `vllm_omni:` request tracking | Yes | Yes | +| `vllm_omni:` latency | Yes | Yes | +| `vllm_omni:` KV cache | Yes | No | +| `vllm_omni:` diffusion timing | Only if pipeline has a diffusion stage | Yes | +| `vllm:` engine metrics | Yes | No | +| `vllm:` MFU metrics | With `--enable-mfu-metrics` | No | + +## Naming Convention + +vLLM-Omni pipeline metrics use the `vllm_omni:` prefix to distinguish +them from upstream per-engine `vllm:` metrics. The upstream +`unregister_vllm_metrics()` function is monkey-patched to a no-op (see +`vllm_omni/patch.py`) so that these metrics are not destroyed during +engine initialization. diff --git a/tests/metrics/test_prometheus.py b/tests/metrics/test_prometheus.py new file mode 100644 index 00000000000..b5302b80715 --- /dev/null +++ b/tests/metrics/test_prometheus.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +import re + +import pytest +from prometheus_client import REGISTRY, CollectorRegistry, generate_latest + +from vllm_omni.metrics import OmniPrometheusMetrics + +pytestmark = [pytest.mark.core_model, pytest.mark.cpu] + +_MODEL = "test-model" + +_PIPELINE_METRICS = [ + "vllm_omni:num_requests_running", + "vllm_omni:num_requests_waiting", + "vllm_omni:num_requests_success", + "vllm_omni:num_requests_fail", + "vllm_omni:e2e_request_latency_seconds", + "vllm_omni:request_queue_time_seconds", +] + +_DIFFUSION_METRICS = [ + "vllm_omni:diffusion_preprocess_time_ms", + "vllm_omni:diffusion_exec_time_ms", + "vllm_omni:diffusion_postprocess_time_ms", + "vllm_omni:diffusion_step_time_ms", +] + + +@pytest.fixture(scope="module") +def registry() -> CollectorRegistry: + return REGISTRY + + +@pytest.fixture(scope="module") +def prom() -> OmniPrometheusMetrics: + return OmniPrometheusMetrics(model_name=_MODEL) + + +@pytest.fixture(scope="module") +def scrape_output(prom: OmniPrometheusMetrics, registry: CollectorRegistry) -> str: + prom.request_succeeded(e2e_seconds=1.5, queue_seconds=0.3) + prom.request_succeeded(e2e_seconds=2.0, queue_seconds=0.5) + prom.request_failed() + prom.set_running(5) + prom.set_waiting(2) + prom.observe_diffusion_metrics( + engine_idx=2, + stage_id=1, + replica_id=0, + metrics={ + "preprocess_time_ms": 10.0, + "diffusion_engine_exec_time_ms": 200.0, + "postprocess_time_ms": 15.0, + "diffusion_engine_total_time_ms": 225.0, + }, + ) + return generate_latest(registry).decode() + + +def _sample_value(output: str, metric_line: str) -> float | None: + for line in output.splitlines(): + if line.startswith(metric_line): + return float(line.split()[-1]) + return None + + +class TestMetricObservation: + def test_all_metric_families_present(self, scrape_output: str) -> None: + for name in _PIPELINE_METRICS + _DIFFUSION_METRICS: + assert f"# HELP {name}" in scrape_output, f"missing metric family: {name}" + + def test_counter_values(self, scrape_output: str) -> None: + success = _sample_value( + scrape_output, + f'vllm_omni:num_requests_success_total{{model_name="{_MODEL}"}}', + ) + assert success == 2.0 + + fail = _sample_value( + scrape_output, + f'vllm_omni:num_requests_fail_total{{model_name="{_MODEL}"}}', + ) + assert fail == 1.0 + + def test_gauge_values(self, scrape_output: str) -> None: + running = _sample_value( + scrape_output, + f'vllm_omni:num_requests_running{{model_name="{_MODEL}"}}', + ) + assert running == 5.0 + + waiting = _sample_value( + scrape_output, + f'vllm_omni:num_requests_waiting{{model_name="{_MODEL}"}}', + ) + assert waiting == 2.0 + + def test_histogram_counts(self, scrape_output: str) -> None: + e2e_count = _sample_value( + scrape_output, + f'vllm_omni:e2e_request_latency_seconds_count{{model_name="{_MODEL}"}}', + ) + assert e2e_count == 2.0 + + queue_count = _sample_value( + scrape_output, + f'vllm_omni:request_queue_time_seconds_count{{model_name="{_MODEL}"}}', + ) + assert queue_count == 2.0 + + def test_diffusion_histogram_counts(self, scrape_output: str) -> None: + for name in _DIFFUSION_METRICS: + count = _sample_value( + scrape_output, + f'{name}_count{{engine="2",model_name="{_MODEL}",replica_id="0",stage_id="1"}}', + ) + assert count == 1.0, f"{name}_count expected 1.0, got {count}" + + +class TestLabelCorrectness: + def test_pipeline_metrics_carry_model_name(self, scrape_output: str) -> None: + for name in _PIPELINE_METRICS: + pattern = rf'^{re.escape(name)}.*model_name="{re.escape(_MODEL)}"' + assert re.search(pattern, scrape_output, re.MULTILINE), f"{name} missing model_name label" + + def test_diffusion_metrics_carry_engine_label(self, scrape_output: str) -> None: + for name in _DIFFUSION_METRICS: + pattern = rf'^{re.escape(name)}.*engine="2".*model_name="{re.escape(_MODEL)}"' + assert re.search(pattern, scrape_output, re.MULTILINE), f"{name} missing engine label" + + def test_diffusion_metrics_carry_stage_and_replica_labels(self, scrape_output: str) -> None: + for name in _DIFFUSION_METRICS: + pattern = rf'^{re.escape(name)}.*stage_id="1".*replica_id="0"' + assert re.search(pattern, scrape_output, re.MULTILINE), ( + f"{name} missing stage_id/replica_id labels" + ) + + +class TestScrapeOutput: + def test_omni_metrics_in_default_registry(self, scrape_output: str) -> None: + for name in _PIPELINE_METRICS + _DIFFUSION_METRICS: + assert name in scrape_output + + def test_process_metrics_in_default_registry(self, scrape_output: str) -> None: + # vllm:* metrics require a full PrometheusStatLogger with VllmConfig + # and are registered by the Orchestrator at server startup. Verifying + # their presence is covered by integration tests. Here we confirm the + # default registry is being scraped by checking for process_* metrics + # from the Python prometheus_client runtime. + assert "process_" in scrape_output diff --git a/vllm_omni/core/sched/omni_scheduler_mixin.py b/vllm_omni/core/sched/omni_scheduler_mixin.py index 570fa554545..4ee49961f31 100644 --- a/vllm_omni/core/sched/omni_scheduler_mixin.py +++ b/vllm_omni/core/sched/omni_scheduler_mixin.py @@ -1,11 +1,13 @@ from __future__ import annotations import os +import time from typing import Any from vllm.logger import init_logger from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.engine import EngineCoreEventType +from vllm.v1.metrics.stats import SchedulerStats from vllm.v1.request import Request, RequestStatus, StreamingUpdate from vllm_omni.core.sched.output import OmniChunkRecvHandle, OmniSchedulerOutput @@ -32,6 +34,8 @@ ) DEFAULT_INPUT_WAIT_TIMEOUT_S = 300.0 +_STATS_INTERVAL_S = 1.0 + class OmniSchedulerMixin: """Shared scheduler helpers for omni-specific request handling.""" @@ -171,3 +175,14 @@ def _replace_session_with_streaming_update( if self.log_stats: session.record_event(EngineCoreEventType.QUEUED) + + def make_stats(self, *args, **kwargs) -> SchedulerStats | None: + now = time.monotonic() + if now - getattr(self, "_last_stats_time", 0.0) < _STATS_INTERVAL_S: + return None + self._last_stats_time = now + return SchedulerStats( + kv_cache_usage=self.kv_cache_manager.usage, + num_running_reqs=len(self.running), + num_waiting_reqs=len(self.waiting), + ) diff --git a/vllm_omni/engine/async_omni_engine.py b/vllm_omni/engine/async_omni_engine.py index 4986eae63c9..ed1ab90532e 100644 --- a/vllm_omni/engine/async_omni_engine.py +++ b/vllm_omni/engine/async_omni_engine.py @@ -110,6 +110,7 @@ load_and_resolve_stage_configs, ) from vllm_omni.inputs.data import OmniSamplingParams +from vllm_omni.metrics.prometheus import OmniRequestCounter from vllm_omni.platforms import current_omni_platform if TYPE_CHECKING: @@ -357,6 +358,7 @@ def __init__( self._stage_init_executor: concurrent.futures.ThreadPoolExecutor | None = None self._weak_finalizer: weakref.finalize | None = None self._rpc_lock = threading.Lock() + self._running_counter = OmniRequestCounter() logger.info(f"[AsyncOmniEngine] Launching Orchestrator thread with {self.num_stages} stages") @@ -905,7 +907,7 @@ def _initialize_llm_replica( launch_omni_core_engines( vllm_config=vllm_config, executor_class=executor_class, - log_stats=False, + log_stats=True, omni_master_server=self._omni_master_server, stage_id=plan.metadata.stage_id, stage_config=stage_cfg, @@ -917,7 +919,7 @@ def _initialize_llm_replica( addresses, proc, handshake_address = spawn_stage_core( vllm_config=vllm_config, executor_class=executor_class, - log_stats=False, + log_stats=True, ) logger.info( "[AsyncOmniEngine] Stage %s engine launch started", @@ -1377,6 +1379,7 @@ async def _run_orchestrator() -> None: coordinator_pub_address=coordinator_pub_address, load_balancer_factory=load_balancer_factory, remote_replica_factory=remote_replica_factory, + running_counter=self._running_counter, ) if not startup_future.done(): startup_future.set_result(asyncio.get_running_loop()) diff --git a/vllm_omni/engine/messages.py b/vllm_omni/engine/messages.py index 0e55105f13f..389f7f93a0c 100644 --- a/vllm_omni/engine/messages.py +++ b/vllm_omni/engine/messages.py @@ -80,6 +80,8 @@ class OutputMessage(EngineQueueMessage, kw_only=True): type: Literal["output"] = "output" request_id: str stage_id: int + replica_id: int = 0 + engine_idx: int = 0 engine_outputs: OmniRequestOutput metrics: StageRequestMetrics | None = None finished: bool @@ -90,6 +92,8 @@ class StageMetricsMessage(EngineQueueMessage, kw_only=True): type: Literal["stage_metrics"] = "stage_metrics" request_id: str stage_id: int + replica_id: int = 0 + engine_idx: int = 0 metrics: StageRequestMetrics stage_submit_ts: float | None = None diff --git a/vllm_omni/engine/orchestrator.py b/vllm_omni/engine/orchestrator.py index 48026ababd7..151114a88a8 100644 --- a/vllm_omni/engine/orchestrator.py +++ b/vllm_omni/engine/orchestrator.py @@ -29,6 +29,8 @@ from vllm.sampling_params import SamplingParams from vllm.v1.engine import EngineCoreOutputs from vllm.v1.engine.exceptions import EngineDeadError +from vllm.v1.metrics.loggers import PrometheusStatLogger +from vllm.v1.metrics.stats import IterationStats from vllm_omni.distributed.omni_coordinator import ( LoadBalancer, @@ -54,6 +56,7 @@ ) from vllm_omni.engine.serialization import serialize_additional_information from vllm_omni.engine.stage_pool import StagePool +from vllm_omni.metrics.prometheus import OmniRequestCounter from vllm_omni.outputs import OmniRequestOutput # Factory signature for building a head-side stage client for a @@ -167,6 +170,7 @@ def __init__( coordinator_pub_address: str | None = None, load_balancer_factory: Callable[[], LoadBalancer] | None = None, remote_replica_factory: RemoteReplicaFactory | None = None, + running_counter: OmniRequestCounter | None = None, ) -> None: self.request_async_queue = request_async_queue self.output_async_queue = output_async_queue @@ -176,6 +180,14 @@ def __init__( self.num_stages = len(stage_pools) self.stage_pools: list[StagePool] = stage_pools + # Build a flat engine index for each (stage_id, replica_id) pair. + self._engine_idx: dict[tuple[int, int], int] = {} + flat = 0 + for sid, pool in enumerate(stage_pools): + for rid in range(pool.num_replicas): + self._engine_idx[(sid, rid)] = flat + flat += 1 + # PD disaggregation state self._pd_pair: tuple[int, int] | None = None self._pd_bootstrap_addr: str | None = None @@ -186,6 +198,23 @@ def __init__( self._pd_bootstrap_addr = pd_config.get("bootstrap_addr") self._pd_prefill_engine_id = pd_config.get("prefill_engine_id") self.request_states: dict[str, OrchestratorRequestState] = {} + self._running_counter = running_counter + + engine_indexes = list(range(flat)) + vllm_config_for_stats = next( + (p.stage_vllm_config for p in stage_pools if p.stage_vllm_config is not None), + None, + ) + if vllm_config_for_stats is not None: + self._stat_logger: PrometheusStatLogger | None = PrometheusStatLogger( + vllm_config=vllm_config_for_stats, + engine_indexes=engine_indexes, + ) + else: + self._stat_logger = None + self._last_stats_ts: float = 0.0 + self._stats_interval_s: float = 1.0 + self._cfg_tracker = CfgCompanionTracker() self._shutdown_event = asyncio.Event() @@ -393,6 +422,8 @@ async def _handle_add_request(self, msg: StageSubmissionMessage) -> None: mm_features=getattr(prompt, "mm_features", None), ) self.request_states[request_id] = req_state + if self._running_counter is not None: + self._running_counter.increment() req_state.streaming.enabled = bool(getattr(prompt, "resumable", False)) req_state.stage_submit_ts[stage_id] = _time.time() enqueue_ts = msg.enqueue_ts @@ -600,7 +631,23 @@ async def _orchestration_loop(self) -> None: "new_prompt_len_snapshot", None, ) - raw_output = await pool.process_llm_raw_outputs(replica_id, raw_outputs) + now = _time.monotonic() + record_stats = ( + self._stat_logger is not None and now - self._last_stats_ts >= self._stats_interval_s + ) + iteration_stats = IterationStats() if record_stats else None + raw_output = await pool.process_llm_raw_outputs( + replica_id, + raw_outputs, + iteration_stats=iteration_stats, + ) + if record_stats: + self._last_stats_ts = now + self._stat_logger.record( + raw_outputs.scheduler_stats, + iteration_stats, + engine_idx=self._engine_idx[(stage_id, replica_id)], + ) except asyncio.CancelledError: raise except EngineDeadError as e: @@ -678,7 +725,7 @@ async def _handle_processed_outputs(self, stage_id: int, replica_id: int, output ) stage_metrics.pipeline_timings = dict(req_state.pipeline_timings) - await self._route_output(stage_id, output, req_state, stage_metrics) + await self._route_output(stage_id, replica_id, output, req_state, stage_metrics) async def _handle_stage_error(self, stage_id: int, output: Any) -> None: """Emit a frontend-visible error and clean up request state.""" @@ -710,7 +757,8 @@ async def _cleanup_request_ids(self, request_ids: list[str], *, abort: bool = Fa self._release_request_bindings(request_ids) for request_id in request_ids: self._pd_kv_params.pop(request_id, None) - self.request_states.pop(request_id, None) + if self.request_states.pop(request_id, None) is not None and self._running_counter is not None: + self._running_counter.decrement() def _maybe_clone_diffusion_params_for_cfg(self, request_id: str, params: Any) -> Any: """Attach CFG companion ids to diffusion sampling params when needed.""" @@ -732,6 +780,7 @@ def _maybe_clone_diffusion_params_for_cfg(self, request_id: str, params: Any) -> async def _route_output( self, stage_id: int, + replica_id: int, output: Any, req_state: OrchestratorRequestState, stage_metrics: Any, @@ -749,11 +798,14 @@ async def _route_output( await self._cleanup_request_ids([req_id]) return + engine_idx = self._engine_idx[(stage_id, replica_id)] if self.stage_pools[stage_id].final_output: await self.output_async_queue.put( OutputMessage( request_id=req_id, stage_id=stage_id, + replica_id=replica_id, + engine_idx=engine_idx, engine_outputs=output, metrics=stage_metrics, finished=finished and stage_id == req_state.final_stage_id, @@ -765,6 +817,8 @@ async def _route_output( StageMetricsMessage( request_id=req_id, stage_id=stage_id, + replica_id=replica_id, + engine_idx=engine_idx, metrics=stage_metrics, stage_submit_ts=submit_ts, ) diff --git a/vllm_omni/engine/stage_init_utils.py b/vllm_omni/engine/stage_init_utils.py index e650373a942..a227c5a5955 100644 --- a/vllm_omni/engine/stage_init_utils.py +++ b/vllm_omni/engine/stage_init_utils.py @@ -839,7 +839,7 @@ def build_llm_stage_output_processor(plan: LogicalStageInitPlan, stage_vllm_conf ) return MultimodalOutputProcessor( tokenizer=tokenizer, - log_stats=False, + log_stats=True, engine_core_output_type=metadata.engine_output_type, ) diff --git a/vllm_omni/engine/stage_pool.py b/vllm_omni/engine/stage_pool.py index effe6470536..a2155677ffb 100644 --- a/vllm_omni/engine/stage_pool.py +++ b/vllm_omni/engine/stage_pool.py @@ -9,6 +9,7 @@ from vllm.logger import init_logger from vllm.v1.engine import EngineCoreOutputs +from vllm.v1.metrics.stats import IterationStats from vllm_omni.distributed.omni_coordinator import ( LoadBalancer, @@ -641,6 +642,7 @@ async def process_llm_raw_outputs( self, replica_id: int, raw_outputs: EngineCoreOutputs, + iteration_stats: IterationStats | None = None, ) -> list[Any]: """Run the shared LLM output processor on one raw poll result.""" raw_client = self.clients[replica_id] @@ -651,7 +653,7 @@ async def process_llm_raw_outputs( processed = processor.process_outputs( raw_outputs.outputs, raw_outputs.timestamp, - None, + iteration_stats, ) if processed.reqs_to_abort: diff --git a/vllm_omni/entrypoints/omni_base.py b/vllm_omni/entrypoints/omni_base.py index de4ef327cbd..bfd4debe640 100644 --- a/vllm_omni/entrypoints/omni_base.py +++ b/vllm_omni/entrypoints/omni_base.py @@ -23,6 +23,7 @@ from vllm_omni.entrypoints.client_request_state import ClientRequestState from vllm_omni.entrypoints.pd_utils import PDDisaggregationMixin from vllm_omni.entrypoints.utils import coerce_param_message_types, get_final_stage_id_for_e2e +from vllm_omni.metrics.prometheus import OmniPrometheusMetrics from vllm_omni.metrics.stats import OrchestratorAggregator as OrchestratorMetrics from vllm_omni.model_executor.model_loader.weight_utils import download_weights_from_hf_specific from vllm_omni.outputs import OmniRequestOutput @@ -190,6 +191,7 @@ def __init__( self.async_chunk = bool(getattr(self.engine, "async_chunk", False)) self.request_states: dict[str, ClientRequestState] = {} + self.prom_metrics = OmniPrometheusMetrics(model_name=model) self.default_sampling_params_list = self.engine.default_sampling_params_list if not self.output_modalities: @@ -278,9 +280,9 @@ def _log_summary_and_cleanup(self, request_id: str) -> None: if req_state is None or req_state.metrics is None: return if self.log_stats: - # Emit per-request orchestrator timing (including e2e_total_ms) - # before dropping request state. req_state.metrics.build_and_log_summary() + if str(request_id) not in req_state.metrics.e2e_done: + self.prom_metrics.request_failed() except Exception: logger.exception( "[%s] Failed to build/log summary for req=%s", @@ -348,6 +350,8 @@ def _handle_output_message( return True, None, None, None req_state.stage_id = stage_id + req_state.replica_id = msg.get("replica_id", 0) + req_state.engine_idx = msg.get("engine_idx", stage_id) return False, req_id, stage_id, req_state @@ -443,9 +447,29 @@ def _process_single_result( req_id, req_start_ts.get(req_id, wall_start_ts), ) + e2e_seconds = now - req_start_ts.get(req_id, wall_start_ts) + _fin_m = result.get("metrics") + _pt = getattr(_fin_m, "pipeline_timings", None) or {} + queue_ms = _pt.get("queue_wait_ms") + queue_seconds = queue_ms / 1000.0 if queue_ms is not None else None + self.prom_metrics.request_succeeded(e2e_seconds, queue_seconds=queue_seconds) except Exception: logger.exception("[%s] Finalize request handling error", self.__class__.__name__) + running = self.engine._running_counter.value + total = len(self.request_states) + self.prom_metrics.set_running(running) + self.prom_metrics.set_waiting(max(0, total - running)) + + diffusion_metrics = getattr(engine_outputs, "metrics", None) + if finished and isinstance(diffusion_metrics, dict) and diffusion_metrics: + self.prom_metrics.observe_diffusion_metrics( + engine_idx=result.engine_idx, + stage_id=stage_id, + replica_id=result.replica_id, + metrics=diffusion_metrics, + ) + output_type = getattr(engine_outputs, "final_output_type", stage_meta.final_output_type) images = getattr(engine_outputs, "images", []) if output_type == "image" else [] return OmniRequestOutput( diff --git a/vllm_omni/metrics/__init__.py b/vllm_omni/metrics/__init__.py index deceb23333a..6814a589181 100644 --- a/vllm_omni/metrics/__init__.py +++ b/vllm_omni/metrics/__init__.py @@ -1,7 +1,10 @@ +from .prometheus import OmniPrometheusMetrics, OmniRequestCounter from .stats import OrchestratorAggregator, StageRequestStats, StageStats from .utils import count_tokens_from_outputs __all__ = [ + "OmniPrometheusMetrics", + "OmniRequestCounter", "OrchestratorAggregator", "StageStats", "StageRequestStats", diff --git a/vllm_omni/metrics/prometheus.py b/vllm_omni/metrics/prometheus.py new file mode 100644 index 00000000000..2b3ec566ae3 --- /dev/null +++ b/vllm_omni/metrics/prometheus.py @@ -0,0 +1,128 @@ +from prometheus_client import Counter, Gauge, Histogram + +_labelnames = ["model_name"] +_diffusion_labelnames = ["model_name", "engine", "stage_id", "replica_id"] + +_DIFFUSION_METRIC_DEFS: dict[str, tuple[str, str]] = { + "preprocess_time_ms": ( + "vllm_omni:diffusion_preprocess_time_ms", + "Diffusion preprocess time per request in milliseconds.", + ), + "diffusion_engine_exec_time_ms": ( + "vllm_omni:diffusion_exec_time_ms", + "Diffusion model execution time per request in milliseconds.", + ), + "postprocess_time_ms": ( + "vllm_omni:diffusion_postprocess_time_ms", + "Diffusion postprocess time per request in milliseconds.", + ), + "diffusion_engine_total_time_ms": ( + "vllm_omni:diffusion_step_time_ms", + "Total diffusion step time per request in milliseconds.", + ), +} + +_running_family = Gauge( + "vllm_omni:num_requests_running", + "Number of requests currently running across all pipeline stages.", + labelnames=_labelnames, +) +_waiting_family = Gauge( + "vllm_omni:num_requests_waiting", + "Number of requests waiting to be scheduled.", + labelnames=_labelnames, +) +_success_family = Counter( + "vllm_omni:num_requests_success", + "Number of requests that completed without error.", + labelnames=_labelnames, +) +_fail_family = Counter( + "vllm_omni:num_requests_fail", + "Number of requests that returned an error.", + labelnames=_labelnames, +) +_e2e_latency_family = Histogram( + "vllm_omni:e2e_request_latency_seconds", + "Histogram of end-to-end request latency in seconds.", + labelnames=_labelnames, +) +_queue_time_family = Histogram( + "vllm_omni:request_queue_time_seconds", + "Histogram of request queue wait time in seconds.", + labelnames=_labelnames, +) +_diffusion_families: dict[str, Histogram] = { + key: Histogram(metric_name, desc, labelnames=_diffusion_labelnames) + for key, (metric_name, desc) in _DIFFUSION_METRIC_DEFS.items() +} + + +class OmniPrometheusMetrics: + """Label-bound wrapper around the raw Prometheus metrics. + + Metric collectors use the ``vllm_omni:`` prefix to avoid being + removed by upstream vLLM's ``unregister_vllm_metrics()``, which + strips every collector whose ``_name`` contains ``"vllm"``. + """ + + def __init__(self, model_name: str) -> None: + self._model_name = model_name + self._running = _running_family.labels(model_name=model_name) + self._waiting = _waiting_family.labels(model_name=model_name) + self._success = _success_family.labels(model_name=model_name) + self._fail = _fail_family.labels(model_name=model_name) + self._e2e_latency = _e2e_latency_family.labels(model_name=model_name) + self._queue_time = _queue_time_family.labels(model_name=model_name) + self._diffusion_by_replica: dict[tuple[str, int], Histogram] = {} + + def set_running(self, n: int) -> None: + self._running.set(n) + + def set_waiting(self, n: int) -> None: + self._waiting.set(n) + + def request_succeeded(self, e2e_seconds: float, queue_seconds: float | None = None) -> None: + self._success.inc() + self._e2e_latency.observe(e2e_seconds) + if queue_seconds is not None: + self._queue_time.observe(queue_seconds) + + def request_failed(self) -> None: + self._fail.inc() + + def observe_diffusion_metrics( + self, + engine_idx: int, + stage_id: int, + replica_id: int, + metrics: dict[str, float], + ) -> None: + for key, parent in _diffusion_families.items(): + value = metrics.get(key) + if value is None: + continue + cache_key = (key, engine_idx) + bound = self._diffusion_by_replica.get(cache_key) + if bound is None: + bound = parent.labels( + model_name=self._model_name, + engine=str(engine_idx), + stage_id=str(stage_id), + replica_id=str(replica_id), + ) + self._diffusion_by_replica[cache_key] = bound + bound.observe(value) + + +class OmniRequestCounter: + """Running-request counter written by the orchestrator thread, read by the client thread.""" + + def __init__(self) -> None: + self.value = 0 + + def increment(self) -> None: + self.value += 1 + + def decrement(self) -> None: + self.value = max(0, self.value - 1) diff --git a/vllm_omni/patch.py b/vllm_omni/patch.py index b78323c2182..7c41c698651 100644 --- a/vllm_omni/patch.py +++ b/vllm_omni/patch.py @@ -1,6 +1,8 @@ +import logging import sys from functools import cached_property +import vllm.v1.metrics.prometheus as _vllm_prometheus from aenum import extend_enum from vllm.config import ModelConfig as _OriginalModelConfig from vllm.inputs import TokensPrompt as _OriginalTokensPrompt @@ -128,3 +130,26 @@ def _patched_glm_image_text_config_init(self, *args, **kwargs): module.StreamingUpdate = OmniStreamingUpdate if hasattr(module, "EngineCoreRequest") and module.EngineCoreRequest == _OriginalEngineCoreRequest: module.EngineCoreRequest = OmniEngineCoreRequest + +# ============================================================================= +# Patch unregister_vllm_metrics to a no-op +# ============================================================================= +# WHY: unregister_vllm_metrics() uses `"vllm" in collector._name` to strip +# collectors from the Prometheus registry. This destroys any vllm-omni +# metrics that use the vllm: namespace. +# +# REMOVAL: Remove this patch once upstream vLLM adds +# _STAT_LOGGER_METRIC_NAMES to vllm.v1.metrics.prometheus and scopes +# unregister_vllm_metrics() to that set. Track: +# https://github.com/vllm-project/vllm/pull/42331 +_logger = logging.getLogger(__name__) + + +def _noop_unregister_vllm_metrics(): + pass + + +_vllm_prometheus.unregister_vllm_metrics = _noop_unregister_vllm_metrics +_logger.warning( + "Monkey-patched unregister_vllm_metrics() to a no-op. Remove this patch once vLLM adds _STAT_LOGGER_METRIC_NAMES." +) From 00f4d8e43e4c3e2178d98403272cc982c9b963e5 Mon Sep 17 00:00:00 2001 From: vraiti Date: Mon, 18 May 2026 11:51:44 -0400 Subject: [PATCH 2/9] Add E2E test for Prometheus metrics under multi-replica config Co-Authored-By: Claude Opus 4.6 Signed-off-by: vraiti --- .../online_serving/test_qwen3_omni_metrics.py | 136 ++++++++++++++++++ tests/helpers/stage_config.py | 37 +++++ 2 files changed, 173 insertions(+) create mode 100644 tests/e2e/online_serving/test_qwen3_omni_metrics.py diff --git a/tests/e2e/online_serving/test_qwen3_omni_metrics.py b/tests/e2e/online_serving/test_qwen3_omni_metrics.py new file mode 100644 index 00000000000..99552a740b9 --- /dev/null +++ b/tests/e2e/online_serving/test_qwen3_omni_metrics.py @@ -0,0 +1,136 @@ +""" +E2E test: Prometheus metrics under multi-replica Qwen3-Omni configuration. + +Verifies that the /metrics endpoint exposes the expected vllm_omni:* and +vllm:* metric families with correct labels when serving a 3-stage pipeline +with 2 talker replicas on 2 GPUs. +""" + +from __future__ import annotations + +import os +import re + +import pytest +import requests + +from tests.helpers.mark import hardware_test +from tests.helpers.runtime import OmniServerParams +from tests.helpers.stage_config import get_deploy_config_path + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" + +MODEL = "Qwen/Qwen3-Omni-30B-A3B-Instruct" +DEPLOY = get_deploy_config_path("ci/qwen3_omni_moe_metrics_2gpu.yaml") +NUM_ENGINES = 4 # 1 thinker + 2 talker replicas + 1 code2wav +NUM_REQUESTS = 2 + +_PIPELINE_METRICS = [ + "vllm_omni:num_requests_running", + "vllm_omni:num_requests_waiting", + "vllm_omni:num_requests_success", + "vllm_omni:num_requests_fail", + "vllm_omni:e2e_request_latency_seconds", + "vllm_omni:request_queue_time_seconds", +] + +test_params = [ + OmniServerParams( + model=MODEL, + stage_config_path=DEPLOY, + server_args=[], + ) +] + + +def _scrape(omni_server) -> str: + url = f"http://{omni_server.host}:{omni_server.port}/metrics" + resp = requests.get(url, timeout=10) + resp.raise_for_status() + return resp.text + + +def _sample_value(output: str, metric_line: str) -> float | None: + for line in output.splitlines(): + if line.startswith(metric_line): + return float(line.split()[-1]) + return None + + +def _count_data_lines(output: str, metric_name: str) -> int: + count = 0 + for line in output.splitlines(): + if line.startswith(metric_name + "{") or line.startswith(metric_name + " "): + if not line.startswith("# "): + count += 1 + return count + + +def _send_text_requests(omni_server, n: int) -> None: + url = f"http://{omni_server.host}:{omni_server.port}/v1/chat/completions" + payload = { + "model": omni_server.model, + "messages": [{"role": "user", "content": "What is 2+2? Answer in one word."}], + "stream": False, + "modalities": ["text"], + "max_tokens": 32, + } + for _ in range(n): + resp = requests.post(url, json=payload, timeout=120) + resp.raise_for_status() + + +@pytest.fixture(scope="module") +def metrics_before(omni_server) -> str: + return _scrape(omni_server) + + +@pytest.fixture(scope="module") +def metrics_after(omni_server, metrics_before) -> str: + _send_text_requests(omni_server, NUM_REQUESTS) + return _scrape(omni_server) + + +@pytest.mark.full_model +@pytest.mark.omni +@hardware_test(res={"cuda": "H100"}, num_cards=2) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +class TestMetricFamilies: + def test_omni_pipeline_metrics_present(self, metrics_before: str) -> None: + for name in _PIPELINE_METRICS: + assert f"# HELP {name}" in metrics_before, ( + f"missing metric family: {name}" + ) + + def test_omni_pipeline_metrics_carry_model_name(self, metrics_before: str) -> None: + for name in _PIPELINE_METRICS: + pattern = rf'^{re.escape(name)}.*model_name="{re.escape(MODEL)}"' + assert re.search(pattern, metrics_before, re.MULTILINE), ( + f"{name} missing model_name label" + ) + + def test_vllm_per_engine_metrics_present(self, metrics_before: str) -> None: + assert "# HELP vllm:num_requests_running" in metrics_before + + def test_vllm_metrics_have_correct_engine_count(self, metrics_before: str) -> None: + count = _count_data_lines(metrics_before, "vllm:num_requests_running") + assert count == NUM_ENGINES, ( + f"expected {NUM_ENGINES} engine label sets, got {count}" + ) + + +@pytest.mark.full_model +@pytest.mark.omni +@hardware_test(res={"cuda": "H100"}, num_cards=2) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +class TestMetricValues: + def test_success_counter_increments( + self, metrics_before: str, metrics_after: str + ) -> None: + prefix = f'vllm_omni:num_requests_success_total{{model_name="{MODEL}"}}' + before = _sample_value(metrics_before, prefix) or 0.0 + after = _sample_value(metrics_after, prefix) + assert after is not None, "success counter not found after requests" + assert after >= before + NUM_REQUESTS, ( + f"expected success count >= {before + NUM_REQUESTS}, got {after}" + ) diff --git a/tests/helpers/stage_config.py b/tests/helpers/stage_config.py index e34680be079..b36108f64c6 100644 --- a/tests/helpers/stage_config.py +++ b/tests/helpers/stage_config.py @@ -420,6 +420,43 @@ def delete_by_path(config_dict: dict, path: str) -> None: }, ], }, + "qwen3_omni_moe_metrics_2gpu": { + "base_config": "qwen3_omni_moe.yaml", + "async_chunk": True, + "stages": [ + { + "stage_id": 0, + "devices": "0", + "gpu_memory_utilization": 0.9, + "max_num_seqs": 4, + "max_model_len": 4096, + "mm_processor_cache_gb": 0, + "load_format": "dummy", + "default_sampling_params": {"max_tokens": 64, "ignore_eos": False}, + }, + { + "stage_id": 1, + "devices": "1,1", + "num_replicas": 2, + "gpu_memory_utilization": 0.3, + "max_num_seqs": 2, + "max_model_len": 4096, + "load_format": "dummy", + "default_sampling_params": {"max_tokens": 64}, + }, + { + "stage_id": 2, + "devices": "1", + "gpu_memory_utilization": 0.1, + "max_num_seqs": 2, + "max_num_batched_tokens": 65536, + "enforce_eager": True, + "async_scheduling": False, + "load_format": "dummy", + "default_sampling_params": {"max_tokens": 2000}, + }, + ], + }, "bagel_multi_replicas_4gpu": { "base_config": "bagel.yaml", "async_chunk": False, From 9a1ef63d4374c48ccddd722b62f5e07fb59b610e Mon Sep 17 00:00:00 2001 From: vraiti Date: Mon, 18 May 2026 14:01:59 -0400 Subject: [PATCH 3/9] Fix dict-style .get() on OutputMessage dataclass attributes Co-Authored-By: Claude Opus 4.6 Signed-off-by: vraiti --- .../online_serving/test_qwen3_omni_metrics.py | 20 +++++-------------- tests/metrics/test_prometheus.py | 4 +--- vllm_omni/entrypoints/omni_base.py | 6 +++--- 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/tests/e2e/online_serving/test_qwen3_omni_metrics.py b/tests/e2e/online_serving/test_qwen3_omni_metrics.py index 99552a740b9..31cc13ed5ff 100644 --- a/tests/e2e/online_serving/test_qwen3_omni_metrics.py +++ b/tests/e2e/online_serving/test_qwen3_omni_metrics.py @@ -98,25 +98,19 @@ def metrics_after(omni_server, metrics_before) -> str: class TestMetricFamilies: def test_omni_pipeline_metrics_present(self, metrics_before: str) -> None: for name in _PIPELINE_METRICS: - assert f"# HELP {name}" in metrics_before, ( - f"missing metric family: {name}" - ) + assert f"# HELP {name}" in metrics_before, f"missing metric family: {name}" def test_omni_pipeline_metrics_carry_model_name(self, metrics_before: str) -> None: for name in _PIPELINE_METRICS: pattern = rf'^{re.escape(name)}.*model_name="{re.escape(MODEL)}"' - assert re.search(pattern, metrics_before, re.MULTILINE), ( - f"{name} missing model_name label" - ) + assert re.search(pattern, metrics_before, re.MULTILINE), f"{name} missing model_name label" def test_vllm_per_engine_metrics_present(self, metrics_before: str) -> None: assert "# HELP vllm:num_requests_running" in metrics_before def test_vllm_metrics_have_correct_engine_count(self, metrics_before: str) -> None: count = _count_data_lines(metrics_before, "vllm:num_requests_running") - assert count == NUM_ENGINES, ( - f"expected {NUM_ENGINES} engine label sets, got {count}" - ) + assert count == NUM_ENGINES, f"expected {NUM_ENGINES} engine label sets, got {count}" @pytest.mark.full_model @@ -124,13 +118,9 @@ def test_vllm_metrics_have_correct_engine_count(self, metrics_before: str) -> No @hardware_test(res={"cuda": "H100"}, num_cards=2) @pytest.mark.parametrize("omni_server", test_params, indirect=True) class TestMetricValues: - def test_success_counter_increments( - self, metrics_before: str, metrics_after: str - ) -> None: + def test_success_counter_increments(self, metrics_before: str, metrics_after: str) -> None: prefix = f'vllm_omni:num_requests_success_total{{model_name="{MODEL}"}}' before = _sample_value(metrics_before, prefix) or 0.0 after = _sample_value(metrics_after, prefix) assert after is not None, "success counter not found after requests" - assert after >= before + NUM_REQUESTS, ( - f"expected success count >= {before + NUM_REQUESTS}, got {after}" - ) + assert after >= before + NUM_REQUESTS, f"expected success count >= {before + NUM_REQUESTS}, got {after}" diff --git a/tests/metrics/test_prometheus.py b/tests/metrics/test_prometheus.py index b5302b80715..60d6c5bbfbf 100644 --- a/tests/metrics/test_prometheus.py +++ b/tests/metrics/test_prometheus.py @@ -133,9 +133,7 @@ def test_diffusion_metrics_carry_engine_label(self, scrape_output: str) -> None: def test_diffusion_metrics_carry_stage_and_replica_labels(self, scrape_output: str) -> None: for name in _DIFFUSION_METRICS: pattern = rf'^{re.escape(name)}.*stage_id="1".*replica_id="0"' - assert re.search(pattern, scrape_output, re.MULTILINE), ( - f"{name} missing stage_id/replica_id labels" - ) + assert re.search(pattern, scrape_output, re.MULTILINE), f"{name} missing stage_id/replica_id labels" class TestScrapeOutput: diff --git a/vllm_omni/entrypoints/omni_base.py b/vllm_omni/entrypoints/omni_base.py index bfd4debe640..d95b6ecdcdf 100644 --- a/vllm_omni/entrypoints/omni_base.py +++ b/vllm_omni/entrypoints/omni_base.py @@ -350,8 +350,8 @@ def _handle_output_message( return True, None, None, None req_state.stage_id = stage_id - req_state.replica_id = msg.get("replica_id", 0) - req_state.engine_idx = msg.get("engine_idx", stage_id) + req_state.replica_id = msg.replica_id + req_state.engine_idx = msg.engine_idx return False, req_id, stage_id, req_state @@ -448,7 +448,7 @@ def _process_single_result( req_start_ts.get(req_id, wall_start_ts), ) e2e_seconds = now - req_start_ts.get(req_id, wall_start_ts) - _fin_m = result.get("metrics") + _fin_m = result.metrics _pt = getattr(_fin_m, "pipeline_timings", None) or {} queue_ms = _pt.get("queue_wait_ms") queue_seconds = queue_ms / 1000.0 if queue_ms is not None else None From 1dd335ba2a174adf0e28a50e3ec3094c7cf98de9 Mon Sep 17 00:00:00 2001 From: vraiti Date: Tue, 26 May 2026 15:00:02 -0400 Subject: [PATCH 4/9] Fix per-engine stats dropping finished requests due to shared timer Co-Authored-By: Claude Opus 4.6 Signed-off-by: vraiti --- vllm_omni/engine/orchestrator.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/vllm_omni/engine/orchestrator.py b/vllm_omni/engine/orchestrator.py index 151114a88a8..01153a819c0 100644 --- a/vllm_omni/engine/orchestrator.py +++ b/vllm_omni/engine/orchestrator.py @@ -212,8 +212,9 @@ def __init__( ) else: self._stat_logger = None - self._last_stats_ts: float = 0.0 self._stats_interval_s: float = 1.0 + self._per_engine_stats_ts: dict[int, float] = {idx: 0.0 for idx in engine_indexes} + self._pending_iteration_stats: dict[int, IterationStats] = {idx: IterationStats() for idx in engine_indexes} self._cfg_tracker = CfgCompanionTracker() @@ -631,23 +632,27 @@ async def _orchestration_loop(self) -> None: "new_prompt_len_snapshot", None, ) - now = _time.monotonic() - record_stats = ( - self._stat_logger is not None and now - self._last_stats_ts >= self._stats_interval_s - ) - iteration_stats = IterationStats() if record_stats else None + engine_idx = self._engine_idx[(stage_id, replica_id)] + iteration_stats = self._pending_iteration_stats.get(engine_idx) + if iteration_stats is not None: + iteration_stats.iteration_timestamp = _time.time() raw_output = await pool.process_llm_raw_outputs( replica_id, raw_outputs, iteration_stats=iteration_stats, ) - if record_stats: - self._last_stats_ts = now + now = _time.monotonic() + if ( + self._stat_logger is not None + and now - self._per_engine_stats_ts.get(engine_idx, 0.0) >= self._stats_interval_s + ): + self._per_engine_stats_ts[engine_idx] = now self._stat_logger.record( raw_outputs.scheduler_stats, iteration_stats, - engine_idx=self._engine_idx[(stage_id, replica_id)], + engine_idx=engine_idx, ) + self._pending_iteration_stats[engine_idx] = IterationStats() except asyncio.CancelledError: raise except EngineDeadError as e: From 8cb1cd88f6302bc1f404fd7f73bf2a56446de4dd Mon Sep 17 00:00:00 2001 From: vraiti Date: Wed, 27 May 2026 15:10:53 -0400 Subject: [PATCH 5/9] Downgrade per-request metric logs to DEBUG Signed-off-by: vraiti --- vllm_omni/metrics/stats.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/vllm_omni/metrics/stats.py b/vllm_omni/metrics/stats.py index 4245deb5453..bcb0cfca36a 100644 --- a/vllm_omni/metrics/stats.py +++ b/vllm_omni/metrics/stats.py @@ -536,7 +536,7 @@ def build_and_log_summary(self) -> dict[str, Any]: value_fields_e2e = sorted(nonzero_e2e_fields) if value_fields_e2e: - logger.info( + logger.debug( "\n%s", _format_table( f"RequestE2EStats [request_id={rid}]", @@ -577,7 +577,7 @@ def build_and_log_summary(self) -> dict[str, Any]: parts.append(f"transfers=[{','.join(transfer_parts)}]") if "ar2diffusion_ms" in pt: parts.append(f"ar2diffusion={pt['ar2diffusion_ms']:.2f}ms") - logger.info("[OmniTiming] %s", " ".join(parts)) + logger.debug("[OmniTiming] %s", " ".join(parts)) # === Stage table (columns = stage_id) === # if any stage has diffusion_metrics, remove postprocess_time_ms field @@ -619,7 +619,7 @@ def build_and_log_summary(self) -> dict[str, Any]: value_fields_list.append(field) if value_fields_list: - logger.info( + logger.debug( "\n%s", _format_table( f"StageRequestStats [request_id={rid}]", @@ -659,7 +659,7 @@ def build_and_log_summary(self) -> dict[str, Any]: value_fields_list.append(field) if value_fields_list: - logger.info( + logger.debug( "\n%s", _format_table( f"TransferEdgeStats [request_id={rid}]", From 5d13fbc48aa5c2ae373a025f7ecbd40e00c3cac9 Mon Sep 17 00:00:00 2001 From: vraiti Date: Thu, 28 May 2026 10:33:52 -0400 Subject: [PATCH 6/9] Delegate make_stats() to upstream Scheduler via super() Co-Authored-By: Claude Opus 4.6 Signed-off-by: vraiti --- vllm_omni/core/sched/omni_scheduler_mixin.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/vllm_omni/core/sched/omni_scheduler_mixin.py b/vllm_omni/core/sched/omni_scheduler_mixin.py index 4ee49961f31..faa56c4c17c 100644 --- a/vllm_omni/core/sched/omni_scheduler_mixin.py +++ b/vllm_omni/core/sched/omni_scheduler_mixin.py @@ -181,8 +181,4 @@ def make_stats(self, *args, **kwargs) -> SchedulerStats | None: if now - getattr(self, "_last_stats_time", 0.0) < _STATS_INTERVAL_S: return None self._last_stats_time = now - return SchedulerStats( - kv_cache_usage=self.kv_cache_manager.usage, - num_running_reqs=len(self.running), - num_waiting_reqs=len(self.waiting), - ) + return super().make_stats(*args, **kwargs) From 280c127de0a9676dc2fc2a2eea5da925ffd184a8 Mon Sep 17 00:00:00 2001 From: vraiti Date: Thu, 28 May 2026 10:43:25 -0400 Subject: [PATCH 7/9] Rename OmniPrometheusMetrics to OmniPrometheusStatLogger, alias PrometheusStatLogger as VllmPrometheusStatLogger Co-Authored-By: Claude Opus 4.6 Signed-off-by: vraiti --- docs/design/metrics.md | 12 ++++++------ tests/metrics/test_prometheus.py | 8 ++++---- vllm_omni/engine/orchestrator.py | 4 ++-- vllm_omni/entrypoints/omni_base.py | 4 ++-- vllm_omni/metrics/__init__.py | 4 ++-- vllm_omni/metrics/prometheus.py | 2 +- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/design/metrics.md b/docs/design/metrics.md index 71ceb5c38ae..9ca805201e8 100644 --- a/docs/design/metrics.md +++ b/docs/design/metrics.md @@ -66,9 +66,9 @@ creates and feeds directly. | | vllm_omni:* collectors vllm:* collectors | | - +-----------+-----------+ +--------+---------+ - | OmniPrometheusMetrics | | PrometheusStatLogger | - +-----------+-----------+ +--------+---------+ + +----------------------------+ +--------------------------+ + | OmniPrometheusStatLogger | | VllmPrometheusStatLogger | + +----------------------------+ +--------------------------+ | | OmniBase Orchestrator (request lifecycle, (feeds SchedulerStats @@ -82,7 +82,7 @@ There are two independent paths for metric collection. **Path 1: Pipeline-level metrics (`vllm_omni:*`)** -`OmniPrometheusMetrics` registers Gauge, Counter, and Histogram +`OmniPrometheusStatLogger` registers Gauge, Counter, and Histogram collectors at init time. It is instantiated once per entrypoint, labeled with the model name. The entrypoint calls its methods as requests progress: @@ -126,7 +126,7 @@ passed to the Orchestrator at construction time. ### Metric Registration and Lifecycle All `vllm_omni:*` collectors are registered once when -`OmniPrometheusMetrics.__init__()` runs. Per-stage labels +`OmniPrometheusStatLogger.__init__()` runs. Per-stage labels (`model_name`, `engine`) are bound lazily on first observation to avoid registering labels for stages that never produce data (e.g., a diffusion pipeline has no AR stage stats). @@ -188,7 +188,7 @@ per-stage, and per-transfer statistics and prints formatted tables to the `INFO` log. This is designed for development and debugging — individual request traces, transfer bandwidth, inter-stage timing. -`OmniPrometheusMetrics` is the Prometheus-oriented path. It records +`OmniPrometheusStatLogger` is the Prometheus-oriented path. It records aggregate counters, gauges, and histograms suitable for time-series monitoring and alerting. The two paths are independent; both can run simultaneously. diff --git a/tests/metrics/test_prometheus.py b/tests/metrics/test_prometheus.py index 60d6c5bbfbf..9ed487bce4c 100644 --- a/tests/metrics/test_prometheus.py +++ b/tests/metrics/test_prometheus.py @@ -5,7 +5,7 @@ import pytest from prometheus_client import REGISTRY, CollectorRegistry, generate_latest -from vllm_omni.metrics import OmniPrometheusMetrics +from vllm_omni.metrics import OmniPrometheusStatLogger pytestmark = [pytest.mark.core_model, pytest.mark.cpu] @@ -34,12 +34,12 @@ def registry() -> CollectorRegistry: @pytest.fixture(scope="module") -def prom() -> OmniPrometheusMetrics: - return OmniPrometheusMetrics(model_name=_MODEL) +def prom() -> OmniPrometheusStatLogger: + return OmniPrometheusStatLogger(model_name=_MODEL) @pytest.fixture(scope="module") -def scrape_output(prom: OmniPrometheusMetrics, registry: CollectorRegistry) -> str: +def scrape_output(prom: OmniPrometheusStatLogger, registry: CollectorRegistry) -> str: prom.request_succeeded(e2e_seconds=1.5, queue_seconds=0.3) prom.request_succeeded(e2e_seconds=2.0, queue_seconds=0.5) prom.request_failed() diff --git a/vllm_omni/engine/orchestrator.py b/vllm_omni/engine/orchestrator.py index 01153a819c0..db88bd52480 100644 --- a/vllm_omni/engine/orchestrator.py +++ b/vllm_omni/engine/orchestrator.py @@ -29,7 +29,7 @@ from vllm.sampling_params import SamplingParams from vllm.v1.engine import EngineCoreOutputs from vllm.v1.engine.exceptions import EngineDeadError -from vllm.v1.metrics.loggers import PrometheusStatLogger +from vllm.v1.metrics.loggers import VllmPrometheusStatLogger as VllmVllmPrometheusStatLogger from vllm.v1.metrics.stats import IterationStats from vllm_omni.distributed.omni_coordinator import ( @@ -206,7 +206,7 @@ def __init__( None, ) if vllm_config_for_stats is not None: - self._stat_logger: PrometheusStatLogger | None = PrometheusStatLogger( + self._stat_logger: VllmPrometheusStatLogger | None = VllmVllmPrometheusStatLogger( vllm_config=vllm_config_for_stats, engine_indexes=engine_indexes, ) diff --git a/vllm_omni/entrypoints/omni_base.py b/vllm_omni/entrypoints/omni_base.py index d95b6ecdcdf..14a4c451e16 100644 --- a/vllm_omni/entrypoints/omni_base.py +++ b/vllm_omni/entrypoints/omni_base.py @@ -23,7 +23,7 @@ from vllm_omni.entrypoints.client_request_state import ClientRequestState from vllm_omni.entrypoints.pd_utils import PDDisaggregationMixin from vllm_omni.entrypoints.utils import coerce_param_message_types, get_final_stage_id_for_e2e -from vllm_omni.metrics.prometheus import OmniPrometheusMetrics +from vllm_omni.metrics.prometheus import OmniPrometheusStatLogger from vllm_omni.metrics.stats import OrchestratorAggregator as OrchestratorMetrics from vllm_omni.model_executor.model_loader.weight_utils import download_weights_from_hf_specific from vllm_omni.outputs import OmniRequestOutput @@ -191,7 +191,7 @@ def __init__( self.async_chunk = bool(getattr(self.engine, "async_chunk", False)) self.request_states: dict[str, ClientRequestState] = {} - self.prom_metrics = OmniPrometheusMetrics(model_name=model) + self.prom_metrics = OmniPrometheusStatLogger(model_name=model) self.default_sampling_params_list = self.engine.default_sampling_params_list if not self.output_modalities: diff --git a/vllm_omni/metrics/__init__.py b/vllm_omni/metrics/__init__.py index 6814a589181..a213e6e9dbb 100644 --- a/vllm_omni/metrics/__init__.py +++ b/vllm_omni/metrics/__init__.py @@ -1,9 +1,9 @@ -from .prometheus import OmniPrometheusMetrics, OmniRequestCounter +from .prometheus import OmniPrometheusStatLogger, OmniRequestCounter from .stats import OrchestratorAggregator, StageRequestStats, StageStats from .utils import count_tokens_from_outputs __all__ = [ - "OmniPrometheusMetrics", + "OmniPrometheusStatLogger", "OmniRequestCounter", "OrchestratorAggregator", "StageStats", diff --git a/vllm_omni/metrics/prometheus.py b/vllm_omni/metrics/prometheus.py index 2b3ec566ae3..e4c7d6d6fbf 100644 --- a/vllm_omni/metrics/prometheus.py +++ b/vllm_omni/metrics/prometheus.py @@ -58,7 +58,7 @@ } -class OmniPrometheusMetrics: +class OmniPrometheusStatLogger: """Label-bound wrapper around the raw Prometheus metrics. Metric collectors use the ``vllm_omni:`` prefix to avoid being From 3d1beede594de9124f26d687fc743345fb96f043 Mon Sep 17 00:00:00 2001 From: vraiti Date: Thu, 28 May 2026 10:45:49 -0400 Subject: [PATCH 8/9] Exclude diffusion stages from engine_indexes to avoid stale zero-valued vllm:* series Co-Authored-By: Claude Opus 4.6 Signed-off-by: vraiti --- vllm_omni/engine/orchestrator.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/vllm_omni/engine/orchestrator.py b/vllm_omni/engine/orchestrator.py index db88bd52480..f88150402f5 100644 --- a/vllm_omni/engine/orchestrator.py +++ b/vllm_omni/engine/orchestrator.py @@ -29,7 +29,7 @@ from vllm.sampling_params import SamplingParams from vllm.v1.engine import EngineCoreOutputs from vllm.v1.engine.exceptions import EngineDeadError -from vllm.v1.metrics.loggers import VllmPrometheusStatLogger as VllmVllmPrometheusStatLogger +from vllm.v1.metrics.loggers import PrometheusStatLogger as VllmPrometheusStatLogger from vllm.v1.metrics.stats import IterationStats from vllm_omni.distributed.omni_coordinator import ( @@ -200,21 +200,26 @@ def __init__( self.request_states: dict[str, OrchestratorRequestState] = {} self._running_counter = running_counter - engine_indexes = list(range(flat)) + llm_engine_indexes = [ + self._engine_idx[(sid, rid)] + for sid, pool in enumerate(stage_pools) + if pool.stage_vllm_config is not None + for rid in range(pool.num_replicas) + ] vllm_config_for_stats = next( (p.stage_vllm_config for p in stage_pools if p.stage_vllm_config is not None), None, ) if vllm_config_for_stats is not None: - self._stat_logger: VllmPrometheusStatLogger | None = VllmVllmPrometheusStatLogger( + self._stat_logger: VllmPrometheusStatLogger | None = VllmPrometheusStatLogger( vllm_config=vllm_config_for_stats, - engine_indexes=engine_indexes, + engine_indexes=llm_engine_indexes, ) else: self._stat_logger = None self._stats_interval_s: float = 1.0 - self._per_engine_stats_ts: dict[int, float] = {idx: 0.0 for idx in engine_indexes} - self._pending_iteration_stats: dict[int, IterationStats] = {idx: IterationStats() for idx in engine_indexes} + self._per_engine_stats_ts: dict[int, float] = {idx: 0.0 for idx in llm_engine_indexes} + self._pending_iteration_stats: dict[int, IterationStats] = {idx: IterationStats() for idx in llm_engine_indexes} self._cfg_tracker = CfgCompanionTracker() From 40ec127c203c9aee1b603ac69beea1337b8afd36 Mon Sep 17 00:00:00 2001 From: vraiti Date: Thu, 28 May 2026 10:52:15 -0400 Subject: [PATCH 9/9] Fix typo Signed-off-by: vraiti --- docs/usage/metrics.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/usage/metrics.md b/docs/usage/metrics.md index 0d8e023a6af..88eb2a2d65f 100644 --- a/docs/usage/metrics.md +++ b/docs/usage/metrics.md @@ -13,7 +13,7 @@ curl http://localhost:8000/metrics | Prefix | Source | Present when | |--------|--------|--------------| -| `vllm_omni:` | vLLM-Omni orchestrato / diffusion stages | Always / Pipeline includes a diffusion stage | +| `vllm_omni:` | vLLM-Omni orchestrator / diffusion stages | Always / Pipeline includes a diffusion stage | | `vllm:` | Upstream vLLM engine | Pipeline includes an LLM (AR) stage | | `http_` / `process_` | Uvicorn / Python runtime | Always |