Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ impl DistributedRuntime {
});
distributed_runtime.register_metrics_callback(drt_hierarchies, nats_client_callback);

// Initialize the uptime gauge in SystemHealth
distributed_runtime
.system_health
.lock()
.unwrap()
.initialize_uptime_gauge(&distributed_runtime)?;

// Handle system status server initialization
if let Some(cancel_token) = cancel_token {
// System server is enabled - start both the state and HTTP server
Expand Down Expand Up @@ -153,17 +160,7 @@ impl DistributedRuntime {
}
}
} else {
// System server HTTP is disabled, but still create the state for metrics
// This ensures uptime_seconds metric is always registered
let system_status_state = crate::system_status_server::SystemStatusState::new(
Arc::new(distributed_runtime.clone()),
)?;

// Initialize the start time for uptime tracking
if let Err(e) = system_status_state.initialize_start_time() {
tracing::warn!("Failed to initialize system status start time: {}", e);
}

// System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
tracing::debug!(
"System status server HTTP endpoints disabled, but uptime metrics are being tracked"
);
Expand Down Expand Up @@ -349,7 +346,7 @@ impl DistributedConfig {
}

#[cfg(test)]
pub mod test_helpers {
pub mod distributed_test_utils {
//! Common test helper functions for DistributedRuntime tests
// TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available.

Expand All @@ -364,3 +361,61 @@ pub mod test_helpers {
.unwrap()
}
}

#[cfg(feature = "integration")]
#[cfg(test)]
mod tests {
use super::distributed_test_utils::create_test_drt_async;

#[tokio::test]
async fn test_drt_uptime_after_delay_system_disabled() {
// Test uptime with system status server disabled
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
// Start a DRT
let drt = create_test_drt_async().await;

// Wait 50ms
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

// Check that uptime is 50+ ms
let uptime = drt.system_health.lock().unwrap().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
uptime
);

println!(
"✓ DRT uptime test passed (system disabled): uptime = {:?}",
uptime
);
})
.await;
}

#[tokio::test]
async fn test_drt_uptime_after_delay_system_enabled() {
// Test uptime with system status server enabled
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("true"))], async {
// Start a DRT
let drt = create_test_drt_async().await;

// Wait 50ms
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

// Check that uptime is 50+ ms
let uptime = drt.system_health.lock().unwrap().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
uptime
);

println!(
"✓ DRT uptime test passed (system enabled): uptime = {:?}",
uptime
);
})
.await;
}
}
33 changes: 33 additions & 0 deletions lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use std::{
collections::HashMap,
sync::{Arc, OnceLock, Weak},
time::Instant,
};
use tokio::sync::Mutex;

Expand Down Expand Up @@ -90,6 +91,8 @@ pub struct SystemHealth {
use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
start_time: Instant,
uptime_gauge: OnceLock<prometheus::Gauge>,
}

impl SystemHealth {
Expand All @@ -109,6 +112,8 @@ impl SystemHealth {
use_endpoint_health_status,
health_path,
live_path,
start_time: Instant::now(),
uptime_gauge: OnceLock::new(),
}
}
pub fn set_health_status(&mut self, status: HealthStatus) {
Expand Down Expand Up @@ -145,6 +150,34 @@ impl SystemHealth {

(healthy, endpoints)
}

/// Initialize the uptime gauge using the provided metrics registry
pub fn initialize_uptime_gauge<T: crate::metrics::MetricsRegistry>(
&self,
registry: &T,
) -> anyhow::Result<()> {
let gauge = registry.create_gauge(
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
&[],
)?;
self.uptime_gauge
.set(gauge)
.map_err(|_| anyhow::anyhow!("uptime_gauge already initialized"))?;
Ok(())
}

/// Get the current uptime as a Duration
pub fn uptime(&self) -> std::time::Duration {
self.start_time.elapsed()
}

/// Update the uptime gauge with the current uptime value
pub fn update_uptime_gauge(&self) {
if let Some(gauge) = self.uptime_gauge.get() {
gauge.set(self.uptime().as_secs_f64());
}
}
}

/// Type alias for runtime callback functions to reduce complexity
Expand Down
6 changes: 3 additions & 3 deletions lib/runtime/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ mod test_metricsregistry_units {
#[cfg(test)]
mod test_metricsregistry_prefixes {
use super::*;
use crate::distributed::test_helpers::create_test_drt_async;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::core::Collector;

#[tokio::test]
Expand Down Expand Up @@ -1047,7 +1047,7 @@ mod test_metricsregistry_prometheus_fmt_outputs {
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::prometheus_names::{nats_client, nats_service};
use super::*;
use crate::distributed::test_helpers::create_test_drt_async;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::Counter;
use std::sync::Arc;

Expand Down Expand Up @@ -1308,7 +1308,7 @@ mod test_metricsregistry_nats {
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::prometheus_names::{nats_client, nats_service};
use super::*;
use crate::distributed::test_helpers::create_test_drt_async;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::pipeline::PushRouter;
use crate::{DistributedRuntime, Runtime};
use tokio::time::{Duration, sleep};
Expand Down
Loading
Loading