diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index c501feb3ad6..3eaf0a453d5 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -17,6 +17,10 @@ const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0"; /// Default system port for health and metrics endpoints const DEFAULT_SYSTEM_PORT: u16 = 9090; +/// Default health endpoint paths +const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health"; +const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live"; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerConfig { /// Grace shutdown period for the system server. @@ -110,6 +114,16 @@ pub struct RuntimeConfig { #[builder(default = "vec![]")] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] pub use_endpoint_health_status: Vec, + + /// Health endpoint paths + /// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH + #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")] + #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] + pub system_health_path: String, + /// Set this at runtime with environment variable DYN_SYSTEM_LIVE_PATH + #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")] + #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] + pub system_live_path: String, } impl fmt::Display for RuntimeConfig { @@ -134,6 +148,8 @@ impl fmt::Display for RuntimeConfig { "starting_health_status={:?}", self.starting_health_status )?; + write!(f, ", system_health_path={}", self.system_health_path)?; + write!(f, ", system_live_path={}", self.system_live_path)?; Ok(()) } @@ -169,6 +185,8 @@ impl RuntimeConfig { "ENABLED" => "system_enabled", "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status", "STARTING_HEALTH_STATUS" => "starting_health_status", + "HEALTH_PATH" => "system_health_path", + "LIVE_PATH" => "system_live_path", _ => k.as_str(), }; Some(mapped_key.into()) @@ -207,6 +225,8 @@ impl RuntimeConfig { system_enabled: false, starting_health_status: HealthStatus::NotReady, use_endpoint_health_status: vec![], + system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), + system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), } } @@ -234,6 +254,8 @@ impl Default for RuntimeConfig { system_enabled: false, starting_health_status: HealthStatus::NotReady, use_endpoint_health_status: vec![], + system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), + system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), } } } @@ -432,6 +454,41 @@ mod tests { ); } + #[test] + fn test_system_health_endpoint_path_default() { + temp_env::with_vars(vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!( + config.system_health_path, + DEFAULT_SYSTEM_HEALTH_PATH.to_string() + ); + }); + + temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!( + config.system_live_path, + DEFAULT_SYSTEM_LIVE_PATH.to_string() + ); + }); + } + + #[test] + fn test_system_health_endpoint_path_custom() { + temp_env::with_vars( + vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))], + || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!(config.system_health_path, "/custom/health"); + }, + ); + + temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!(config.system_live_path, "/custom/live"); + }); + } + #[test] fn test_is_truthy_and_falsey() { // Test truthy values diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index f1b62eed9ee..bc4c4e1bad8 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -88,9 +88,13 @@ impl DistributedRuntime { }; let starting_health_status = config.starting_health_status.clone(); let use_endpoint_health_status = config.use_endpoint_health_status.clone(); - let system_health = Arc::new(Mutex::new(SystemHealth::new( + let health_endpoint_path = config.system_health_path.clone(); + let live_endpoint_path = config.system_live_path.clone(); + let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new( starting_health_status, use_endpoint_health_status, + health_endpoint_path, + live_endpoint_path, ))); let distributed_runtime = Self { diff --git a/lib/runtime/src/http_server.rs b/lib/runtime/src/http_server.rs index 1c2eda18d91..10c622f5330 100644 --- a/lib/runtime/src/http_server.rs +++ b/lib/runtime/src/http_server.rs @@ -130,6 +130,20 @@ pub async fn spawn_http_server( ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { // Create HTTP server state with the provided metrics registry let server_state = Arc::new(HttpServerState::new(drt)?); + let health_path = server_state + .drt() + .system_health + .lock() + .unwrap() + .health_path + .clone(); + let live_path = server_state + .drt() + .system_health + .lock() + .unwrap() + .live_path + .clone(); // Initialize the start time server_state @@ -138,14 +152,14 @@ pub async fn spawn_http_server( let app = Router::new() .route( - "/health", + &health_path, get({ let state = Arc::clone(&server_state); move |tracing_ctx| health_handler(state, "health", tracing_ctx) }), ) .route( - "/live", + &live_path, get({ let state = Arc::clone(&server_state); move |tracing_ctx| health_handler(state, "live", tracing_ctx) @@ -216,8 +230,12 @@ async fn health_handler( route: &'static str, // Used for tracing only trace_parent: TraceParent, // Used for tracing only ) -> impl IntoResponse { - let system_health = state.drt().system_health.lock().await; - let (mut healthy, endpoints) = system_health.get_health_status(); + let (mut healthy, endpoints) = state + .drt() + .system_health + .lock() + .unwrap() + .get_health_status(); let uptime = match state.uptime() { Ok(uptime_state) => Some(uptime_state), Err(e) => { @@ -373,14 +391,26 @@ dynamo_component_dynamo_uptime_seconds 42 } #[rstest] - #[case("ready", 200, "ready")] - #[case("notready", 503, "notready")] + #[case("ready", 200, "ready", None, None, 3)] + #[case("notready", 503, "notready", None, None, 3)] + #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)] + #[case( + "notready", + 503, + "notready", + Some("/custom/health"), + Some("/custom/live"), + 5 + )] #[tokio::test] #[cfg(feature = "integration")] async fn test_health_endpoints( #[case] starting_health_status: &'static str, #[case] expected_status: u16, #[case] expected_body: &'static str, + #[case] custom_health_path: Option<&'static str>, + #[case] custom_live_path: Option<&'static str>, + #[case] expected_num_tests: usize, ) { use std::sync::Arc; use tokio::time::sleep; @@ -394,10 +424,14 @@ dynamo_component_dynamo_uptime_seconds 42 #[allow(clippy::redundant_closure_call)] temp_env::async_with_vars( - [( - "DYN_SYSTEM_STARTING_HEALTH_STATUS", - Some(starting_health_status), - )], + [ + ( + "DYN_SYSTEM_STARTING_HEALTH_STATUS", + Some(starting_health_status), + ), + ("DYN_SYSTEM_HEALTH_PATH", custom_health_path), + ("DYN_SYSTEM_LIVE_PATH", custom_live_path), + ], (async || { let runtime = crate::Runtime::from_settings().unwrap(); let drt = Arc::new( @@ -413,20 +447,30 @@ dynamo_component_dynamo_uptime_seconds 42 sleep(std::time::Duration::from_millis(1000)).await; println!("[test] Server should be up, starting requests..."); let client = reqwest::Client::new(); - for (path, expect_status, expect_body) in [ - ("/health", expected_status, expected_body), - ("/live", expected_status, expected_body), - ("/someRandomPathNotFoundHere", 404, "Route not found"), - ] { + + // Prepare test cases + let mut test_cases = vec![]; + if custom_health_path.is_none() { + // When using default paths, test the default paths + test_cases.push(("/health", expected_status, expected_body)); + } else { + // When using custom paths, default paths should not exist + test_cases.push(("/health", 404, "Route not found")); + test_cases.push((custom_health_path.unwrap(), expected_status, expected_body)); + } + if custom_live_path.is_none() { + // When using default paths, test the default paths + test_cases.push(("/live", expected_status, expected_body)); + } else { + // When using custom paths, default paths should not exist + test_cases.push(("/live", 404, "Route not found")); + test_cases.push((custom_live_path.unwrap(), expected_status, expected_body)); + } + test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found")); + assert_eq!(test_cases.len(), expected_num_tests); + + for (path, expect_status, expect_body) in test_cases { println!("[test] Sending request to {}", path); - let traceparent_value = - "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"; - let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2"; - let mut headers = reqwest::header::HeaderMap::new(); - headers.insert( - reqwest::header::HeaderName::from_static("traceparent"), - reqwest::header::HeaderValue::from_str(traceparent_value).unwrap(), - ); let url = format!("http://{}{}", addr, path); let response = client.get(&url).send().await.unwrap(); let status = response.status(); diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 925c12b7ffb..f22ce4ad052 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -87,12 +87,16 @@ pub struct SystemHealth { system_health: HealthStatus, endpoint_health: HashMap, use_endpoint_health_status: Vec, + health_path: String, + live_path: String, } impl SystemHealth { pub fn new( starting_health_status: HealthStatus, use_endpoint_health_status: Vec, + health_path: String, + live_path: String, ) -> Self { let mut endpoint_health = HashMap::new(); for endpoint in &use_endpoint_health_status { @@ -102,6 +106,8 @@ impl SystemHealth { system_health: starting_health_status, endpoint_health, use_endpoint_health_status, + health_path, + live_path, } } pub fn set_health_status(&mut self, status: HealthStatus) { @@ -167,7 +173,7 @@ pub struct DistributedRuntime { instance_sources: Arc>>>, // Health Status - system_health: Arc>, + system_health: Arc>, // This map associates metric prefixes with their corresponding Prometheus registries. prometheus_registries_by_prefix: Arc>>, diff --git a/lib/runtime/src/pipeline/network/ingress/push_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/push_endpoint.rs index 538462103e5..50c2b9010a6 100644 --- a/lib/runtime/src/pipeline/network/ingress/push_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/push_endpoint.rs @@ -23,7 +23,7 @@ use anyhow::Result; use async_nats::service::endpoint::Endpoint; use derive_builder::Builder; use std::collections::HashMap; -use tokio::sync::Mutex; +use std::sync::Mutex; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; @@ -54,7 +54,7 @@ impl PushEndpoint { system_health .lock() - .await + .unwrap() .set_endpoint_health_status(endpoint_name.clone(), HealthStatus::Ready); loop { @@ -113,7 +113,7 @@ impl PushEndpoint { system_health .lock() - .await + .unwrap() .set_endpoint_health_status(endpoint_name.clone(), HealthStatus::NotReady); // await for all inflight requests to complete