Skip to content
57 changes: 57 additions & 0 deletions lib/runtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
/// Default system port for health and metrics endpoints
const DEFAULT_SYSTEM_PORT: u16 = 9090;

/// Default health endpoint paths
const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
/// Grace shutdown period for the system server.
Expand Down Expand Up @@ -110,6 +114,16 @@ pub struct RuntimeConfig {
#[builder(default = "vec![]")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub use_endpoint_health_status: Vec<String>,

/// Health endpoint paths
/// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH
#[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_health_path: String,
/// Set this at runtime with environment variable DYN_SYSTEM_LIVE_PATH
#[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_live_path: String,
}

impl fmt::Display for RuntimeConfig {
Expand All @@ -134,6 +148,8 @@ impl fmt::Display for RuntimeConfig {
"starting_health_status={:?}",
self.starting_health_status
)?;
write!(f, ", system_health_path={}", self.system_health_path)?;
write!(f, ", system_live_path={}", self.system_live_path)?;

Ok(())
}
Expand Down Expand Up @@ -169,6 +185,8 @@ impl RuntimeConfig {
"ENABLED" => "system_enabled",
"USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
"STARTING_HEALTH_STATUS" => "starting_health_status",
"HEALTH_PATH" => "system_health_path",
"LIVE_PATH" => "system_live_path",
_ => k.as_str(),
};
Some(mapped_key.into())
Expand Down Expand Up @@ -207,6 +225,8 @@ impl RuntimeConfig {
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
}
}

Expand Down Expand Up @@ -234,6 +254,8 @@ impl Default for RuntimeConfig {
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
}
}
}
Expand Down Expand Up @@ -432,6 +454,41 @@ mod tests {
);
}

#[test]
fn test_system_health_endpoint_path_default() {
temp_env::with_vars(vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || {
let config = RuntimeConfig::from_settings().unwrap();
assert_eq!(
config.system_health_path,
DEFAULT_SYSTEM_HEALTH_PATH.to_string()
);
});

temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || {
let config = RuntimeConfig::from_settings().unwrap();
assert_eq!(
config.system_live_path,
DEFAULT_SYSTEM_LIVE_PATH.to_string()
);
});
}

#[test]
fn test_system_health_endpoint_path_custom() {
temp_env::with_vars(
vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert_eq!(config.system_health_path, "/custom/health");
},
);

temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || {
let config = RuntimeConfig::from_settings().unwrap();
assert_eq!(config.system_live_path, "/custom/live");
});
}

#[test]
fn test_is_truthy_and_falsey() {
// Test truthy values
Expand Down
6 changes: 5 additions & 1 deletion lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,13 @@ impl DistributedRuntime {
};
let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let system_health = Arc::new(Mutex::new(SystemHealth::new(
let health_endpoint_path = config.system_health_path.clone();
let live_endpoint_path = config.system_live_path.clone();
let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
health_endpoint_path,
live_endpoint_path,
)));

let distributed_runtime = Self {
Expand Down
90 changes: 67 additions & 23 deletions lib/runtime/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ pub async fn spawn_http_server(
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
// Create HTTP server state with the provided metrics registry
let server_state = Arc::new(HttpServerState::new(drt)?);
let health_path = server_state
.drt()
.system_health
.lock()
.unwrap()
.health_path
.clone();
let live_path = server_state
.drt()
.system_health
.lock()
.unwrap()
.live_path
.clone();

// Initialize the start time
server_state
Expand All @@ -138,14 +152,14 @@ pub async fn spawn_http_server(

let app = Router::new()
.route(
"/health",
&health_path,
get({
let state = Arc::clone(&server_state);
move |tracing_ctx| health_handler(state, "health", tracing_ctx)
}),
)
.route(
"/live",
&live_path,
get({
let state = Arc::clone(&server_state);
move |tracing_ctx| health_handler(state, "live", tracing_ctx)
Expand Down Expand Up @@ -216,8 +230,12 @@ async fn health_handler(
route: &'static str, // Used for tracing only
trace_parent: TraceParent, // Used for tracing only
) -> impl IntoResponse {
let system_health = state.drt().system_health.lock().await;
let (mut healthy, endpoints) = system_health.get_health_status();
let (mut healthy, endpoints) = state
.drt()
.system_health
.lock()
.unwrap()
.get_health_status();
let uptime = match state.uptime() {
Ok(uptime_state) => Some(uptime_state),
Err(e) => {
Expand Down Expand Up @@ -373,14 +391,26 @@ dynamo_component_dynamo_uptime_seconds 42
}

#[rstest]
#[case("ready", 200, "ready")]
#[case("notready", 503, "notready")]
#[case("ready", 200, "ready", None, None, 3)]
#[case("notready", 503, "notready", None, None, 3)]
#[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
#[case(
"notready",
503,
"notready",
Some("/custom/health"),
Some("/custom/live"),
5
)]
#[tokio::test]
#[cfg(feature = "integration")]
async fn test_health_endpoints(
#[case] starting_health_status: &'static str,
#[case] expected_status: u16,
#[case] expected_body: &'static str,
#[case] custom_health_path: Option<&'static str>,
#[case] custom_live_path: Option<&'static str>,
#[case] expected_num_tests: usize,
) {
use std::sync::Arc;
use tokio::time::sleep;
Expand All @@ -394,10 +424,14 @@ dynamo_component_dynamo_uptime_seconds 42

#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
Some(starting_health_status),
)],
[
(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
Some(starting_health_status),
),
("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
("DYN_SYSTEM_LIVE_PATH", custom_live_path),
],
(async || {
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
Expand All @@ -413,20 +447,30 @@ dynamo_component_dynamo_uptime_seconds 42
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_status, expect_body) in [
("/health", expected_status, expected_body),
("/live", expected_status, expected_body),
("/someRandomPathNotFoundHere", 404, "Route not found"),
] {

// Prepare test cases
let mut test_cases = vec![];
if custom_health_path.is_none() {
// When using default paths, test the default paths
test_cases.push(("/health", expected_status, expected_body));
} else {
// When using custom paths, default paths should not exist
test_cases.push(("/health", 404, "Route not found"));
test_cases.push((custom_health_path.unwrap(), expected_status, expected_body));
}
if custom_live_path.is_none() {
// When using default paths, test the default paths
test_cases.push(("/live", expected_status, expected_body));
} else {
// When using custom paths, default paths should not exist
test_cases.push(("/live", 404, "Route not found"));
test_cases.push((custom_live_path.unwrap(), expected_status, expected_body));
}
test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
assert_eq!(test_cases.len(), expected_num_tests);

for (path, expect_status, expect_body) in test_cases {
println!("[test] Sending request to {}", path);
let traceparent_value =
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::HeaderName::from_static("traceparent"),
reqwest::header::HeaderValue::from_str(traceparent_value).unwrap(),
);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
Expand Down
8 changes: 7 additions & 1 deletion lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,16 @@ pub struct SystemHealth {
system_health: HealthStatus,
endpoint_health: HashMap<String, HealthStatus>,
use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
}

impl SystemHealth {
pub fn new(
starting_health_status: HealthStatus,
use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
) -> Self {
let mut endpoint_health = HashMap::new();
for endpoint in &use_endpoint_health_status {
Expand All @@ -102,6 +106,8 @@ impl SystemHealth {
system_health: starting_health_status,
endpoint_health,
use_endpoint_health_status,
health_path,
live_path,
}
}
pub fn set_health_status(&mut self, status: HealthStatus) {
Expand Down Expand Up @@ -167,7 +173,7 @@ pub struct DistributedRuntime {
instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,

// Health Status
system_health: Arc<Mutex<SystemHealth>>,
system_health: Arc<std::sync::Mutex<SystemHealth>>,

// This map associates metric prefixes with their corresponding Prometheus registries.
prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
Expand Down
6 changes: 3 additions & 3 deletions lib/runtime/src/pipeline/network/ingress/push_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use anyhow::Result;
use async_nats::service::endpoint::Endpoint;
use derive_builder::Builder;
use std::collections::HashMap;
use tokio::sync::Mutex;
use std::sync::Mutex;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -54,7 +54,7 @@ impl PushEndpoint {

system_health
.lock()
.await
.unwrap()
.set_endpoint_health_status(endpoint_name.clone(), HealthStatus::Ready);

loop {
Expand Down Expand Up @@ -113,7 +113,7 @@ impl PushEndpoint {

system_health
.lock()
.await
.unwrap()
.set_endpoint_health_status(endpoint_name.clone(), HealthStatus::NotReady);

// await for all inflight requests to complete
Expand Down
Loading