diff --git a/lib/bindings/python/rust/llm/kv.rs b/lib/bindings/python/rust/llm/kv.rs index 8b503b2dd7..fdd235a7b9 100644 --- a/lib/bindings/python/rust/llm/kv.rs +++ b/lib/bindings/python/rust/llm/kv.rs @@ -85,6 +85,11 @@ impl WorkerMetricsPublisher { None }; + // Register Prometheus metrics first + rs_publisher + .register_prometheus_metrics(&rs_component) + .map_err(to_pyerr)?; + rs_publisher .create_endpoint(rs_component, metrics_labels_ref.as_deref()) .await diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 4b5bad5a49..3a640ae7f2 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -35,7 +35,7 @@ testing-nixl = ["dep:nixl-sys"] testing-etcd = [] block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:ndarray", "dep:nix"] cuda = ["dep:cudarc"] -integration = [] +integration = ["dynamo-runtime/integration"] # NOTE: This feature will be enabled once ModelExpress packages are published # model-express = ["dep:model_express_client", "dep:model_express_common"] diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 073372f7a6..3a3f165ce7 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -20,6 +20,7 @@ use crate::kv_router::{ scoring::LoadEvent, }; use async_trait::async_trait; +use dynamo_runtime::metrics::{MetricsRegistry, prometheus_names::kvstats}; use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher}; use dynamo_runtime::{ Error, Result, @@ -31,7 +32,7 @@ use dynamo_runtime::{ protocols::annotated::Annotated, }; use futures::stream; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -482,12 +483,77 @@ enum RawKvEvent { pub struct WorkerMetricsPublisher { tx: tokio::sync::watch::Sender>, rx: tokio::sync::watch::Receiver>, + /// Prometheus gauges for KvStats metrics + /// We use OnceLock for efficient one-time initialization and lock-free reads + /// The gauges are set once during register_prometheus_metrics and then only read + prometheus_gauges: OnceLock, +} + +struct KvStatsPrometheusGauges { + kv_active_blocks_gauge: prometheus::Gauge, + kv_total_blocks_gauge: prometheus::Gauge, + gpu_cache_usage_gauge: prometheus::Gauge, + gpu_prefix_cache_hit_rate_gauge: prometheus::Gauge, +} + +impl KvStatsPrometheusGauges { + /// Create a new KvStatsPrometheusGauges instance with all metrics registered + fn new(component: &Component) -> Result { + let kv_active_blocks_gauge = component.create_gauge( + kvstats::ACTIVE_BLOCKS, + "Number of active KV cache blocks currently in use", + &[], + )?; + + let kv_total_blocks_gauge = component.create_gauge( + kvstats::TOTAL_BLOCKS, + "Total number of KV cache blocks available", + &[], + )?; + + let gpu_cache_usage_gauge = component.create_gauge( + kvstats::GPU_CACHE_USAGE_PERCENT, + "GPU cache usage as a percentage (0.0-1.0)", + &[], + )?; + + let gpu_prefix_cache_hit_rate_gauge = component.create_gauge( + kvstats::GPU_PREFIX_CACHE_HIT_RATE, + "GPU prefix cache hit rate as a percentage (0.0-1.0)", + &[], + )?; + + tracing::info!("Registered KvStats Prometheus metrics"); + + Ok(KvStatsPrometheusGauges { + kv_active_blocks_gauge, + kv_total_blocks_gauge, + gpu_cache_usage_gauge, + gpu_prefix_cache_hit_rate_gauge, + }) + } + + /// Update all gauges with values from KvStats + fn update_from_kvstats(&self, kv_stats: &KvStats) { + self.kv_active_blocks_gauge + .set(kv_stats.kv_active_blocks as f64); + self.kv_total_blocks_gauge + .set(kv_stats.kv_total_blocks as f64); + self.gpu_cache_usage_gauge + .set(kv_stats.gpu_cache_usage_perc as f64); + self.gpu_prefix_cache_hit_rate_gauge + .set(kv_stats.gpu_prefix_cache_hit_rate as f64); + } } impl WorkerMetricsPublisher { pub fn new() -> Result { let (tx, rx) = tokio::sync::watch::channel(Arc::new(ForwardPassMetrics::default())); - Ok(WorkerMetricsPublisher { tx, rx }) + Ok(WorkerMetricsPublisher { + tx, + rx, + prometheus_gauges: OnceLock::new(), + }) } pub fn publish( @@ -495,9 +561,27 @@ impl WorkerMetricsPublisher { metrics: Arc, ) -> Result<(), tokio::sync::watch::error::SendError>> { tracing::trace!("Publish metrics: {metrics:?}"); + + // Update Prometheus gauges - OnceLock provides lock-free reads after initialization + // This is the hot path - we only read the Arc, no locking overhead + if let Some(gauges) = self.prometheus_gauges.get() { + gauges.update_from_kvstats(&metrics.kv_stats); + } + self.tx.send(metrics) } + /// Register KvStats Prometheus metrics with the component's registry + pub fn register_prometheus_metrics(&self, component: &Component) -> Result<()> { + // Use get_or_init for thread-safe one-time initialization + // This will only initialize once, subsequent calls will return immediately + self.prometheus_gauges.get_or_init(|| { + KvStatsPrometheusGauges::new(component).expect("Failed to create Prometheus gauges") + }); + + Ok(()) + } + pub async fn create_endpoint( &self, component: Component, @@ -981,21 +1065,20 @@ mod test_exponential_backoff { } } -#[cfg(test)] -mod test_worker_metrics_publisher { +#[cfg(all(test, feature = "integration"))] +mod test_integration_publisher { use super::*; use crate::kv_router::protocols::{ForwardPassMetrics, KvStats, WorkerStats}; - use dynamo_runtime::traits::events::EventSubscriber; // Add this import - use dynamo_runtime::{DistributedRuntime, Runtime}; + use dynamo_runtime::distributed_test_utils::create_test_drt_async; + use dynamo_runtime::traits::events::EventSubscriber; use futures::StreamExt; #[tokio::test] - #[ignore] // Mark as ignored as requested + #[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS async fn test_metrics_publishing_behavior() -> Result<()> { // Set up runtime and namespace - let rt = Runtime::from_current().unwrap(); - let drt = DistributedRuntime::from_settings(rt.clone()).await?; - let namespace = drt.namespace("test".to_string())?; + let drt = create_test_drt_async().await; + let namespace = drt.namespace("ns2001".to_string())?; // Create a subscriber for the metrics events using subscribe_with_type let mut subscriber = namespace @@ -1088,8 +1171,92 @@ mod test_worker_metrics_publisher { "Expected no messages when load metrics don't change" ); - rt.shutdown(); + drt.shutdown(); Ok(()) } + + #[tokio::test] + #[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS + async fn test_kvstats_prometheus_gauge_updates() { + use crate::kv_router::publisher::kvstats; + use dynamo_runtime::metrics::MetricsRegistry; + + // Test that publish() updates Prometheus gauges correctly using real Component + let publisher = WorkerMetricsPublisher::new().unwrap(); + + // Create a real DRT and component for integration testing + let drt = create_test_drt_async().await; + let namespace = drt.namespace("ns2002".to_string()).unwrap(); + let component = namespace.component("comp2002".to_string()).unwrap(); + + // Register Prometheus metrics using the real constructor + publisher.register_prometheus_metrics(&component).unwrap(); + + // Get references to the gauges for testing + let gauges = publisher.prometheus_gauges.get().unwrap(); + let active_blocks_gauge = gauges.kv_active_blocks_gauge.clone(); + let total_blocks_gauge = gauges.kv_total_blocks_gauge.clone(); + let cache_usage_gauge = gauges.gpu_cache_usage_gauge.clone(); + let hit_rate_gauge = gauges.gpu_prefix_cache_hit_rate_gauge.clone(); + + // Create test metrics with specific values + let test_metrics = Arc::new(ForwardPassMetrics { + worker_stats: WorkerStats { + data_parallel_rank: None, + request_active_slots: 5, + request_total_slots: 100, + num_requests_waiting: 2, + }, + kv_stats: KvStats { + kv_active_blocks: 42, + kv_total_blocks: 12894, + gpu_cache_usage_perc: 0.5, + gpu_prefix_cache_hit_rate: 0.75, + }, + spec_decode_stats: None, + }); + + // Test 1: Initial gauge values should be 0 + assert_eq!(active_blocks_gauge.get(), 0.0); + assert_eq!(total_blocks_gauge.get(), 0.0); + assert_eq!(cache_usage_gauge.get(), 0.0); + assert_eq!(hit_rate_gauge.get(), 0.0); + + // Test 2: publish() should update all gauges with correct values + let result = publisher.publish(test_metrics); + assert!(result.is_ok()); + + // Test 3: Verify gauges were updated correctly + assert_eq!(active_blocks_gauge.get(), 42.0); + assert_eq!(total_blocks_gauge.get(), 12894.0); + assert_eq!(cache_usage_gauge.get(), 0.5); + assert_eq!(hit_rate_gauge.get(), 0.75); + + // Test 4: Verify metrics are properly registered in the component's registry + // Component implements MetricsRegistry trait which provides prometheus_metrics_fmt() + let prometheus_output = component.prometheus_metrics_fmt().unwrap(); + + // Verify metric names are present + assert!(prometheus_output.contains(kvstats::ACTIVE_BLOCKS)); + assert!(prometheus_output.contains(kvstats::TOTAL_BLOCKS)); + assert!(prometheus_output.contains(kvstats::GPU_CACHE_USAGE_PERCENT)); + assert!(prometheus_output.contains(kvstats::GPU_PREFIX_CACHE_HIT_RATE)); + + // Test 5: Verify the prometheus output contains the actual values + // Print the output to debug format issues + println!("Prometheus output:\n{}", prometheus_output); + + // Check for metric values - the format includes labels so we need to be more flexible + assert!(prometheus_output.contains("kvstats_active_blocks")); + assert!(prometheus_output.contains("42")); // The value should be there + assert!(prometheus_output.contains("kvstats_total_blocks")); + assert!(prometheus_output.contains("12894")); // The value should be there + assert!(prometheus_output.contains("kvstats_gpu_cache_usage_percent")); + assert!(prometheus_output.contains("kvstats_gpu_prefix_cache_hit_rate")); + + println!( + "✅ KvStatsPrometheusGauges constructor and publish() work correctly with real Component" + ); + } } diff --git a/lib/llm/src/mocker/scheduler.rs b/lib/llm/src/mocker/scheduler.rs index 72bab3366f..a9094a4272 100644 --- a/lib/llm/src/mocker/scheduler.rs +++ b/lib/llm/src/mocker/scheduler.rs @@ -787,7 +787,7 @@ mod tests { // Manual debug ticker that prints forward pass metrics _ = debug_interval.tick() => { let _metrics = metrics_rx.borrow().clone(); - println!("Forward Pass Metrics: {_metrics:#?}"); + tracing::debug!("Forward Pass Metrics: {_metrics:#?}"); } Some(_) = output_rx.recv() => { @@ -891,7 +891,7 @@ mod tests { // Manual debug ticker that prints forward pass metrics _ = debug_interval.tick() => { let _metrics = metrics_rx.borrow().clone(); - println!("Forward Pass Metrics: {_metrics:#?}"); + tracing::debug!("Forward Pass Metrics: {_metrics:#?}"); } Some(_signal) = output_rx.recv() => { diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 27e9c49a1f..d617eda380 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -345,12 +345,11 @@ impl DistributedConfig { } } -#[cfg(test)] pub mod distributed_test_utils { //! Common test helper functions for DistributedRuntime tests // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available. - /// Helper function to create a DRT instance for tests + /// Helper function to create a DRT instance for integration-only tests. /// Uses from_current to leverage existing tokio runtime /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery #[cfg(feature = "integration")] @@ -362,8 +361,7 @@ pub mod distributed_test_utils { } } -#[cfg(feature = "integration")] -#[cfg(test)] +#[cfg(all(test, feature = "integration"))] mod tests { use super::distributed_test_utils::create_test_drt_async; diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 58b6299a68..096d4d4615 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -56,6 +56,7 @@ pub mod utils; pub mod worker; pub mod distributed; +pub use distributed::distributed_test_utils; pub use futures::stream; pub use tokio_util::sync::CancellationToken; pub use worker::Worker; diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index ec069523d0..084b65341a 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -246,6 +246,39 @@ pub mod kvbm_connector { pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker"; } +/// KvStats metrics from LLM workers +pub mod kvstats { + /// Macro to generate KvStats metric names with the prefix + macro_rules! kvstats_name { + ($name:expr) => { + concat!("kvstats_", $name) + }; + } + + /// Prefix for all KvStats metrics + pub const PREFIX: &str = kvstats_name!(""); + + /// Number of active KV cache blocks currently in use + pub const ACTIVE_BLOCKS: &str = kvstats_name!("active_blocks"); + + /// Total number of KV cache blocks available + pub const TOTAL_BLOCKS: &str = kvstats_name!("total_blocks"); + + /// GPU cache usage as a percentage (0.0-1.0) + pub const GPU_CACHE_USAGE_PERCENT: &str = kvstats_name!("gpu_cache_usage_percent"); + + /// GPU prefix cache hit rate as a percentage (0.0-1.0) + pub const GPU_PREFIX_CACHE_HIT_RATE: &str = kvstats_name!("gpu_prefix_cache_hit_rate"); +} + +/// All KvStats Prometheus metric names as an array for iteration/validation +pub const KVSTATS_METRICS: &[&str] = &[ + kvstats::ACTIVE_BLOCKS, + kvstats::TOTAL_BLOCKS, + kvstats::GPU_CACHE_USAGE_PERCENT, + kvstats::GPU_PREFIX_CACHE_HIT_RATE, +]; + // Shared regex patterns for Prometheus sanitization static METRIC_INVALID_CHARS_PATTERN: Lazy = Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap());