From 9caec5fd029ed1f06f040c8b0cf9a9c7545689a7 Mon Sep 17 00:00:00 2001 From: Adam Singer Date: Tue, 6 Feb 2024 16:50:10 -0800 Subject: [PATCH] [Breaking] Change in behavior of /status by introduction of component based health Introduce a component based health check system. Each type of component should be able to opt into registering handlers implement some mechanical checks of health. Health in this context is functionality expected to work but runtime wise are semi no-op in terms of influencing the underlying storage / rpc systems. Opting in to the system requires for component to define a `HealthStatusIndicator` and implement the `check_health()` function. Registration should automatically be done by the existence of implementation and calling the `Store.register_health()` function in component implementation. At the moment only `Store` based components can register and a single health check is defined for `FilesystemStore`. A global parameter `default_digest_size_health_check` has been introduced for configuring the static seeded random bytes to fill data payload. The default value is 1MB. The `/status` endpoint has been updated to return the resulting string serialized instances of `HealthStatus`, smart serialization of such objects is not implemented at this time. --- Cargo.lock | 1 + nativelink-config/src/cas_server.rs | 9 + nativelink-service/tests/ac_server_test.rs | 2 + .../tests/bytestream_server_test.rs | 1 + nativelink-service/tests/cas_server_test.rs | 1 + nativelink-store/BUILD.bazel | 1 + nativelink-store/Cargo.toml | 1 + .../src/completeness_checking_store.rs | 3 + nativelink-store/src/compression_store.rs | 3 + nativelink-store/src/dedup_store.rs | 3 + nativelink-store/src/default_store_factory.rs | 31 ++- nativelink-store/src/existence_cache_store.rs | 3 + nativelink-store/src/fast_slow_store.rs | 3 + nativelink-store/src/filesystem_store.rs | 17 ++ nativelink-store/src/grpc_store.rs | 5 +- nativelink-store/src/memory_store.rs | 3 + nativelink-store/src/noop_store.rs | 3 + nativelink-store/src/ref_store.rs | 3 + nativelink-store/src/s3_store.rs | 3 + nativelink-store/src/shard_store.rs | 3 + .../src/size_partitioning_store.rs | 3 + nativelink-store/src/verify_store.rs | 3 + .../tests/fast_slow_store_test.rs | 3 + nativelink-util/BUILD.bazel | 3 + nativelink-util/Cargo.toml | 1 + nativelink-util/src/health_utils.rs | 197 ++++++++++++++++ nativelink-util/src/lib.rs | 1 + nativelink-util/src/store_trait.rs | 102 +++++++- nativelink-util/tests/health_utils_test.rs | 223 ++++++++++++++++++ src/bin/nativelink.rs | 82 ++++++- 30 files changed, 695 insertions(+), 22 deletions(-) create mode 100644 nativelink-util/src/health_utils.rs create mode 100644 nativelink-util/tests/health_utils_test.rs diff --git a/Cargo.lock b/Cargo.lock index 73572e3f59..7e6e2a7e99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,7 @@ dependencies = [ "serde", "sha2", "shellexpand", + "tempfile", "tokio", "tokio-stream", "tokio-util", diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 0e6889c1e6..f7c2b58d26 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -629,6 +629,15 @@ pub struct GlobalConfig { /// /// Default: ConfigDigestHashFunction::sha256 pub default_digest_hash_function: Option, + + /// Default digest size to use for health check when running + /// diagnostics checks. Health checks are expected to use this + /// size for filling a buffer that is used for creation of + /// digest. + /// + /// Default: 1024*1024 (1MiB) + #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] + pub default_digest_size_health_check: usize, } #[derive(Deserialize, Debug)] diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 6f8c458a45..3c5da9cbdd 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 370a6d6a44..20f39fffd7 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index 6f8c60c4ff..ffc3aaf6a0 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 606a0d6a45..002a6e27c5 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -58,6 +58,7 @@ rust_library( "@crates//:serde", "@crates//:sha2", "@crates//:shellexpand", + "@crates//:tempfile", "@crates//:tokio", "@crates//:tokio-stream", "@crates//:tokio-util", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index a1c67bdb17..0ffd051c80 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.8.5" serde = "1.0.193" sha2 = "0.10.8" shellexpand = "3.1.0" +tempfile = "3.9.0" tokio = { version = "1.35.1" } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.10" } diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index 00a3d4ffca..98d9d13e33 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -25,6 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ }; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use parking_lot::Mutex; use tokio::sync::Notify; @@ -361,3 +362,5 @@ impl Store for CompletenessCheckingStore { Box::new(self) } } + +default_health_status_indicator!(CompletenessCheckingStore); diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index c2677669bd..f41014970c 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -26,6 +26,7 @@ use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size}; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{DigestInfo, JoinHandleDropGuard}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; @@ -590,3 +591,5 @@ impl Store for CompressionStore { self.inner_store.clone().register_metrics(inner_store_registry); } } + +default_health_status_indicator!(CompressionStore); diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index a9bbd9e04b..d0c29ffb53 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -24,6 +24,7 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, StreamReader}; use nativelink_util::common::DigestInfo; use nativelink_util::fastcdc::FastCDC; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use tokio_util::codec::FramedRead; @@ -349,3 +350,5 @@ impl Store for DedupStore { Box::new(self) } } + +default_health_status_indicator!(DedupStore); diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 4e54251c33..595c15e356 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered; use futures::{Future, TryStreamExt}; use nativelink_config::stores::StoreConfig; use nativelink_error::Error; +use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::Store; @@ -44,6 +45,7 @@ pub fn store_factory<'a>( backend: &'a StoreConfig, store_manager: &'a Arc, maybe_store_metrics: Option<&'a mut Registry>, + maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { let store: Arc = match backend { @@ -51,36 +53,36 @@ pub fn store_factory<'a>( StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?), StoreConfig::verify(config) => Arc::new(VerifyStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::compression(config) => Arc::new(CompressionStore::new( *config.clone(), - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )?), StoreConfig::dedup(config) => Arc::new(DedupStore::new( config, - store_factory(&config.index_store, store_manager, None).await?, - store_factory(&config.content_store, store_manager, None).await?, + store_factory(&config.index_store, store_manager, None, None).await?, + store_factory(&config.content_store, store_manager, None, None).await?, )), StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new( - store_factory(&config.backend, store_manager, None).await?, - store_factory(&config.cas_store, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, + store_factory(&config.cas_store, store_manager, None, None).await?, )), StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new( config, - store_factory(&config.fast, store_manager, None).await?, - store_factory(&config.slow, store_manager, None).await?, + store_factory(&config.fast, store_manager, None, None).await?, + store_factory(&config.slow, store_manager, None, None).await?, )), StoreConfig::filesystem(config) => Arc::new(::new(config).await?), StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))), StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new( config, - store_factory(&config.lower_store, store_manager, None).await?, - store_factory(&config.upper_store, store_manager, None).await?, + store_factory(&config.lower_store, store_manager, None, None).await?, + store_factory(&config.upper_store, store_manager, None, None).await?, )), StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?), StoreConfig::noop => Arc::new(NoopStore::new()), @@ -88,7 +90,7 @@ pub fn store_factory<'a>( let stores = config .stores .iter() - .map(|store_config| store_factory(&store_config.store, store_manager, None)) + .map(|store_config| store_factory(&store_config.store, store_manager, None, None)) .collect::>() .try_collect::>() .await?; @@ -98,6 +100,11 @@ pub fn store_factory<'a>( if let Some(store_metrics) = maybe_store_metrics { store.clone().register_metrics(store_metrics); } + + if let Some(health_registry_builder) = maybe_health_registry_builder { + store.clone().register_health(health_registry_builder); + } + Ok(store) }) } diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 9b6c60595a..6fafa9ce11 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -23,6 +23,7 @@ use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -198,3 +199,5 @@ impl MetricsComponent for ExistenceCacheStore { self.existence_cache.gather_metrics(c) } } + +default_health_status_indicator!(ExistenceCacheStore); diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 4ba77d9c49..5eb5abaed5 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -22,6 +22,7 @@ use futures::{join, FutureExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -268,3 +269,5 @@ impl Store for FastSlowStore { self.slow_store.clone().register_metrics(slow_store_registry); } } + +default_health_status_indicator!(FastSlowStore); diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 94a2c342fa..4f057ca6d6 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::ffi::OsString; use std::fmt::{Debug, Formatter}; use std::pin::Pin; @@ -29,6 +30,7 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; @@ -750,6 +752,10 @@ impl Store for FilesystemStore { fn register_metrics(self: Arc, registry: &mut Registry) { registry.register_collector(Box::new(Collector::new(&self))); } + + fn register_health(self: Arc, registry: &mut HealthRegistryBuilder) { + registry.register_indicator(self); + } } impl MetricsComponent for FilesystemStore { @@ -777,3 +783,14 @@ impl MetricsComponent for FilesystemStore { c.publish("evicting_map", self.evicting_map.as_ref(), ""); } } + +#[async_trait] +impl HealthStatusIndicator for FilesystemStore { + fn get_name(&self) -> &'static str { + "FilesystemStore" + } + + async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { + Store::check_health(Pin::new(self), namespace).await + } +} diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index de199b25ae..ac5c5466d1 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -37,11 +37,12 @@ use nativelink_proto::google::bytestream::{ use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::grpc_utils::ConnectionManager; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::resource_info::ResourceInfo; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use nativelink_util::tls_utils; use nativelink_util::write_request_stream_wrapper::WriteRequestStreamWrapper; +use nativelink_util::{default_health_status_indicator, tls_utils}; use parking_lot::Mutex; use prost::Message; use rand::rngs::OsRng; @@ -844,3 +845,5 @@ impl Store for GrpcStore { Box::new(self) } } + +default_health_status_indicator!(GrpcStore); diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index ebf129987f..a4ca1990bd 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -23,6 +23,7 @@ use nativelink_error::{Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -170,3 +171,5 @@ impl MetricsComponent for MemoryStore { c.publish("evicting_map", &self.evicting_map, ""); } } + +default_health_status_indicator!(MemoryStore); diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index a3a3c6b51b..006c2cf374 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; #[derive(Default)] @@ -71,3 +72,5 @@ impl Store for NoopStore { Box::new(self) } } + +default_health_status_indicator!(NoopStore); diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index ff2f11d31d..fd09f04f28 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tracing::error; @@ -137,3 +138,5 @@ impl Store for RefStore { Box::new(self) } } + +default_health_status_indicator!(RefStore); diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 2179846a4d..91d4a60ae3 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -40,6 +40,7 @@ use hyper_rustls::{HttpsConnector, MaybeHttpsStream}; use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use rand::rngs::OsRng; @@ -534,3 +535,5 @@ impl Store for S3Store { Box::new(self) } } + +default_health_status_indicator!(S3Store); diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index 5a858e862e..432abb8446 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -21,6 +21,7 @@ use futures::stream::{FuturesUnordered, TryStreamExt}; use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -191,3 +192,5 @@ impl Store for ShardStore { } } } + +default_health_status_indicator!(ShardStore); diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 3d92148c68..df02d2306b 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use nativelink_error::{Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::join; @@ -128,3 +129,5 @@ impl Store for SizePartitioningStore { Box::new(self) } } + +default_health_status_indicator!(SizePartitioningStore); diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 2cabf95a76..195720ffe9 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -22,6 +22,7 @@ use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, CounterWithTime, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -199,3 +200,5 @@ impl MetricsComponent for VerifyStore { ); } } + +default_health_status_indicator!(VerifyStore); diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index d21dced017..6b52450dc1 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -76,6 +76,7 @@ mod fast_slow_store_tests { use bytes::Bytes; use nativelink_error::{make_err, Code, ResultExt}; use nativelink_util::buf_channel::make_buf_channel_pair; + use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use pretty_assertions::assert_eq; use super::*; // Must be declared in every module. @@ -301,6 +302,8 @@ mod fast_slow_store_tests { } } + default_health_status_indicator!(DropCheckStore); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); let (fast_store_read_tx, fast_store_read_rx) = tokio::sync::oneshot::channel(); let (fast_store_eof_tx, fast_store_eof_rx) = tokio::sync::oneshot::channel(); diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 77d5302d0f..81e361e4a0 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -17,6 +17,7 @@ rust_library( "src/fastcdc.rs", "src/fs.rs", "src/grpc_utils.rs", + "src/health_utils.rs", "src/lib.rs", "src/metrics_utils.rs", "src/platform_properties.rs", @@ -47,6 +48,7 @@ rust_library( "@crates//:prometheus-client", "@crates//:prost", "@crates//:prost-types", + "@crates//:rand", "@crates//:serde", "@crates//:sha2", "@crates//:tokio", @@ -64,6 +66,7 @@ rust_test_suite( "tests/evicting_map_test.rs", "tests/fastcdc_test.rs", "tests/fs_test.rs", + "tests/health_utils_test.rs", "tests/resource_info_test.rs", "tests/retry_test.rs", ], diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index 3bcf20bd76..45738d0ed0 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -21,6 +21,7 @@ pin-project-lite = "0.2.13" prometheus-client = "0.21.2" prost = "0.12.3" prost-types = "0.12.3" +rand = "0.8.5" serde = { version = "1.0.193", features = ["derive"] } sha2 = "0.10.8" tokio = { version = "1.35.1", features = [ "sync", "fs", "rt", "time", "io-util", "macros" ] } diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs new file mode 100644 index 0000000000..2f684b6131 --- /dev/null +++ b/nativelink-util/src/health_utils.rs @@ -0,0 +1,197 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt::Debug; +use std::marker::Send; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use parking_lot::Mutex; +use serde::Serialize; + +/// Struct name health indicator component. +type StructName = str; +/// Readable message status of the health indicator. +type Message = str; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] +pub enum HealthStatus { + Ok { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + Initializing { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + /// This status is used to indicate a non-fatal issue with the component. + Warning { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + Failed { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, +} + +impl HealthStatus { + pub fn new_ok(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> Self { + Self::Ok { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_initializing( + component: &(impl HealthStatusIndicator + ?Sized), + message: Cow<'static, str>, + ) -> HealthStatus { + Self::Initializing { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_warning(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Warning { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_failed(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Failed { + struct_name: component.struct_name(), + message, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)] +pub struct HealthStatusDescription { + pub namespace: Cow<'static, str>, + pub status: HealthStatus, +} + +/// Health status indicator trait. This trait is used to define +/// a health status indicator by implementing the `check_health` function. +/// A default implementation is provided for the `check_health` function +/// that returns healthy component. +#[async_trait] +pub trait HealthStatusIndicator: Sync + Send + Unpin { + fn get_name(&self) -> &'static str; + + /// Returns the name of the struct implementing the trait. + fn struct_name(&self) -> &'static str { + std::any::type_name::() + } + + /// Check the health status of the component. This function should be + /// implemented by the component to check the health status of the component. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus; +} + +type HealthRegistryBuilderState = Arc, Arc>>>; +pub struct HealthRegistryBuilder { + namespace: Cow<'static, str>, + state: HealthRegistryBuilderState, +} + +/// Health registry builder that is used to build a health registry. +/// The builder provides creation, registering of health status indicators, +/// sub building scoped health registries and building the health registry. +/// `build()` should be called once for finalizing the production of a health registry. +impl HealthRegistryBuilder { + pub fn new(namespace: Cow<'static, str>) -> Self { + Self { + namespace: format!("/{}", namespace).into(), + state: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Register a health status indicator at current namespace. + pub fn register_indicator(&mut self, indicator: Arc) { + let name = format!("{}/{}", self.namespace, indicator.get_name()); + self.state.lock().insert(name.into(), indicator); + } + + /// Create a sub builder for a namespace. + pub fn sub_builder(&mut self, namespace: Cow<'static, str>) -> HealthRegistryBuilder { + HealthRegistryBuilder { + namespace: format!("{}/{}", self.namespace, namespace).into(), + state: self.state.clone(), + } + } + + /// Finalize the production of the health registry. + pub fn build(&mut self) -> HealthRegistry { + HealthRegistry { + indicators: self.state.lock().clone().into_iter().collect(), + } + } +} + +#[derive(Default, Clone)] +pub struct HealthRegistry { + indicators: Vec<(Cow<'static, str>, Arc)>, +} + +pub trait HealthStatusReporter { + fn health_status_report(&self) -> Pin + '_>>; +} + +/// Health status reporter implementation for the health registry that provides a stream +/// of health status descriptions. +impl HealthStatusReporter for HealthRegistry { + fn health_status_report(&self) -> Pin + '_>> { + Box::pin( + futures::stream::iter(self.indicators.iter()).then(|(namespace, indicator)| async move { + HealthStatusDescription { + namespace: namespace.clone(), + status: indicator.check_health(namespace.clone()).await, + } + }), + ) + } +} + +/// Default health status indicator implementation for a component. +/// Generally used for components that don't need custom implementations +/// of the `check_health` function. +#[macro_export] +macro_rules! default_health_status_indicator { + ($type:ty) => { + #[async_trait::async_trait] + impl HealthStatusIndicator for $type { + fn get_name(&self) -> &'static str { + stringify!($type) + } + + async fn check_health( + &self, + namespace: std::borrow::Cow<'static, str>, + ) -> nativelink_util::health_utils::HealthStatus { + Store::check_health(Pin::new(self), namespace).await + } + } + }; +} + +// Re-scoped for the health_utils module. +pub use crate::default_health_status_indicator; diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 62c615c0ee..c493b9f70f 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -20,6 +20,7 @@ pub mod evicting_map; pub mod fastcdc; pub mod fs; pub mod grpc_utils; +pub mod health_utils; pub mod metrics_utils; pub mod platform_properties; pub mod resource_info; diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 2c6a4b721e..982f7b1452 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -12,20 +12,46 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; +use std::collections::hash_map::DefaultHasher as StdHasher; +use std::hash::{Hash, Hasher}; use std::marker::Send; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use async_trait::async_trait; use bytes::Bytes; use futures::{join, try_join}; -use nativelink_error::{Error, ResultExt}; +// use lru::DefaultHasher; +use nativelink_error::{make_err, Code, Error, ResultExt}; +use rand::rngs::StdRng; +use rand::{RngCore, SeedableRng}; use serde::{Deserialize, Serialize}; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; +use crate::digest_hasher::{default_digest_hasher_func, DigestHasher}; +use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use crate::metrics_utils::Registry; +static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock = OnceLock::new(); +/// Default digest size for health check data. Any change in this value +/// changes the default contract. `GlobalConfig` should be updated to reflect +/// changes in this value. +pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024; + +// Get the default digest size for health check data, if value is unset a system wide default is used. +pub fn default_digest_size_health_check() -> usize { + *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG) +} + +/// Set the default digest size for health check data, this should be called once. +pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> { + DEFAULT_DIGEST_SIZE_HEALTH_CHECK + .set(size) + .map_err(|_| make_err!(Code::Internal, "set_default_digest_size_health_check already set")) +} + #[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] pub enum UploadSizeInfo { /// When the data transfer amount is known to be exact size, this enum should be used. @@ -40,7 +66,7 @@ pub enum UploadSizeInfo { } #[async_trait] -pub trait Store: Sync + Send + Unpin { +pub trait Store: Sync + Send + Unpin + HealthStatusIndicator { /// Look up a digest in the store and return None if it does not exist in /// the store, or Some(size) if it does. /// Note: On an AC store the size will be incorrect and should not be used! @@ -168,6 +194,73 @@ pub trait Store: Sync + Send + Unpin { .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked")) } + // Default implementation of the health check. Some stores may want to override this + // in situations where the default implementation is not sufficient. + async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus { + let digest_data_size = default_digest_size_health_check(); + let mut digest_data = vec![0u8; digest_data_size]; + + let mut namespace_hasher = StdHasher::new(); + namespace.hash(&mut namespace_hasher); + self.get_name().hash(&mut namespace_hasher); + let hash = namespace_hasher.finish(); + + // Fill the digest data with random data based on a stable + // hash of the namespace and store name. Intention is to + // have randomly filled data that is unique per store and + // does not change between health checks. This is to ensure + // we are not adding more data to store on each health check. + let mut rng: StdRng = StdRng::seed_from_u64(hash); + rng.fill_bytes(&mut digest_data); + + let mut hasher = DigestHasher::from(default_digest_hasher_func()); + hasher.update(&digest_data); + let digest_data_len = digest_data.len(); + let digest_info = hasher.finalize_digest(digest_data_len as i64); + + let digest_bytes = bytes::Bytes::copy_from_slice(&digest_data); + + if let Err(e) = self.update_oneshot(digest_info, digest_bytes.clone()).await { + return HealthStatus::new_failed(self.get_ref(), format!("Store.update_oneshot() failed: {}", e).into()); + } + + match self.has(digest_info).await { + Ok(Some(s)) => { + if s != digest_data_len { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.has() size mismatch {s} != {digest_data_len}").into(), + ); + } + } + Ok(None) => { + return HealthStatus::new_failed(self.get_ref(), "Store.has() size not found".into()); + } + Err(e) => { + return HealthStatus::new_failed(self.get_ref(), format!("Store.has() failed: {}", e).into()); + } + } + + match self + .get_part_unchunked(digest_info, 0, Some(digest_data_len), Some(digest_data_len)) + .await + { + Ok(b) => { + if b != digest_bytes { + return HealthStatus::new_failed(self.get_ref(), "Store.get_part_unchunked() data mismatch".into()); + } + } + Err(e) => { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.get_part_unchunked() failed: {}", e).into(), + ); + } + } + + HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into()) + } + /// Gets the underlying store for the given digest. This can be used to find out /// what any underlying store is for a given digest will be and hand it to the caller. /// A caller might want to use this to obtain a reference to the "real" underlying store @@ -180,4 +273,7 @@ pub trait Store: Sync + Send + Unpin { /// Register any metrics that this store wants to expose to the Prometheus. fn register_metrics(self: Arc, _registry: &mut Registry) {} + + // Register health checks used to monitor the store. + fn register_health(self: Arc, _registry: &mut HealthRegistryBuilder) {} } diff --git a/nativelink-util/tests/health_utils_test.rs b/nativelink-util/tests/health_utils_test.rs new file mode 100644 index 0000000000..0ea59fed21 --- /dev/null +++ b/nativelink-util/tests/health_utils_test.rs @@ -0,0 +1,223 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashSet; +use std::iter::FromIterator; +use std::sync::Arc; + +use futures::StreamExt; +use nativelink_error::Error; +use nativelink_util::health_utils::{ + HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusIndicator, HealthStatusReporter, +}; + +#[cfg(test)] +mod health_utils_tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[tokio::test] + async fn create_empty_indicator() -> Result<(), Error> { + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + assert_eq!(health_status.len(), 0); + Ok(()) + } + + #[tokio::test] + async fn create_register_indicator() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 1); + assert_eq!( + health_status, + vec![HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into() + }, + }] + ); + + Ok(()) + } + + #[tokio::test] + async fn create_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let mut namespace1_registry = health_registry_builder.sub_builder("namespace1".into()); + + namespace1_registry.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 2); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_same_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl1".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl1", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl2".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl2", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl3".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl3", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_with_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + let mut sub_builder = health_registry_builder.sub_builder("namespace1".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + let mut sub_builder = health_registry_builder.sub_builder("namespace2".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + + health_registry_builder + .sub_builder("namespace3".into()) + .register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl1".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl1", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace2/MockComponentImpl2".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl2", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace3/MockComponentImpl3".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl3", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[macro_export] + macro_rules! generate_health_status_indicator { + ($struct_name:ident, $health_status:ident, $status_msg:expr) => { + struct $struct_name; + + #[async_trait::async_trait] + impl HealthStatusIndicator for $struct_name { + fn get_name(&self) -> &'static str { + stringify!($struct_name).into() + } + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + HealthStatus::$health_status { + struct_name: stringify!($struct_name).into(), + message: $status_msg.into(), + } + } + } + }; + } + + fn vec_to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index b8a222a7a4..47ca2a9bc9 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt}; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use nativelink_config::cas_server::{ @@ -41,9 +41,13 @@ use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; +use nativelink_util::health_utils::{ + HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter, +}; use nativelink_util::metrics_utils::{ set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry, }; +use nativelink_util::store_trait::{set_default_digest_size_health_check, DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG}; use nativelink_worker::local_worker::new_local_worker; use parking_lot::Mutex; use rustls_pemfile::{certs as extract_certs, crls as extract_crls}; @@ -71,6 +75,9 @@ const DEFAULT_ADMIN_API_PATH: &str = "/admin"; /// Name of environment variable to disable metrics. const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS"; +/// Content type header value for JSON. +const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; + /// Backend for bazel remote execution / cache API. #[derive(Parser, Debug)] #[clap( @@ -87,17 +94,27 @@ struct Args { async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), Box> { let mut root_metrics_registry = ::with_prefix("nativelink"); + let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink".into()))); let store_manager = Arc::new(StoreManager::new()); { + let mut health_registry_lock = health_registry_builder.lock().await; let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores"); + for (name, store_cfg) in cfg.stores { + let health_component_name = format!("stores/{name}"); + let mut health_register_store = health_registry_lock.sub_builder(health_component_name.into()); let store_metrics = root_store_metrics.sub_registry_with_prefix(&name); store_manager.add_store( &name, - store_factory(&store_cfg, &store_manager, Some(store_metrics)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?, + store_factory( + &store_cfg, + &store_manager, + Some(store_metrics), + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?, ); } } @@ -349,12 +366,59 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B ); let root_metrics_registry = root_metrics_registry.clone(); + let health_registry_status = health_registry_builder.lock().await.build(); let mut svc = Router::new() // This is the default service that executes if no other endpoint matches. .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))) - // This is a generic endpoint used to check if the server is up. - .route_service("/status", axum::routing::get(move || async move { "Ok".to_string() })); + .route_service( + "/status", + axum::routing::get(move || async move { + fn error_to_response(e: E) -> Response { + let mut response = Response::new(format!("Error: {e:?}")); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response + } + + spawn_blocking(move || { + futures::executor::block_on(async { + let health_status_descriptions: Vec = + health_registry_status.health_status_report().collect().await; + + match serde_json5::to_string(&health_status_descriptions) { + Ok(body) => { + let contains_failed_report = health_status_descriptions + .iter() + .any(|description| matches!(description.status, HealthStatus::Failed { .. })); + let status_code = if contains_failed_report { + StatusCode::SERVICE_UNAVAILABLE + } else { + StatusCode::OK + }; + Response::builder() + .status(status_code) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(body) + .unwrap() + } + Err(e) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(format!("Internal Failure: {e:?}")) + .unwrap(), + } + }) + }) + .await + .unwrap_or_else(error_to_response) + }), + ); if let Some(prometheus_cfg) = services.experimental_prometheus { fn error_to_response(e: E) -> Response { @@ -735,6 +799,10 @@ fn main() -> Result<(), Box> { if global_cfg.idle_file_descriptor_timeout_millis == 0 { global_cfg.idle_file_descriptor_timeout_millis = DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS; } + if global_cfg.default_digest_size_health_check == 0 { + global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG; + } + *global_cfg } else { GlobalConfig { @@ -747,6 +815,7 @@ fn main() -> Result<(), Box> { service.experimental_prometheus.is_none() }), default_digest_hash_function: None, + default_digest_size_health_check: DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG, } }; set_open_file_limit(global_cfg.max_open_files); @@ -756,6 +825,7 @@ fn main() -> Result<(), Box> { .default_digest_hash_function .unwrap_or(ConfigDigestHashFunction::sha256), ))?; + set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?; // TODO (#513): prevent deadlocks by assigning max blocking threads number of open files * ten (!global_cfg.disable_metrics, global_cfg.max_open_files * 10) };