From 0741ad24d3e1cff779bd278763972d1846d56cac Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Mon, 18 Aug 2025 22:56:32 +0000 Subject: [PATCH 1/2] refactor: consolidate system metrics and uptime tracking architecture --- lib/runtime/src/distributed.rs | 23 ++-- lib/runtime/src/lib.rs | 41 +++++++ lib/runtime/src/system_status_server.rs | 145 +++++++----------------- 3 files changed, 94 insertions(+), 115 deletions(-) diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 15559ec7ea2..a12c965a145 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -117,6 +117,13 @@ impl DistributedRuntime { }); distributed_runtime.register_metrics_callback(drt_hierarchies, nats_client_callback); + // Initialize the uptime gauge in SystemHealth + distributed_runtime + .system_health + .lock() + .unwrap() + .initialize_uptime_gauge(&distributed_runtime)?; + // Handle system status server initialization if let Some(cancel_token) = cancel_token { // System server is enabled - start both the state and HTTP server @@ -153,20 +160,8 @@ impl DistributedRuntime { } } } else { - // System server HTTP is disabled, but still create the state for metrics - // This ensures uptime_seconds metric is always registered - let system_status_state = crate::system_status_server::SystemStatusState::new( - Arc::new(distributed_runtime.clone()), - )?; - - // Initialize the start time for uptime tracking - if let Err(e) = system_status_state.initialize_start_time() { - tracing::warn!("Failed to initialize system status start time: {}", e); - } - - tracing::debug!( - "System status server HTTP endpoints disabled, but uptime metrics are being tracked" - ); + // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth + tracing::debug!("System status server HTTP endpoints disabled, but uptime metrics are being tracked"); } Ok(distributed_runtime) diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 5dc01ee7e12..f67ea9f1de5 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -21,6 +21,7 @@ use std::{ collections::HashMap, sync::{Arc, OnceLock, Weak}, + time::Instant, }; use tokio::sync::Mutex; @@ -90,6 +91,8 @@ pub struct SystemHealth { use_endpoint_health_status: Vec, health_path: String, live_path: String, + start_time: Instant, + uptime_gauge: Option, } impl SystemHealth { @@ -109,6 +112,8 @@ impl SystemHealth { use_endpoint_health_status, health_path, live_path, + start_time: Instant::now(), + uptime_gauge: None, } } pub fn set_health_status(&mut self, status: HealthStatus) { @@ -145,6 +150,42 @@ impl SystemHealth { (healthy, endpoints) } + + /// Initialize the uptime gauge using the provided metrics registry + pub fn initialize_uptime_gauge( + &mut self, + registry: &T, + ) -> anyhow::Result<()> { + match registry.create_gauge( + "uptime_seconds", + "Total uptime of the DistributedRuntime in seconds", + &[], + ) { + Ok(gauge) => { + self.uptime_gauge = Some(gauge); + Ok(()) + } + Err(e) if e.to_string().contains("Duplicate metrics") => { + // Log warning for duplicate metrics (common in tests) but don't fail + tracing::warn!("uptime_seconds metric already registered: {}", e); + // Leave uptime_gauge as None - updates will be no-ops + Ok(()) + } + Err(e) => Err(e), + } + } + + /// Get the current uptime as a Duration + pub fn uptime(&self) -> std::time::Duration { + self.start_time.elapsed() + } + + /// Update the uptime gauge with the current uptime value + pub fn update_uptime_gauge(&self) { + if let Some(ref gauge) = self.uptime_gauge { + gauge.set(self.uptime().as_secs_f64()); + } + } } /// Type alias for runtime callback functions to reduce complexity diff --git a/lib/runtime/src/system_status_server.rs b/lib/runtime/src/system_status_server.rs index b6eca4f7850..36690ae07d9 100644 --- a/lib/runtime/src/system_status_server.rs +++ b/lib/runtime/src/system_status_server.rs @@ -21,6 +21,7 @@ use axum::{Router, http::StatusCode, response::IntoResponse, routing::get}; use serde_json::json; use std::sync::{Arc, OnceLock}; use std::time::Instant; +use std::collections::HashMap; use tokio::{net::TcpListener, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tower_http::trace::TraceLayer; @@ -62,78 +63,22 @@ impl Clone for SystemStatusServerInfo { } } -/// System status server state containing metrics and uptime tracking +/// System status server state containing the distributed runtime reference pub struct SystemStatusState { // global drt registry is for printing out the entire Prometheus format output root_drt: Arc, - start_time: OnceLock, - uptime_gauge: prometheus::Gauge, } impl SystemStatusState { - /// Create new system status server state with the provided metrics registry + /// Create new system status server state with the provided distributed runtime pub fn new(drt: Arc) -> anyhow::Result { - // Note: This metric is created at the DRT level (no namespace), so it will be prefixed with "dynamo_component_" - // TODO(keiven): this is part of another upcoming refactor, where we will no longer - // have this duplicate DRT (and Duplicate metrics error). - let uptime_gauge = match drt.as_ref().create_gauge( - "uptime_seconds", - "Total uptime of the DistributedRuntime in seconds", - &[], - ) { - Ok(gauge) => gauge, - Err(e) if e.to_string().contains("Duplicate metrics") => { - // If the metric already exists, get it from the registry - // This can happen when SystemStatusState is created multiple times in tests - tracing::debug!( - "uptime_seconds metric already registered, retrieving existing metric" - ); - // Create a non-http gauge since we can't retrieve the existing one easily - // The important thing is that the metric is registered in the registry - prometheus::Gauge::new( - "uptime_seconds", - "Total uptime of the DistributedRuntime in seconds", - ) - .map_err(|e| anyhow::anyhow!("Failed to create dummy gauge: {}", e))? - } - Err(e) => return Err(e), - }; - let state = Self { - root_drt: drt, - start_time: OnceLock::new(), - uptime_gauge, - }; - Ok(state) - } - - /// Initialize the start time (can only be called once) - pub fn initialize_start_time(&self) -> Result<(), &'static str> { - self.start_time - .set(Instant::now()) - .map_err(|_| "Start time already initialized") - } - - pub fn uptime(&self) -> Result { - self.start_time - .get() - .ok_or("Start time not initialized") - .map(|start_time| start_time.elapsed()) + Ok(Self { root_drt: drt }) } /// Get a reference to the distributed runtime pub fn drt(&self) -> &crate::DistributedRuntime { &self.root_drt } - - /// Update the uptime gauge with current value - pub fn update_uptime_gauge(&self) { - if let Ok(uptime) = self.uptime() { - let uptime_seconds = uptime.as_secs_f64(); - self.uptime_gauge.set(uptime_seconds); - } else { - tracing::warn!("Failed to update uptime gauge: start time not initialized"); - } - } } /// Start system status server with metrics support @@ -143,7 +88,7 @@ pub async fn spawn_system_status_server( cancel_token: CancellationToken, drt: Arc, ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { - // Create system status server state with the provided metrics registry + // Create system status server state with the provided distributed runtime let server_state = Arc::new(SystemStatusState::new(drt)?); let health_path = server_state .drt() @@ -160,11 +105,6 @@ pub async fn spawn_system_status_server( .live_path .clone(); - // Initialize the start time - server_state - .initialize_start_time() - .map_err(|e| anyhow::anyhow!("Failed to initialize start time: {}", e))?; - let app = Router::new() .route( &health_path, @@ -230,20 +170,9 @@ pub async fn spawn_system_status_server( /// Health handler #[tracing::instrument(skip_all, level = "trace")] async fn health_handler(state: Arc) -> impl IntoResponse { - let (mut healthy, endpoints) = state - .drt() - .system_health - .lock() - .unwrap() - .get_health_status(); - let uptime = match state.uptime() { - Ok(uptime_state) => Some(uptime_state), - Err(e) => { - tracing::error!("Failed to get uptime: {}", e); - healthy = false; - None - } - }; + let system_health = state.drt().system_health.lock().unwrap(); + let (healthy, endpoints) = system_health.get_health_status(); + let uptime = Some(system_health.uptime()); let healthy_string = if healthy { "ready" } else { "notready" }; let status_code = if healthy { @@ -267,7 +196,12 @@ async fn health_handler(state: Arc) -> impl IntoResponse { #[tracing::instrument(skip_all, level = "trace")] async fn metrics_handler(state: Arc) -> impl IntoResponse { // Update the uptime gauge with current value - state.update_uptime_gauge(); + state + .drt() + .system_health + .lock() + .unwrap() + .update_uptime_gauge(); // Execute all the callbacks starting at the DistributedRuntime level assert!(state.drt().basename() == ""); @@ -342,16 +276,20 @@ mod integration_tests { use tokio::time::Duration; #[tokio::test] - async fn test_uptime_without_initialization() { - // Test that uptime returns an error if start time is not initialized + async fn test_uptime_from_system_health() { + // Test that uptime is available from SystemHealth temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async { let drt = create_test_drt_async().await; - let system_status = SystemStatusState::new(Arc::new(drt)).unwrap(); - // This should return an error because start time is not initialized - let result = system_status.uptime(); - assert!(result.is_err()); - assert_eq!(result.unwrap_err(), "Start time not initialized"); + // Get uptime from SystemHealth + let uptime = drt.system_health.lock().unwrap().uptime(); + // Uptime should exist (even if close to zero) + assert!(uptime.as_nanos() > 0 || uptime.is_zero()); + + // Sleep briefly and check uptime increases + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let uptime_after = drt.system_health.lock().unwrap().uptime(); + assert!(uptime_after > uptime); }) .await; } @@ -397,28 +335,33 @@ mod integration_tests { } #[tokio::test] - async fn test_start_time_initialization() { - // Test that start time can only be initialized once + async fn test_uptime_gauge_updates() { + // Test that the uptime gauge is properly updated and increases over time temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async { let drt = create_test_drt_async().await; - let system_status = SystemStatusState::new(Arc::new(drt)).unwrap(); - // First initialization should succeed - assert!(system_status.initialize_start_time().is_ok()); + // Get initial uptime + let initial_uptime = drt.system_health.lock().unwrap().uptime(); - // Second initialization should fail - assert!(system_status.initialize_start_time().is_err()); + // Update the gauge with initial value + drt.system_health.lock().unwrap().update_uptime_gauge(); - // Sleep for 100ms and verify uptime increases + // Sleep for 100ms tokio::time::sleep(std::time::Duration::from_millis(100)).await; - let uptime_after_sleep = system_status.uptime().unwrap(); + + // Get uptime after sleep + let uptime_after_sleep = drt.system_health.lock().unwrap().uptime(); + + // Update the gauge again + drt.system_health.lock().unwrap().update_uptime_gauge(); + + // Verify uptime increased by at least 100ms + let elapsed = uptime_after_sleep - initial_uptime; assert!( - uptime_after_sleep >= std::time::Duration::from_millis(100), - "Uptime should be at least 100ms after sleep, got: {:?}", - uptime_after_sleep + elapsed >= std::time::Duration::from_millis(100), + "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}", + elapsed ); - - // If we get here, uptime calculation works correctly }) .await; } From b973a2d05145078f14c7242e29359174a4bda375 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Sat, 23 Aug 2025 00:37:15 +0000 Subject: [PATCH 2/2] refactor: consolidate uptime tracking in SystemHealth and improve thread safety - Move uptime tracking from SystemStatusState to SystemHealth - Replace Option with OnceLock for better thread safety - Add tests for uptime functionality with system enabled/disabled - Fix clippy warning by removing unnecessary ref in pattern matching --- lib/runtime/src/distributed.rs | 64 ++++++++++++++++++++++++- lib/runtime/src/lib.rs | 28 ++++------- lib/runtime/src/metrics.rs | 6 +-- lib/runtime/src/system_status_server.rs | 22 ++++++++- 4 files changed, 95 insertions(+), 25 deletions(-) diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index a12c965a145..27e9c49a1f0 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -161,7 +161,9 @@ impl DistributedRuntime { } } else { // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth - tracing::debug!("System status server HTTP endpoints disabled, but uptime metrics are being tracked"); + tracing::debug!( + "System status server HTTP endpoints disabled, but uptime metrics are being tracked" + ); } Ok(distributed_runtime) @@ -344,7 +346,7 @@ impl DistributedConfig { } #[cfg(test)] -pub mod test_helpers { +pub mod distributed_test_utils { //! Common test helper functions for DistributedRuntime tests // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available. @@ -359,3 +361,61 @@ pub mod test_helpers { .unwrap() } } + +#[cfg(feature = "integration")] +#[cfg(test)] +mod tests { + use super::distributed_test_utils::create_test_drt_async; + + #[tokio::test] + async fn test_drt_uptime_after_delay_system_disabled() { + // Test uptime with system status server disabled + temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async { + // Start a DRT + let drt = create_test_drt_async().await; + + // Wait 50ms + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Check that uptime is 50+ ms + let uptime = drt.system_health.lock().unwrap().uptime(); + assert!( + uptime >= std::time::Duration::from_millis(50), + "Expected uptime to be at least 50ms, but got {:?}", + uptime + ); + + println!( + "✓ DRT uptime test passed (system disabled): uptime = {:?}", + uptime + ); + }) + .await; + } + + #[tokio::test] + async fn test_drt_uptime_after_delay_system_enabled() { + // Test uptime with system status server enabled + temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("true"))], async { + // Start a DRT + let drt = create_test_drt_async().await; + + // Wait 50ms + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Check that uptime is 50+ ms + let uptime = drt.system_health.lock().unwrap().uptime(); + assert!( + uptime >= std::time::Duration::from_millis(50), + "Expected uptime to be at least 50ms, but got {:?}", + uptime + ); + + println!( + "✓ DRT uptime test passed (system enabled): uptime = {:?}", + uptime + ); + }) + .await; + } +} diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index f67ea9f1de5..6b3fb200ed2 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -92,7 +92,7 @@ pub struct SystemHealth { health_path: String, live_path: String, start_time: Instant, - uptime_gauge: Option, + uptime_gauge: OnceLock, } impl SystemHealth { @@ -113,7 +113,7 @@ impl SystemHealth { health_path, live_path, start_time: Instant::now(), - uptime_gauge: None, + uptime_gauge: OnceLock::new(), } } pub fn set_health_status(&mut self, status: HealthStatus) { @@ -153,26 +153,18 @@ impl SystemHealth { /// Initialize the uptime gauge using the provided metrics registry pub fn initialize_uptime_gauge( - &mut self, + &self, registry: &T, ) -> anyhow::Result<()> { - match registry.create_gauge( + let gauge = registry.create_gauge( "uptime_seconds", "Total uptime of the DistributedRuntime in seconds", &[], - ) { - Ok(gauge) => { - self.uptime_gauge = Some(gauge); - Ok(()) - } - Err(e) if e.to_string().contains("Duplicate metrics") => { - // Log warning for duplicate metrics (common in tests) but don't fail - tracing::warn!("uptime_seconds metric already registered: {}", e); - // Leave uptime_gauge as None - updates will be no-ops - Ok(()) - } - Err(e) => Err(e), - } + )?; + self.uptime_gauge + .set(gauge) + .map_err(|_| anyhow::anyhow!("uptime_gauge already initialized"))?; + Ok(()) } /// Get the current uptime as a Duration @@ -182,7 +174,7 @@ impl SystemHealth { /// Update the uptime gauge with the current uptime value pub fn update_uptime_gauge(&self) { - if let Some(ref gauge) = self.uptime_gauge { + if let Some(gauge) = self.uptime_gauge.get() { gauge.set(self.uptime().as_secs_f64()); } } diff --git a/lib/runtime/src/metrics.rs b/lib/runtime/src/metrics.rs index 58a690f8543..55af79508f0 100644 --- a/lib/runtime/src/metrics.rs +++ b/lib/runtime/src/metrics.rs @@ -913,7 +913,7 @@ mod test_metricsregistry_units { #[cfg(test)] mod test_metricsregistry_prefixes { use super::*; - use crate::distributed::test_helpers::create_test_drt_async; + use crate::distributed::distributed_test_utils::create_test_drt_async; use prometheus::core::Collector; #[tokio::test] @@ -1047,7 +1047,7 @@ mod test_metricsregistry_prometheus_fmt_outputs { use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS}; use super::prometheus_names::{nats_client, nats_service}; use super::*; - use crate::distributed::test_helpers::create_test_drt_async; + use crate::distributed::distributed_test_utils::create_test_drt_async; use prometheus::Counter; use std::sync::Arc; @@ -1308,7 +1308,7 @@ mod test_metricsregistry_nats { use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS}; use super::prometheus_names::{nats_client, nats_service}; use super::*; - use crate::distributed::test_helpers::create_test_drt_async; + use crate::distributed::distributed_test_utils::create_test_drt_async; use crate::pipeline::PushRouter; use crate::{DistributedRuntime, Runtime}; use tokio::time::{Duration, sleep}; diff --git a/lib/runtime/src/system_status_server.rs b/lib/runtime/src/system_status_server.rs index 36690ae07d9..6bd0fa715b8 100644 --- a/lib/runtime/src/system_status_server.rs +++ b/lib/runtime/src/system_status_server.rs @@ -19,9 +19,9 @@ use crate::metrics::MetricsRegistry; use crate::traits::DistributedRuntimeProvider; use axum::{Router, http::StatusCode, response::IntoResponse, routing::get}; use serde_json::json; +use std::collections::HashMap; use std::sync::{Arc, OnceLock}; use std::time::Instant; -use std::collections::HashMap; use tokio::{net::TcpListener, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tower_http::trace::TraceLayer; @@ -268,7 +268,7 @@ mod tests { #[cfg(all(test, feature = "integration"))] mod integration_tests { use super::*; - use crate::distributed::test_helpers::create_test_drt_async; + use crate::distributed::distributed_test_utils::create_test_drt_async; use crate::metrics::MetricsRegistry; use anyhow::Result; use rstest::rstest; @@ -366,6 +366,24 @@ mod integration_tests { .await; } + #[tokio::test] + async fn test_http_requests_fail_when_system_disabled() { + // Test that system status server is not running when disabled + temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async { + let drt = create_test_drt_async().await; + + // Verify that system status server info is None when disabled + let system_info = drt.system_status_server_info(); + assert!( + system_info.is_none(), + "System status server should not be running when DYN_SYSTEM_ENABLED=false" + ); + + println!("✓ System status server correctly disabled when DYN_SYSTEM_ENABLED=false"); + }) + .await; + } + /// This test verifies the health and liveness endpoints of the system status server. /// It checks that the endpoints respond with the correct HTTP status codes and bodies /// based on the initial health status and any custom endpoint paths provided via environment variables.