diff --git a/container/deps/requirements.txt b/container/deps/requirements.txt index b67fb4e079b..3cb80d52394 100644 --- a/container/deps/requirements.txt +++ b/container/deps/requirements.txt @@ -28,6 +28,7 @@ matplotlib msgspec mypy numpy==1.26.4 # pmdarima is not compatible with numpy 2 +nvidia-ml-py opentelemetry-api opentelemetry-sdk pip @@ -37,7 +38,6 @@ prometheus-api-client prophet protobuf==5.29.5 pydantic==2.7.1 -pynvml pyright PyYAML scikit-learn diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index 33ebb676dd1..cd555bd1763 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -59,6 +59,30 @@ pub enum HealthStatus { NotReady, } +/// Health transition policy for automatic health status management +#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] +#[serde(rename_all = "snake_case")] +pub enum HealthTransitionPolicy { + /// Never auto-transition (current behavior, requires manual set_health_status) + Manual, + /// Auto-ready after specified uptime seconds + TimeBasedReady { after_seconds: u64 }, + /// Ready when service has registered at least one endpoint + EndpointBasedReady, + /// Custom readiness logic with configurable parameters + Custom { + auto_ready_after_seconds: Option, + require_endpoints_ready: bool, + }, +} + +impl Default for HealthTransitionPolicy { + fn default() -> Self { + // Better default: auto-ready after 30 seconds for simple services + Self::TimeBasedReady { after_seconds: 30 } + } +} + /// Runtime configuration /// Defines the configuration for Tokio runtimes #[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)] @@ -114,6 +138,12 @@ pub struct RuntimeConfig { #[builder(default = "vec![]")] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] pub use_endpoint_health_status: Vec, + /// Health Transition Policy + /// Controls when and how the system automatically transitions from "notready" to "ready" + /// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_TRANSITION_POLICY + #[builder(default = "HealthTransitionPolicy::default()")] + #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] + pub health_transition_policy: HealthTransitionPolicy, /// Health endpoint paths /// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH @@ -185,6 +215,7 @@ impl RuntimeConfig { "ENABLED" => "system_enabled", "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status", "STARTING_HEALTH_STATUS" => "starting_health_status", + "HEALTH_TRANSITION_POLICY" => "health_transition_policy", "HEALTH_PATH" => "system_health_path", "LIVE_PATH" => "system_live_path", _ => k.as_str(), @@ -205,7 +236,19 @@ impl RuntimeConfig { /// /// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM` pub fn from_settings() -> Result { - let config: RuntimeConfig = Self::figment().extract()?; + let mut config: RuntimeConfig = Self::figment().extract()?; + // Handle DYN_SYSTEM_AUTO_READY_AFTER_SECONDS environment variable + // This provides a convenient shortcut for time-based health transition + if let Ok(seconds_str) = std::env::var("DYN_SYSTEM_AUTO_READY_AFTER_SECONDS") { + if !seconds_str.is_empty() { + if let Ok(seconds) = seconds_str.parse::() { + tracing::info!("Using DYN_SYSTEM_AUTO_READY_AFTER_SECONDS={} for health transition policy", seconds); + config.health_transition_policy = HealthTransitionPolicy::TimeBasedReady { after_seconds: seconds }; + } else { + tracing::warn!("Invalid value for DYN_SYSTEM_AUTO_READY_AFTER_SECONDS: '{}', expected a number", seconds_str); + } + } + } config.validate()?; Ok(config) } @@ -225,6 +268,7 @@ impl RuntimeConfig { system_enabled: false, starting_health_status: HealthStatus::NotReady, use_endpoint_health_status: vec![], + health_transition_policy: HealthTransitionPolicy::default(), system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), } @@ -254,6 +298,7 @@ impl Default for RuntimeConfig { system_enabled: false, starting_health_status: HealthStatus::NotReady, use_endpoint_health_status: vec![], + health_transition_policy: HealthTransitionPolicy::default(), system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), } diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 8397b89d910..7c866f52808 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -75,11 +75,13 @@ impl DistributedRuntime { }; let starting_health_status = config.starting_health_status.clone(); let use_endpoint_health_status = config.use_endpoint_health_status.clone(); + let health_transition_policy = config.health_transition_policy.clone(); let health_endpoint_path = config.system_health_path.clone(); let live_endpoint_path = config.system_live_path.clone(); let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new( starting_health_status, use_endpoint_health_status, + health_transition_policy, health_endpoint_path, live_endpoint_path, ))); diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 263ef357153..697165be8d6 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -65,7 +65,7 @@ use crate::metrics::prometheus_names::distributed_runtime; use component::{Endpoint, InstanceSource}; use utils::GracefulShutdownTracker; -use config::HealthStatus; +use config::{HealthStatus, HealthTransitionPolicy}; /// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime]. #[derive(Clone)] @@ -94,6 +94,7 @@ pub struct SystemHealth { system_health: HealthStatus, endpoint_health: HashMap, use_endpoint_health_status: Vec, + health_transition_policy: HealthTransitionPolicy, health_path: String, live_path: String, start_time: Instant, @@ -104,6 +105,7 @@ impl SystemHealth { pub fn new( starting_health_status: HealthStatus, use_endpoint_health_status: Vec, + health_transition_policy: HealthTransitionPolicy, health_path: String, live_path: String, ) -> Self { @@ -115,6 +117,7 @@ impl SystemHealth { system_health: starting_health_status, endpoint_health, use_endpoint_health_status, + health_transition_policy, health_path, live_path, start_time: Instant::now(), @@ -129,7 +132,79 @@ impl SystemHealth { self.endpoint_health.insert(endpoint.to_string(), status); } - /// Returns the overall health status and endpoint health statuses + /// Check and automatically update health status based on transition policy + /// This is the core fix for the health transition issue + pub fn check_and_update_health_status(&mut self) { + if self.system_health == HealthStatus::Ready { + return; // Already ready, no need to transition + } + + let uptime = self.start_time.elapsed(); + match &self.health_transition_policy { + HealthTransitionPolicy::Manual => { + // No automatic transition (current behavior) + }, + HealthTransitionPolicy::TimeBasedReady { after_seconds } => { + if uptime.as_secs() >= *after_seconds { + tracing::info!("Auto-transitioning to ready after {}s uptime (policy: time_based_ready)", uptime.as_secs()); + self.system_health = HealthStatus::Ready; + } + }, + HealthTransitionPolicy::EndpointBasedReady => { + // Ready when service has registered at least one endpoint + if !self.endpoint_health.is_empty() { + let all_endpoints_ready = self.endpoint_health.values() + .all(|status| *status == HealthStatus::Ready); + + if all_endpoints_ready { + tracing::info!("Auto-transitioning to ready - all {} endpoints are ready (policy: endpoint_based_ready)", + self.endpoint_health.len()); + self.system_health = HealthStatus::Ready; + } + } + }, + HealthTransitionPolicy::Custom { auto_ready_after_seconds, require_endpoints_ready } => { + let mut ready_conditions_met = true; + + // Check time-based condition if specified + if let Some(after_seconds) = auto_ready_after_seconds { + if uptime.as_secs() < *after_seconds { + ready_conditions_met = false; + } + } + + // Check endpoint-based condition if required + if *require_endpoints_ready { + if self.endpoint_health.is_empty() { + ready_conditions_met = false; + } else { + let all_endpoints_ready = self.endpoint_health.values() + .all(|status| *status == HealthStatus::Ready); + if !all_endpoints_ready { + ready_conditions_met = false; + } + } + } + + if ready_conditions_met { + tracing::info!("Auto-transitioning to ready after {}s uptime (policy: custom)", uptime.as_secs()); + self.system_health = HealthStatus::Ready; + } + } + } + } + + /// Check for automatic transitions and return the overall health status and endpoint health statuses + /// This is the main method that should be called by the health handler + pub fn get_health_status_with_transition_check(&mut self) -> (bool, HashMap) { + // Check for automatic health transitions BEFORE reporting status + self.check_and_update_health_status(); + + // Now get the current status + self.get_health_status() + } + + /// Returns the overall health status and endpoint health statuses (read-only) pub fn get_health_status(&self) -> (bool, HashMap) { let mut endpoints: HashMap = HashMap::new(); for (endpoint, ready) in &self.endpoint_health { diff --git a/lib/runtime/src/system_status_server.rs b/lib/runtime/src/system_status_server.rs index d8ab9a8c97f..9c785c85ba7 100644 --- a/lib/runtime/src/system_status_server.rs +++ b/lib/runtime/src/system_status_server.rs @@ -171,8 +171,8 @@ pub async fn spawn_system_status_server( /// Health handler #[tracing::instrument(skip_all, level = "trace")] async fn health_handler(state: Arc) -> impl IntoResponse { - let system_health = state.drt().system_health.lock().unwrap(); - let (healthy, endpoints) = system_health.get_health_status(); + let mut system_health = state.drt().system_health.lock().unwrap(); + let (healthy, endpoints) = system_health.get_health_status_with_transition_check(); let uptime = Some(system_health.uptime()); let healthy_string = if healthy { "ready" } else { "notready" };