From b23db3ca1f3ab48eb39f73fe9cc38d04f872c051 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Tue, 26 Aug 2025 02:44:13 +0000 Subject: [PATCH 01/14] feat: add Prometheus metrics integration for KvStats - Add KvStatsPrometheusGauges struct to manage Prometheus gauges for KV cache metrics - Integrate gauge updates into WorkerMetricsPublisher.publish() method - Register metrics with Component's MetricsRegistry during initialization - Add kvstats metric name constants in prometheus_names.rs - Use RwLock for performance optimization in hot path - Add integration test for KvStats Prometheus metrics - Move test utilities to common.rs for reusability Signed-off-by: Keiven Chang --- lib/bindings/python/rust/llm/kv.rs | 5 + lib/llm/src/common.rs | 15 ++ lib/llm/src/kv_router/publisher.rs | 197 +++++++++++++++++++- lib/runtime/src/metrics/prometheus_names.rs | 33 ++++ 4 files changed, 249 insertions(+), 1 deletion(-) diff --git a/lib/bindings/python/rust/llm/kv.rs b/lib/bindings/python/rust/llm/kv.rs index 8b503b2dd7..fdd235a7b9 100644 --- a/lib/bindings/python/rust/llm/kv.rs +++ b/lib/bindings/python/rust/llm/kv.rs @@ -85,6 +85,11 @@ impl WorkerMetricsPublisher { None }; + // Register Prometheus metrics first + rs_publisher + .register_prometheus_metrics(&rs_component) + .map_err(to_pyerr)?; + rs_publisher .create_endpoint(rs_component, metrics_labels_ref.as_deref()) .await diff --git a/lib/llm/src/common.rs b/lib/llm/src/common.rs index 2d97064bd1..5296dd283e 100644 --- a/lib/llm/src/common.rs +++ b/lib/llm/src/common.rs @@ -15,3 +15,18 @@ pub mod dtype; pub mod versioned; + +/// Test utilities for integration tests +#[cfg(all(test, feature = "integration"))] +pub mod test_utils { + use dynamo_runtime::{DistributedRuntime, Runtime}; + + /// Creates a test DistributedRuntime for integration testing. + /// Uses from_current to leverage the existing tokio runtime in tests. + pub async fn create_test_drt_async() -> DistributedRuntime { + let runtime = Runtime::from_current().expect("Failed to get current runtime"); + DistributedRuntime::from_settings_without_discovery(runtime) + .await + .expect("Failed to create test DRT") + } +} diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 073372f7a6..1189fccc8e 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -20,6 +20,7 @@ use crate::kv_router::{ scoring::LoadEvent, }; use async_trait::async_trait; +use dynamo_runtime::metrics::{MetricsRegistry, prometheus_names::kvstats}; use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher}; use dynamo_runtime::{ Error, Result, @@ -482,12 +483,83 @@ enum RawKvEvent { pub struct WorkerMetricsPublisher { tx: tokio::sync::watch::Sender>, rx: tokio::sync::watch::Receiver>, + // Prometheus metrics for KvStats (wrapped in Mutex for thread-safe access) + /// Prometheus gauges for KvStats metrics + /// We use Option> for lock-free reads after initialization + /// The Arc is set once during register_prometheus_metrics and then only read + prometheus_gauges: std::sync::RwLock>>, +} + +#[derive(Default)] +struct KvStatsPrometheusGauges { + kv_active_blocks_gauge: Option, + kv_total_blocks_gauge: Option, + gpu_cache_usage_gauge: Option, + gpu_prefix_cache_hit_rate_gauge: Option, +} + +impl KvStatsPrometheusGauges { + /// Create a new KvStatsPrometheusGauges instance with all metrics registered + fn new(component: &Component) -> Result { + let kv_active_blocks_gauge = Some(component.create_gauge( + kvstats::ACTIVE_BLOCKS, + "Number of active KV cache blocks currently in use", + &[], + )?); + + let kv_total_blocks_gauge = Some(component.create_gauge( + kvstats::TOTAL_BLOCKS, + "Total number of KV cache blocks available", + &[], + )?); + + let gpu_cache_usage_gauge = Some(component.create_gauge( + kvstats::GPU_CACHE_USAGE_PERCENT, + "GPU cache usage as a percentage (0.0-1.0)", + &[], + )?); + + let gpu_prefix_cache_hit_rate_gauge = Some(component.create_gauge( + kvstats::GPU_PREFIX_CACHE_HIT_RATE, + "GPU prefix cache hit rate as a percentage (0.0-1.0)", + &[], + )?); + + tracing::info!("Registered KvStats Prometheus metrics"); + + Ok(KvStatsPrometheusGauges { + kv_active_blocks_gauge, + kv_total_blocks_gauge, + gpu_cache_usage_gauge, + gpu_prefix_cache_hit_rate_gauge, + }) + } + + /// Update all gauges with values from KvStats + fn update_from_kvstats(&self, kv_stats: &KvStats) { + if let Some(gauge) = &self.kv_active_blocks_gauge { + gauge.set(kv_stats.kv_active_blocks as f64); + } + if let Some(gauge) = &self.kv_total_blocks_gauge { + gauge.set(kv_stats.kv_total_blocks as f64); + } + if let Some(gauge) = &self.gpu_cache_usage_gauge { + gauge.set(kv_stats.gpu_cache_usage_perc as f64); + } + if let Some(gauge) = &self.gpu_prefix_cache_hit_rate_gauge { + gauge.set(kv_stats.gpu_prefix_cache_hit_rate as f64); + } + } } impl WorkerMetricsPublisher { pub fn new() -> Result { let (tx, rx) = tokio::sync::watch::channel(Arc::new(ForwardPassMetrics::default())); - Ok(WorkerMetricsPublisher { tx, rx }) + Ok(WorkerMetricsPublisher { + tx, + rx, + prometheus_gauges: std::sync::RwLock::new(None), + }) } pub fn publish( @@ -495,9 +567,35 @@ impl WorkerMetricsPublisher { metrics: Arc, ) -> Result<(), tokio::sync::watch::error::SendError>> { tracing::trace!("Publish metrics: {metrics:?}"); + + // Update Prometheus gauges using read lock for better performance + // This is the hot path - we only read the Arc, no writes needed + #[allow(clippy::collapsible_if)] // Necessary to keep guard alive + if let Ok(guard) = self.prometheus_gauges.read() { + if let Some(gauges) = guard.as_ref() { + gauges.update_from_kvstats(&metrics.kv_stats); + } + } + self.tx.send(metrics) } + /// Register KvStats Prometheus metrics with the component's registry + pub fn register_prometheus_metrics(&self, component: &Component) -> Result<()> { + // Create new KvStatsPrometheusGauges with registered metrics + let new_gauges = Arc::new(KvStatsPrometheusGauges::new(component)?); + + // Use write lock only for initialization (happens once) + if let Ok(mut gauges) = self.prometheus_gauges.write() { + *gauges = Some(new_gauges); + Ok(()) + } else { + Err(anyhow::anyhow!( + "Failed to acquire write lock on prometheus_gauges" + )) + } + } + pub async fn create_endpoint( &self, component: Component, @@ -1093,3 +1191,100 @@ mod test_worker_metrics_publisher { Ok(()) } } + +#[cfg(all(test, feature = "integration"))] +mod integration_tests { + use { + super::{ForwardPassMetrics, KvStats, WorkerMetricsPublisher, WorkerStats, kvstats}, + dynamo_runtime::metrics::MetricsRegistry, + std::sync::Arc, + }; + + #[tokio::test] + async fn test_kvstats_prometheus_gauge_updates() { + use crate::common::test_utils::create_test_drt_async; + + // Test that publish() updates Prometheus gauges correctly using real Component + let publisher = WorkerMetricsPublisher::new().unwrap(); + + // Create a real DRT and component for integration testing + let drt = std::sync::Arc::new(create_test_drt_async().await); + let namespace = drt.namespace("test_kvstats").unwrap(); + let component = namespace.component("test_publisher").unwrap(); + + // Register Prometheus metrics using the real constructor + publisher.register_prometheus_metrics(&component).unwrap(); + + // Get references to the gauges for testing + let gauges_guard = publisher.prometheus_gauges.read().unwrap(); + let gauges = gauges_guard.as_ref().unwrap(); + let active_blocks_gauge = gauges.kv_active_blocks_gauge.as_ref().unwrap().clone(); + let total_blocks_gauge = gauges.kv_total_blocks_gauge.as_ref().unwrap().clone(); + let cache_usage_gauge = gauges.gpu_cache_usage_gauge.as_ref().unwrap().clone(); + let hit_rate_gauge = gauges + .gpu_prefix_cache_hit_rate_gauge + .as_ref() + .unwrap() + .clone(); + drop(gauges_guard); // Release the read lock + + // Create test metrics with specific values + let test_metrics = Arc::new(ForwardPassMetrics { + worker_stats: WorkerStats { + data_parallel_rank: None, + request_active_slots: 5, + request_total_slots: 100, + num_requests_waiting: 2, + }, + kv_stats: KvStats { + kv_active_blocks: 42, + kv_total_blocks: 12894, + gpu_cache_usage_perc: 0.5, + gpu_prefix_cache_hit_rate: 0.75, + }, + spec_decode_stats: None, + }); + + // Test 1: Initial gauge values should be 0 + assert_eq!(active_blocks_gauge.get(), 0.0); + assert_eq!(total_blocks_gauge.get(), 0.0); + assert_eq!(cache_usage_gauge.get(), 0.0); + assert_eq!(hit_rate_gauge.get(), 0.0); + + // Test 2: publish() should update all gauges with correct values + let result = publisher.publish(test_metrics); + assert!(result.is_ok()); + + // Test 3: Verify gauges were updated correctly + assert_eq!(active_blocks_gauge.get(), 42.0); + assert_eq!(total_blocks_gauge.get(), 12894.0); + assert_eq!(cache_usage_gauge.get(), 0.5); + assert_eq!(hit_rate_gauge.get(), 0.75); + + // Test 4: Verify metrics are properly registered in the component's registry + // Component implements MetricsRegistry trait which provides prometheus_metrics_fmt() + let prometheus_output = component.prometheus_metrics_fmt().unwrap(); + + // Verify metric names are present + assert!(prometheus_output.contains(kvstats::ACTIVE_BLOCKS)); + assert!(prometheus_output.contains(kvstats::TOTAL_BLOCKS)); + assert!(prometheus_output.contains(kvstats::GPU_CACHE_USAGE_PERCENT)); + assert!(prometheus_output.contains(kvstats::GPU_PREFIX_CACHE_HIT_RATE)); + + // Test 5: Verify the prometheus output contains the actual values + // Print the output to debug format issues + println!("Prometheus output:\n{}", prometheus_output); + + // Check for metric values - the format includes labels so we need to be more flexible + assert!(prometheus_output.contains("kvstats_active_blocks")); + assert!(prometheus_output.contains("42")); // The value should be there + assert!(prometheus_output.contains("kvstats_total_blocks")); + assert!(prometheus_output.contains("12894")); // The value should be there + assert!(prometheus_output.contains("kvstats_gpu_cache_usage_percent")); + assert!(prometheus_output.contains("kvstats_gpu_prefix_cache_hit_rate")); + + println!( + "✅ KvStatsPrometheusGauges constructor and publish() work correctly with real Component" + ); + } +} diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index 87bbac924f..d5bcc1feec 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -141,3 +141,36 @@ pub mod kvbm_connector { /// KVBM connector worker pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker"; } + +/// KvStats metrics from LLM workers +pub mod kvstats { + /// Macro to generate KvStats metric names with the prefix + macro_rules! kvstats_name { + ($name:expr) => { + concat!("kvstats_", $name) + }; + } + + /// Prefix for all KvStats metrics + pub const PREFIX: &str = kvstats_name!(""); + + /// Number of active KV cache blocks currently in use + pub const ACTIVE_BLOCKS: &str = kvstats_name!("active_blocks"); + + /// Total number of KV cache blocks available + pub const TOTAL_BLOCKS: &str = kvstats_name!("total_blocks"); + + /// GPU cache usage as a percentage (0.0-1.0) + pub const GPU_CACHE_USAGE_PERCENT: &str = kvstats_name!("gpu_cache_usage_percent"); + + /// GPU prefix cache hit rate as a percentage (0.0-1.0) + pub const GPU_PREFIX_CACHE_HIT_RATE: &str = kvstats_name!("gpu_prefix_cache_hit_rate"); +} + +/// All KvStats Prometheus metric names as an array for iteration/validation +pub const KVSTATS_METRICS: &[&str] = &[ + kvstats::ACTIVE_BLOCKS, + kvstats::TOTAL_BLOCKS, + kvstats::GPU_CACHE_USAGE_PERCENT, + kvstats::GPU_PREFIX_CACHE_HIT_RATE, +]; From 9f1f4299c027ef468d2df1f16fb0ff5f21ee9c0c Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Tue, 26 Aug 2025 19:26:04 +0000 Subject: [PATCH 02/14] refactor: make register_prometheus_metrics idempotent Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 1189fccc8e..1bed6a9515 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -582,18 +582,17 @@ impl WorkerMetricsPublisher { /// Register KvStats Prometheus metrics with the component's registry pub fn register_prometheus_metrics(&self, component: &Component) -> Result<()> { - // Create new KvStatsPrometheusGauges with registered metrics - let new_gauges = Arc::new(KvStatsPrometheusGauges::new(component)?); - - // Use write lock only for initialization (happens once) - if let Ok(mut gauges) = self.prometheus_gauges.write() { - *gauges = Some(new_gauges); - Ok(()) - } else { - Err(anyhow::anyhow!( - "Failed to acquire write lock on prometheus_gauges" - )) + let mut gauges = self + .prometheus_gauges + .write() + .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on prometheus_gauges"))?; + + // Only initialize if not already done + if gauges.is_none() { + *gauges = Some(Arc::new(KvStatsPrometheusGauges::new(component)?)); } + + Ok(()) } pub async fn create_endpoint( From 2d93dcb27bca693ad85f31cbad6518446bba5b48 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Wed, 27 Aug 2025 00:07:12 +0000 Subject: [PATCH 03/14] add integration feature so it does not run on CI Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 1bed6a9515..97c5abbf39 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -1199,6 +1199,7 @@ mod integration_tests { std::sync::Arc, }; + #[cfg(feature = "integration")] #[tokio::test] async fn test_kvstats_prometheus_gauge_updates() { use crate::common::test_utils::create_test_drt_async; From 34599ece029dcc82217342cfa657082d16b2e8ae Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Wed, 27 Aug 2025 01:12:35 +0000 Subject: [PATCH 04/14] refactor: replace println with tracing::debug for metrics output Signed-off-by: Keiven Chang --- lib/llm/src/mocker/scheduler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/llm/src/mocker/scheduler.rs b/lib/llm/src/mocker/scheduler.rs index 72bab3366f..a9094a4272 100644 --- a/lib/llm/src/mocker/scheduler.rs +++ b/lib/llm/src/mocker/scheduler.rs @@ -787,7 +787,7 @@ mod tests { // Manual debug ticker that prints forward pass metrics _ = debug_interval.tick() => { let _metrics = metrics_rx.borrow().clone(); - println!("Forward Pass Metrics: {_metrics:#?}"); + tracing::debug!("Forward Pass Metrics: {_metrics:#?}"); } Some(_) = output_rx.recv() => { @@ -891,7 +891,7 @@ mod tests { // Manual debug ticker that prints forward pass metrics _ = debug_interval.tick() => { let _metrics = metrics_rx.borrow().clone(); - println!("Forward Pass Metrics: {_metrics:#?}"); + tracing::debug!("Forward Pass Metrics: {_metrics:#?}"); } Some(_signal) = output_rx.recv() => { From 9496bd54c6f8479880a37d6163a4009e32fa0d56 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Wed, 27 Aug 2025 15:20:18 +0000 Subject: [PATCH 05/14] ignore test_kvstats_prometheus_gauge_updates Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 97c5abbf39..59a2733a9b 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -1191,7 +1191,8 @@ mod test_worker_metrics_publisher { } } -#[cfg(all(test, feature = "integration"))] +#[cfg(feature = "integration")] +#[cfg(test)] mod integration_tests { use { super::{ForwardPassMetrics, KvStats, WorkerMetricsPublisher, WorkerStats, kvstats}, @@ -1199,7 +1200,7 @@ mod integration_tests { std::sync::Arc, }; - #[cfg(feature = "integration")] + #[ignore] // Requires NATS server to be running but keeps getting invoked on CI #[tokio::test] async fn test_kvstats_prometheus_gauge_updates() { use crate::common::test_utils::create_test_drt_async; From 28281a746a9ded3fd404609ecbf7d0fae0c20504 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Wed, 27 Aug 2025 15:43:46 +0000 Subject: [PATCH 06/14] minor re-org of test groups Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 59a2733a9b..80a3bfcd65 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -1087,12 +1087,11 @@ mod test_worker_metrics_publisher { use futures::StreamExt; #[tokio::test] - #[ignore] // Mark as ignored as requested async fn test_metrics_publishing_behavior() -> Result<()> { // Set up runtime and namespace let rt = Runtime::from_current().unwrap(); let drt = DistributedRuntime::from_settings(rt.clone()).await?; - let namespace = drt.namespace("test".to_string())?; + let namespace = drt.namespace("ns2001".to_string())?; // Create a subscriber for the metrics events using subscribe_with_type let mut subscriber = namespace @@ -1193,14 +1192,15 @@ mod test_worker_metrics_publisher { #[cfg(feature = "integration")] #[cfg(test)] -mod integration_tests { +mod test_kvstats_prometheus_gauge_updates { use { - super::{ForwardPassMetrics, KvStats, WorkerMetricsPublisher, WorkerStats, kvstats}, + crate::kv_router::publisher::{ + ForwardPassMetrics, KvStats, WorkerMetricsPublisher, WorkerStats, kvstats, + }, dynamo_runtime::metrics::MetricsRegistry, std::sync::Arc, }; - #[ignore] // Requires NATS server to be running but keeps getting invoked on CI #[tokio::test] async fn test_kvstats_prometheus_gauge_updates() { use crate::common::test_utils::create_test_drt_async; @@ -1210,8 +1210,8 @@ mod integration_tests { // Create a real DRT and component for integration testing let drt = std::sync::Arc::new(create_test_drt_async().await); - let namespace = drt.namespace("test_kvstats").unwrap(); - let component = namespace.component("test_publisher").unwrap(); + let namespace = drt.namespace("ns2002").unwrap(); + let component = namespace.component("comp2002").unwrap(); // Register Prometheus metrics using the real constructor publisher.register_prometheus_metrics(&component).unwrap(); From ae56f652f9fe07624f640a34e8a49d491ff5126a Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Wed, 27 Aug 2025 23:19:03 +0000 Subject: [PATCH 07/14] consolidate create_test_drt_async Signed-off-by: Keiven Chang --- lib/llm/src/common.rs | 15 --------------- lib/llm/src/kv_router/publisher.rs | 2 +- lib/runtime/src/distributed.rs | 3 +-- lib/runtime/src/lib.rs | 3 +++ 4 files changed, 5 insertions(+), 18 deletions(-) diff --git a/lib/llm/src/common.rs b/lib/llm/src/common.rs index 5296dd283e..2d97064bd1 100644 --- a/lib/llm/src/common.rs +++ b/lib/llm/src/common.rs @@ -15,18 +15,3 @@ pub mod dtype; pub mod versioned; - -/// Test utilities for integration tests -#[cfg(all(test, feature = "integration"))] -pub mod test_utils { - use dynamo_runtime::{DistributedRuntime, Runtime}; - - /// Creates a test DistributedRuntime for integration testing. - /// Uses from_current to leverage the existing tokio runtime in tests. - pub async fn create_test_drt_async() -> DistributedRuntime { - let runtime = Runtime::from_current().expect("Failed to get current runtime"); - DistributedRuntime::from_settings_without_discovery(runtime) - .await - .expect("Failed to create test DRT") - } -} diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 80a3bfcd65..1a6a4293da 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -1203,7 +1203,7 @@ mod test_kvstats_prometheus_gauge_updates { #[tokio::test] async fn test_kvstats_prometheus_gauge_updates() { - use crate::common::test_utils::create_test_drt_async; + use dynamo_runtime::distributed_test_utils::create_test_drt_async; // Test that publish() updates Prometheus gauges correctly using real Component let publisher = WorkerMetricsPublisher::new().unwrap(); diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 27e9c49a1f..692b268c47 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -345,7 +345,7 @@ impl DistributedConfig { } } -#[cfg(test)] +#[cfg(any(test, feature = "integration"))] pub mod distributed_test_utils { //! Common test helper functions for DistributedRuntime tests // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available. @@ -353,7 +353,6 @@ pub mod distributed_test_utils { /// Helper function to create a DRT instance for tests /// Uses from_current to leverage existing tokio runtime /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery - #[cfg(feature = "integration")] pub async fn create_test_drt_async() -> crate::DistributedRuntime { let rt = crate::Runtime::from_current().unwrap(); crate::DistributedRuntime::from_settings_without_discovery(rt) diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 6b3fb200ed..4d96220d02 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -56,6 +56,9 @@ pub mod utils; pub mod worker; pub mod distributed; +// Re-export test utilities at crate root so external crates can use them with the integration feature +#[cfg(any(test, feature = "integration"))] +pub use distributed::distributed_test_utils; pub use futures::stream; pub use tokio_util::sync::CancellationToken; pub use worker::Worker; From 3a8acad85a53ef08acfe5e4adfa075f165da3401 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Thu, 28 Aug 2025 04:24:25 +0000 Subject: [PATCH 08/14] reorganize the tests Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 1a6a4293da..6c5d07ffe3 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -1078,19 +1078,18 @@ mod test_exponential_backoff { } } -#[cfg(test)] -mod test_worker_metrics_publisher { +#[cfg(all(test, feature = "integration"))] +mod test_integration_publisher { use super::*; use crate::kv_router::protocols::{ForwardPassMetrics, KvStats, WorkerStats}; - use dynamo_runtime::traits::events::EventSubscriber; // Add this import - use dynamo_runtime::{DistributedRuntime, Runtime}; + use dynamo_runtime::distributed_test_utils::create_test_drt_async; + use dynamo_runtime::traits::events::EventSubscriber; use futures::StreamExt; #[tokio::test] async fn test_metrics_publishing_behavior() -> Result<()> { // Set up runtime and namespace - let rt = Runtime::from_current().unwrap(); - let drt = DistributedRuntime::from_settings(rt.clone()).await?; + let drt = create_test_drt_async().await; let namespace = drt.namespace("ns2001".to_string())?; // Create a subscriber for the metrics events using subscribe_with_type @@ -1184,26 +1183,15 @@ mod test_worker_metrics_publisher { "Expected no messages when load metrics don't change" ); - rt.shutdown(); + drt.shutdown(); Ok(()) } -} - -#[cfg(feature = "integration")] -#[cfg(test)] -mod test_kvstats_prometheus_gauge_updates { - use { - crate::kv_router::publisher::{ - ForwardPassMetrics, KvStats, WorkerMetricsPublisher, WorkerStats, kvstats, - }, - dynamo_runtime::metrics::MetricsRegistry, - std::sync::Arc, - }; #[tokio::test] async fn test_kvstats_prometheus_gauge_updates() { - use dynamo_runtime::distributed_test_utils::create_test_drt_async; + use crate::kv_router::publisher::kvstats; + use dynamo_runtime::metrics::MetricsRegistry; // Test that publish() updates Prometheus gauges correctly using real Component let publisher = WorkerMetricsPublisher::new().unwrap(); From 911b9e0c62405734446ce12f2b297c1feaafc494 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Thu, 28 Aug 2025 15:45:19 +0000 Subject: [PATCH 09/14] fix the create_test_drt_async feature gating Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 6 +++--- lib/runtime/src/distributed.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 6c5d07ffe3..58ca707c42 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -1197,9 +1197,9 @@ mod test_integration_publisher { let publisher = WorkerMetricsPublisher::new().unwrap(); // Create a real DRT and component for integration testing - let drt = std::sync::Arc::new(create_test_drt_async().await); - let namespace = drt.namespace("ns2002").unwrap(); - let component = namespace.component("comp2002").unwrap(); + let drt = create_test_drt_async().await; + let namespace = drt.namespace("ns2002".to_string()).unwrap(); + let component = namespace.component("comp2002".to_string()).unwrap(); // Register Prometheus metrics using the real constructor publisher.register_prometheus_metrics(&component).unwrap(); diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 692b268c47..904dafe0b3 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -345,7 +345,7 @@ impl DistributedConfig { } } -#[cfg(any(test, feature = "integration"))] +#[cfg(test)] pub mod distributed_test_utils { //! Common test helper functions for DistributedRuntime tests // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available. @@ -353,6 +353,7 @@ pub mod distributed_test_utils { /// Helper function to create a DRT instance for tests /// Uses from_current to leverage existing tokio runtime /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery + #[cfg(feature = "integration")] pub async fn create_test_drt_async() -> crate::DistributedRuntime { let rt = crate::Runtime::from_current().unwrap(); crate::DistributedRuntime::from_settings_without_discovery(rt) @@ -361,8 +362,7 @@ pub mod distributed_test_utils { } } -#[cfg(feature = "integration")] -#[cfg(test)] +#[cfg(all(test, feature = "integration"))] mod tests { use super::distributed_test_utils::create_test_drt_async; From 829039cbd3a90040832251982b101b54bcfbf500 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Thu, 28 Aug 2025 17:06:38 +0000 Subject: [PATCH 10/14] fix the create_test_drt_async imports Signed-off-by: Keiven Chang --- lib/llm/Cargo.toml | 2 +- lib/runtime/src/distributed.rs | 3 +-- lib/runtime/src/lib.rs | 2 -- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 0f9a8c08b4..9c236060d1 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -35,7 +35,7 @@ testing-nixl = ["dep:nixl-sys"] testing-etcd = [] block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:ndarray", "dep:nix"] cuda = ["dep:cudarc"] -integration = [] +integration = ["dynamo-runtime/integration"] [[bench]] name = "tokenizer" diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 904dafe0b3..d617eda380 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -345,12 +345,11 @@ impl DistributedConfig { } } -#[cfg(test)] pub mod distributed_test_utils { //! Common test helper functions for DistributedRuntime tests // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available. - /// Helper function to create a DRT instance for tests + /// Helper function to create a DRT instance for integration-only tests. /// Uses from_current to leverage existing tokio runtime /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery #[cfg(feature = "integration")] diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 4d96220d02..fdec8e1ddf 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -56,8 +56,6 @@ pub mod utils; pub mod worker; pub mod distributed; -// Re-export test utilities at crate root so external crates can use them with the integration feature -#[cfg(any(test, feature = "integration"))] pub use distributed::distributed_test_utils; pub use futures::stream; pub use tokio_util::sync::CancellationToken; From 98c77fd2ace9b2def5c70f235fc7ae7fea20f9ba Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Thu, 28 Aug 2025 17:38:38 +0000 Subject: [PATCH 11/14] add ignore to tests Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 58ca707c42..175718d3cd 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -1087,6 +1087,7 @@ mod test_integration_publisher { use futures::StreamExt; #[tokio::test] + #[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS async fn test_metrics_publishing_behavior() -> Result<()> { // Set up runtime and namespace let drt = create_test_drt_async().await; @@ -1189,6 +1190,7 @@ mod test_integration_publisher { } #[tokio::test] + #[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS async fn test_kvstats_prometheus_gauge_updates() { use crate::kv_router::publisher::kvstats; use dynamo_runtime::metrics::MetricsRegistry; From 26c932deeff28aaebd7c5e48025484f2f41eb06a Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Thu, 28 Aug 2025 18:54:31 +0000 Subject: [PATCH 12/14] fix: resolve merge conflict in prometheus_names.rs Signed-off-by: Keiven Chang --- lib/runtime/src/metrics/prometheus_names.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index 2a7d290ee8..084b65341a 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -278,7 +278,7 @@ pub const KVSTATS_METRICS: &[&str] = &[ kvstats::GPU_CACHE_USAGE_PERCENT, kvstats::GPU_PREFIX_CACHE_HIT_RATE, ]; -======= + // Shared regex patterns for Prometheus sanitization static METRIC_INVALID_CHARS_PATTERN: Lazy = Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap()); From c7d5396ada1eec4a5e0d0061403f416549140f1d Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Fri, 29 Aug 2025 00:11:30 +0000 Subject: [PATCH 13/14] refactor: use OnceLock for prometheus_gauges instead of RwLock RwLock is not necessary since gauges are initialized once and then only read. OnceLock simplifies the code and improves performance on the hot path. Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 41 +++++++++++++----------------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 175718d3cd..7d59da27e7 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -32,7 +32,7 @@ use dynamo_runtime::{ protocols::annotated::Annotated, }; use futures::stream; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -483,11 +483,10 @@ enum RawKvEvent { pub struct WorkerMetricsPublisher { tx: tokio::sync::watch::Sender>, rx: tokio::sync::watch::Receiver>, - // Prometheus metrics for KvStats (wrapped in Mutex for thread-safe access) /// Prometheus gauges for KvStats metrics - /// We use Option> for lock-free reads after initialization + /// We use OnceLock for efficient one-time initialization and lock-free reads /// The Arc is set once during register_prometheus_metrics and then only read - prometheus_gauges: std::sync::RwLock>>, + prometheus_gauges: OnceLock>, } #[derive(Default)] @@ -558,7 +557,7 @@ impl WorkerMetricsPublisher { Ok(WorkerMetricsPublisher { tx, rx, - prometheus_gauges: std::sync::RwLock::new(None), + prometheus_gauges: OnceLock::new(), }) } @@ -568,13 +567,10 @@ impl WorkerMetricsPublisher { ) -> Result<(), tokio::sync::watch::error::SendError>> { tracing::trace!("Publish metrics: {metrics:?}"); - // Update Prometheus gauges using read lock for better performance - // This is the hot path - we only read the Arc, no writes needed - #[allow(clippy::collapsible_if)] // Necessary to keep guard alive - if let Ok(guard) = self.prometheus_gauges.read() { - if let Some(gauges) = guard.as_ref() { - gauges.update_from_kvstats(&metrics.kv_stats); - } + // Update Prometheus gauges - OnceLock provides lock-free reads after initialization + // This is the hot path - we only read the Arc, no locking overhead + if let Some(gauges) = self.prometheus_gauges.get() { + gauges.update_from_kvstats(&metrics.kv_stats); } self.tx.send(metrics) @@ -582,15 +578,14 @@ impl WorkerMetricsPublisher { /// Register KvStats Prometheus metrics with the component's registry pub fn register_prometheus_metrics(&self, component: &Component) -> Result<()> { - let mut gauges = self - .prometheus_gauges - .write() - .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on prometheus_gauges"))?; - - // Only initialize if not already done - if gauges.is_none() { - *gauges = Some(Arc::new(KvStatsPrometheusGauges::new(component)?)); - } + // Use get_or_init for thread-safe one-time initialization + // This will only initialize once, subsequent calls will return immediately + self.prometheus_gauges.get_or_init(|| { + Arc::new( + KvStatsPrometheusGauges::new(component) + .expect("Failed to create Prometheus gauges"), + ) + }); Ok(()) } @@ -1207,8 +1202,7 @@ mod test_integration_publisher { publisher.register_prometheus_metrics(&component).unwrap(); // Get references to the gauges for testing - let gauges_guard = publisher.prometheus_gauges.read().unwrap(); - let gauges = gauges_guard.as_ref().unwrap(); + let gauges = publisher.prometheus_gauges.get().unwrap(); let active_blocks_gauge = gauges.kv_active_blocks_gauge.as_ref().unwrap().clone(); let total_blocks_gauge = gauges.kv_total_blocks_gauge.as_ref().unwrap().clone(); let cache_usage_gauge = gauges.gpu_cache_usage_gauge.as_ref().unwrap().clone(); @@ -1217,7 +1211,6 @@ mod test_integration_publisher { .as_ref() .unwrap() .clone(); - drop(gauges_guard); // Release the read lock // Create test metrics with specific values let test_metrics = Arc::new(ForwardPassMetrics { From fd55dda47358e7222c0d3e6d5e5df9287d6752a7 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Fri, 29 Aug 2025 02:24:40 +0000 Subject: [PATCH 14/14] refactor: simplify KvStatsPrometheusGauges Removed unnecessary Arc and Option wrappers since gauges are never shared independently and are always initialized. Signed-off-by: Keiven Chang --- lib/llm/src/kv_router/publisher.rs | 66 ++++++++++++------------------ 1 file changed, 27 insertions(+), 39 deletions(-) diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 7d59da27e7..3a3f165ce7 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -485,44 +485,43 @@ pub struct WorkerMetricsPublisher { rx: tokio::sync::watch::Receiver>, /// Prometheus gauges for KvStats metrics /// We use OnceLock for efficient one-time initialization and lock-free reads - /// The Arc is set once during register_prometheus_metrics and then only read - prometheus_gauges: OnceLock>, + /// The gauges are set once during register_prometheus_metrics and then only read + prometheus_gauges: OnceLock, } -#[derive(Default)] struct KvStatsPrometheusGauges { - kv_active_blocks_gauge: Option, - kv_total_blocks_gauge: Option, - gpu_cache_usage_gauge: Option, - gpu_prefix_cache_hit_rate_gauge: Option, + kv_active_blocks_gauge: prometheus::Gauge, + kv_total_blocks_gauge: prometheus::Gauge, + gpu_cache_usage_gauge: prometheus::Gauge, + gpu_prefix_cache_hit_rate_gauge: prometheus::Gauge, } impl KvStatsPrometheusGauges { /// Create a new KvStatsPrometheusGauges instance with all metrics registered fn new(component: &Component) -> Result { - let kv_active_blocks_gauge = Some(component.create_gauge( + let kv_active_blocks_gauge = component.create_gauge( kvstats::ACTIVE_BLOCKS, "Number of active KV cache blocks currently in use", &[], - )?); + )?; - let kv_total_blocks_gauge = Some(component.create_gauge( + let kv_total_blocks_gauge = component.create_gauge( kvstats::TOTAL_BLOCKS, "Total number of KV cache blocks available", &[], - )?); + )?; - let gpu_cache_usage_gauge = Some(component.create_gauge( + let gpu_cache_usage_gauge = component.create_gauge( kvstats::GPU_CACHE_USAGE_PERCENT, "GPU cache usage as a percentage (0.0-1.0)", &[], - )?); + )?; - let gpu_prefix_cache_hit_rate_gauge = Some(component.create_gauge( + let gpu_prefix_cache_hit_rate_gauge = component.create_gauge( kvstats::GPU_PREFIX_CACHE_HIT_RATE, "GPU prefix cache hit rate as a percentage (0.0-1.0)", &[], - )?); + )?; tracing::info!("Registered KvStats Prometheus metrics"); @@ -536,18 +535,14 @@ impl KvStatsPrometheusGauges { /// Update all gauges with values from KvStats fn update_from_kvstats(&self, kv_stats: &KvStats) { - if let Some(gauge) = &self.kv_active_blocks_gauge { - gauge.set(kv_stats.kv_active_blocks as f64); - } - if let Some(gauge) = &self.kv_total_blocks_gauge { - gauge.set(kv_stats.kv_total_blocks as f64); - } - if let Some(gauge) = &self.gpu_cache_usage_gauge { - gauge.set(kv_stats.gpu_cache_usage_perc as f64); - } - if let Some(gauge) = &self.gpu_prefix_cache_hit_rate_gauge { - gauge.set(kv_stats.gpu_prefix_cache_hit_rate as f64); - } + self.kv_active_blocks_gauge + .set(kv_stats.kv_active_blocks as f64); + self.kv_total_blocks_gauge + .set(kv_stats.kv_total_blocks as f64); + self.gpu_cache_usage_gauge + .set(kv_stats.gpu_cache_usage_perc as f64); + self.gpu_prefix_cache_hit_rate_gauge + .set(kv_stats.gpu_prefix_cache_hit_rate as f64); } } @@ -581,10 +576,7 @@ impl WorkerMetricsPublisher { // Use get_or_init for thread-safe one-time initialization // This will only initialize once, subsequent calls will return immediately self.prometheus_gauges.get_or_init(|| { - Arc::new( - KvStatsPrometheusGauges::new(component) - .expect("Failed to create Prometheus gauges"), - ) + KvStatsPrometheusGauges::new(component).expect("Failed to create Prometheus gauges") }); Ok(()) @@ -1203,14 +1195,10 @@ mod test_integration_publisher { // Get references to the gauges for testing let gauges = publisher.prometheus_gauges.get().unwrap(); - let active_blocks_gauge = gauges.kv_active_blocks_gauge.as_ref().unwrap().clone(); - let total_blocks_gauge = gauges.kv_total_blocks_gauge.as_ref().unwrap().clone(); - let cache_usage_gauge = gauges.gpu_cache_usage_gauge.as_ref().unwrap().clone(); - let hit_rate_gauge = gauges - .gpu_prefix_cache_hit_rate_gauge - .as_ref() - .unwrap() - .clone(); + let active_blocks_gauge = gauges.kv_active_blocks_gauge.clone(); + let total_blocks_gauge = gauges.kv_total_blocks_gauge.clone(); + let cache_usage_gauge = gauges.gpu_cache_usage_gauge.clone(); + let hit_rate_gauge = gauges.gpu_prefix_cache_hit_rate_gauge.clone(); // Create test metrics with specific values let test_metrics = Arc::new(ForwardPassMetrics {