From f243c53a41691b958cb9374a4302c6ed84f89012 Mon Sep 17 00:00:00 2001 From: Adam Singer Date: Tue, 6 Feb 2024 16:50:10 -0800 Subject: [PATCH] [Breaking] Change in behavior of /status by introduction of component based health Introduce a component based health check system. Each type of component should be able to opt into registering handlers implement some mechanical checks of health. Health in this context is functionality expected to work but runtime wise are semi no-op in terms of influencing the underlying storage / rpc systems. Opting in to the system requires for component to define a `HealthStatusIndicator` and implement the `check_health()` function. Registration should automatically be done by the existence of implementation and calling the `Store.register_health()` function in component implementation. At the moment only `Store` based components can register and a single health check is defined for `FilesystemStore`. A global parameter `default_digest_size_health_check` has been introduced for configuring the static seeded random bytes to fill data payload. The default value is 1MB. The `/status` endpoint has been updated to return the resulting string serialized instances of `HealthStatus`, smart serialization of such objects is not implemented at this time. --- Cargo.lock | 2 + nativelink-config/src/cas_server.rs | 7 + nativelink-service/tests/ac_server_test.rs | 2 + .../tests/bytestream_server_test.rs | 1 + nativelink-service/tests/cas_server_test.rs | 1 + nativelink-store/BUILD.bazel | 1 + nativelink-store/Cargo.toml | 1 + .../src/completeness_checking_store.rs | 3 + nativelink-store/src/compression_store.rs | 3 + nativelink-store/src/dedup_store.rs | 3 + nativelink-store/src/default_store_factory.rs | 31 ++- nativelink-store/src/existence_cache_store.rs | 3 + nativelink-store/src/fast_slow_store.rs | 3 + nativelink-store/src/filesystem_store.rs | 17 ++ nativelink-store/src/grpc_store.rs | 5 +- nativelink-store/src/memory_store.rs | 3 + nativelink-store/src/noop_store.rs | 3 + nativelink-store/src/ref_store.rs | 3 + nativelink-store/src/s3_store.rs | 3 + nativelink-store/src/shard_store.rs | 3 + .../src/size_partitioning_store.rs | 3 + nativelink-store/src/verify_store.rs | 3 + .../tests/fast_slow_store_test.rs | 3 + nativelink-util/BUILD.bazel | 4 + nativelink-util/Cargo.toml | 2 + nativelink-util/src/health_utils.rs | 197 ++++++++++++++++ nativelink-util/src/lib.rs | 1 + nativelink-util/src/store_trait.rs | 106 ++++++++- nativelink-util/tests/health_utils_test.rs | 223 ++++++++++++++++++ src/bin/nativelink.rs | 83 ++++++- 30 files changed, 701 insertions(+), 22 deletions(-) create mode 100644 nativelink-util/src/health_utils.rs create mode 100644 nativelink-util/tests/health_utils_test.rs diff --git a/Cargo.lock b/Cargo.lock index 73572e3f5..7802c22ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,7 @@ dependencies = [ "serde", "sha2", "shellexpand", + "tempfile", "tokio", "tokio-stream", "tokio-util", @@ -1810,6 +1811,7 @@ dependencies = [ "bytes", "futures", "hex", + "lazy_static", "log", "lru", "mock_instant", diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 0e6889c1e..4c91c7488 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -629,6 +629,13 @@ pub struct GlobalConfig { /// /// Default: ConfigDigestHashFunction::sha256 pub default_digest_hash_function: Option, + + /// Default digest size to use for health check when running + /// diagnostics checks. + /// + /// Default: 1024*1024 (1MiB) + #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] + pub default_digest_size_health_check: usize, } #[derive(Deserialize, Debug)] diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 6f8c458a4..3c5da9cbd 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 370a6d6a4..20f39fffd 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index 6f8c60c4f..ffc3aaf6a 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 606a0d6a4..002a6e27c 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -58,6 +58,7 @@ rust_library( "@crates//:serde", "@crates//:sha2", "@crates//:shellexpand", + "@crates//:tempfile", "@crates//:tokio", "@crates//:tokio-stream", "@crates//:tokio-util", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index a1c67bdb1..0ffd051c8 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.8.5" serde = "1.0.193" sha2 = "0.10.8" shellexpand = "3.1.0" +tempfile = "3.9.0" tokio = { version = "1.35.1" } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.10" } diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index 00a3d4ffc..98d9d13e3 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -25,6 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ }; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use parking_lot::Mutex; use tokio::sync::Notify; @@ -361,3 +362,5 @@ impl Store for CompletenessCheckingStore { Box::new(self) } } + +default_health_status_indicator!(CompletenessCheckingStore); diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index c2677669b..f41014970 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -26,6 +26,7 @@ use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size}; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{DigestInfo, JoinHandleDropGuard}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; @@ -590,3 +591,5 @@ impl Store for CompressionStore { self.inner_store.clone().register_metrics(inner_store_registry); } } + +default_health_status_indicator!(CompressionStore); diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index a9bbd9e04..d0c29ffb5 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -24,6 +24,7 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, StreamReader}; use nativelink_util::common::DigestInfo; use nativelink_util::fastcdc::FastCDC; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use tokio_util::codec::FramedRead; @@ -349,3 +350,5 @@ impl Store for DedupStore { Box::new(self) } } + +default_health_status_indicator!(DedupStore); diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 4e54251c3..595c15e35 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered; use futures::{Future, TryStreamExt}; use nativelink_config::stores::StoreConfig; use nativelink_error::Error; +use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::Store; @@ -44,6 +45,7 @@ pub fn store_factory<'a>( backend: &'a StoreConfig, store_manager: &'a Arc, maybe_store_metrics: Option<&'a mut Registry>, + maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { let store: Arc = match backend { @@ -51,36 +53,36 @@ pub fn store_factory<'a>( StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?), StoreConfig::verify(config) => Arc::new(VerifyStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::compression(config) => Arc::new(CompressionStore::new( *config.clone(), - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )?), StoreConfig::dedup(config) => Arc::new(DedupStore::new( config, - store_factory(&config.index_store, store_manager, None).await?, - store_factory(&config.content_store, store_manager, None).await?, + store_factory(&config.index_store, store_manager, None, None).await?, + store_factory(&config.content_store, store_manager, None, None).await?, )), StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new( - store_factory(&config.backend, store_manager, None).await?, - store_factory(&config.cas_store, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, + store_factory(&config.cas_store, store_manager, None, None).await?, )), StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new( config, - store_factory(&config.fast, store_manager, None).await?, - store_factory(&config.slow, store_manager, None).await?, + store_factory(&config.fast, store_manager, None, None).await?, + store_factory(&config.slow, store_manager, None, None).await?, )), StoreConfig::filesystem(config) => Arc::new(::new(config).await?), StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))), StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new( config, - store_factory(&config.lower_store, store_manager, None).await?, - store_factory(&config.upper_store, store_manager, None).await?, + store_factory(&config.lower_store, store_manager, None, None).await?, + store_factory(&config.upper_store, store_manager, None, None).await?, )), StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?), StoreConfig::noop => Arc::new(NoopStore::new()), @@ -88,7 +90,7 @@ pub fn store_factory<'a>( let stores = config .stores .iter() - .map(|store_config| store_factory(&store_config.store, store_manager, None)) + .map(|store_config| store_factory(&store_config.store, store_manager, None, None)) .collect::>() .try_collect::>() .await?; @@ -98,6 +100,11 @@ pub fn store_factory<'a>( if let Some(store_metrics) = maybe_store_metrics { store.clone().register_metrics(store_metrics); } + + if let Some(health_registry_builder) = maybe_health_registry_builder { + store.clone().register_health(health_registry_builder); + } + Ok(store) }) } diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 9b6c60595..6fafa9ce1 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -23,6 +23,7 @@ use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -198,3 +199,5 @@ impl MetricsComponent for ExistenceCacheStore { self.existence_cache.gather_metrics(c) } } + +default_health_status_indicator!(ExistenceCacheStore); diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 4ba77d9c4..5eb5abaed 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -22,6 +22,7 @@ use futures::{join, FutureExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -268,3 +269,5 @@ impl Store for FastSlowStore { self.slow_store.clone().register_metrics(slow_store_registry); } } + +default_health_status_indicator!(FastSlowStore); diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 94a2c342f..4f057ca6d 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::ffi::OsString; use std::fmt::{Debug, Formatter}; use std::pin::Pin; @@ -29,6 +30,7 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; @@ -750,6 +752,10 @@ impl Store for FilesystemStore { fn register_metrics(self: Arc, registry: &mut Registry) { registry.register_collector(Box::new(Collector::new(&self))); } + + fn register_health(self: Arc, registry: &mut HealthRegistryBuilder) { + registry.register_indicator(self); + } } impl MetricsComponent for FilesystemStore { @@ -777,3 +783,14 @@ impl MetricsComponent for FilesystemStore { c.publish("evicting_map", self.evicting_map.as_ref(), ""); } } + +#[async_trait] +impl HealthStatusIndicator for FilesystemStore { + fn get_name(&self) -> &'static str { + "FilesystemStore" + } + + async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { + Store::check_health(Pin::new(self), namespace).await + } +} diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index de199b25a..ac5c5466d 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -37,11 +37,12 @@ use nativelink_proto::google::bytestream::{ use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::grpc_utils::ConnectionManager; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::resource_info::ResourceInfo; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use nativelink_util::tls_utils; use nativelink_util::write_request_stream_wrapper::WriteRequestStreamWrapper; +use nativelink_util::{default_health_status_indicator, tls_utils}; use parking_lot::Mutex; use prost::Message; use rand::rngs::OsRng; @@ -844,3 +845,5 @@ impl Store for GrpcStore { Box::new(self) } } + +default_health_status_indicator!(GrpcStore); diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index ebf129987..a4ca1990b 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -23,6 +23,7 @@ use nativelink_error::{Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -170,3 +171,5 @@ impl MetricsComponent for MemoryStore { c.publish("evicting_map", &self.evicting_map, ""); } } + +default_health_status_indicator!(MemoryStore); diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index a3a3c6b51..006c2cf37 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; #[derive(Default)] @@ -71,3 +72,5 @@ impl Store for NoopStore { Box::new(self) } } + +default_health_status_indicator!(NoopStore); diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index ff2f11d31..fd09f04f2 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tracing::error; @@ -137,3 +138,5 @@ impl Store for RefStore { Box::new(self) } } + +default_health_status_indicator!(RefStore); diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 2179846a4..91d4a60ae 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -40,6 +40,7 @@ use hyper_rustls::{HttpsConnector, MaybeHttpsStream}; use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use rand::rngs::OsRng; @@ -534,3 +535,5 @@ impl Store for S3Store { Box::new(self) } } + +default_health_status_indicator!(S3Store); diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index 5a858e862..432abb844 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -21,6 +21,7 @@ use futures::stream::{FuturesUnordered, TryStreamExt}; use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -191,3 +192,5 @@ impl Store for ShardStore { } } } + +default_health_status_indicator!(ShardStore); diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 3d92148c6..df02d2306 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use nativelink_error::{Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::join; @@ -128,3 +129,5 @@ impl Store for SizePartitioningStore { Box::new(self) } } + +default_health_status_indicator!(SizePartitioningStore); diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 2cabf95a7..195720ffe 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -22,6 +22,7 @@ use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, CounterWithTime, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -199,3 +200,5 @@ impl MetricsComponent for VerifyStore { ); } } + +default_health_status_indicator!(VerifyStore); diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index d21dced01..6b52450dc 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -76,6 +76,7 @@ mod fast_slow_store_tests { use bytes::Bytes; use nativelink_error::{make_err, Code, ResultExt}; use nativelink_util::buf_channel::make_buf_channel_pair; + use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use pretty_assertions::assert_eq; use super::*; // Must be declared in every module. @@ -301,6 +302,8 @@ mod fast_slow_store_tests { } } + default_health_status_indicator!(DropCheckStore); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); let (fast_store_read_tx, fast_store_read_rx) = tokio::sync::oneshot::channel(); let (fast_store_eof_tx, fast_store_eof_rx) = tokio::sync::oneshot::channel(); diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 77d5302d0..7aa0b736e 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -17,6 +17,7 @@ rust_library( "src/fastcdc.rs", "src/fs.rs", "src/grpc_utils.rs", + "src/health_utils.rs", "src/lib.rs", "src/metrics_utils.rs", "src/platform_properties.rs", @@ -40,6 +41,7 @@ rust_library( "@crates//:bytes", "@crates//:futures", "@crates//:hex", + "@crates//:lazy_static", "@crates//:log", "@crates//:lru", "@crates//:parking_lot", @@ -47,6 +49,7 @@ rust_library( "@crates//:prometheus-client", "@crates//:prost", "@crates//:prost-types", + "@crates//:rand", "@crates//:serde", "@crates//:sha2", "@crates//:tokio", @@ -64,6 +67,7 @@ rust_test_suite( "tests/evicting_map_test.rs", "tests/fastcdc_test.rs", "tests/fs_test.rs", + "tests/health_utils_test.rs", "tests/resource_info_test.rs", "tests/retry_test.rs", ], diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index 3bcf20bd7..85c2606f2 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -14,6 +14,7 @@ blake3 = "1.5.0" bytes = "1.5.0" futures = "0.3.29" hex = "0.4.3" +lazy_static = "1.4.0" log = "0.4.20" lru = "0.12.1" parking_lot = "0.12.1" @@ -21,6 +22,7 @@ pin-project-lite = "0.2.13" prometheus-client = "0.21.2" prost = "0.12.3" prost-types = "0.12.3" +rand = "0.8.5" serde = { version = "1.0.193", features = ["derive"] } sha2 = "0.10.8" tokio = { version = "1.35.1", features = [ "sync", "fs", "rt", "time", "io-util", "macros" ] } diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs new file mode 100644 index 000000000..2f684b613 --- /dev/null +++ b/nativelink-util/src/health_utils.rs @@ -0,0 +1,197 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt::Debug; +use std::marker::Send; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use parking_lot::Mutex; +use serde::Serialize; + +/// Struct name health indicator component. +type StructName = str; +/// Readable message status of the health indicator. +type Message = str; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] +pub enum HealthStatus { + Ok { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + Initializing { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + /// This status is used to indicate a non-fatal issue with the component. + Warning { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + Failed { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, +} + +impl HealthStatus { + pub fn new_ok(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> Self { + Self::Ok { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_initializing( + component: &(impl HealthStatusIndicator + ?Sized), + message: Cow<'static, str>, + ) -> HealthStatus { + Self::Initializing { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_warning(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Warning { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_failed(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Failed { + struct_name: component.struct_name(), + message, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)] +pub struct HealthStatusDescription { + pub namespace: Cow<'static, str>, + pub status: HealthStatus, +} + +/// Health status indicator trait. This trait is used to define +/// a health status indicator by implementing the `check_health` function. +/// A default implementation is provided for the `check_health` function +/// that returns healthy component. +#[async_trait] +pub trait HealthStatusIndicator: Sync + Send + Unpin { + fn get_name(&self) -> &'static str; + + /// Returns the name of the struct implementing the trait. + fn struct_name(&self) -> &'static str { + std::any::type_name::() + } + + /// Check the health status of the component. This function should be + /// implemented by the component to check the health status of the component. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus; +} + +type HealthRegistryBuilderState = Arc, Arc>>>; +pub struct HealthRegistryBuilder { + namespace: Cow<'static, str>, + state: HealthRegistryBuilderState, +} + +/// Health registry builder that is used to build a health registry. +/// The builder provides creation, registering of health status indicators, +/// sub building scoped health registries and building the health registry. +/// `build()` should be called once for finalizing the production of a health registry. +impl HealthRegistryBuilder { + pub fn new(namespace: Cow<'static, str>) -> Self { + Self { + namespace: format!("/{}", namespace).into(), + state: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Register a health status indicator at current namespace. + pub fn register_indicator(&mut self, indicator: Arc) { + let name = format!("{}/{}", self.namespace, indicator.get_name()); + self.state.lock().insert(name.into(), indicator); + } + + /// Create a sub builder for a namespace. + pub fn sub_builder(&mut self, namespace: Cow<'static, str>) -> HealthRegistryBuilder { + HealthRegistryBuilder { + namespace: format!("{}/{}", self.namespace, namespace).into(), + state: self.state.clone(), + } + } + + /// Finalize the production of the health registry. + pub fn build(&mut self) -> HealthRegistry { + HealthRegistry { + indicators: self.state.lock().clone().into_iter().collect(), + } + } +} + +#[derive(Default, Clone)] +pub struct HealthRegistry { + indicators: Vec<(Cow<'static, str>, Arc)>, +} + +pub trait HealthStatusReporter { + fn health_status_report(&self) -> Pin + '_>>; +} + +/// Health status reporter implementation for the health registry that provides a stream +/// of health status descriptions. +impl HealthStatusReporter for HealthRegistry { + fn health_status_report(&self) -> Pin + '_>> { + Box::pin( + futures::stream::iter(self.indicators.iter()).then(|(namespace, indicator)| async move { + HealthStatusDescription { + namespace: namespace.clone(), + status: indicator.check_health(namespace.clone()).await, + } + }), + ) + } +} + +/// Default health status indicator implementation for a component. +/// Generally used for components that don't need custom implementations +/// of the `check_health` function. +#[macro_export] +macro_rules! default_health_status_indicator { + ($type:ty) => { + #[async_trait::async_trait] + impl HealthStatusIndicator for $type { + fn get_name(&self) -> &'static str { + stringify!($type) + } + + async fn check_health( + &self, + namespace: std::borrow::Cow<'static, str>, + ) -> nativelink_util::health_utils::HealthStatus { + Store::check_health(Pin::new(self), namespace).await + } + } + }; +} + +// Re-scoped for the health_utils module. +pub use crate::default_health_status_indicator; diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 62c615c0e..c493b9f70 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -20,6 +20,7 @@ pub mod evicting_map; pub mod fastcdc; pub mod fs; pub mod grpc_utils; +pub mod health_utils; pub mod metrics_utils; pub mod platform_properties; pub mod resource_info; diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 2c6a4b721..fcc61f07e 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -12,20 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::marker::Send; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use async_trait::async_trait; use bytes::Bytes; use futures::{join, try_join}; -use nativelink_error::{Error, ResultExt}; +use lazy_static::lazy_static; +use nativelink_error::{make_err, Code, Error, ResultExt}; +use parking_lot::Mutex; +use rand::rngs::StdRng; +use rand::{RngCore, SeedableRng}; use serde::{Deserialize, Serialize}; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; +use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use crate::metrics_utils::Registry; +static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock = OnceLock::new(); +const DEFAULT_DATA_SEED_HEALTH_CHECK: u64 = 0xdeadbeef; + +pub fn default_digest_size_health_check() -> usize { + *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| 1024 * 1024) +} + +pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> { + DEFAULT_DIGEST_SIZE_HEALTH_CHECK + .set(size) + .map_err(|_| make_err!(Code::Internal, "set_default_digest_size_health_check already set")) +} + +lazy_static! { + static ref HEALTH_CHECK_RNG: Mutex = Mutex::new(StdRng::seed_from_u64(DEFAULT_DATA_SEED_HEALTH_CHECK)); +} + #[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] pub enum UploadSizeInfo { /// When the data transfer amount is known to be exact size, this enum should be used. @@ -40,7 +63,7 @@ pub enum UploadSizeInfo { } #[async_trait] -pub trait Store: Sync + Send + Unpin { +pub trait Store: Sync + Send + Unpin + HealthStatusIndicator { /// Look up a digest in the store and return None if it does not exist in /// the store, or Some(size) if it does. /// Note: On an AC store the size will be incorrect and should not be used! @@ -168,6 +191,80 @@ pub trait Store: Sync + Send + Unpin { .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked")) } + // Default implementation of the health check. Some stores may want to override this + // in situations where the default implementation is not sufficient. + async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus { + let digest_data_size = default_digest_size_health_check(); + let namespace_bytes = namespace.as_bytes(); + let store_name_bytes = self.get_name().as_bytes(); + + let mut digest_data = vec![0u8; digest_data_size]; + + // Fill data with all random bytes. + HEALTH_CHECK_RNG.lock().fill_bytes(&mut digest_data); + + // Copy namespace bytes into data + let namespace_copy_length = usize::min(digest_data.len(), namespace_bytes.len()); + digest_data[..namespace_copy_length].copy_from_slice(&namespace_bytes[..namespace_copy_length]); + + // Calculate remaining space after copying namespace bytes + let remaining_space = digest_data_size.saturating_sub(namespace_copy_length); + + // Determine how many store_name bytes can be copied + let store_name_copy_length = usize::min(remaining_space, store_name_bytes.len()); + + // Copy store_name bytes into data, starting where namespace bytes end + if store_name_copy_length > 0 { + digest_data[namespace_copy_length..namespace_copy_length + store_name_copy_length] + .copy_from_slice(&store_name_bytes[..store_name_copy_length]); + } + + let digest_data_hash = blake3::hash(&digest_data).into(); + let digest_data_len = digest_data.len(); + let digest_info = DigestInfo::new(digest_data_hash, digest_data_len as i64); + let digest_data_copy = bytes::Bytes::copy_from_slice(&digest_data); + + if let Err(e) = self.update_oneshot(digest_info, digest_data_copy.clone()).await { + return HealthStatus::new_failed(self.get_ref(), format!("Store.update_oneshot() failed: {}", e).into()); + } + + match self.has(digest_info).await { + Ok(Some(s)) => { + if s != digest_data_len { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.has() size mismatch {s} != {digest_data_len}").into(), + ); + } + } + Ok(None) => { + return HealthStatus::new_failed(self.get_ref(), "Store.has() size not found".into()); + } + Err(e) => { + return HealthStatus::new_failed(self.get_ref(), format!("Store.has() failed: {}", e).into()); + } + } + + match self + .get_part_unchunked(digest_info, 0, Some(digest_data_len), Some(digest_data_len)) + .await + { + Ok(b) => { + if b != digest_data_copy { + return HealthStatus::new_failed(self.get_ref(), "Store.get_part_unchunked() data mismatch".into()); + } + } + Err(e) => { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.get_part_unchunked() failed: {}", e).into(), + ); + } + } + + HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into()) + } + /// Gets the underlying store for the given digest. This can be used to find out /// what any underlying store is for a given digest will be and hand it to the caller. /// A caller might want to use this to obtain a reference to the "real" underlying store @@ -180,4 +277,7 @@ pub trait Store: Sync + Send + Unpin { /// Register any metrics that this store wants to expose to the Prometheus. fn register_metrics(self: Arc, _registry: &mut Registry) {} + + // Register health checks used to monitor the store. + fn register_health(self: Arc, _registry: &mut HealthRegistryBuilder) {} } diff --git a/nativelink-util/tests/health_utils_test.rs b/nativelink-util/tests/health_utils_test.rs new file mode 100644 index 000000000..0ea59fed2 --- /dev/null +++ b/nativelink-util/tests/health_utils_test.rs @@ -0,0 +1,223 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashSet; +use std::iter::FromIterator; +use std::sync::Arc; + +use futures::StreamExt; +use nativelink_error::Error; +use nativelink_util::health_utils::{ + HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusIndicator, HealthStatusReporter, +}; + +#[cfg(test)] +mod health_utils_tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[tokio::test] + async fn create_empty_indicator() -> Result<(), Error> { + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + assert_eq!(health_status.len(), 0); + Ok(()) + } + + #[tokio::test] + async fn create_register_indicator() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 1); + assert_eq!( + health_status, + vec![HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into() + }, + }] + ); + + Ok(()) + } + + #[tokio::test] + async fn create_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let mut namespace1_registry = health_registry_builder.sub_builder("namespace1".into()); + + namespace1_registry.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 2); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_same_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl1".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl1", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl2".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl2", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl3".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl3", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_with_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + let mut sub_builder = health_registry_builder.sub_builder("namespace1".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + let mut sub_builder = health_registry_builder.sub_builder("namespace2".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + + health_registry_builder + .sub_builder("namespace3".into()) + .register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl1".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl1", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace2/MockComponentImpl2".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl2", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace3/MockComponentImpl3".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl3", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[macro_export] + macro_rules! generate_health_status_indicator { + ($struct_name:ident, $health_status:ident, $status_msg:expr) => { + struct $struct_name; + + #[async_trait::async_trait] + impl HealthStatusIndicator for $struct_name { + fn get_name(&self) -> &'static str { + stringify!($struct_name).into() + } + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + HealthStatus::$health_status { + struct_name: stringify!($struct_name).into(), + message: $status_msg.into(), + } + } + } + }; + } + + fn vec_to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index b8a222a7a..54cb3f7bd 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt}; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use nativelink_config::cas_server::{ @@ -41,9 +41,13 @@ use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; +use nativelink_util::health_utils::{ + HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter, +}; use nativelink_util::metrics_utils::{ set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry, }; +use nativelink_util::store_trait::set_default_digest_size_health_check; use nativelink_worker::local_worker::new_local_worker; use parking_lot::Mutex; use rustls_pemfile::{certs as extract_certs, crls as extract_crls}; @@ -71,6 +75,9 @@ const DEFAULT_ADMIN_API_PATH: &str = "/admin"; /// Name of environment variable to disable metrics. const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS"; +/// Content type header value for JSON. +const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; + /// Backend for bazel remote execution / cache API. #[derive(Parser, Debug)] #[clap( @@ -87,17 +94,27 @@ struct Args { async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), Box> { let mut root_metrics_registry = ::with_prefix("nativelink"); + let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink".into()))); let store_manager = Arc::new(StoreManager::new()); { + let mut health_registry_lock = health_registry_builder.lock().await; let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores"); + for (name, store_cfg) in cfg.stores { + let health_component_name = format!("stores/{name}"); + let mut health_register_store = health_registry_lock.sub_builder(health_component_name.into()); let store_metrics = root_store_metrics.sub_registry_with_prefix(&name); store_manager.add_store( &name, - store_factory(&store_cfg, &store_manager, Some(store_metrics)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?, + store_factory( + &store_cfg, + &store_manager, + Some(store_metrics), + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?, ); } } @@ -349,12 +366,59 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B ); let root_metrics_registry = root_metrics_registry.clone(); + let health_registry_status = health_registry_builder.lock().await.build(); let mut svc = Router::new() // This is the default service that executes if no other endpoint matches. .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))) - // This is a generic endpoint used to check if the server is up. - .route_service("/status", axum::routing::get(move || async move { "Ok".to_string() })); + .route_service( + "/status", + axum::routing::get(move || async move { + fn error_to_response(e: E) -> Response { + let mut response = Response::new(format!("Error: {e:?}")); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response + } + + spawn_blocking(move || { + futures::executor::block_on(async { + let health_status_descriptions: Vec = + health_registry_status.health_status_report().collect().await; + + match serde_json5::to_string(&health_status_descriptions) { + Ok(body) => { + let contains_failed_report = health_status_descriptions + .iter() + .any(|description| matches!(description.status, HealthStatus::Failed { .. })); + let status_code = if contains_failed_report { + StatusCode::SERVICE_UNAVAILABLE + } else { + StatusCode::OK + }; + Response::builder() + .status(status_code) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(body) + .unwrap() + } + Err(e) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(format!("Internal Failure: {e:?}")) + .unwrap(), + } + }) + }) + .await + .unwrap_or_else(error_to_response) + }), + ); if let Some(prometheus_cfg) = services.experimental_prometheus { fn error_to_response(e: E) -> Response { @@ -728,6 +792,7 @@ fn main() -> Result<(), Box> { // Note: If the default changes make sure you update the documentation in // `config/cas_server.rs`. const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000; + const DEFAULT_DIGEST_SIZE_HEALTH_CHECK: usize = 1024 * 1024; let global_cfg = if let Some(global_cfg) = &mut cfg.global { if global_cfg.max_open_files == 0 { global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES; @@ -735,6 +800,10 @@ fn main() -> Result<(), Box> { if global_cfg.idle_file_descriptor_timeout_millis == 0 { global_cfg.idle_file_descriptor_timeout_millis = DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS; } + if global_cfg.default_digest_size_health_check == 0 { + global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK; + } + *global_cfg } else { GlobalConfig { @@ -747,6 +816,7 @@ fn main() -> Result<(), Box> { service.experimental_prometheus.is_none() }), default_digest_hash_function: None, + default_digest_size_health_check: DEFAULT_DIGEST_SIZE_HEALTH_CHECK, } }; set_open_file_limit(global_cfg.max_open_files); @@ -756,6 +826,7 @@ fn main() -> Result<(), Box> { .default_digest_hash_function .unwrap_or(ConfigDigestHashFunction::sha256), ))?; + set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?; // TODO (#513): prevent deadlocks by assigning max blocking threads number of open files * ten (!global_cfg.disable_metrics, global_cfg.max_open_files * 10) };