Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/bindings/python/rust/llm/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ impl WorkerMetricsPublisher {
None
};

// Register Prometheus metrics first
rs_publisher
.register_prometheus_metrics(&rs_component)
.map_err(to_pyerr)?;

rs_publisher
.create_endpoint(rs_component, metrics_labels_ref.as_deref())
.await
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ testing-nixl = ["dep:nixl-sys"]
testing-etcd = []
block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:ndarray", "dep:nix"]
cuda = ["dep:cudarc"]
integration = []
integration = ["dynamo-runtime/integration"]
# NOTE: This feature will be enabled once ModelExpress packages are published
# model-express = ["dep:model_express_client", "dep:model_express_common"]

Expand Down
189 changes: 178 additions & 11 deletions lib/llm/src/kv_router/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::kv_router::{
scoring::LoadEvent,
};
use async_trait::async_trait;
use dynamo_runtime::metrics::{MetricsRegistry, prometheus_names::kvstats};
use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
use dynamo_runtime::{
Error, Result,
Expand All @@ -31,7 +32,7 @@ use dynamo_runtime::{
protocols::annotated::Annotated,
};
use futures::stream;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -482,22 +483,105 @@ enum RawKvEvent {
pub struct WorkerMetricsPublisher {
tx: tokio::sync::watch::Sender<Arc<ForwardPassMetrics>>,
rx: tokio::sync::watch::Receiver<Arc<ForwardPassMetrics>>,
/// Prometheus gauges for KvStats metrics
/// We use OnceLock for efficient one-time initialization and lock-free reads
/// The gauges are set once during register_prometheus_metrics and then only read
prometheus_gauges: OnceLock<KvStatsPrometheusGauges>,
}

struct KvStatsPrometheusGauges {
kv_active_blocks_gauge: prometheus::Gauge,
kv_total_blocks_gauge: prometheus::Gauge,
gpu_cache_usage_gauge: prometheus::Gauge,
gpu_prefix_cache_hit_rate_gauge: prometheus::Gauge,
}

impl KvStatsPrometheusGauges {
/// Create a new KvStatsPrometheusGauges instance with all metrics registered
fn new(component: &Component) -> Result<Self> {
let kv_active_blocks_gauge = component.create_gauge(
kvstats::ACTIVE_BLOCKS,
"Number of active KV cache blocks currently in use",
&[],
)?;

let kv_total_blocks_gauge = component.create_gauge(
kvstats::TOTAL_BLOCKS,
"Total number of KV cache blocks available",
&[],
)?;

let gpu_cache_usage_gauge = component.create_gauge(
kvstats::GPU_CACHE_USAGE_PERCENT,
"GPU cache usage as a percentage (0.0-1.0)",
&[],
)?;

let gpu_prefix_cache_hit_rate_gauge = component.create_gauge(
kvstats::GPU_PREFIX_CACHE_HIT_RATE,
"GPU prefix cache hit rate as a percentage (0.0-1.0)",
&[],
)?;

tracing::info!("Registered KvStats Prometheus metrics");

Ok(KvStatsPrometheusGauges {
kv_active_blocks_gauge,
kv_total_blocks_gauge,
gpu_cache_usage_gauge,
gpu_prefix_cache_hit_rate_gauge,
})
}

/// Update all gauges with values from KvStats
fn update_from_kvstats(&self, kv_stats: &KvStats) {
self.kv_active_blocks_gauge
.set(kv_stats.kv_active_blocks as f64);
self.kv_total_blocks_gauge
.set(kv_stats.kv_total_blocks as f64);
self.gpu_cache_usage_gauge
.set(kv_stats.gpu_cache_usage_perc as f64);
self.gpu_prefix_cache_hit_rate_gauge
.set(kv_stats.gpu_prefix_cache_hit_rate as f64);
}
}

impl WorkerMetricsPublisher {
pub fn new() -> Result<Self> {
let (tx, rx) = tokio::sync::watch::channel(Arc::new(ForwardPassMetrics::default()));
Ok(WorkerMetricsPublisher { tx, rx })
Ok(WorkerMetricsPublisher {
tx,
rx,
prometheus_gauges: OnceLock::new(),
})
}

pub fn publish(
&self,
metrics: Arc<ForwardPassMetrics>,
) -> Result<(), tokio::sync::watch::error::SendError<Arc<ForwardPassMetrics>>> {
tracing::trace!("Publish metrics: {metrics:?}");

// Update Prometheus gauges - OnceLock provides lock-free reads after initialization
// This is the hot path - we only read the Arc, no locking overhead
if let Some(gauges) = self.prometheus_gauges.get() {
gauges.update_from_kvstats(&metrics.kv_stats);
}

self.tx.send(metrics)
}

/// Register KvStats Prometheus metrics with the component's registry
pub fn register_prometheus_metrics(&self, component: &Component) -> Result<()> {
// Use get_or_init for thread-safe one-time initialization
// This will only initialize once, subsequent calls will return immediately
self.prometheus_gauges.get_or_init(|| {
KvStatsPrometheusGauges::new(component).expect("Failed to create Prometheus gauges")
});

Ok(())
}

pub async fn create_endpoint(
&self,
component: Component,
Expand Down Expand Up @@ -981,21 +1065,20 @@ mod test_exponential_backoff {
}
}

#[cfg(test)]
mod test_worker_metrics_publisher {
#[cfg(all(test, feature = "integration"))]
mod test_integration_publisher {
use super::*;
use crate::kv_router::protocols::{ForwardPassMetrics, KvStats, WorkerStats};
use dynamo_runtime::traits::events::EventSubscriber; // Add this import
use dynamo_runtime::{DistributedRuntime, Runtime};
use dynamo_runtime::distributed_test_utils::create_test_drt_async;
use dynamo_runtime::traits::events::EventSubscriber;
use futures::StreamExt;

#[tokio::test]
#[ignore] // Mark as ignored as requested
#[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS
async fn test_metrics_publishing_behavior() -> Result<()> {
// Set up runtime and namespace
let rt = Runtime::from_current().unwrap();
let drt = DistributedRuntime::from_settings(rt.clone()).await?;
let namespace = drt.namespace("test".to_string())?;
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns2001".to_string())?;

// Create a subscriber for the metrics events using subscribe_with_type
let mut subscriber = namespace
Expand Down Expand Up @@ -1088,8 +1171,92 @@ mod test_worker_metrics_publisher {
"Expected no messages when load metrics don't change"
);

rt.shutdown();
drt.shutdown();

Ok(())
}

#[tokio::test]
#[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS
async fn test_kvstats_prometheus_gauge_updates() {
use crate::kv_router::publisher::kvstats;
use dynamo_runtime::metrics::MetricsRegistry;

// Test that publish() updates Prometheus gauges correctly using real Component
let publisher = WorkerMetricsPublisher::new().unwrap();

// Create a real DRT and component for integration testing
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns2002".to_string()).unwrap();
let component = namespace.component("comp2002".to_string()).unwrap();

// Register Prometheus metrics using the real constructor
publisher.register_prometheus_metrics(&component).unwrap();

// Get references to the gauges for testing
let gauges = publisher.prometheus_gauges.get().unwrap();
let active_blocks_gauge = gauges.kv_active_blocks_gauge.clone();
let total_blocks_gauge = gauges.kv_total_blocks_gauge.clone();
let cache_usage_gauge = gauges.gpu_cache_usage_gauge.clone();
let hit_rate_gauge = gauges.gpu_prefix_cache_hit_rate_gauge.clone();

// Create test metrics with specific values
let test_metrics = Arc::new(ForwardPassMetrics {
worker_stats: WorkerStats {
data_parallel_rank: None,
request_active_slots: 5,
request_total_slots: 100,
num_requests_waiting: 2,
},
kv_stats: KvStats {
kv_active_blocks: 42,
kv_total_blocks: 12894,
gpu_cache_usage_perc: 0.5,
gpu_prefix_cache_hit_rate: 0.75,
},
spec_decode_stats: None,
});

// Test 1: Initial gauge values should be 0
assert_eq!(active_blocks_gauge.get(), 0.0);
assert_eq!(total_blocks_gauge.get(), 0.0);
assert_eq!(cache_usage_gauge.get(), 0.0);
assert_eq!(hit_rate_gauge.get(), 0.0);

// Test 2: publish() should update all gauges with correct values
let result = publisher.publish(test_metrics);
assert!(result.is_ok());

// Test 3: Verify gauges were updated correctly
assert_eq!(active_blocks_gauge.get(), 42.0);
assert_eq!(total_blocks_gauge.get(), 12894.0);
assert_eq!(cache_usage_gauge.get(), 0.5);
assert_eq!(hit_rate_gauge.get(), 0.75);

// Test 4: Verify metrics are properly registered in the component's registry
// Component implements MetricsRegistry trait which provides prometheus_metrics_fmt()
let prometheus_output = component.prometheus_metrics_fmt().unwrap();

// Verify metric names are present
assert!(prometheus_output.contains(kvstats::ACTIVE_BLOCKS));
assert!(prometheus_output.contains(kvstats::TOTAL_BLOCKS));
assert!(prometheus_output.contains(kvstats::GPU_CACHE_USAGE_PERCENT));
assert!(prometheus_output.contains(kvstats::GPU_PREFIX_CACHE_HIT_RATE));

// Test 5: Verify the prometheus output contains the actual values
// Print the output to debug format issues
println!("Prometheus output:\n{}", prometheus_output);

// Check for metric values - the format includes labels so we need to be more flexible
assert!(prometheus_output.contains("kvstats_active_blocks"));
assert!(prometheus_output.contains("42")); // The value should be there
assert!(prometheus_output.contains("kvstats_total_blocks"));
assert!(prometheus_output.contains("12894")); // The value should be there
assert!(prometheus_output.contains("kvstats_gpu_cache_usage_percent"));
assert!(prometheus_output.contains("kvstats_gpu_prefix_cache_hit_rate"));

println!(
"✅ KvStatsPrometheusGauges constructor and publish() work correctly with real Component"
);
}
}
4 changes: 2 additions & 2 deletions lib/llm/src/mocker/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ mod tests {
// Manual debug ticker that prints forward pass metrics
_ = debug_interval.tick() => {
let _metrics = metrics_rx.borrow().clone();
println!("Forward Pass Metrics: {_metrics:#?}");
tracing::debug!("Forward Pass Metrics: {_metrics:#?}");
}

Some(_) = output_rx.recv() => {
Expand Down Expand Up @@ -891,7 +891,7 @@ mod tests {
// Manual debug ticker that prints forward pass metrics
_ = debug_interval.tick() => {
let _metrics = metrics_rx.borrow().clone();
println!("Forward Pass Metrics: {_metrics:#?}");
tracing::debug!("Forward Pass Metrics: {_metrics:#?}");
}

Some(_signal) = output_rx.recv() => {
Expand Down
6 changes: 2 additions & 4 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,11 @@ impl DistributedConfig {
}
}

#[cfg(test)]
pub mod distributed_test_utils {
//! Common test helper functions for DistributedRuntime tests
// TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available.

/// Helper function to create a DRT instance for tests
/// Helper function to create a DRT instance for integration-only tests.
/// Uses from_current to leverage existing tokio runtime
/// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery
#[cfg(feature = "integration")]
Expand All @@ -362,8 +361,7 @@ pub mod distributed_test_utils {
}
}

#[cfg(feature = "integration")]
#[cfg(test)]
#[cfg(all(test, feature = "integration"))]
mod tests {
use super::distributed_test_utils::create_test_drt_async;

Expand Down
1 change: 1 addition & 0 deletions lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod utils;
pub mod worker;

pub mod distributed;
pub use distributed::distributed_test_utils;
pub use futures::stream;
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;
Expand Down
33 changes: 33 additions & 0 deletions lib/runtime/src/metrics/prometheus_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,39 @@ pub mod kvbm_connector {
pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker";
}

/// KvStats metrics from LLM workers
pub mod kvstats {
/// Macro to generate KvStats metric names with the prefix
macro_rules! kvstats_name {
($name:expr) => {
concat!("kvstats_", $name)
};
}

/// Prefix for all KvStats metrics
pub const PREFIX: &str = kvstats_name!("");

/// Number of active KV cache blocks currently in use
pub const ACTIVE_BLOCKS: &str = kvstats_name!("active_blocks");

/// Total number of KV cache blocks available
pub const TOTAL_BLOCKS: &str = kvstats_name!("total_blocks");

/// GPU cache usage as a percentage (0.0-1.0)
pub const GPU_CACHE_USAGE_PERCENT: &str = kvstats_name!("gpu_cache_usage_percent");

/// GPU prefix cache hit rate as a percentage (0.0-1.0)
pub const GPU_PREFIX_CACHE_HIT_RATE: &str = kvstats_name!("gpu_prefix_cache_hit_rate");
}

/// All KvStats Prometheus metric names as an array for iteration/validation
pub const KVSTATS_METRICS: &[&str] = &[
kvstats::ACTIVE_BLOCKS,
kvstats::TOTAL_BLOCKS,
kvstats::GPU_CACHE_USAGE_PERCENT,
kvstats::GPU_PREFIX_CACHE_HIT_RATE,
];

// Shared regex patterns for Prometheus sanitization
static METRIC_INVALID_CHARS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap());
Expand Down
Loading