diff --git a/Cargo.lock b/Cargo.lock index 7b81664116..9634bb5478 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,6 +132,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-recursion" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -1791,6 +1802,7 @@ dependencies = [ "serde", "sha2", "shellexpand", + "tempfile", "tokio", "tokio-stream", "tokio-util", @@ -1804,6 +1816,7 @@ name = "nativelink-util" version = "0.2.0" dependencies = [ "async-lock", + "async-recursion", "async-trait", "blake3", "bytes", diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 04a10234e7..08dddd1dcc 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index f2573c1042..1a7137778f 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index f6f636899a..ac41380612 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 94a4abb453..106bf93480 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -58,6 +58,7 @@ rust_library( "@crate_index//:serde", "@crate_index//:sha2", "@crate_index//:shellexpand", + "@crate_index//:tempfile", "@crate_index//:tokio", "@crate_index//:tokio-stream", "@crate_index//:tokio-util", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index a1c67bdb17..0ffd051c80 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.8.5" serde = "1.0.193" sha2 = "0.10.8" shellexpand = "3.1.0" +tempfile = "3.9.0" tokio = { version = "1.35.1" } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.10" } diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 6ce68cbf6a..580fc69746 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered; use futures::{Future, TryStreamExt}; use nativelink_config::stores::StoreConfig; use nativelink_error::Error; +use nativelink_util::health_utils::HealthRegistry; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::Store; @@ -44,6 +45,7 @@ pub fn store_factory<'a>( backend: &'a StoreConfig, store_manager: &'a Arc, maybe_store_metrics: Option<&'a mut Registry>, + maybe_health_registry: Option<&'a mut HealthRegistry>, ) -> Pin> { Box::pin(async move { let store: Arc = match backend { @@ -51,36 +53,38 @@ pub fn store_factory<'a>( StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?), StoreConfig::verify(config) => Arc::new(VerifyStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::compression(config) => Arc::new(CompressionStore::new( *config.clone(), - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )?), - StoreConfig::dedup(config) => Arc::new(DedupStore::new( - config, - store_factory(&config.index_store, store_manager, None).await?, - store_factory(&config.content_store, store_manager, None).await?, - )), + StoreConfig::dedup(config) => { + Arc::new(DedupStore::new( + config, + store_factory(&config.index_store, store_manager, None, None).await?, + store_factory(&config.content_store, store_manager, None, None).await?, + )) + }, StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new( - store_factory(&config.backend, store_manager, None).await?, - store_factory(&config.cas_store, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, + store_factory(&config.cas_store, store_manager, None, None).await?, )), StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new( config, - store_factory(&config.fast, store_manager, None).await?, - store_factory(&config.slow, store_manager, None).await?, + store_factory(&config.fast, store_manager, None, None).await?, + store_factory(&config.slow, store_manager, None, None).await?, )), StoreConfig::filesystem(config) => Arc::new(::new(config).await?), StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))), StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new( config, - store_factory(&config.lower_store, store_manager, None).await?, - store_factory(&config.upper_store, store_manager, None).await?, + store_factory(&config.lower_store, store_manager, None, None).await?, + store_factory(&config.upper_store, store_manager, None, None).await?, )), StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?), StoreConfig::noop => Arc::new(NoopStore::new()), @@ -88,7 +92,7 @@ pub fn store_factory<'a>( let stores = config .stores .iter() - .map(|store_config| store_factory(&store_config.store, store_manager, None)) + .map(|store_config| store_factory(&store_config.store, store_manager, None, None)) .collect::>() .try_collect::>() .await?; @@ -98,6 +102,11 @@ pub fn store_factory<'a>( if let Some(store_metrics) = maybe_store_metrics { store.clone().register_metrics(store_metrics); } + + if let Some(health_registry) = maybe_health_registry { + store.clone().register_health(health_registry); + } + Ok(store) }) } diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 5d312e92f8..b6e6e65d33 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -14,6 +14,7 @@ use std::ffi::OsString; use std::fmt::{Debug, Formatter}; +use std::io::Write; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -29,13 +30,16 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{HealthRegistry, HealthStatus, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; +use rand::RngCore; +use tempfile::NamedTempFile; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::task::spawn_blocking; use tokio::time::{sleep, timeout, Sleep}; use tokio_stream::wrappers::ReadDirStream; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use crate::cas_utils::is_zero_digest; @@ -748,6 +752,78 @@ impl Store for FilesystemStore { fn register_metrics(self: Arc, registry: &mut Registry) { registry.register_collector(Box::new(Collector::new(&self))); } + + fn register_health(self: Arc, registry: &mut HealthRegistry) { + registry.register_indicator(self); + } +} + +#[async_trait] +impl HealthStatusIndicator for FilesystemStore { + async fn check_health(self: Arc) -> Result { + let temp_path = &self.shared_context.temp_path; + let temp_file_with_prefix = NamedTempFile::with_prefix_in(&".fs_hc_", temp_path); + + let failed_fn = |failure_action, error| { + let message = format!( + "Failed to {:?} temp file in filesystem store health check: {:?}", + failure_action, error + ); + error!(message); + return self.make_failed(message.into()); + }; + + match temp_file_with_prefix { + Ok(mut file) => { + let mut data = [0u8; 1_000_000]; + + rand::thread_rng().fill_bytes(&mut data); + + let write_result = file.write_all(&data); + match write_result { + Ok(()) => debug!( + "Successfully wrote to temp file in filesystem store health check {:?}", + file + ), + Err(err) => return Ok(failed_fn("write", err)) + } + + let flush_result = file.flush(); + match flush_result { + Ok(_) => debug!("Successfully flushed temp file in filesystem store health check"), + Err(err) => return Ok(failed_fn("flush", err)) + } + + let keep_result = file.keep(); + let file_path: std::path::PathBuf; + match keep_result { + Ok((f, p)) => { + debug!( + "Successfully kept temp file in filesystem store health check {:?} {:?}", + f, p + ); + file_path = p.to_path_buf(); // Some(p.as_path().clone()); + } + Err(err) => return Ok(failed_fn("keep", err.into())), + } + + debug!("Removing temp file in filesystem store health check {:?}", file_path); + let remove_result = fs::remove_file(file_path.as_path()).await; + match remove_result { + Ok(_) => debug!( + "Successfully removed temp file in filesystem store health check {:?}", + file_path.as_path() + ), + Err(err) => return Ok(failed_fn("remove", err.to_std_err())), + } + + let message = format!("Successfully filesystem store health check"); + Ok(self.make_ok(message.into())) + } + + Err(err) => return Ok(failed_fn("create", err)), + } + } } impl MetricsComponent for FilesystemStore { diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 9d37be3ab2..86f6c7fa68 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -16,6 +16,7 @@ rust_library( "src/evicting_map.rs", "src/fastcdc.rs", "src/fs.rs", + "src/health_utils.rs", "src/lib.rs", "src/metrics_utils.rs", "src/platform_properties.rs", @@ -35,6 +36,7 @@ rust_library( "//nativelink-error", "//nativelink-proto", "@crate_index//:async-lock", + "@crate_index//:async-recursion", "@crate_index//:blake3", "@crate_index//:bytes", "@crate_index//:futures", diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index 3bcf20bd76..d6dc93f115 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -9,6 +9,7 @@ nativelink-error = { path = "../nativelink-error" } nativelink-proto = { path = "../nativelink-proto" } async-lock = "3.2.0" +async-recursion = "1.0.5" async-trait = "0.1.74" blake3 = "1.5.0" bytes = "1.5.0" diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs new file mode 100644 index 0000000000..802170739c --- /dev/null +++ b/nativelink-util/src/health_utils.rs @@ -0,0 +1,134 @@ +use std::borrow::Cow; +use std::marker::Send; +use std::sync::Arc; + +use async_recursion::async_recursion; +use async_trait::async_trait; +use nativelink_error::Error; + +use std::fmt::Debug; + +type HealthComponent = Cow<'static, str>; +type TypeName = Cow<'static, str>; +pub type Message = Cow<'static, str>; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum HealthStatus { + Ok(TypeName, Message), + Initializing(TypeName, Message), + Warning(TypeName, Message), + Failed(TypeName, Message), +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct HealthStatusDescription { + pub component: HealthComponent, + pub status: HealthStatus +} + +#[async_trait] +pub trait HealthStatusIndicator: Sync + Send + Unpin { + fn type_name(&self) -> TypeName { + Cow::Borrowed(std::any::type_name::()) + } + + async fn check_health(self: Arc) -> Result { + Ok(self.make_ok("ok".into())) + } + + fn make_ok(&self, message: Message) -> HealthStatus { + HealthStatus::Ok(self.type_name(), message) + } + + fn make_initializing(&self, message: Message) -> HealthStatus { + HealthStatus::Initializing(self.type_name(), message) + } + + fn make_warning(&self, message: Message) -> HealthStatus { + HealthStatus::Warning(self.type_name(), message) + } + + fn make_failed(&self, message: Message) -> HealthStatus { + HealthStatus::Failed(self.type_name(), message) + } +} + + +#[derive(Default, Clone)] +pub struct HealthRegistry { + component: HealthComponent, + indicators: Vec>, + registries: Vec, +} + +impl HealthRegistry { + pub fn new(component: HealthComponent) -> Self { + Self { + component, + ..Default::default() + } + } + + pub fn register_indicator(&mut self, indicator: Arc) { + self.indicators.push(indicator); + } + + pub fn add_dependency(&mut self, component: HealthComponent) -> &mut Self { + let dependency = HealthRegistry::new(component); + + self.registries.push(dependency); + self.registries + .last_mut() + .expect("dependencies should not to be empty.") + } + + #[async_recursion] + async fn flatten( + &mut self, + results: &mut Vec, + parent_component: &HealthComponent, + component: &HealthComponent, + indicators: &Vec>, + registries: &Vec, + ) -> Result<(), Error> { + let component_name: Cow<'static, str> = Cow::Owned(format!("{parent_component}/{component}")); + for indicator in indicators { + let result = indicator.clone().check_health().await; + + let health_status = match result { + Ok(health_status) => HealthStatusDescription { + component: component_name.clone(), + status: health_status + }, + Err(error) => HealthStatusDescription { + component: component_name.clone(), + status: indicator.make_failed(format!("health check failed: {error}").into()) + }, + }; + + results.push(health_status); + } + + for registry in registries { + let _ = self + .clone() + .flatten(results, &component_name, ®istry.component, ®istry.indicators, ®istry.registries) + .await; + } + + Ok(()) + } + + pub async fn flatten_indicators(&mut self) -> Vec { + let mut health_status_results = Vec::new(); + let parent_component: HealthComponent = "".into(); + let component = &self.component; + let indicators = &self.indicators; + let registries = &self.registries; + let _ = self + .clone() + .flatten(&mut health_status_results, &parent_component, &component, indicators, registries) + .await; + health_status_results + } +} diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 3dad898e82..04bf0ebf58 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -19,6 +19,7 @@ pub mod digest_hasher; pub mod evicting_map; pub mod fastcdc; pub mod fs; +pub mod health_utils; pub mod metrics_utils; pub mod platform_properties; pub mod resource_info; diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index b918391068..87ff456b8f 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize}; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; +use crate::health_utils::HealthRegistry; use crate::metrics_utils::Registry; #[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] @@ -180,4 +181,6 @@ pub trait Store: Sync + Send + Unpin { /// Register any metrics that this store wants to expose to the Prometheus. fn register_metrics(self: Arc, _registry: &mut Registry) {} + + fn register_health(self: Arc, _registry: &mut HealthRegistry) {} } diff --git a/nativelink-util/tests/health_utils_test.rs b/nativelink-util/tests/health_utils_test.rs new file mode 100644 index 0000000000..885d9b3592 --- /dev/null +++ b/nativelink-util/tests/health_utils_test.rs @@ -0,0 +1,179 @@ +use std::collections::HashSet; +use std::iter::FromIterator; +use std::sync::{Arc, Mutex}; + +use nativelink_error::Error; + +#[cfg(test)] +mod health_utils_tests { + + use nativelink_util::health_utils::*; + use pretty_assertions::assert_eq; + + use super::*; + + #[tokio::test] + async fn create_empty_indicator() -> Result<(), Error> { + let health_registery = Arc::new(Mutex::new(HealthRegistry::new("nativelink".into()))); + + let health_status = health_registery.lock().unwrap().flatten_indicators().await; + assert_eq!(health_status.len(), 0); + + Ok(()) + } + + #[tokio::test] + async fn create_register_indicator() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, HealthStatus::Ok, "ok"); + + let health_registery = Arc::new(Mutex::new(HealthRegistry::new("nativelink".into()))); + let mut health_registery = health_registery.lock().unwrap(); + + health_registery.register_indicator(Arc::new(MockComponentImpl {})); + + let health_status = health_registery.flatten_indicators().await; + assert_eq!(health_status.len(), 1); + assert_eq!( + health_status, + vec![HealthStatusDescription { + component: "/nativelink".into(), + status: HealthStatus::Ok("MockComponentImpl".into(), "ok".into()), + }] + ); + + Ok(()) + } + + #[tokio::test] + async fn create_add_dependency() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, HealthStatus::Ok, "ok"); + + let health_registery = Arc::new(Mutex::new(HealthRegistry::new("nativelink".into()))); + let mut health_registery = health_registery.lock().unwrap(); + + health_registery.register_indicator(Arc::new(MockComponentImpl {})); + + let dependency1_registry = health_registery.add_dependency("dependency1".into()); + + dependency1_registry.register_indicator(Arc::new(MockComponentImpl {})); + + let health_status = health_registery.flatten_indicators().await; + // println!("health_status: {:?}", health_status); + assert_eq!(health_status.len(), 2); + let expected_health_status = vec![ + HealthStatusDescription { + component: "/nativelink".into(), + status: HealthStatus::Ok("MockComponentImpl".into(), "ok".into()), + }, + HealthStatusDescription { + component: "/nativelink/dependency1".into(), + status: HealthStatus::Ok("MockComponentImpl".into(), "ok".into()), + }, + ]; + + assert_eq!(health_status, expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_same_level() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, HealthStatus::Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, HealthStatus::Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, HealthStatus::Ok, "ok"); + + let health_registery = Arc::new(Mutex::new(HealthRegistry::new("nativelink".into()))); + let mut health_registery = health_registery.lock().unwrap(); + + health_registery.register_indicator(Arc::new(MockComponentImpl1 {})); + health_registery.register_indicator(Arc::new(MockComponentImpl2 {})); + health_registery.register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_status = health_registery.flatten_indicators().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + component: "/nativelink".into(), + status: HealthStatus::Ok("MockComponentImpl1".into(), "ok".into()), + }, + HealthStatusDescription { + component: "/nativelink".into(), + status: HealthStatus::Ok("MockComponentImpl2".into(), "ok".into()), + }, + HealthStatusDescription { + component: "/nativelink".into(), + status: HealthStatus::Ok("MockComponentImpl3".into(), "ok".into()), + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_nested_levels() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, HealthStatus::Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, HealthStatus::Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, HealthStatus::Ok, "ok"); + + let health_registery = Arc::new(Mutex::new(HealthRegistry::new("nativelink".into()))); + let mut health_registery = health_registery.lock().unwrap(); + + health_registery + .add_dependency("dependency1".into()) + .register_indicator(Arc::new(MockComponentImpl1 {})); + + health_registery + .add_dependency("dependency2".into()) + .register_indicator(Arc::new(MockComponentImpl2 {})); + + health_registery + .add_dependency("dependency3".into()) + .register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_status = health_registery.flatten_indicators().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + component: "/nativelink/dependency1".into(), + status: HealthStatus::Ok("MockComponentImpl1".into(), "ok".into()), + }, + HealthStatusDescription { + component: "/nativelink/dependency2".into(), + status: HealthStatus::Ok("MockComponentImpl2".into(), "ok".into()), + }, + HealthStatusDescription { + component: "/nativelink/dependency3".into(), + status: HealthStatus::Ok("MockComponentImpl3".into(), "ok".into()), + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[macro_export] + macro_rules! generate_health_status_indicator { + ($struct_name:ident, $health_status:expr, $status_msg:expr) => { + struct $struct_name; + + #[async_trait::async_trait] + impl HealthStatusIndicator for $struct_name { + async fn check_health(self: Arc) -> Result { + Ok($health_status( + stringify!($struct_name).into(), + $status_msg.into(), + )) + } + } + }; + } + + fn vec_to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 9c01d330b6..4504346990 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -41,6 +41,7 @@ use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; +use nativelink_util::health_utils::HealthRegistry; use nativelink_util::metrics_utils::{ set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry, }; @@ -87,18 +88,30 @@ struct Args { async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), Box> { let mut root_metrics_registry = ::with_prefix("nativelink"); + let health_registery = Arc::new(Mutex::new(HealthRegistry::new("nativelink".into()))); let store_manager = Arc::new(StoreManager::new()); { - let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores"); - for (name, store_cfg) in cfg.stores { - let store_metrics = root_store_metrics.sub_registry_with_prefix(&name); - store_manager.add_store( - &name, - store_factory(&store_cfg, &store_manager, Some(store_metrics)) + let mut health_registery_lock = health_registery.lock(); + { + let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores"); + + for (name, store_cfg) in cfg.stores { + let health_component_name = format!("stores/{name}"); + let health_register_store = health_registery_lock.add_dependency(health_component_name.into()); + let store_metrics = root_store_metrics.sub_registry_with_prefix(&name); + store_manager.add_store( + &name, + store_factory( + &store_cfg, + &store_manager, + Some(store_metrics), + Some(health_register_store), + ) .await .err_tip(|| format!("Failed to create store '{name}'"))?, - ); + ); + } } } @@ -349,12 +362,41 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B ); let root_metrics_registry = root_metrics_registry.clone(); - + let health_registery_status = health_registery.clone(); let mut svc = Router::new() // This is the default service that executes if no other endpoint matches. .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))) - // This is a generic endpoint used to check if the server is up. - .route_service("/status", axum::routing::get(move || async move { "Ok".to_string() })); + .route_service( + "/status", + axum::routing::get(move || async move { + fn error_to_response(e: E) -> Response { + let mut response = Response::new(format!("Error: {e:?}")); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response + } + + spawn_blocking(move || { + let mut buf = String::new(); + let mut health_registery_status_guard = health_registery_status.lock(); + + let indicators = + futures::executor::block_on(health_registery_status_guard.flatten_indicators()); + + for indicator in indicators { + buf.push_str(&format!("{:?}\n", indicator)); + } + + let mut response = Response::new(buf); + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static("text/plain; version=0.0.4; charset=utf-8"), + ); + response + }) + .await + .unwrap_or_else(error_to_response) + }), + ); if let Some(prometheus_cfg) = services.experimental_prometheus { fn error_to_response(e: E) -> Response {