From 0bb2c8d92c31fa6f34b8ed26f785ca0429e0a737 Mon Sep 17 00:00:00 2001 From: "Gavin.Zhu" Date: Sun, 7 Sep 2025 05:06:50 +0000 Subject: [PATCH 1/6] fix: fixed failing health probes to enable state transition between states Signed-off-by: Gavin.Zhu --- lib/runtime/src/config.rs | 34 +++++++++++ lib/runtime/src/distributed.rs | 2 + lib/runtime/src/lib.rs | 80 ++++++++++++++++++++++++- lib/runtime/src/system_status_server.rs | 4 +- 4 files changed, 116 insertions(+), 4 deletions(-) diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index 33ebb676dd1..dc4904477f2 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -59,6 +59,30 @@ pub enum HealthStatus { NotReady, } +/// Health transition policy for automatic health status management +#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] +#[serde(rename_all = "snake_case")] +pub enum HealthTransitionPolicy { + /// Never auto-transition (current behavior, requires manual set_health_status) + Manual, + /// Auto-ready after specified uptime seconds + TimeBasedReady { after_seconds: u64 }, + /// Ready when service has registered at least one endpoint + EndpointBasedReady, + /// Custom readiness logic with configurable parameters + Custom { + auto_ready_after_seconds: Option, + require_endpoints_ready: bool, + }, +} + +impl Default for HealthTransitionPolicy { + fn default() -> Self { + // Better default: auto-ready after 30 seconds for simple services + Self::TimeBasedReady { after_seconds: 30 } + } +} + /// Runtime configuration /// Defines the configuration for Tokio runtimes #[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)] @@ -114,6 +138,12 @@ pub struct RuntimeConfig { #[builder(default = "vec![]")] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] pub use_endpoint_health_status: Vec, + /// Health Transition Policy + /// Controls when and how the system automatically transitions from "notready" to "ready" + /// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_TRANSITION_POLICY + #[builder(default = "HealthTransitionPolicy::default()")] + #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] + pub health_transition_policy: HealthTransitionPolicy, /// Health endpoint paths /// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH @@ -185,6 +215,8 @@ impl RuntimeConfig { "ENABLED" => "system_enabled", "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status", "STARTING_HEALTH_STATUS" => "starting_health_status", + "HEALTH_TRANSITION_POLICY" => "health_transition_policy", + "AUTO_READY_AFTER_SECONDS" => "health_transition_policy", "HEALTH_PATH" => "system_health_path", "LIVE_PATH" => "system_live_path", _ => k.as_str(), @@ -225,6 +257,7 @@ impl RuntimeConfig { system_enabled: false, starting_health_status: HealthStatus::NotReady, use_endpoint_health_status: vec![], + health_transition_policy: HealthTransitionPolicy::default(), system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), } @@ -254,6 +287,7 @@ impl Default for RuntimeConfig { system_enabled: false, starting_health_status: HealthStatus::NotReady, use_endpoint_health_status: vec![], + health_transition_policy: HealthTransitionPolicy::default(), system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), } diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 8397b89d910..7c866f52808 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -75,11 +75,13 @@ impl DistributedRuntime { }; let starting_health_status = config.starting_health_status.clone(); let use_endpoint_health_status = config.use_endpoint_health_status.clone(); + let health_transition_policy = config.health_transition_policy.clone(); let health_endpoint_path = config.system_health_path.clone(); let live_endpoint_path = config.system_live_path.clone(); let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new( starting_health_status, use_endpoint_health_status, + health_transition_policy, health_endpoint_path, live_endpoint_path, ))); diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 263ef357153..d2e1277a892 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -65,7 +65,7 @@ use crate::metrics::prometheus_names::distributed_runtime; use component::{Endpoint, InstanceSource}; use utils::GracefulShutdownTracker; -use config::HealthStatus; +use config::{HealthStatus, HealthTransitionPolicy}; /// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime]. #[derive(Clone)] @@ -94,6 +94,7 @@ pub struct SystemHealth { system_health: HealthStatus, endpoint_health: HashMap, use_endpoint_health_status: Vec, + health_transition_policy: HealthTransitionPolicy, health_path: String, live_path: String, start_time: Instant, @@ -104,6 +105,7 @@ impl SystemHealth { pub fn new( starting_health_status: HealthStatus, use_endpoint_health_status: Vec, + health_transition_policy: HealthTransitionPolicy, health_path: String, live_path: String, ) -> Self { @@ -115,6 +117,7 @@ impl SystemHealth { system_health: starting_health_status, endpoint_health, use_endpoint_health_status, + health_transition_policy, health_path, live_path, start_time: Instant::now(), @@ -129,7 +132,80 @@ impl SystemHealth { self.endpoint_health.insert(endpoint.to_string(), status); } - /// Returns the overall health status and endpoint health statuses + /// Check and automatically update health status based on transition policy + /// This is the core fix for the health transition issue + pub fn check_and_update_health_status(&mut self) { + if self.system_health == HealthStatus::Ready { + return; // Already ready, no need to transition + } + + let uptime = self.start_time.elapsed(); + + match &self.health_transition_policy { + HealthTransitionPolicy::Manual => { + // No automatic transition (current behavior) + }, + HealthTransitionPolicy::TimeBasedReady { after_seconds } => { + if uptime.as_secs() >= *after_seconds { + tracing::info!("Auto-transitioning to ready after {}s uptime (policy: time_based_ready)", uptime.as_secs()); + self.system_health = HealthStatus::Ready; + } + }, + HealthTransitionPolicy::EndpointBasedReady => { + // Ready when service has registered at least one endpoint + if !self.endpoint_health.is_empty() { + let all_endpoints_ready = self.endpoint_health.values() + .all(|status| *status == HealthStatus::Ready); + + if all_endpoints_ready { + tracing::info!("Auto-transitioning to ready - all {} endpoints are ready (policy: endpoint_based_ready)", + self.endpoint_health.len()); + self.system_health = HealthStatus::Ready; + } + } + }, + HealthTransitionPolicy::Custom { auto_ready_after_seconds, require_endpoints_ready } => { + let mut ready_conditions_met = true; + + // Check time-based condition if specified + if let Some(after_seconds) = auto_ready_after_seconds { + if uptime.as_secs() < *after_seconds { + ready_conditions_met = false; + } + } + + // Check endpoint-based condition if required + if *require_endpoints_ready { + if self.endpoint_health.is_empty() { + ready_conditions_met = false; + } else { + let all_endpoints_ready = self.endpoint_health.values() + .all(|status| *status == HealthStatus::Ready); + if !all_endpoints_ready { + ready_conditions_met = false; + } + } + } + + if ready_conditions_met { + tracing::info!("Auto-transitioning to ready after {}s uptime (policy: custom)", uptime.as_secs()); + self.system_health = HealthStatus::Ready; + } + } + } + } + + /// Check for automatic transitions and return the overall health status and endpoint health statuses + /// This is the main method that should be called by the health handler + pub fn get_health_status_with_transition_check(&mut self) -> (bool, HashMap) { + // Check for automatic health transitions BEFORE reporting status + self.check_and_update_health_status(); + + // Now get the current status + self.get_health_status() + } + + /// Returns the overall health status and endpoint health statuses (read-only) pub fn get_health_status(&self) -> (bool, HashMap) { let mut endpoints: HashMap = HashMap::new(); for (endpoint, ready) in &self.endpoint_health { diff --git a/lib/runtime/src/system_status_server.rs b/lib/runtime/src/system_status_server.rs index d8ab9a8c97f..9c785c85ba7 100644 --- a/lib/runtime/src/system_status_server.rs +++ b/lib/runtime/src/system_status_server.rs @@ -171,8 +171,8 @@ pub async fn spawn_system_status_server( /// Health handler #[tracing::instrument(skip_all, level = "trace")] async fn health_handler(state: Arc) -> impl IntoResponse { - let system_health = state.drt().system_health.lock().unwrap(); - let (healthy, endpoints) = system_health.get_health_status(); + let mut system_health = state.drt().system_health.lock().unwrap(); + let (healthy, endpoints) = system_health.get_health_status_with_transition_check(); let uptime = Some(system_health.uptime()); let healthy_string = if healthy { "ready" } else { "notready" }; From 90e7373a4811456b6ffb90bf6cd1b6652b79943f Mon Sep 17 00:00:00 2001 From: "Gavin.Zhu" Date: Sun, 7 Sep 2025 05:27:06 +0000 Subject: [PATCH 2/6] fix: fixed format related to prefommit Signed-off-by: Gavin.Zhu --- lib/runtime/src/config.rs | 2 +- lib/runtime/src/lib.rs | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index dc4904477f2..bc1832fd91b 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -70,7 +70,7 @@ pub enum HealthTransitionPolicy { /// Ready when service has registered at least one endpoint EndpointBasedReady, /// Custom readiness logic with configurable parameters - Custom { + Custom { auto_ready_after_seconds: Option, require_endpoints_ready: bool, }, diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index d2e1277a892..697165be8d6 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -140,7 +140,6 @@ impl SystemHealth { } let uptime = self.start_time.elapsed(); - match &self.health_transition_policy { HealthTransitionPolicy::Manual => { // No automatic transition (current behavior) @@ -156,9 +155,9 @@ impl SystemHealth { if !self.endpoint_health.is_empty() { let all_endpoints_ready = self.endpoint_health.values() .all(|status| *status == HealthStatus::Ready); - + if all_endpoints_ready { - tracing::info!("Auto-transitioning to ready - all {} endpoints are ready (policy: endpoint_based_ready)", + tracing::info!("Auto-transitioning to ready - all {} endpoints are ready (policy: endpoint_based_ready)", self.endpoint_health.len()); self.system_health = HealthStatus::Ready; } @@ -166,14 +165,14 @@ impl SystemHealth { }, HealthTransitionPolicy::Custom { auto_ready_after_seconds, require_endpoints_ready } => { let mut ready_conditions_met = true; - + // Check time-based condition if specified if let Some(after_seconds) = auto_ready_after_seconds { if uptime.as_secs() < *after_seconds { ready_conditions_met = false; } } - + // Check endpoint-based condition if required if *require_endpoints_ready { if self.endpoint_health.is_empty() { @@ -186,7 +185,7 @@ impl SystemHealth { } } } - + if ready_conditions_met { tracing::info!("Auto-transitioning to ready after {}s uptime (policy: custom)", uptime.as_secs()); self.system_health = HealthStatus::Ready; @@ -200,7 +199,7 @@ impl SystemHealth { pub fn get_health_status_with_transition_check(&mut self) -> (bool, HashMap) { // Check for automatic health transitions BEFORE reporting status self.check_and_update_health_status(); - + // Now get the current status self.get_health_status() } From 9e19c495cbb1144b92af941bd618745a45c06d2a Mon Sep 17 00:00:00 2001 From: "Gavin.Zhu" Date: Sun, 7 Sep 2025 05:34:58 +0000 Subject: [PATCH 3/6] fix: removed previouse mapping to avoid deserialize issue Signed-off-by: Gavin.Zhu --- lib/runtime/src/config.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index bc1832fd91b..bf8b6f9066c 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -216,7 +216,6 @@ impl RuntimeConfig { "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status", "STARTING_HEALTH_STATUS" => "starting_health_status", "HEALTH_TRANSITION_POLICY" => "health_transition_policy", - "AUTO_READY_AFTER_SECONDS" => "health_transition_policy", "HEALTH_PATH" => "system_health_path", "LIVE_PATH" => "system_live_path", _ => k.as_str(), @@ -237,7 +236,21 @@ impl RuntimeConfig { /// /// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM` pub fn from_settings() -> Result { - let config: RuntimeConfig = Self::figment().extract()?; + let mut config: RuntimeConfig = Self::figment().extract()?; + + // Handle DYN_SYSTEM_AUTO_READY_AFTER_SECONDS environment variable + // This provides a convenient shortcut for time-based health transition + if let Ok(seconds_str) = std::env::var("DYN_SYSTEM_AUTO_READY_AFTER_SECONDS") { + if !seconds_str.is_empty() { + if let Ok(seconds) = seconds_str.parse::() { + tracing::info!("Using DYN_SYSTEM_AUTO_READY_AFTER_SECONDS={} for health transition policy", seconds); + config.health_transition_policy = HealthTransitionPolicy::TimeBasedReady { after_seconds: seconds }; + } else { + tracing::warn!("Invalid value for DYN_SYSTEM_AUTO_READY_AFTER_SECONDS: '{}', expected a number", seconds_str); + } + } + } + config.validate()?; Ok(config) } From 2dff50f409b262bf69ece2d1c9becbf68bd5c486 Mon Sep 17 00:00:00 2001 From: "Gavin.Zhu" Date: Sun, 7 Sep 2025 05:37:57 +0000 Subject: [PATCH 4/6] fix: fixed space .. again Signed-off-by: Gavin.Zhu --- lib/runtime/src/config.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index bf8b6f9066c..cd555bd1763 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -237,7 +237,6 @@ impl RuntimeConfig { /// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM` pub fn from_settings() -> Result { let mut config: RuntimeConfig = Self::figment().extract()?; - // Handle DYN_SYSTEM_AUTO_READY_AFTER_SECONDS environment variable // This provides a convenient shortcut for time-based health transition if let Ok(seconds_str) = std::env::var("DYN_SYSTEM_AUTO_READY_AFTER_SECONDS") { @@ -250,7 +249,6 @@ impl RuntimeConfig { } } } - config.validate()?; Ok(config) } From 8f1ac9a01c5c11df3fbe3dca4a2c91cbb0abdd8a Mon Sep 17 00:00:00 2001 From: "Gavin.Zhu" Date: Mon, 8 Sep 2025 02:39:43 +0000 Subject: [PATCH 5/6] chore: replace pynvml with nvidia-ml-py as suggested Signed-off-by: Gavin.Zhu --- container/deps/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/container/deps/requirements.txt b/container/deps/requirements.txt index b67fb4e079b..4f62908efd1 100644 --- a/container/deps/requirements.txt +++ b/container/deps/requirements.txt @@ -37,7 +37,7 @@ prometheus-api-client prophet protobuf==5.29.5 pydantic==2.7.1 -pynvml +nvidia-ml-py pyright PyYAML scikit-learn From b3f80d6968721881a244d8e14267df694b621026 Mon Sep 17 00:00:00 2001 From: "Gavin.Zhu" Date: Mon, 8 Sep 2025 02:47:03 +0000 Subject: [PATCH 6/6] fix: fix dep seq as pre commit Signed-off-by: Gavin.Zhu --- container/deps/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/container/deps/requirements.txt b/container/deps/requirements.txt index 4f62908efd1..3cb80d52394 100644 --- a/container/deps/requirements.txt +++ b/container/deps/requirements.txt @@ -28,6 +28,7 @@ matplotlib msgspec mypy numpy==1.26.4 # pmdarima is not compatible with numpy 2 +nvidia-ml-py opentelemetry-api opentelemetry-sdk pip @@ -37,7 +38,6 @@ prometheus-api-client prophet protobuf==5.29.5 pydantic==2.7.1 -nvidia-ml-py pyright PyYAML scikit-learn