diff --git a/Cargo.lock b/Cargo.lock index 73572e3f5..7802c22ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,7 @@ dependencies = [ "serde", "sha2", "shellexpand", + "tempfile", "tokio", "tokio-stream", "tokio-util", @@ -1810,6 +1811,7 @@ dependencies = [ "bytes", "futures", "hex", + "lazy_static", "log", "lru", "mock_instant", diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 0e6889c1e..4c91c7488 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -629,6 +629,13 @@ pub struct GlobalConfig { /// /// Default: ConfigDigestHashFunction::sha256 pub default_digest_hash_function: Option, + + /// Default digest size to use for health check when running + /// diagnostics checks. + /// + /// Default: 1024*1024 (1MiB) + #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] + pub default_digest_size_health_check: usize, } #[derive(Deserialize, Debug)] diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 6f8c458a4..3c5da9cbd 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 370a6d6a4..20f39fffd 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index 6f8c60c4f..ffc3aaf6a 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 606a0d6a4..002a6e27c 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -58,6 +58,7 @@ rust_library( "@crates//:serde", "@crates//:sha2", "@crates//:shellexpand", + "@crates//:tempfile", "@crates//:tokio", "@crates//:tokio-stream", "@crates//:tokio-util", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index a1c67bdb1..0ffd051c8 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.8.5" serde = "1.0.193" sha2 = "0.10.8" shellexpand = "3.1.0" +tempfile = "3.9.0" tokio = { version = "1.35.1" } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.10" } diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index 00a3d4ffc..98d9d13e3 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -25,6 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ }; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use parking_lot::Mutex; use tokio::sync::Notify; @@ -361,3 +362,5 @@ impl Store for CompletenessCheckingStore { Box::new(self) } } + +default_health_status_indicator!(CompletenessCheckingStore); diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index c2677669b..f41014970 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -26,6 +26,7 @@ use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size}; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{DigestInfo, JoinHandleDropGuard}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; @@ -590,3 +591,5 @@ impl Store for CompressionStore { self.inner_store.clone().register_metrics(inner_store_registry); } } + +default_health_status_indicator!(CompressionStore); diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index a9bbd9e04..d0c29ffb5 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -24,6 +24,7 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, StreamReader}; use nativelink_util::common::DigestInfo; use nativelink_util::fastcdc::FastCDC; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use tokio_util::codec::FramedRead; @@ -349,3 +350,5 @@ impl Store for DedupStore { Box::new(self) } } + +default_health_status_indicator!(DedupStore); diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 4e54251c3..595c15e35 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered; use futures::{Future, TryStreamExt}; use nativelink_config::stores::StoreConfig; use nativelink_error::Error; +use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::Store; @@ -44,6 +45,7 @@ pub fn store_factory<'a>( backend: &'a StoreConfig, store_manager: &'a Arc, maybe_store_metrics: Option<&'a mut Registry>, + maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { let store: Arc = match backend { @@ -51,36 +53,36 @@ pub fn store_factory<'a>( StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?), StoreConfig::verify(config) => Arc::new(VerifyStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::compression(config) => Arc::new(CompressionStore::new( *config.clone(), - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )?), StoreConfig::dedup(config) => Arc::new(DedupStore::new( config, - store_factory(&config.index_store, store_manager, None).await?, - store_factory(&config.content_store, store_manager, None).await?, + store_factory(&config.index_store, store_manager, None, None).await?, + store_factory(&config.content_store, store_manager, None, None).await?, )), StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new( - store_factory(&config.backend, store_manager, None).await?, - store_factory(&config.cas_store, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, + store_factory(&config.cas_store, store_manager, None, None).await?, )), StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new( config, - store_factory(&config.fast, store_manager, None).await?, - store_factory(&config.slow, store_manager, None).await?, + store_factory(&config.fast, store_manager, None, None).await?, + store_factory(&config.slow, store_manager, None, None).await?, )), StoreConfig::filesystem(config) => Arc::new(::new(config).await?), StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))), StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new( config, - store_factory(&config.lower_store, store_manager, None).await?, - store_factory(&config.upper_store, store_manager, None).await?, + store_factory(&config.lower_store, store_manager, None, None).await?, + store_factory(&config.upper_store, store_manager, None, None).await?, )), StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?), StoreConfig::noop => Arc::new(NoopStore::new()), @@ -88,7 +90,7 @@ pub fn store_factory<'a>( let stores = config .stores .iter() - .map(|store_config| store_factory(&store_config.store, store_manager, None)) + .map(|store_config| store_factory(&store_config.store, store_manager, None, None)) .collect::>() .try_collect::>() .await?; @@ -98,6 +100,11 @@ pub fn store_factory<'a>( if let Some(store_metrics) = maybe_store_metrics { store.clone().register_metrics(store_metrics); } + + if let Some(health_registry_builder) = maybe_health_registry_builder { + store.clone().register_health(health_registry_builder); + } + Ok(store) }) } diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 9b6c60595..6fafa9ce1 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -23,6 +23,7 @@ use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -198,3 +199,5 @@ impl MetricsComponent for ExistenceCacheStore { self.existence_cache.gather_metrics(c) } } + +default_health_status_indicator!(ExistenceCacheStore); diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 4ba77d9c4..5eb5abaed 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -22,6 +22,7 @@ use futures::{join, FutureExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -268,3 +269,5 @@ impl Store for FastSlowStore { self.slow_store.clone().register_metrics(slow_store_registry); } } + +default_health_status_indicator!(FastSlowStore); diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 94a2c342f..4f057ca6d 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::ffi::OsString; use std::fmt::{Debug, Formatter}; use std::pin::Pin; @@ -29,6 +30,7 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; @@ -750,6 +752,10 @@ impl Store for FilesystemStore { fn register_metrics(self: Arc, registry: &mut Registry) { registry.register_collector(Box::new(Collector::new(&self))); } + + fn register_health(self: Arc, registry: &mut HealthRegistryBuilder) { + registry.register_indicator(self); + } } impl MetricsComponent for FilesystemStore { @@ -777,3 +783,14 @@ impl MetricsComponent for FilesystemStore { c.publish("evicting_map", self.evicting_map.as_ref(), ""); } } + +#[async_trait] +impl HealthStatusIndicator for FilesystemStore { + fn get_name(&self) -> &'static str { + "FilesystemStore" + } + + async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { + Store::check_health(Pin::new(self), namespace).await + } +} diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index de199b25a..ac5c5466d 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -37,11 +37,12 @@ use nativelink_proto::google::bytestream::{ use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::grpc_utils::ConnectionManager; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::resource_info::ResourceInfo; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use nativelink_util::tls_utils; use nativelink_util::write_request_stream_wrapper::WriteRequestStreamWrapper; +use nativelink_util::{default_health_status_indicator, tls_utils}; use parking_lot::Mutex; use prost::Message; use rand::rngs::OsRng; @@ -844,3 +845,5 @@ impl Store for GrpcStore { Box::new(self) } } + +default_health_status_indicator!(GrpcStore); diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index ebf129987..a4ca1990b 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -23,6 +23,7 @@ use nativelink_error::{Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -170,3 +171,5 @@ impl MetricsComponent for MemoryStore { c.publish("evicting_map", &self.evicting_map, ""); } } + +default_health_status_indicator!(MemoryStore); diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index a3a3c6b51..006c2cf37 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; #[derive(Default)] @@ -71,3 +72,5 @@ impl Store for NoopStore { Box::new(self) } } + +default_health_status_indicator!(NoopStore); diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index ff2f11d31..fd09f04f2 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tracing::error; @@ -137,3 +138,5 @@ impl Store for RefStore { Box::new(self) } } + +default_health_status_indicator!(RefStore); diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 2179846a4..91d4a60ae 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -40,6 +40,7 @@ use hyper_rustls::{HttpsConnector, MaybeHttpsStream}; use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use rand::rngs::OsRng; @@ -534,3 +535,5 @@ impl Store for S3Store { Box::new(self) } } + +default_health_status_indicator!(S3Store); diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index 5a858e862..432abb844 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -21,6 +21,7 @@ use futures::stream::{FuturesUnordered, TryStreamExt}; use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -191,3 +192,5 @@ impl Store for ShardStore { } } } + +default_health_status_indicator!(ShardStore); diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 3d92148c6..df02d2306 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use nativelink_error::{Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::join; @@ -128,3 +129,5 @@ impl Store for SizePartitioningStore { Box::new(self) } } + +default_health_status_indicator!(SizePartitioningStore); diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 2cabf95a7..195720ffe 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -22,6 +22,7 @@ use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, CounterWithTime, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -199,3 +200,5 @@ impl MetricsComponent for VerifyStore { ); } } + +default_health_status_indicator!(VerifyStore); diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index d21dced01..6b52450dc 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -76,6 +76,7 @@ mod fast_slow_store_tests { use bytes::Bytes; use nativelink_error::{make_err, Code, ResultExt}; use nativelink_util::buf_channel::make_buf_channel_pair; + use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use pretty_assertions::assert_eq; use super::*; // Must be declared in every module. @@ -301,6 +302,8 @@ mod fast_slow_store_tests { } } + default_health_status_indicator!(DropCheckStore); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); let (fast_store_read_tx, fast_store_read_rx) = tokio::sync::oneshot::channel(); let (fast_store_eof_tx, fast_store_eof_rx) = tokio::sync::oneshot::channel(); diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 77d5302d0..7aa0b736e 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -17,6 +17,7 @@ rust_library( "src/fastcdc.rs", "src/fs.rs", "src/grpc_utils.rs", + "src/health_utils.rs", "src/lib.rs", "src/metrics_utils.rs", "src/platform_properties.rs", @@ -40,6 +41,7 @@ rust_library( "@crates//:bytes", "@crates//:futures", "@crates//:hex", + "@crates//:lazy_static", "@crates//:log", "@crates//:lru", "@crates//:parking_lot", @@ -47,6 +49,7 @@ rust_library( "@crates//:prometheus-client", "@crates//:prost", "@crates//:prost-types", + "@crates//:rand", "@crates//:serde", "@crates//:sha2", "@crates//:tokio", @@ -64,6 +67,7 @@ rust_test_suite( "tests/evicting_map_test.rs", "tests/fastcdc_test.rs", "tests/fs_test.rs", + "tests/health_utils_test.rs", "tests/resource_info_test.rs", "tests/retry_test.rs", ], diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index 3bcf20bd7..85c2606f2 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -14,6 +14,7 @@ blake3 = "1.5.0" bytes = "1.5.0" futures = "0.3.29" hex = "0.4.3" +lazy_static = "1.4.0" log = "0.4.20" lru = "0.12.1" parking_lot = "0.12.1" @@ -21,6 +22,7 @@ pin-project-lite = "0.2.13" prometheus-client = "0.21.2" prost = "0.12.3" prost-types = "0.12.3" +rand = "0.8.5" serde = { version = "1.0.193", features = ["derive"] } sha2 = "0.10.8" tokio = { version = "1.35.1", features = [ "sync", "fs", "rt", "time", "io-util", "macros" ] } diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs new file mode 100644 index 000000000..2f684b613 --- /dev/null +++ b/nativelink-util/src/health_utils.rs @@ -0,0 +1,197 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt::Debug; +use std::marker::Send; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use parking_lot::Mutex; +use serde::Serialize; + +/// Struct name health indicator component. +type StructName = str; +/// Readable message status of the health indicator. +type Message = str; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] +pub enum HealthStatus { + Ok { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + Initializing { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + /// This status is used to indicate a non-fatal issue with the component. + Warning { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + Failed { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, +} + +impl HealthStatus { + pub fn new_ok(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> Self { + Self::Ok { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_initializing( + component: &(impl HealthStatusIndicator + ?Sized), + message: Cow<'static, str>, + ) -> HealthStatus { + Self::Initializing { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_warning(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Warning { + struct_name: component.struct_name(), + message, + } + } + + pub fn new_failed(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Failed { + struct_name: component.struct_name(), + message, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)] +pub struct HealthStatusDescription { + pub namespace: Cow<'static, str>, + pub status: HealthStatus, +} + +/// Health status indicator trait. This trait is used to define +/// a health status indicator by implementing the `check_health` function. +/// A default implementation is provided for the `check_health` function +/// that returns healthy component. +#[async_trait] +pub trait HealthStatusIndicator: Sync + Send + Unpin { + fn get_name(&self) -> &'static str; + + /// Returns the name of the struct implementing the trait. + fn struct_name(&self) -> &'static str { + std::any::type_name::() + } + + /// Check the health status of the component. This function should be + /// implemented by the component to check the health status of the component. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus; +} + +type HealthRegistryBuilderState = Arc, Arc>>>; +pub struct HealthRegistryBuilder { + namespace: Cow<'static, str>, + state: HealthRegistryBuilderState, +} + +/// Health registry builder that is used to build a health registry. +/// The builder provides creation, registering of health status indicators, +/// sub building scoped health registries and building the health registry. +/// `build()` should be called once for finalizing the production of a health registry. +impl HealthRegistryBuilder { + pub fn new(namespace: Cow<'static, str>) -> Self { + Self { + namespace: format!("/{}", namespace).into(), + state: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Register a health status indicator at current namespace. + pub fn register_indicator(&mut self, indicator: Arc) { + let name = format!("{}/{}", self.namespace, indicator.get_name()); + self.state.lock().insert(name.into(), indicator); + } + + /// Create a sub builder for a namespace. + pub fn sub_builder(&mut self, namespace: Cow<'static, str>) -> HealthRegistryBuilder { + HealthRegistryBuilder { + namespace: format!("{}/{}", self.namespace, namespace).into(), + state: self.state.clone(), + } + } + + /// Finalize the production of the health registry. + pub fn build(&mut self) -> HealthRegistry { + HealthRegistry { + indicators: self.state.lock().clone().into_iter().collect(), + } + } +} + +#[derive(Default, Clone)] +pub struct HealthRegistry { + indicators: Vec<(Cow<'static, str>, Arc)>, +} + +pub trait HealthStatusReporter { + fn health_status_report(&self) -> Pin + '_>>; +} + +/// Health status reporter implementation for the health registry that provides a stream +/// of health status descriptions. +impl HealthStatusReporter for HealthRegistry { + fn health_status_report(&self) -> Pin + '_>> { + Box::pin( + futures::stream::iter(self.indicators.iter()).then(|(namespace, indicator)| async move { + HealthStatusDescription { + namespace: namespace.clone(), + status: indicator.check_health(namespace.clone()).await, + } + }), + ) + } +} + +/// Default health status indicator implementation for a component. +/// Generally used for components that don't need custom implementations +/// of the `check_health` function. +#[macro_export] +macro_rules! default_health_status_indicator { + ($type:ty) => { + #[async_trait::async_trait] + impl HealthStatusIndicator for $type { + fn get_name(&self) -> &'static str { + stringify!($type) + } + + async fn check_health( + &self, + namespace: std::borrow::Cow<'static, str>, + ) -> nativelink_util::health_utils::HealthStatus { + Store::check_health(Pin::new(self), namespace).await + } + } + }; +} + +// Re-scoped for the health_utils module. +pub use crate::default_health_status_indicator; diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 62c615c0e..c493b9f70 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -20,6 +20,7 @@ pub mod evicting_map; pub mod fastcdc; pub mod fs; pub mod grpc_utils; +pub mod health_utils; pub mod metrics_utils; pub mod platform_properties; pub mod resource_info; diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 2c6a4b721..fcc61f07e 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -12,20 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::marker::Send; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use async_trait::async_trait; use bytes::Bytes; use futures::{join, try_join}; -use nativelink_error::{Error, ResultExt}; +use lazy_static::lazy_static; +use nativelink_error::{make_err, Code, Error, ResultExt}; +use parking_lot::Mutex; +use rand::rngs::StdRng; +use rand::{RngCore, SeedableRng}; use serde::{Deserialize, Serialize}; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; +use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use crate::metrics_utils::Registry; +static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock = OnceLock::new(); +const DEFAULT_DATA_SEED_HEALTH_CHECK: u64 = 0xdeadbeef; + +pub fn default_digest_size_health_check() -> usize { + *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| 1024 * 1024) +} + +pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> { + DEFAULT_DIGEST_SIZE_HEALTH_CHECK + .set(size) + .map_err(|_| make_err!(Code::Internal, "set_default_digest_size_health_check already set")) +} + +lazy_static! { + static ref HEALTH_CHECK_RNG: Mutex = Mutex::new(StdRng::seed_from_u64(DEFAULT_DATA_SEED_HEALTH_CHECK)); +} + #[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] pub enum UploadSizeInfo { /// When the data transfer amount is known to be exact size, this enum should be used. @@ -40,7 +63,7 @@ pub enum UploadSizeInfo { } #[async_trait] -pub trait Store: Sync + Send + Unpin { +pub trait Store: Sync + Send + Unpin + HealthStatusIndicator { /// Look up a digest in the store and return None if it does not exist in /// the store, or Some(size) if it does. /// Note: On an AC store the size will be incorrect and should not be used! @@ -168,6 +191,80 @@ pub trait Store: Sync + Send + Unpin { .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked")) } + // Default implementation of the health check. Some stores may want to override this + // in situations where the default implementation is not sufficient. + async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus { + let digest_data_size = default_digest_size_health_check(); + let namespace_bytes = namespace.as_bytes(); + let store_name_bytes = self.get_name().as_bytes(); + + let mut digest_data = vec![0u8; digest_data_size]; + + // Fill data with all random bytes. + HEALTH_CHECK_RNG.lock().fill_bytes(&mut digest_data); + + // Copy namespace bytes into data + let namespace_copy_length = usize::min(digest_data.len(), namespace_bytes.len()); + digest_data[..namespace_copy_length].copy_from_slice(&namespace_bytes[..namespace_copy_length]); + + // Calculate remaining space after copying namespace bytes + let remaining_space = digest_data_size.saturating_sub(namespace_copy_length); + + // Determine how many store_name bytes can be copied + let store_name_copy_length = usize::min(remaining_space, store_name_bytes.len()); + + // Copy store_name bytes into data, starting where namespace bytes end + if store_name_copy_length > 0 { + digest_data[namespace_copy_length..namespace_copy_length + store_name_copy_length] + .copy_from_slice(&store_name_bytes[..store_name_copy_length]); + } + + let digest_data_hash = blake3::hash(&digest_data).into(); + let digest_data_len = digest_data.len(); + let digest_info = DigestInfo::new(digest_data_hash, digest_data_len as i64); + let digest_data_copy = bytes::Bytes::copy_from_slice(&digest_data); + + if let Err(e) = self.update_oneshot(digest_info, digest_data_copy.clone()).await { + return HealthStatus::new_failed(self.get_ref(), format!("Store.update_oneshot() failed: {}", e).into()); + } + + match self.has(digest_info).await { + Ok(Some(s)) => { + if s != digest_data_len { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.has() size mismatch {s} != {digest_data_len}").into(), + ); + } + } + Ok(None) => { + return HealthStatus::new_failed(self.get_ref(), "Store.has() size not found".into()); + } + Err(e) => { + return HealthStatus::new_failed(self.get_ref(), format!("Store.has() failed: {}", e).into()); + } + } + + match self + .get_part_unchunked(digest_info, 0, Some(digest_data_len), Some(digest_data_len)) + .await + { + Ok(b) => { + if b != digest_data_copy { + return HealthStatus::new_failed(self.get_ref(), "Store.get_part_unchunked() data mismatch".into()); + } + } + Err(e) => { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.get_part_unchunked() failed: {}", e).into(), + ); + } + } + + HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into()) + } + /// Gets the underlying store for the given digest. This can be used to find out /// what any underlying store is for a given digest will be and hand it to the caller. /// A caller might want to use this to obtain a reference to the "real" underlying store @@ -180,4 +277,7 @@ pub trait Store: Sync + Send + Unpin { /// Register any metrics that this store wants to expose to the Prometheus. fn register_metrics(self: Arc, _registry: &mut Registry) {} + + // Register health checks used to monitor the store. + fn register_health(self: Arc, _registry: &mut HealthRegistryBuilder) {} } diff --git a/nativelink-util/tests/health_utils_test.rs b/nativelink-util/tests/health_utils_test.rs new file mode 100644 index 000000000..0ea59fed2 --- /dev/null +++ b/nativelink-util/tests/health_utils_test.rs @@ -0,0 +1,223 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashSet; +use std::iter::FromIterator; +use std::sync::Arc; + +use futures::StreamExt; +use nativelink_error::Error; +use nativelink_util::health_utils::{ + HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusIndicator, HealthStatusReporter, +}; + +#[cfg(test)] +mod health_utils_tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[tokio::test] + async fn create_empty_indicator() -> Result<(), Error> { + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + assert_eq!(health_status.len(), 0); + Ok(()) + } + + #[tokio::test] + async fn create_register_indicator() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 1); + assert_eq!( + health_status, + vec![HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into() + }, + }] + ); + + Ok(()) + } + + #[tokio::test] + async fn create_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let mut namespace1_registry = health_registry_builder.sub_builder("namespace1".into()); + + namespace1_registry.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 2); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_same_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl1".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl1", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl2".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl2", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl3".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl3", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_with_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + let mut sub_builder = health_registry_builder.sub_builder("namespace1".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + let mut sub_builder = health_registry_builder.sub_builder("namespace2".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + + health_registry_builder + .sub_builder("namespace3".into()) + .register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl1".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl1", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace2/MockComponentImpl2".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl2", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace3/MockComponentImpl3".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl3", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[macro_export] + macro_rules! generate_health_status_indicator { + ($struct_name:ident, $health_status:ident, $status_msg:expr) => { + struct $struct_name; + + #[async_trait::async_trait] + impl HealthStatusIndicator for $struct_name { + fn get_name(&self) -> &'static str { + stringify!($struct_name).into() + } + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + HealthStatus::$health_status { + struct_name: stringify!($struct_name).into(), + message: $status_msg.into(), + } + } + } + }; + } + + fn vec_to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index b8a222a7a..54cb3f7bd 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt}; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use nativelink_config::cas_server::{ @@ -41,9 +41,13 @@ use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; +use nativelink_util::health_utils::{ + HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter, +}; use nativelink_util::metrics_utils::{ set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry, }; +use nativelink_util::store_trait::set_default_digest_size_health_check; use nativelink_worker::local_worker::new_local_worker; use parking_lot::Mutex; use rustls_pemfile::{certs as extract_certs, crls as extract_crls}; @@ -71,6 +75,9 @@ const DEFAULT_ADMIN_API_PATH: &str = "/admin"; /// Name of environment variable to disable metrics. const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS"; +/// Content type header value for JSON. +const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; + /// Backend for bazel remote execution / cache API. #[derive(Parser, Debug)] #[clap( @@ -87,17 +94,27 @@ struct Args { async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), Box> { let mut root_metrics_registry = ::with_prefix("nativelink"); + let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink".into()))); let store_manager = Arc::new(StoreManager::new()); { + let mut health_registry_lock = health_registry_builder.lock().await; let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores"); + for (name, store_cfg) in cfg.stores { + let health_component_name = format!("stores/{name}"); + let mut health_register_store = health_registry_lock.sub_builder(health_component_name.into()); let store_metrics = root_store_metrics.sub_registry_with_prefix(&name); store_manager.add_store( &name, - store_factory(&store_cfg, &store_manager, Some(store_metrics)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?, + store_factory( + &store_cfg, + &store_manager, + Some(store_metrics), + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?, ); } } @@ -349,12 +366,59 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B ); let root_metrics_registry = root_metrics_registry.clone(); + let health_registry_status = health_registry_builder.lock().await.build(); let mut svc = Router::new() // This is the default service that executes if no other endpoint matches. .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))) - // This is a generic endpoint used to check if the server is up. - .route_service("/status", axum::routing::get(move || async move { "Ok".to_string() })); + .route_service( + "/status", + axum::routing::get(move || async move { + fn error_to_response(e: E) -> Response { + let mut response = Response::new(format!("Error: {e:?}")); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response + } + + spawn_blocking(move || { + futures::executor::block_on(async { + let health_status_descriptions: Vec = + health_registry_status.health_status_report().collect().await; + + match serde_json5::to_string(&health_status_descriptions) { + Ok(body) => { + let contains_failed_report = health_status_descriptions + .iter() + .any(|description| matches!(description.status, HealthStatus::Failed { .. })); + let status_code = if contains_failed_report { + StatusCode::SERVICE_UNAVAILABLE + } else { + StatusCode::OK + }; + Response::builder() + .status(status_code) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(body) + .unwrap() + } + Err(e) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(format!("Internal Failure: {e:?}")) + .unwrap(), + } + }) + }) + .await + .unwrap_or_else(error_to_response) + }), + ); if let Some(prometheus_cfg) = services.experimental_prometheus { fn error_to_response(e: E) -> Response { @@ -728,6 +792,7 @@ fn main() -> Result<(), Box> { // Note: If the default changes make sure you update the documentation in // `config/cas_server.rs`. const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000; + const DEFAULT_DIGEST_SIZE_HEALTH_CHECK: usize = 1024 * 1024; let global_cfg = if let Some(global_cfg) = &mut cfg.global { if global_cfg.max_open_files == 0 { global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES; @@ -735,6 +800,10 @@ fn main() -> Result<(), Box> { if global_cfg.idle_file_descriptor_timeout_millis == 0 { global_cfg.idle_file_descriptor_timeout_millis = DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS; } + if global_cfg.default_digest_size_health_check == 0 { + global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK; + } + *global_cfg } else { GlobalConfig { @@ -747,6 +816,7 @@ fn main() -> Result<(), Box> { service.experimental_prometheus.is_none() }), default_digest_hash_function: None, + default_digest_size_health_check: DEFAULT_DIGEST_SIZE_HEALTH_CHECK, } }; set_open_file_limit(global_cfg.max_open_files); @@ -756,6 +826,7 @@ fn main() -> Result<(), Box> { .default_digest_hash_function .unwrap_or(ConfigDigestHashFunction::sha256), ))?; + set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?; // TODO (#513): prevent deadlocks by assigning max blocking threads number of open files * ten (!global_cfg.disable_metrics, global_cfg.max_open_files * 10) };