Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion container/deps/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ matplotlib
msgspec
mypy
numpy==1.26.4 # pmdarima is not compatible with numpy 2
nvidia-ml-py
opentelemetry-api
opentelemetry-sdk
pip
Expand All @@ -37,7 +38,6 @@ prometheus-api-client
prophet
protobuf==5.29.5
pydantic==2.7.1
pynvml
pyright
PyYAML
scikit-learn
Expand Down
47 changes: 46 additions & 1 deletion lib/runtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,30 @@ pub enum HealthStatus {
NotReady,
}

/// Health transition policy for automatic health status management
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum HealthTransitionPolicy {
/// Never auto-transition (current behavior, requires manual set_health_status)
Manual,
/// Auto-ready after specified uptime seconds
TimeBasedReady { after_seconds: u64 },
/// Ready when service has registered at least one endpoint
EndpointBasedReady,
/// Custom readiness logic with configurable parameters
Custom {
auto_ready_after_seconds: Option<u64>,
require_endpoints_ready: bool,
},
}

impl Default for HealthTransitionPolicy {
fn default() -> Self {
// Better default: auto-ready after 30 seconds for simple services
Self::TimeBasedReady { after_seconds: 30 }
}
}

/// Runtime configuration
/// Defines the configuration for Tokio runtimes
#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
Expand Down Expand Up @@ -114,6 +138,12 @@ pub struct RuntimeConfig {
#[builder(default = "vec![]")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub use_endpoint_health_status: Vec<String>,
/// Health Transition Policy
/// Controls when and how the system automatically transitions from "notready" to "ready"
/// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_TRANSITION_POLICY
#[builder(default = "HealthTransitionPolicy::default()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub health_transition_policy: HealthTransitionPolicy,

/// Health endpoint paths
/// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH
Expand Down Expand Up @@ -185,6 +215,7 @@ impl RuntimeConfig {
"ENABLED" => "system_enabled",
"USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
"STARTING_HEALTH_STATUS" => "starting_health_status",
"HEALTH_TRANSITION_POLICY" => "health_transition_policy",
"HEALTH_PATH" => "system_health_path",
"LIVE_PATH" => "system_live_path",
_ => k.as_str(),
Expand All @@ -205,7 +236,19 @@ impl RuntimeConfig {
///
/// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
pub fn from_settings() -> Result<RuntimeConfig> {
let config: RuntimeConfig = Self::figment().extract()?;
let mut config: RuntimeConfig = Self::figment().extract()?;
// Handle DYN_SYSTEM_AUTO_READY_AFTER_SECONDS environment variable
// This provides a convenient shortcut for time-based health transition
if let Ok(seconds_str) = std::env::var("DYN_SYSTEM_AUTO_READY_AFTER_SECONDS") {
if !seconds_str.is_empty() {
if let Ok(seconds) = seconds_str.parse::<u64>() {
tracing::info!("Using DYN_SYSTEM_AUTO_READY_AFTER_SECONDS={} for health transition policy", seconds);
config.health_transition_policy = HealthTransitionPolicy::TimeBasedReady { after_seconds: seconds };
} else {
tracing::warn!("Invalid value for DYN_SYSTEM_AUTO_READY_AFTER_SECONDS: '{}', expected a number", seconds_str);
}
}
}
config.validate()?;
Ok(config)
}
Expand All @@ -225,6 +268,7 @@ impl RuntimeConfig {
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
health_transition_policy: HealthTransitionPolicy::default(),
system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
}
Expand Down Expand Up @@ -254,6 +298,7 @@ impl Default for RuntimeConfig {
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
health_transition_policy: HealthTransitionPolicy::default(),
system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
}
Expand Down
2 changes: 2 additions & 0 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ impl DistributedRuntime {
};
let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let health_transition_policy = config.health_transition_policy.clone();
let health_endpoint_path = config.system_health_path.clone();
let live_endpoint_path = config.system_live_path.clone();
let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
health_transition_policy,
health_endpoint_path,
live_endpoint_path,
)));
Expand Down
79 changes: 77 additions & 2 deletions lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use crate::metrics::prometheus_names::distributed_runtime;
use component::{Endpoint, InstanceSource};
use utils::GracefulShutdownTracker;

use config::HealthStatus;
use config::{HealthStatus, HealthTransitionPolicy};

/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
#[derive(Clone)]
Expand Down Expand Up @@ -94,6 +94,7 @@ pub struct SystemHealth {
system_health: HealthStatus,
endpoint_health: HashMap<String, HealthStatus>,
use_endpoint_health_status: Vec<String>,
health_transition_policy: HealthTransitionPolicy,
health_path: String,
live_path: String,
start_time: Instant,
Expand All @@ -104,6 +105,7 @@ impl SystemHealth {
pub fn new(
starting_health_status: HealthStatus,
use_endpoint_health_status: Vec<String>,
health_transition_policy: HealthTransitionPolicy,
health_path: String,
live_path: String,
) -> Self {
Expand All @@ -115,6 +117,7 @@ impl SystemHealth {
system_health: starting_health_status,
endpoint_health,
use_endpoint_health_status,
health_transition_policy,
health_path,
live_path,
start_time: Instant::now(),
Expand All @@ -129,7 +132,79 @@ impl SystemHealth {
self.endpoint_health.insert(endpoint.to_string(), status);
}

/// Returns the overall health status and endpoint health statuses
/// Check and automatically update health status based on transition policy
/// This is the core fix for the health transition issue
pub fn check_and_update_health_status(&mut self) {
if self.system_health == HealthStatus::Ready {
return; // Already ready, no need to transition
}

let uptime = self.start_time.elapsed();
match &self.health_transition_policy {
HealthTransitionPolicy::Manual => {
// No automatic transition (current behavior)
},
HealthTransitionPolicy::TimeBasedReady { after_seconds } => {
if uptime.as_secs() >= *after_seconds {
tracing::info!("Auto-transitioning to ready after {}s uptime (policy: time_based_ready)", uptime.as_secs());
self.system_health = HealthStatus::Ready;
}
},
HealthTransitionPolicy::EndpointBasedReady => {
// Ready when service has registered at least one endpoint
if !self.endpoint_health.is_empty() {
let all_endpoints_ready = self.endpoint_health.values()
.all(|status| *status == HealthStatus::Ready);

if all_endpoints_ready {
tracing::info!("Auto-transitioning to ready - all {} endpoints are ready (policy: endpoint_based_ready)",
self.endpoint_health.len());
self.system_health = HealthStatus::Ready;
}
}
},
HealthTransitionPolicy::Custom { auto_ready_after_seconds, require_endpoints_ready } => {
let mut ready_conditions_met = true;

// Check time-based condition if specified
if let Some(after_seconds) = auto_ready_after_seconds {
if uptime.as_secs() < *after_seconds {
ready_conditions_met = false;
}
}

// Check endpoint-based condition if required
if *require_endpoints_ready {
if self.endpoint_health.is_empty() {
ready_conditions_met = false;
} else {
let all_endpoints_ready = self.endpoint_health.values()
.all(|status| *status == HealthStatus::Ready);
if !all_endpoints_ready {
ready_conditions_met = false;
}
}
}

if ready_conditions_met {
tracing::info!("Auto-transitioning to ready after {}s uptime (policy: custom)", uptime.as_secs());
self.system_health = HealthStatus::Ready;
}
}
}
}

/// Check for automatic transitions and return the overall health status and endpoint health statuses
/// This is the main method that should be called by the health handler
pub fn get_health_status_with_transition_check(&mut self) -> (bool, HashMap<String, String>) {
// Check for automatic health transitions BEFORE reporting status
self.check_and_update_health_status();

// Now get the current status
self.get_health_status()
}

/// Returns the overall health status and endpoint health statuses (read-only)
pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
let mut endpoints: HashMap<String, String> = HashMap::new();
for (endpoint, ready) in &self.endpoint_health {
Expand Down
4 changes: 2 additions & 2 deletions lib/runtime/src/system_status_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ pub async fn spawn_system_status_server(
/// Health handler
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let system_health = state.drt().system_health.lock().unwrap();
let (healthy, endpoints) = system_health.get_health_status();
let mut system_health = state.drt().system_health.lock().unwrap();
let (healthy, endpoints) = system_health.get_health_status_with_transition_check();
let uptime = Some(system_health.uptime());

let healthy_string = if healthy { "ready" } else { "notready" };
Expand Down
Loading