From f6b39fab8e1eb2c4d8e3a27decdcd9f56961468d Mon Sep 17 00:00:00 2001 From: NickLucche Date: Tue, 14 Oct 2025 13:49:17 +0000 Subject: [PATCH 1/4] init Signed-off-by: NickLucche --- vllm/v1/metrics/loggers.py | 127 +++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 1a8fefdd1ddf..99e0b68be50a 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -804,6 +804,104 @@ def __init__( ], ) + # + # KVConnector metrics + # + self._nixl_metrics_enabled = False + if ( + kv_transfer_config := vllm_config.kv_transfer_config + ) and kv_transfer_config.kv_connector == "NixlConnector": + self._nixl_metrics_enabled = True + buckets = [ + 0.001, + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.2, + 0.3, + 0.5, + 0.75, + 1.0, + 5.0, + ] + nixl_histogram_xfer_time = self._histogram_cls( + name="vllm:nixl_xfer_time_seconds", + documentation="Histogram of transfer duration for NIXL KV" + " Cache transfers.", + buckets=buckets, + labelnames=labelnames, + ) + self.nixl_histogram_xfer_time = make_per_engine( + nixl_histogram_xfer_time, engine_indexes, model_name + ) + nixl_histogram_post_time = self._histogram_cls( + name="vllm:nixl_post_time_seconds", + documentation="Histogram of transfer post time for NIXL KV" + " Cache transfers.", + buckets=buckets[1:], + labelnames=labelnames, + ) + self.nixl_histogram_post_time = make_per_engine( + nixl_histogram_post_time, engine_indexes, model_name + ) + # uniform 2kb to 16gb range + buckets = [2**10 + i for i in range(1, 24, 2)] + nixl_histogram_bytes_transferred = self._histogram_cls( + name="vllm:nixl_bytes_transferred", + documentation="Histogram of bytes transferred per NIXL KV" + " Cache transfers.", + buckets=buckets, + labelnames=labelnames, + ) + self.nixl_histogram_bytes_transferred = make_per_engine( + nixl_histogram_bytes_transferred, engine_indexes, model_name + ) + buckets = [ + 10, + 20, + 30, + 50, + 75, + 100, + 200, + 400, + 1000, + 2000, + 4000, + 10000, + 20000, + 50000, + ] + nixl_histogram_num_descriptors = self._histogram_cls( + name="vllm:nixl_num_descriptors", + documentation="Histogram of number of descriptors per NIXL" + " KV Cache transfers.", + buckets=buckets, + labelnames=labelnames, + ) + self.nixl_histogram_num_descriptors = make_per_engine( + nixl_histogram_num_descriptors, engine_indexes, model_name + ) + counter_nixl_num_failed_transfers = self._counter_cls( + name="vllm:nixl_num_failed_transfers", + documentation="Number of failed NIXL KV Cache transfers.", + labelnames=labelnames, + ) + self.counter_nixl_num_failed_transfers = make_per_engine( + counter_nixl_num_failed_transfers, engine_indexes, model_name + ) + counter_nixl_num_failed_notifications = self._counter_cls( + name="vllm:nixl_num_failed_notifications", + documentation="Number of failed NIXL KV Cache notifications.", + labelnames=labelnames, + ) + self.counter_nixl_num_failed_notifications = make_per_engine( + counter_nixl_num_failed_notifications, engine_indexes, model_name + ) + def log_metrics_info(self, type: str, config_obj: SupportsMetricsInfo): metrics_info = config_obj.metrics_info() metrics_info["engine"] = "" @@ -869,6 +967,35 @@ def record( self.spec_decoding_prom.observe( scheduler_stats.spec_decoding_stats, engine_idx ) + # TODO factor this out into OOT metrics class + if self._nixl_metrics_enabled and ( + kv_stats := scheduler_stats.kv_connector_stats + ): + for prom_obj, list_item_key in zip( + [ + self.nixl_histogram_xfer_time, + self.nixl_histogram_post_time, + self.nixl_histogram_bytes_transferred, + self.nixl_histogram_num_descriptors, + ], + [ + "transfer_duration", + "post_duration", + "bytes_transferred", + "num_descriptors", + ], + ): + for list_item in kv_stats[list_item_key]: + prom_obj[engine_idx].observe(list_item) + for counter_obj, counter_item_key in zip( + [ + self.counter_nixl_num_failed_transfers, + self.counter_nixl_num_failed_notifications, + ], + ["num_failed_transfers", "num_failed_notifications"], + ): + for list_item in kv_stats[counter_item_key]: + counter_obj[engine_idx].inc(list_item) if mm_cache_stats is not None: self.counter_mm_cache_queries[engine_idx].inc(mm_cache_stats.queries) From 4e87197680b5a8b6a0ac09195b252f9f454c6f49 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 24 Oct 2025 03:54:44 -0400 Subject: [PATCH 2/4] [NIXL][Metrics] Add abstraction for per-connector Prometheus metrics Signed-off-by: Mark McLoughlin --- .../kv_transfer/kv_connector/v1/base.py | 21 ++- .../kv_transfer/kv_connector/v1/metrics.py | 83 +++++++++- .../kv_connector/v1/nixl_connector.py | 137 ++++++++++++++++- vllm/v1/metrics/loggers.py | 145 ++---------------- vllm/v1/metrics/ray_wrappers.py | 14 ++ 5 files changed, 267 insertions(+), 133 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py index ab5d2ecdc71b..e32c9076630e 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py @@ -50,7 +50,12 @@ from vllm.attention.backends.abstract import AttentionMetadata from vllm.config import VllmConfig from vllm.distributed.kv_events import KVCacheEvent - from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats + from vllm.distributed.kv_transfer.kv_connector.v1.metrics import ( + KVConnectorPromMetrics, + KVConnectorStats, + PromMetric, + PromMetricT, + ) from vllm.forward_context import ForwardContext from vllm.v1.core.kv_cache_manager import KVCacheBlocks from vllm.v1.request import Request @@ -431,3 +436,17 @@ def build_kv_connector_stats( which can implement custom aggregation logic on the data dict. """ return None + + @classmethod + def build_prom_metrics( + cls, + metric_types: dict[type["PromMetric"], type["PromMetricT"]], + labelnames: list[str], + per_engine_labelvalues: dict[int, list[str]], + ) -> Optional["KVConnectorPromMetrics"]: + """ + Create a KVConnectorPromMetrics subclass which should register + per-connector Prometheus metrics and implement observe() to + expose connector transfer stats via Prometheus. + """ + return None diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py b/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py index 21002fe572c5..5cb739c21a69 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py @@ -1,13 +1,18 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project from dataclasses import dataclass, field -from typing import Any +from typing import Any, TypeAlias, TypeVar + +from prometheus_client import Counter, Gauge, Histogram from vllm.config.kv_transfer import KVTransferConfig from vllm.distributed.kv_transfer.kv_connector.factory import KVConnectorFactory from vllm.distributed.kv_transfer.kv_transfer_state import has_kv_transfer_group from vllm.logger import init_logger +PromMetric: TypeAlias = Gauge | Counter | Histogram +PromMetricT = TypeVar("PromMetricT", bound=PromMetric) + logger = init_logger(__name__) @@ -102,3 +107,79 @@ def log(self, log_fn=logger.info): # Reset metrics for next interval self.reset() + + +class KVConnectorPromMetrics: + """ + A base class for per-connector Prometheus metric registration + and recording. + """ + + def __init__( + self, + metric_types: dict[type[PromMetric], type[PromMetricT]], + labelnames: list[str], + per_engine_labelvalues: dict[int, list[str]], + ): + self._gauge_cls = metric_types[Gauge] + self._counter_cls = metric_types[Counter] + self._histogram_cls = metric_types[Histogram] + self._labelnames = labelnames + self._per_engine_labelvalues = per_engine_labelvalues + + def make_per_engine(self, metric: PromMetric) -> PromMetric: + """ + Create a per-engine child of a prometheus_client.Metric with + the appropriate labels set. The parent metric must be created + using the labelnames list. + """ + return { + idx: metric.labels(*labelvalues) + for idx, labelvalues in self._per_engine_labelvalues.items() + } + + def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0): + """ + Record the supplied transfer statistics to Prometheus metrics. These + statistics are engine-specific, and should be recorded to a metric + with the appropriate 'engine' label. These metric instances can be + created using the make_per_engine() helper method. + """ + raise NotImplementedError + + +class KVConnectorPrometheus: + """ + Support for registering per-connector Prometheus metrics, and + recording transfer statistics to those metrics. Uses + KVConnectorBase.build_prom_metrics(). + """ + + _gauge_cls = Gauge + _counter_cls = Counter + _histogram_cls = Histogram + + def __init__( + self, + kv_transfer_config: KVTransferConfig | None, + labelnames: list[str], + per_engine_labelvalues: dict[int, list[str]], + ): + self.prom_metrics: KVConnectorPromMetrics | None = None + if kv_transfer_config and kv_transfer_config.kv_connector: + connector_cls = KVConnectorFactory.get_connector_class(kv_transfer_config) + metric_types = { + Gauge: self._gauge_cls, + Counter: self._counter_cls, + Histogram: self._histogram_cls, + } + self.prom_metrics = connector_cls.build_prom_metrics( + metric_types, + labelnames, + per_engine_labelvalues, + ) + + def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0): + if self.prom_metrics is None: + return + self.prom_metrics.observe(transfer_stats_data, engine_idx) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 8c4c82f76ff2..1774834be8c5 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -30,7 +30,12 @@ KVConnectorMetadata, KVConnectorRole, ) -from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats +from vllm.distributed.kv_transfer.kv_connector.v1.metrics import ( + KVConnectorPromMetrics, + KVConnectorStats, + PromMetric, + PromMetricT, +) from vllm.distributed.parallel_state import ( get_tensor_model_parallel_rank, get_tensor_model_parallel_world_size, @@ -254,6 +259,15 @@ def build_kv_connector_stats( else NixlKVConnectorStats() ) + @classmethod + def build_prom_metrics( + cls, + metric_types: dict[type[PromMetric], type[PromMetricT]], + labelnames: list[str], + per_engine_labelvalues: dict[int, list[str]], + ) -> KVConnectorPromMetrics: + return NixlPromMetrics(metric_types, labelnames, per_engine_labelvalues) + def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None: assert self.connector_worker is not None assert isinstance(self._connector_metadata, NixlConnectorMetadata) @@ -1744,3 +1758,124 @@ def reduce(self) -> dict[str, int | float]: @property def num_successful_transfers(self) -> int: return len(self.data["transfer_duration"]) + + +class NixlPromMetrics(KVConnectorPromMetrics): + def __init__( + self, + metric_types: dict[type[PromMetric], type[PromMetricT]], + labelnames: list[str], + per_engine_labelvalues: dict[int, list[str]], + ): + super().__init__(metric_types, labelnames, per_engine_labelvalues) + + buckets = [ + 0.001, + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.2, + 0.3, + 0.5, + 0.75, + 1.0, + 5.0, + ] + nixl_histogram_xfer_time = self._histogram_cls( + name="vllm:nixl_xfer_time_seconds", + documentation="Histogram of transfer duration for NIXL KV Cache transfers.", + buckets=buckets, + labelnames=labelnames, + ) + self.nixl_histogram_xfer_time = self.make_per_engine(nixl_histogram_xfer_time) + nixl_histogram_post_time = self._histogram_cls( + name="vllm:nixl_post_time_seconds", + documentation="Histogram of transfer post time for NIXL KV" + " Cache transfers.", + buckets=buckets[1:], + labelnames=labelnames, + ) + self.nixl_histogram_post_time = self.make_per_engine(nixl_histogram_post_time) + # uniform 2kb to 16gb range + buckets = [2**10 + i for i in range(1, 24, 2)] + nixl_histogram_bytes_transferred = self._histogram_cls( + name="vllm:nixl_bytes_transferred", + documentation="Histogram of bytes transferred per NIXL KV Cache transfers.", + buckets=buckets, + labelnames=labelnames, + ) + self.nixl_histogram_bytes_transferred = self.make_per_engine( + nixl_histogram_bytes_transferred + ) + buckets = [ + 10, + 20, + 30, + 50, + 75, + 100, + 200, + 400, + 1000, + 2000, + 4000, + 10000, + 20000, + 50000, + ] + nixl_histogram_num_descriptors = self._histogram_cls( + name="vllm:nixl_num_descriptors", + documentation="Histogram of number of descriptors per NIXL" + " KV Cache transfers.", + buckets=buckets, + labelnames=labelnames, + ) + self.nixl_histogram_num_descriptors = self.make_per_engine( + nixl_histogram_num_descriptors + ) + counter_nixl_num_failed_transfers = self._counter_cls( + name="vllm:nixl_num_failed_transfers", + documentation="Number of failed NIXL KV Cache transfers.", + labelnames=labelnames, + ) + self.counter_nixl_num_failed_transfers = self.make_per_engine( + counter_nixl_num_failed_transfers + ) + counter_nixl_num_failed_notifications = self._counter_cls( + name="vllm:nixl_num_failed_notifications", + documentation="Number of failed NIXL KV Cache notifications.", + labelnames=labelnames, + ) + self.counter_nixl_num_failed_notifications = self.make_per_engine( + counter_nixl_num_failed_notifications + ) + + def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0): + for prom_obj, list_item_key in zip( + [ + self.nixl_histogram_xfer_time, + self.nixl_histogram_post_time, + self.nixl_histogram_bytes_transferred, + self.nixl_histogram_num_descriptors, + ], + [ + "transfer_duration", + "post_duration", + "bytes_transferred", + "num_descriptors", + ], + ): + for list_item in transfer_stats_data[list_item_key]: + prom_obj[engine_idx].observe(list_item) + for counter_obj, counter_item_key in zip( + [ + self.counter_nixl_num_failed_transfers, + self.counter_nixl_num_failed_notifications, + ], + ["num_failed_transfers", "num_failed_notifications"], + ): + for list_item in transfer_stats_data[counter_item_key]: + counter_obj[engine_idx].inc(list_item) diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 99e0b68be50a..7682c3401b39 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -10,7 +10,10 @@ from prometheus_client import Counter, Gauge, Histogram from vllm.config import SupportsMetricsInfo, VllmConfig -from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorLogging +from vllm.distributed.kv_transfer.kv_connector.v1.metrics import ( + KVConnectorLogging, + KVConnectorPrometheus, +) from vllm.logger import init_logger from vllm.v1.engine import FinishReason from vllm.v1.metrics.prometheus import unregister_vllm_metrics @@ -308,6 +311,7 @@ class PrometheusStatLogger(AggregateStatLoggerBase): _counter_cls = Counter _histogram_cls = Histogram _spec_decoding_cls = SpecDecodingProm + _kv_connector_cls = KVConnectorPrometheus def __init__( self, vllm_config: VllmConfig, engine_indexes: list[int] | None = None @@ -327,12 +331,15 @@ def __init__( model_name = vllm_config.model_config.served_model_name max_model_len = vllm_config.model_config.max_model_len - spec_decode_labelvalues: dict[int, list[str]] = { + per_engine_labelvalues: dict[int, list[str]] = { idx: [model_name, str(idx)] for idx in engine_indexes } self.spec_decoding_prom = self._spec_decoding_cls( - vllm_config.speculative_config, labelnames, spec_decode_labelvalues + vllm_config.speculative_config, labelnames, per_engine_labelvalues + ) + self.kv_connector_prom = self._kv_connector_cls( + vllm_config.kv_transfer_config, labelnames, per_engine_labelvalues ) # @@ -804,104 +811,6 @@ def __init__( ], ) - # - # KVConnector metrics - # - self._nixl_metrics_enabled = False - if ( - kv_transfer_config := vllm_config.kv_transfer_config - ) and kv_transfer_config.kv_connector == "NixlConnector": - self._nixl_metrics_enabled = True - buckets = [ - 0.001, - 0.005, - 0.01, - 0.025, - 0.05, - 0.075, - 0.1, - 0.2, - 0.3, - 0.5, - 0.75, - 1.0, - 5.0, - ] - nixl_histogram_xfer_time = self._histogram_cls( - name="vllm:nixl_xfer_time_seconds", - documentation="Histogram of transfer duration for NIXL KV" - " Cache transfers.", - buckets=buckets, - labelnames=labelnames, - ) - self.nixl_histogram_xfer_time = make_per_engine( - nixl_histogram_xfer_time, engine_indexes, model_name - ) - nixl_histogram_post_time = self._histogram_cls( - name="vllm:nixl_post_time_seconds", - documentation="Histogram of transfer post time for NIXL KV" - " Cache transfers.", - buckets=buckets[1:], - labelnames=labelnames, - ) - self.nixl_histogram_post_time = make_per_engine( - nixl_histogram_post_time, engine_indexes, model_name - ) - # uniform 2kb to 16gb range - buckets = [2**10 + i for i in range(1, 24, 2)] - nixl_histogram_bytes_transferred = self._histogram_cls( - name="vllm:nixl_bytes_transferred", - documentation="Histogram of bytes transferred per NIXL KV" - " Cache transfers.", - buckets=buckets, - labelnames=labelnames, - ) - self.nixl_histogram_bytes_transferred = make_per_engine( - nixl_histogram_bytes_transferred, engine_indexes, model_name - ) - buckets = [ - 10, - 20, - 30, - 50, - 75, - 100, - 200, - 400, - 1000, - 2000, - 4000, - 10000, - 20000, - 50000, - ] - nixl_histogram_num_descriptors = self._histogram_cls( - name="vllm:nixl_num_descriptors", - documentation="Histogram of number of descriptors per NIXL" - " KV Cache transfers.", - buckets=buckets, - labelnames=labelnames, - ) - self.nixl_histogram_num_descriptors = make_per_engine( - nixl_histogram_num_descriptors, engine_indexes, model_name - ) - counter_nixl_num_failed_transfers = self._counter_cls( - name="vllm:nixl_num_failed_transfers", - documentation="Number of failed NIXL KV Cache transfers.", - labelnames=labelnames, - ) - self.counter_nixl_num_failed_transfers = make_per_engine( - counter_nixl_num_failed_transfers, engine_indexes, model_name - ) - counter_nixl_num_failed_notifications = self._counter_cls( - name="vllm:nixl_num_failed_notifications", - documentation="Number of failed NIXL KV Cache notifications.", - labelnames=labelnames, - ) - self.counter_nixl_num_failed_notifications = make_per_engine( - counter_nixl_num_failed_notifications, engine_indexes, model_name - ) - def log_metrics_info(self, type: str, config_obj: SupportsMetricsInfo): metrics_info = config_obj.metrics_info() metrics_info["engine"] = "" @@ -967,35 +876,11 @@ def record( self.spec_decoding_prom.observe( scheduler_stats.spec_decoding_stats, engine_idx ) - # TODO factor this out into OOT metrics class - if self._nixl_metrics_enabled and ( - kv_stats := scheduler_stats.kv_connector_stats - ): - for prom_obj, list_item_key in zip( - [ - self.nixl_histogram_xfer_time, - self.nixl_histogram_post_time, - self.nixl_histogram_bytes_transferred, - self.nixl_histogram_num_descriptors, - ], - [ - "transfer_duration", - "post_duration", - "bytes_transferred", - "num_descriptors", - ], - ): - for list_item in kv_stats[list_item_key]: - prom_obj[engine_idx].observe(list_item) - for counter_obj, counter_item_key in zip( - [ - self.counter_nixl_num_failed_transfers, - self.counter_nixl_num_failed_notifications, - ], - ["num_failed_transfers", "num_failed_notifications"], - ): - for list_item in kv_stats[counter_item_key]: - counter_obj[engine_idx].inc(list_item) + + if scheduler_stats.kv_connector_stats is not None: + self.kv_connector_prom.observe( + scheduler_stats.kv_connector_stats, engine_idx + ) if mm_cache_stats is not None: self.counter_mm_cache_queries[engine_idx].inc(mm_cache_stats.queries) diff --git a/vllm/v1/metrics/ray_wrappers.py b/vllm/v1/metrics/ray_wrappers.py index b845852a0c0d..a319ffb1d257 100644 --- a/vllm/v1/metrics/ray_wrappers.py +++ b/vllm/v1/metrics/ray_wrappers.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import time +from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorPrometheus from vllm.v1.metrics.loggers import PrometheusStatLogger from vllm.v1.spec_decode.metrics import SpecDecodingProm @@ -141,6 +142,18 @@ class RaySpecDecodingProm(SpecDecodingProm): _counter_cls = RayCounterWrapper +class RayKVConnectorPrometheus(KVConnectorPrometheus): + """ + RayKVConnectorPrometheus is used by RayMetrics to log Ray + metrics. Provides the same metrics as KV connectors but + uses Ray's util.metrics library. + """ + + _gauge_cls = RayGaugeWrapper + _counter_cls = RayCounterWrapper + _histogram_cls = RayHistogramWrapper + + class RayPrometheusStatLogger(PrometheusStatLogger): """RayPrometheusStatLogger uses Ray metrics instead.""" @@ -148,6 +161,7 @@ class RayPrometheusStatLogger(PrometheusStatLogger): _counter_cls = RayCounterWrapper _histogram_cls = RayHistogramWrapper _spec_decoding_cls = RaySpecDecodingProm + _kv_connector_cls = RayKVConnectorPrometheus @staticmethod def _unregister_vllm_metrics(): From 35f8ba1ed416ab154d387c5e73dd439360acadab Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 24 Oct 2025 10:28:43 -0400 Subject: [PATCH 3/4] [NIXL][Metrics] Fix NIXL buckets It's post times that need the smaller bucket size, not transfer duration. Uniform 2kb to 16gb range: ``` >>> def human_size(bytes, units=[' bytes','KB','MB','GB','TB', 'PB', 'EB']): ... """ Returns a human readable string representation of bytes """ ... return str(bytes) + units[0] if bytes < 1024 else human_size(bytes>>10, units[1:]) ... >>> [human_size(2**(10+i)) for i in range(1, 25, 2)] ['2KB', '8KB', '32KB', '128KB', '512KB', '2MB', '8MB', '32MB', '128MB', '512MB', '2GB', '8GB'] ``` Signed-off-by: Mark McLoughlin --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 1774834be8c5..5799b9ca53ec 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -1787,7 +1787,7 @@ def __init__( nixl_histogram_xfer_time = self._histogram_cls( name="vllm:nixl_xfer_time_seconds", documentation="Histogram of transfer duration for NIXL KV Cache transfers.", - buckets=buckets, + buckets=buckets[1:], labelnames=labelnames, ) self.nixl_histogram_xfer_time = self.make_per_engine(nixl_histogram_xfer_time) @@ -1795,12 +1795,12 @@ def __init__( name="vllm:nixl_post_time_seconds", documentation="Histogram of transfer post time for NIXL KV" " Cache transfers.", - buckets=buckets[1:], + buckets=buckets, labelnames=labelnames, ) self.nixl_histogram_post_time = self.make_per_engine(nixl_histogram_post_time) # uniform 2kb to 16gb range - buckets = [2**10 + i for i in range(1, 24, 2)] + buckets = [2 ** (10 + i) for i in range(1, 25, 2)] nixl_histogram_bytes_transferred = self._histogram_cls( name="vllm:nixl_bytes_transferred", documentation="Histogram of bytes transferred per NIXL KV Cache transfers.", From 3416c80394a6b5079bebf366a118be2659f9154f Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 24 Oct 2025 09:42:22 -0400 Subject: [PATCH 4/4] [KV Connector][Metrics] Add prometheus metrics support to multi-connector Signed-off-by: Mark McLoughlin --- .../kv_transfer/kv_connector/v1/base.py | 1 + .../kv_transfer/kv_connector/v1/metrics.py | 8 +- .../kv_connector/v1/multi_connector.py | 110 ++++++++++++++---- .../kv_connector/v1/nixl_connector.py | 8 +- vllm/v1/metrics/loggers.py | 2 +- 5 files changed, 102 insertions(+), 27 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py index e32c9076630e..e5a54e5360c9 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py @@ -440,6 +440,7 @@ def build_kv_connector_stats( @classmethod def build_prom_metrics( cls, + vllm_config: "VllmConfig", metric_types: dict[type["PromMetric"], type["PromMetricT"]], labelnames: list[str], per_engine_labelvalues: dict[int, list[str]], diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py b/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py index 5cb739c21a69..d6ea4f1ab4cf 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py @@ -5,7 +5,7 @@ from prometheus_client import Counter, Gauge, Histogram -from vllm.config.kv_transfer import KVTransferConfig +from vllm.config import KVTransferConfig, VllmConfig from vllm.distributed.kv_transfer.kv_connector.factory import KVConnectorFactory from vllm.distributed.kv_transfer.kv_transfer_state import has_kv_transfer_group from vllm.logger import init_logger @@ -117,10 +117,12 @@ class KVConnectorPromMetrics: def __init__( self, + vllm_config: VllmConfig, metric_types: dict[type[PromMetric], type[PromMetricT]], labelnames: list[str], per_engine_labelvalues: dict[int, list[str]], ): + self._kv_transfer_config = vllm_config.kv_transfer_config self._gauge_cls = metric_types[Gauge] self._counter_cls = metric_types[Counter] self._histogram_cls = metric_types[Histogram] @@ -161,11 +163,12 @@ class KVConnectorPrometheus: def __init__( self, - kv_transfer_config: KVTransferConfig | None, + vllm_config: VllmConfig, labelnames: list[str], per_engine_labelvalues: dict[int, list[str]], ): self.prom_metrics: KVConnectorPromMetrics | None = None + kv_transfer_config = vllm_config.kv_transfer_config if kv_transfer_config and kv_transfer_config.kv_connector: connector_cls = KVConnectorFactory.get_connector_class(kv_transfer_config) metric_types = { @@ -174,6 +177,7 @@ def __init__( Histogram: self._histogram_cls, } self.prom_metrics = connector_cls.build_prom_metrics( + vllm_config, metric_types, labelnames, per_engine_labelvalues, diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py index 845ce320837d..42a66993a2d2 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/multi_connector.py @@ -9,13 +9,19 @@ from vllm.config import VllmConfig from vllm.config.kv_transfer import KVTransferConfig +from vllm.distributed.kv_transfer.kv_connector.base import KVConnectorBaseType from vllm.distributed.kv_transfer.kv_connector.factory import KVConnectorFactory from vllm.distributed.kv_transfer.kv_connector.v1.base import ( KVConnectorBase_V1, KVConnectorMetadata, KVConnectorRole, ) -from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats +from vllm.distributed.kv_transfer.kv_connector.v1.metrics import ( + KVConnectorPromMetrics, + KVConnectorStats, + PromMetric, + PromMetricT, +) from vllm.logger import init_logger from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.outputs import KVConnectorOutput @@ -72,6 +78,27 @@ def __setitem__(self, connector_id: str, stats: KVConnectorStats): self.data[connector_id] = stats +class MultiKVConnectorPromMetrics(KVConnectorPromMetrics): + def __init__( + self, + vllm_config: "VllmConfig", + metric_types: dict[type[PromMetric], type[PromMetricT]], + labelnames: list[str], + per_engine_labelvalues: dict[int, list[str]], + prom_metrics: dict[str, KVConnectorPromMetrics], + ): + super().__init__(vllm_config, metric_types, labelnames, per_engine_labelvalues) + self._prom_metrics = prom_metrics + + def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0): + for connector_id, stats_data in transfer_stats_data.items(): + assert connector_id in self._prom_metrics, ( + f"{connector_id} is not contained in the list of registered connectors " + f"with Prometheus metrics support: {self._prom_metrics.keys()}" + ) + self._prom_metrics[connector_id].observe(stats_data["data"], engine_idx) + + class MultiConnector(KVConnectorBase_V1): """ A wrapper for using multiple KVConnectors at the same time. @@ -84,19 +111,13 @@ class MultiConnector(KVConnectorBase_V1): def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole): super().__init__(vllm_config=vllm_config, role=role) + self._connectors: list[KVConnectorBase_V1] = [] self._ktc_kv_transfer_config = [] - ktcs = self._kv_transfer_config.kv_connector_extra_config.get("connectors") - assert ktcs is not None - for ktc in ktcs: - temp_config = copy.copy(vllm_config) - engine_id = ktc.get("engine_id", self._kv_transfer_config.engine_id) - temp_config.kv_transfer_config = KVTransferConfig( - **ktc, engine_id=engine_id - ) - self._connectors.append( - KVConnectorFactory.create_connector(temp_config, role) - ) + for connector_cls, temp_config in self._get_connector_classes_and_configs( + vllm_config + ): + self._connectors.append(connector_cls(temp_config, role)) self._ktc_kv_transfer_config.append(temp_config.kv_transfer_config) # A mapping from request id to the index of the connector chosen to @@ -109,6 +130,32 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole): # Propagated from scheduler to worker side via the connector metadata. self._extra_async_saves: dict[str, int] = {} + @classmethod + def _get_connector_classes_and_configs( + cls, vllm_config: "VllmConfig" + ) -> list[tuple[type[KVConnectorBaseType], "VllmConfig"]]: + assert vllm_config.kv_transfer_config is not None + ktcs = vllm_config.kv_transfer_config.kv_connector_extra_config.get( + "connectors" + ) + assert ktcs is not None + ret: list[tuple[type[KVConnectorBaseType], VllmConfig]] = [] + for ktc in ktcs: + temp_config = copy.copy(vllm_config) + engine_id = ktc.get("engine_id", vllm_config.kv_transfer_config.engine_id) + temp_config.kv_transfer_config = KVTransferConfig( + **ktc, engine_id=engine_id + ) + ret.append( + ( + KVConnectorFactory.get_connector_class( + temp_config.kv_transfer_config + ), + temp_config, + ) + ) + return ret + def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]): for c in self._connectors: c.register_kv_caches(kv_caches) @@ -295,18 +342,12 @@ def get_required_kvcache_layout(cls, vllm_config: "VllmConfig") -> str | None: None if the connector does not require a specific layout. """ assert vllm_config.kv_transfer_config is not None - ktcs = vllm_config.kv_transfer_config.kv_connector_extra_config.get( - "connectors" - ) - assert ktcs is not None layouts: set[str] = set() - temp_vllm_config = copy.copy(vllm_config) - for ktc in ktcs: - kv_transfer_config = KVTransferConfig(**ktc) - temp_vllm_config.kv_transfer_config = kv_transfer_config - connector_cls = KVConnectorFactory.get_connector_class(kv_transfer_config) + for connector_cls, temp_config in cls._get_connector_classes_and_configs( + vllm_config + ): required_kvcache_layout = connector_cls.get_required_kvcache_layout( - temp_vllm_config + temp_config ) if required_kvcache_layout is not None: layouts.add(required_kvcache_layout) @@ -342,3 +383,28 @@ def get_kv_connector_stats(self) -> MultiKVConnectorStats | None: stats_by_connector = MultiKVConnectorStats() stats_by_connector[c.__class__.__name__] = stats return stats_by_connector + + @classmethod + def build_prom_metrics( + cls, + vllm_config: "VllmConfig", + metric_types: dict[type["PromMetric"], type["PromMetricT"]], + labelnames: list[str], + per_engine_labelvalues: dict[int, list[str]], + ) -> KVConnectorPromMetrics: + prom_metrics: dict[str, KVConnectorPromMetrics] = {} + for connector_cls, temp_config in cls._get_connector_classes_and_configs( + vllm_config + ): + connector_prom = connector_cls.build_prom_metrics( + temp_config, metric_types, labelnames, per_engine_labelvalues + ) + if connector_prom is not None: + prom_metrics[connector_cls.__name__] = connector_prom + return MultiKVConnectorPromMetrics( + vllm_config, + metric_types, + labelnames, + per_engine_labelvalues, + prom_metrics, + ) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 1774834be8c5..8fbab69a894b 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -262,11 +262,14 @@ def build_kv_connector_stats( @classmethod def build_prom_metrics( cls, + vllm_config: VllmConfig, metric_types: dict[type[PromMetric], type[PromMetricT]], labelnames: list[str], per_engine_labelvalues: dict[int, list[str]], ) -> KVConnectorPromMetrics: - return NixlPromMetrics(metric_types, labelnames, per_engine_labelvalues) + return NixlPromMetrics( + vllm_config, metric_types, labelnames, per_engine_labelvalues + ) def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None: assert self.connector_worker is not None @@ -1763,11 +1766,12 @@ def num_successful_transfers(self) -> int: class NixlPromMetrics(KVConnectorPromMetrics): def __init__( self, + vllm_config: VllmConfig, metric_types: dict[type[PromMetric], type[PromMetricT]], labelnames: list[str], per_engine_labelvalues: dict[int, list[str]], ): - super().__init__(metric_types, labelnames, per_engine_labelvalues) + super().__init__(vllm_config, metric_types, labelnames, per_engine_labelvalues) buckets = [ 0.001, diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 7682c3401b39..82c08f68a082 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -339,7 +339,7 @@ def __init__( vllm_config.speculative_config, labelnames, per_engine_labelvalues ) self.kv_connector_prom = self._kv_connector_cls( - vllm_config.kv_transfer_config, labelnames, per_engine_labelvalues + vllm_config, labelnames, per_engine_labelvalues ) #