From 5d0ca51b130a7fc3c6c0786eb9633f2122bc6502 Mon Sep 17 00:00:00 2001 From: Yingge He Date: Thu, 31 Jul 2025 14:05:09 -0700 Subject: [PATCH 1/6] Initial commit --- lib/runtime/src/config.rs | 53 +++++++++++++++++ lib/runtime/src/distributed.rs | 4 ++ lib/runtime/src/http_server.rs | 102 ++++++++++++++++++++++++++++----- lib/runtime/src/lib.rs | 6 ++ 4 files changed, 150 insertions(+), 15 deletions(-) diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index c501feb3ad6..b7e118cfa65 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -17,6 +17,11 @@ const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0"; /// Default system port for health and metrics endpoints const DEFAULT_SYSTEM_PORT: u16 = 9090; +/// Default health endpoint paths +const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health"; +const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live"; + + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerConfig { /// Grace shutdown period for the system server. @@ -110,6 +115,16 @@ pub struct RuntimeConfig { #[builder(default = "vec![]")] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] pub use_endpoint_health_status: Vec, + + /// Health endpoint paths + /// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH + #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")] + #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] + pub system_health_path: String, + /// Set this at runtime with environment variable DYN_SYSTEM_LIVE_PATH + #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")] + #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] + pub system_live_path: String, } impl fmt::Display for RuntimeConfig { @@ -134,6 +149,8 @@ impl fmt::Display for RuntimeConfig { "starting_health_status={:?}", self.starting_health_status )?; + write!(f, ", system_health_path={}", self.system_health_path)?; + write!(f, ", system_live_path={}", self.system_live_path)?; Ok(()) } @@ -169,6 +186,8 @@ impl RuntimeConfig { "ENABLED" => "system_enabled", "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status", "STARTING_HEALTH_STATUS" => "starting_health_status", + "HEALTH_PATH" => "system_health_path", + "LIVE_PATH" => "system_live_path", _ => k.as_str(), }; Some(mapped_key.into()) @@ -207,6 +226,8 @@ impl RuntimeConfig { system_enabled: false, starting_health_status: HealthStatus::NotReady, use_endpoint_health_status: vec![], + system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), + system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), } } @@ -234,6 +255,8 @@ impl Default for RuntimeConfig { system_enabled: false, starting_health_status: HealthStatus::NotReady, use_endpoint_health_status: vec![], + system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), + system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), } } } @@ -432,6 +455,36 @@ mod tests { ); } + #[test] + fn test_system_health_endpoint_path_default() { + temp_env::with_vars( + vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!(config.system_health_path, DEFAULT_SYSTEM_HEALTH_PATH.to_string()); + }); + + temp_env::with_vars( + vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!(config.system_live_path, DEFAULT_SYSTEM_LIVE_PATH.to_string()); + }); + } + + #[test] + fn test_system_health_endpoint_path_custom() { + temp_env::with_vars( + vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))], || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!(config.system_health_path, "/custom/health"); + }); + + temp_env::with_vars( + vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!(config.system_live_path, "/custom/live"); + }); + } + #[test] fn test_is_truthy_and_falsey() { // Test truthy values diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index f1b62eed9ee..85ffd84f9ce 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -88,9 +88,13 @@ impl DistributedRuntime { }; let starting_health_status = config.starting_health_status.clone(); let use_endpoint_health_status = config.use_endpoint_health_status.clone(); + let health_endpoint_path = config.system_health_path.clone(); + let live_endpoint_path = config.system_live_path.clone(); let system_health = Arc::new(Mutex::new(SystemHealth::new( starting_health_status, use_endpoint_health_status, + health_endpoint_path, + live_endpoint_path, ))); let distributed_runtime = Self { diff --git a/lib/runtime/src/http_server.rs b/lib/runtime/src/http_server.rs index 6c326d795bf..5c0536ec6c6 100644 --- a/lib/runtime/src/http_server.rs +++ b/lib/runtime/src/http_server.rs @@ -151,6 +151,8 @@ pub async fn spawn_http_server( ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { // Create HTTP server state with the provided metrics registry let server_state = Arc::new(HttpServerState::new(drt)?); + let state = Arc::clone(&server_state); + let system_health = state.drt().system_health.lock().await; // Initialize the start time server_state @@ -159,14 +161,14 @@ pub async fn spawn_http_server( let app = Router::new() .route( - "/health", + &system_health.health_path, get({ let state = Arc::clone(&server_state); move |tracing_ctx| health_handler(state, "health", tracing_ctx) }), ) .route( - "/live", + &system_health.live_path, get({ let state = Arc::clone(&server_state); move |tracing_ctx| health_handler(state, "live", tracing_ctx) @@ -368,9 +370,9 @@ mod tests { println!("Full metrics response:\n{}", response); let expected = "\ -# HELP dynamo_uptime_seconds Total uptime of the DistributedRuntime in seconds -# TYPE dynamo_uptime_seconds gauge -dynamo_uptime_seconds{namespace=\"http_server\"} 42 +# HELP http_server_dynamo_uptime_seconds Total uptime of the DistributedRuntime in seconds +# TYPE http_server_dynamo_uptime_seconds gauge +http_server_dynamo_uptime_seconds{namespace=\"http_server\"} 42 "; assert_eq!(response, expected); } @@ -398,7 +400,7 @@ dynamo_uptime_seconds{namespace=\"http_server\"} 42 #[case("notready", 503, "notready")] #[tokio::test] #[cfg(feature = "integration")] - async fn test_health_endpoints( + async fn test_default_health_endpoints( #[case] starting_health_status: &'static str, #[case] expected_status: u16, #[case] expected_body: &'static str, @@ -440,14 +442,84 @@ dynamo_uptime_seconds{namespace=\"http_server\"} 42 ("/someRandomPathNotFoundHere", 404, "Route not found"), ] { println!("[test] Sending request to {}", path); - let traceparent_value = - "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"; - let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2"; - let mut headers = reqwest::header::HeaderMap::new(); - headers.insert( - reqwest::header::HeaderName.from_static("traceparent"), - reqwest::header::HeaderValue.from_str(traceparent_value)?, + let url = format!("http://{}{}", addr, path); + let response = client.get(&url).send().await.unwrap(); + let status = response.status(); + let body = response.text().await.unwrap(); + println!( + "[test] Response for {}: status={}, body={:?}", + path, status, body + ); + assert_eq!( + status, expect_status, + "Response: status={}, body={:?}", + status, body ); + assert!( + body.contains(expect_body), + "Response: status={}, body={:?}", + status, + body + ); + } + })(), + ) + .await; + } + + #[rstest] + #[case("ready", 200, "ready")] + #[case("notready", 503, "notready")] + #[tokio::test] + #[cfg(feature = "integration")] + async fn test_custom_health_endpoint_paths( + #[case] starting_health_status: &'static str, + #[case] expected_status: u16, + #[case] expected_body: &'static str, + ) { + use std::sync::Arc; + use tokio::time::sleep; + use tokio_util::sync::CancellationToken; + // use tokio::io::{AsyncReadExt, AsyncWriteExt}; + // use reqwest for HTTP requests + + // Closure call is needed here to satisfy async_with_vars + + crate::logging::init(); + + #[allow(clippy::redundant_closure_call)] + temp_env::async_with_vars( + [ + ( + "DYN_SYSTEM_STARTING_HEALTH_STATUS", + Some(starting_health_status), + ), + ("DYN_SYSTEM_HEALTH_PATH",Some("/custom/health")), + ("DYN_SYSTEM_LIVE_PATH", Some("/custom/live")), + ], + (async || { + let runtime = crate::Runtime::from_settings().unwrap(); + let drt = Arc::new( + crate::DistributedRuntime::from_settings_without_discovery(runtime) + .await + .unwrap(), + ); + let cancel_token = CancellationToken::new(); + let (addr, _) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt) + .await + .unwrap(); + println!("[test] Waiting for server to start..."); + sleep(std::time::Duration::from_millis(1000)).await; + println!("[test] Server should be up, starting requests..."); + let client = reqwest::Client::new(); + for (path, expect_status, expect_body) in [ + ("/custom/health", expected_status, expected_body), + ("/custom/live", expected_status, expected_body), + ("/health", 404, "Route not found"), + ("/live", 404, "Route not found"), + ("/someRandomPathNotFoundHere", 404, "Route not found"), + ] { + println!("[test] Sending request to {}", path); let url = format!("http://{}{}", addr, path); let response = client.get(&url).send().await.unwrap(); let status = response.status(); @@ -514,11 +586,11 @@ dynamo_uptime_seconds{namespace=\"http_server\"} 42 let mut headers = reqwest::header::HeaderMap::new(); headers.insert( reqwest::header::HeaderName::from_static("traceparent"), - reqwest::header::HeaderValue::from_str(traceparent_value)?, + reqwest::header::HeaderValue::from_str(traceparent_value).unwrap(), ); headers.insert( reqwest::header::HeaderName::from_static("tracestate"), - reqwest::header::HeaderValue::from_str(tracestate_value)?, + reqwest::header::HeaderValue::from_str(tracestate_value).unwrap(), ); let url = format!("http://{}{}", addr, path); let response = client.get(&url).headers(headers).send().await.unwrap(); diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 925c12b7ffb..144ad1ae933 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -87,12 +87,16 @@ pub struct SystemHealth { system_health: HealthStatus, endpoint_health: HashMap, use_endpoint_health_status: Vec, + health_path: String, + live_path: String, } impl SystemHealth { pub fn new( starting_health_status: HealthStatus, use_endpoint_health_status: Vec, + health_path: String, + live_path: String, ) -> Self { let mut endpoint_health = HashMap::new(); for endpoint in &use_endpoint_health_status { @@ -102,6 +106,8 @@ impl SystemHealth { system_health: starting_health_status, endpoint_health, use_endpoint_health_status, + health_path, + live_path, } } pub fn set_health_status(&mut self, status: HealthStatus) { From 12cebc76db1c9dc680b7b8c442870c0155d5a9ae Mon Sep 17 00:00:00 2001 From: Yingge He Date: Thu, 31 Jul 2025 15:13:19 -0700 Subject: [PATCH 2/6] Merge tests --- lib/runtime/src/http_server.rs | 118 +++++++++------------------------ 1 file changed, 30 insertions(+), 88 deletions(-) diff --git a/lib/runtime/src/http_server.rs b/lib/runtime/src/http_server.rs index 5c0536ec6c6..866f68e492b 100644 --- a/lib/runtime/src/http_server.rs +++ b/lib/runtime/src/http_server.rs @@ -396,14 +396,18 @@ http_server_dynamo_uptime_seconds{namespace=\"http_server\"} 42 } #[rstest] - #[case("ready", 200, "ready")] - #[case("notready", 503, "notready")] + #[case("ready", 200, "ready", None, None)] + #[case("notready", 503, "notready", None, None)] + #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"))] + #[case("notready", 503, "notready", Some("/custom/health"), Some("/custom/live"))] #[tokio::test] #[cfg(feature = "integration")] - async fn test_default_health_endpoints( + async fn test_health_endpoints( #[case] starting_health_status: &'static str, #[case] expected_status: u16, #[case] expected_body: &'static str, + #[case] custom_health_path: Option<&'static str>, + #[case] custom_live_path: Option<&'static str>, ) { use std::sync::Arc; use tokio::time::sleep; @@ -417,10 +421,9 @@ http_server_dynamo_uptime_seconds{namespace=\"http_server\"} 42 #[allow(clippy::redundant_closure_call)] temp_env::async_with_vars( - [( - "DYN_SYSTEM_STARTING_HEALTH_STATUS", - Some(starting_health_status), - )], + [("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some(starting_health_status)), + ("DYN_SYSTEM_HEALTH_PATH", custom_health_path), + ("DYN_SYSTEM_LIVE_PATH", custom_live_path)], (async || { let runtime = crate::Runtime::from_settings().unwrap(); let drt = Arc::new( @@ -436,89 +439,28 @@ http_server_dynamo_uptime_seconds{namespace=\"http_server\"} 42 sleep(std::time::Duration::from_millis(1000)).await; println!("[test] Server should be up, starting requests..."); let client = reqwest::Client::new(); - for (path, expect_status, expect_body) in [ - ("/health", expected_status, expected_body), - ("/live", expected_status, expected_body), - ("/someRandomPathNotFoundHere", 404, "Route not found"), - ] { - println!("[test] Sending request to {}", path); - let url = format!("http://{}{}", addr, path); - let response = client.get(&url).send().await.unwrap(); - let status = response.status(); - let body = response.text().await.unwrap(); - println!( - "[test] Response for {}: status={}, body={:?}", - path, status, body - ); - assert_eq!( - status, expect_status, - "Response: status={}, body={:?}", - status, body - ); - assert!( - body.contains(expect_body), - "Response: status={}, body={:?}", - status, - body - ); + + // Prepare test cases + let mut test_cases = vec![]; + if custom_health_path.is_none() { + // When using default paths, test the default paths + test_cases.push(("/health", expected_status, expected_body)); + } else { + // When using custom paths, default paths should not exist + test_cases.push(("/health", 404, "Route not found")); + test_cases.push((custom_health_path.unwrap(), expected_status, expected_body)); } - })(), - ) - .await; - } - - #[rstest] - #[case("ready", 200, "ready")] - #[case("notready", 503, "notready")] - #[tokio::test] - #[cfg(feature = "integration")] - async fn test_custom_health_endpoint_paths( - #[case] starting_health_status: &'static str, - #[case] expected_status: u16, - #[case] expected_body: &'static str, - ) { - use std::sync::Arc; - use tokio::time::sleep; - use tokio_util::sync::CancellationToken; - // use tokio::io::{AsyncReadExt, AsyncWriteExt}; - // use reqwest for HTTP requests - - // Closure call is needed here to satisfy async_with_vars - - crate::logging::init(); + if custom_live_path.is_none() { + // When using default paths, test the default paths + test_cases.push(("/live", expected_status, expected_body)); + } else { + // When using custom paths, default paths should not exist + test_cases.push(("/live", 404, "Route not found")); + test_cases.push((custom_live_path.unwrap(), expected_status, expected_body)); + } + test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found")); - #[allow(clippy::redundant_closure_call)] - temp_env::async_with_vars( - [ - ( - "DYN_SYSTEM_STARTING_HEALTH_STATUS", - Some(starting_health_status), - ), - ("DYN_SYSTEM_HEALTH_PATH",Some("/custom/health")), - ("DYN_SYSTEM_LIVE_PATH", Some("/custom/live")), - ], - (async || { - let runtime = crate::Runtime::from_settings().unwrap(); - let drt = Arc::new( - crate::DistributedRuntime::from_settings_without_discovery(runtime) - .await - .unwrap(), - ); - let cancel_token = CancellationToken::new(); - let (addr, _) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt) - .await - .unwrap(); - println!("[test] Waiting for server to start..."); - sleep(std::time::Duration::from_millis(1000)).await; - println!("[test] Server should be up, starting requests..."); - let client = reqwest::Client::new(); - for (path, expect_status, expect_body) in [ - ("/custom/health", expected_status, expected_body), - ("/custom/live", expected_status, expected_body), - ("/health", 404, "Route not found"), - ("/live", 404, "Route not found"), - ("/someRandomPathNotFoundHere", 404, "Route not found"), - ] { + for (path, expect_status, expect_body) in test_cases { println!("[test] Sending request to {}", path); let url = format!("http://{}{}", addr, path); let response = client.get(&url).send().await.unwrap(); From 510e57a17036482903479acb3a63c12f00811552 Mon Sep 17 00:00:00 2001 From: Yingge He Date: Thu, 31 Jul 2025 16:02:20 -0700 Subject: [PATCH 3/6] Fix format --- lib/runtime/src/config.rs | 30 +++++++++++++++++------------- lib/runtime/src/http_server.rs | 25 ++++++++++++++++++------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index b7e118cfa65..3eaf0a453d5 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -21,7 +21,6 @@ const DEFAULT_SYSTEM_PORT: u16 = 9090; const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health"; const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live"; - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerConfig { /// Grace shutdown period for the system server. @@ -457,29 +456,34 @@ mod tests { #[test] fn test_system_health_endpoint_path_default() { - temp_env::with_vars( - vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || { + temp_env::with_vars(vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || { let config = RuntimeConfig::from_settings().unwrap(); - assert_eq!(config.system_health_path, DEFAULT_SYSTEM_HEALTH_PATH.to_string()); + assert_eq!( + config.system_health_path, + DEFAULT_SYSTEM_HEALTH_PATH.to_string() + ); }); - temp_env::with_vars( - vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || { + temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || { let config = RuntimeConfig::from_settings().unwrap(); - assert_eq!(config.system_live_path, DEFAULT_SYSTEM_LIVE_PATH.to_string()); + assert_eq!( + config.system_live_path, + DEFAULT_SYSTEM_LIVE_PATH.to_string() + ); }); } #[test] fn test_system_health_endpoint_path_custom() { temp_env::with_vars( - vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))], || { - let config = RuntimeConfig::from_settings().unwrap(); - assert_eq!(config.system_health_path, "/custom/health"); - }); + vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))], + || { + let config = RuntimeConfig::from_settings().unwrap(); + assert_eq!(config.system_health_path, "/custom/health"); + }, + ); - temp_env::with_vars( - vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || { + temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || { let config = RuntimeConfig::from_settings().unwrap(); assert_eq!(config.system_live_path, "/custom/live"); }); diff --git a/lib/runtime/src/http_server.rs b/lib/runtime/src/http_server.rs index 8f6a03a9382..8e0399e9b68 100644 --- a/lib/runtime/src/http_server.rs +++ b/lib/runtime/src/http_server.rs @@ -399,7 +399,13 @@ dynamo_system_uptime_seconds{namespace=\"dynamo\"} 42 #[case("ready", 200, "ready", None, None)] #[case("notready", 503, "notready", None, None)] #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"))] - #[case("notready", 503, "notready", Some("/custom/health"), Some("/custom/live"))] + #[case( + "notready", + 503, + "notready", + Some("/custom/health"), + Some("/custom/live") + )] #[tokio::test] #[cfg(feature = "integration")] async fn test_health_endpoints( @@ -421,9 +427,14 @@ dynamo_system_uptime_seconds{namespace=\"dynamo\"} 42 #[allow(clippy::redundant_closure_call)] temp_env::async_with_vars( - [("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some(starting_health_status)), - ("DYN_SYSTEM_HEALTH_PATH", custom_health_path), - ("DYN_SYSTEM_LIVE_PATH", custom_live_path)], + [ + ( + "DYN_SYSTEM_STARTING_HEALTH_STATUS", + Some(starting_health_status), + ), + ("DYN_SYSTEM_HEALTH_PATH", custom_health_path), + ("DYN_SYSTEM_LIVE_PATH", custom_live_path), + ], (async || { let runtime = crate::Runtime::from_settings().unwrap(); let drt = Arc::new( @@ -439,7 +450,7 @@ dynamo_system_uptime_seconds{namespace=\"dynamo\"} 42 sleep(std::time::Duration::from_millis(1000)).await; println!("[test] Server should be up, starting requests..."); let client = reqwest::Client::new(); - + // Prepare test cases let mut test_cases = vec![]; if custom_health_path.is_none() { @@ -528,11 +539,11 @@ dynamo_system_uptime_seconds{namespace=\"dynamo\"} 42 let mut headers = reqwest::header::HeaderMap::new(); headers.insert( reqwest::header::HeaderName::from_static("traceparent"), - reqwest::header::HeaderValue::from_str(traceparent_value).unwrap(), + reqwest::header::HeaderValue::from_str(traceparent_value)?, ); headers.insert( reqwest::header::HeaderName::from_static("tracestate"), - reqwest::header::HeaderValue::from_str(tracestate_value).unwrap(), + reqwest::header::HeaderValue::from_str(tracestate_value)?, ); let url = format!("http://{}{}", addr, path); let response = client.get(&url).headers(headers).send().await.unwrap(); From 0ba4a84d80e6f05b95a39a31c653d6fc735c12f0 Mon Sep 17 00:00:00 2001 From: Yingge He Date: Fri, 1 Aug 2025 12:53:37 -0700 Subject: [PATCH 4/6] Check num of tests in test_health_endpoints --- lib/runtime/src/http_server.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/runtime/src/http_server.rs b/lib/runtime/src/http_server.rs index 8e0399e9b68..f6b4043cee3 100644 --- a/lib/runtime/src/http_server.rs +++ b/lib/runtime/src/http_server.rs @@ -396,15 +396,16 @@ dynamo_system_uptime_seconds{namespace=\"dynamo\"} 42 } #[rstest] - #[case("ready", 200, "ready", None, None)] - #[case("notready", 503, "notready", None, None)] - #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"))] + #[case("ready", 200, "ready", None, None, 3)] + #[case("notready", 503, "notready", None, None, 3)] + #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)] #[case( "notready", 503, "notready", Some("/custom/health"), - Some("/custom/live") + Some("/custom/live"), + 5 )] #[tokio::test] #[cfg(feature = "integration")] @@ -414,6 +415,7 @@ dynamo_system_uptime_seconds{namespace=\"dynamo\"} 42 #[case] expected_body: &'static str, #[case] custom_health_path: Option<&'static str>, #[case] custom_live_path: Option<&'static str>, + #[case] expected_num_tests: usize, ) { use std::sync::Arc; use tokio::time::sleep; @@ -470,6 +472,7 @@ dynamo_system_uptime_seconds{namespace=\"dynamo\"} 42 test_cases.push((custom_live_path.unwrap(), expected_status, expected_body)); } test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found")); + assert_eq!(test_cases.len(), expected_num_tests); for (path, expect_status, expect_body) in test_cases { println!("[test] Sending request to {}", path); From 0ec2976fc77ec61a657317a131edd36612ed8037 Mon Sep 17 00:00:00 2001 From: Yingge He Date: Mon, 4 Aug 2025 16:42:55 -0700 Subject: [PATCH 5/6] change tokio mutex to std mutex SystemHealth for less overhead --- lib/runtime/src/distributed.rs | 2 +- lib/runtime/src/http_server.rs | 11 +++++------ lib/runtime/src/lib.rs | 2 +- .../src/pipeline/network/ingress/push_endpoint.rs | 6 +++--- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 85ffd84f9ce..bc4c4e1bad8 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -90,7 +90,7 @@ impl DistributedRuntime { let use_endpoint_health_status = config.use_endpoint_health_status.clone(); let health_endpoint_path = config.system_health_path.clone(); let live_endpoint_path = config.system_live_path.clone(); - let system_health = Arc::new(Mutex::new(SystemHealth::new( + let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new( starting_health_status, use_endpoint_health_status, health_endpoint_path, diff --git a/lib/runtime/src/http_server.rs b/lib/runtime/src/http_server.rs index 94f7820e302..9426901e2a6 100644 --- a/lib/runtime/src/http_server.rs +++ b/lib/runtime/src/http_server.rs @@ -130,8 +130,8 @@ pub async fn spawn_http_server( ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { // Create HTTP server state with the provided metrics registry let server_state = Arc::new(HttpServerState::new(drt)?); - let state = Arc::clone(&server_state); - let system_health = state.drt().system_health.lock().await; + let health_path = server_state.drt().system_health.lock().unwrap().health_path.clone(); + let live_path = server_state.drt().system_health.lock().unwrap().live_path.clone(); // Initialize the start time server_state @@ -140,14 +140,14 @@ pub async fn spawn_http_server( let app = Router::new() .route( - &system_health.health_path, + &health_path, get({ let state = Arc::clone(&server_state); move |tracing_ctx| health_handler(state, "health", tracing_ctx) }), ) .route( - &system_health.live_path, + &live_path, get({ let state = Arc::clone(&server_state); move |tracing_ctx| health_handler(state, "live", tracing_ctx) @@ -218,8 +218,7 @@ async fn health_handler( route: &'static str, // Used for tracing only trace_parent: TraceParent, // Used for tracing only ) -> impl IntoResponse { - let system_health = state.drt().system_health.lock().await; - let (mut healthy, endpoints) = system_health.get_health_status(); + let (mut healthy, endpoints) = state.drt().system_health.lock().unwrap().get_health_status(); let uptime = match state.uptime() { Ok(uptime_state) => Some(uptime_state), Err(e) => { diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 144ad1ae933..f22ce4ad052 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -173,7 +173,7 @@ pub struct DistributedRuntime { instance_sources: Arc>>>, // Health Status - system_health: Arc>, + system_health: Arc>, // This map associates metric prefixes with their corresponding Prometheus registries. prometheus_registries_by_prefix: Arc>>, diff --git a/lib/runtime/src/pipeline/network/ingress/push_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/push_endpoint.rs index 538462103e5..50c2b9010a6 100644 --- a/lib/runtime/src/pipeline/network/ingress/push_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/push_endpoint.rs @@ -23,7 +23,7 @@ use anyhow::Result; use async_nats::service::endpoint::Endpoint; use derive_builder::Builder; use std::collections::HashMap; -use tokio::sync::Mutex; +use std::sync::Mutex; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; @@ -54,7 +54,7 @@ impl PushEndpoint { system_health .lock() - .await + .unwrap() .set_endpoint_health_status(endpoint_name.clone(), HealthStatus::Ready); loop { @@ -113,7 +113,7 @@ impl PushEndpoint { system_health .lock() - .await + .unwrap() .set_endpoint_health_status(endpoint_name.clone(), HealthStatus::NotReady); // await for all inflight requests to complete From a3d01bc3f1fb846cec159a598e25a0a8cca6ecdd Mon Sep 17 00:00:00 2001 From: Yingge He Date: Mon, 4 Aug 2025 17:16:11 -0700 Subject: [PATCH 6/6] Fix format --- lib/runtime/src/http_server.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/runtime/src/http_server.rs b/lib/runtime/src/http_server.rs index 9426901e2a6..10c622f5330 100644 --- a/lib/runtime/src/http_server.rs +++ b/lib/runtime/src/http_server.rs @@ -130,8 +130,20 @@ pub async fn spawn_http_server( ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { // Create HTTP server state with the provided metrics registry let server_state = Arc::new(HttpServerState::new(drt)?); - let health_path = server_state.drt().system_health.lock().unwrap().health_path.clone(); - let live_path = server_state.drt().system_health.lock().unwrap().live_path.clone(); + let health_path = server_state + .drt() + .system_health + .lock() + .unwrap() + .health_path + .clone(); + let live_path = server_state + .drt() + .system_health + .lock() + .unwrap() + .live_path + .clone(); // Initialize the start time server_state @@ -218,7 +230,12 @@ async fn health_handler( route: &'static str, // Used for tracing only trace_parent: TraceParent, // Used for tracing only ) -> impl IntoResponse { - let (mut healthy, endpoints) = state.drt().system_health.lock().unwrap().get_health_status(); + let (mut healthy, endpoints) = state + .drt() + .system_health + .lock() + .unwrap() + .get_health_status(); let uptime = match state.uptime() { Ok(uptime_state) => Some(uptime_state), Err(e) => {