Skip to content

Commit

Permalink
Introduce component based health
Browse files Browse the repository at this point in the history
Introduce a component based health check system. Each type of component
should be able to opt into registering handlers implement some mechanical
checks of health. Health in this context is functionality expected to
work but runtime wise are semi no-op in terms of influencing the underlying
storage / rpc systems.

Opting in to the system requires for component to define a `HealthStatusIndicator`
and implement the `check_health()` function. Registration should automatically
be done by the existence of implementation and calling the `Store.register_health()`
function in component implementation. At the moment only `Store` based components
can register and a single health check is defined for `FilesystemStore`.

The `/status` endpoint has been updated to return the resulting string serialized
instances of `HealthStatus`, smart serialization of such objects is not implemented
at this time.
  • Loading branch information
adam-singer committed Jan 27, 2024
1 parent ae92fb3 commit 01188ae
Show file tree
Hide file tree
Showing 16 changed files with 872 additions and 361 deletions.
731 changes: 389 additions & 342 deletions Cargo.Bazel.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand All @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
1 change: 1 addition & 0 deletions nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ rust_library(
"@crate_index//:serde",
"@crate_index//:sha2",
"@crate_index//:shellexpand",
"@crate_index//:tempfile",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
"@crate_index//:tokio-util",
Expand Down
1 change: 1 addition & 0 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rand = "0.8.5"
serde = "1.0.193"
sha2 = "0.10.8"
shellexpand = "3.1.0"
tempfile = "3.9.0"
tokio = { version = "1.35.1" }
tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-util = { version = "0.7.10" }
Expand Down
31 changes: 19 additions & 12 deletions nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered;
use futures::{Future, TryStreamExt};
use nativelink_config::stores::StoreConfig;
use nativelink_error::Error;
use nativelink_util::health_utils::HealthRegistry;
use nativelink_util::metrics_utils::Registry;
use nativelink_util::store_trait::Store;

Expand All @@ -44,51 +45,52 @@ pub fn store_factory<'a>(
backend: &'a StoreConfig,
store_manager: &'a Arc<StoreManager>,
maybe_store_metrics: Option<&'a mut Registry>,
maybe_health_registry: Option<&'a mut HealthRegistry>,
) -> Pin<FutureMaybeStore<'a>> {
Box::pin(async move {
let store: Arc<dyn Store> = match backend {
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?),
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
config,
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)),
StoreConfig::compression(config) => Arc::new(CompressionStore::new(
*config.clone(),
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)?),
StoreConfig::dedup(config) => Arc::new(DedupStore::new(
config,
store_factory(&config.index_store, store_manager, None).await?,
store_factory(&config.content_store, store_manager, None).await?,
store_factory(&config.index_store, store_manager, None, None).await?,
store_factory(&config.content_store, store_manager, None, None).await?,
)),
StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new(
config,
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)),
StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new(
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.cas_store, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
store_factory(&config.cas_store, store_manager, None, None).await?,
)),
StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new(
config,
store_factory(&config.fast, store_manager, None).await?,
store_factory(&config.slow, store_manager, None).await?,
store_factory(&config.fast, store_manager, None, None).await?,
store_factory(&config.slow, store_manager, None, None).await?,
)),
StoreConfig::filesystem(config) => Arc::new(<FilesystemStore>::new(config).await?),
StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))),
StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new(
config,
store_factory(&config.lower_store, store_manager, None).await?,
store_factory(&config.upper_store, store_manager, None).await?,
store_factory(&config.lower_store, store_manager, None, None).await?,
store_factory(&config.upper_store, store_manager, None, None).await?,
)),
StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?),
StoreConfig::noop => Arc::new(NoopStore::new()),
StoreConfig::shard(config) => {
let stores = config
.stores
.iter()
.map(|store_config| store_factory(&store_config.store, store_manager, None))
.map(|store_config| store_factory(&store_config.store, store_manager, None, None))
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await?;
Expand All @@ -98,6 +100,11 @@ pub fn store_factory<'a>(
if let Some(store_metrics) = maybe_store_metrics {
store.clone().register_metrics(store_metrics);
}

if let Some(health_registry) = maybe_health_registry {
store.clone().register_health(health_registry);
}

Ok(store)
})
}
69 changes: 68 additions & 1 deletion nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::ffi::OsString;
use std::fmt::{Debug, Formatter};
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -29,13 +30,16 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::health_utils::{HealthRegistry, HealthStatus, HealthStatusIndicator};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use rand::RngCore;
use tempfile::NamedTempFile;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::task::spawn_blocking;
use tokio::time::{sleep, timeout, Sleep};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

use crate::cas_utils::is_zero_digest;

Expand Down Expand Up @@ -748,6 +752,69 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
registry.register_collector(Box::new(Collector::new(&self)));
}

fn register_health(self: Arc<Self>, registry: &mut HealthRegistry) {
registry.register_indicator(self);
}
}

#[async_trait]
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
async fn check_health(self: Arc<Self>) -> Result<HealthStatus, Error> {
let temp_path = &self.shared_context.temp_path;
let temp_file_with_prefix = NamedTempFile::with_prefix_in(".fs_hc_", temp_path);

let failed_fn = |failure_action, error| {
let message = format!(
"Failed to {:?} temp file in filesystem store health check: {:?}",
failure_action, error
);
error!(message);
self.make_failed(message.into())
};

match temp_file_with_prefix {
Ok(mut file) => {
let mut data = [0u8; 1_000_000];

rand::thread_rng().fill_bytes(&mut data);

match file.write_all(&data) {
Ok(()) => debug!(
"Successfully wrote to temp file in filesystem store health check {:?}",
file
),
Err(err) => return Ok(failed_fn("write", err)),
}

match file.flush() {
Ok(_) => debug!("Successfully flushed temp file in filesystem store health check"),
Err(err) => return Ok(failed_fn("flush", err)),
}

let file_path: std::path::PathBuf = match file.keep() {
Ok((_, p)) => {
debug!("Successfully kept temp file in filesystem store health check {:?}", p);
p.to_path_buf()
}
Err(err) => return Ok(failed_fn("keep", err.into())),
};

debug!("Removing temp file in filesystem store health check {:?}", file_path);
match fs::remove_file(file_path.as_path()).await {
Ok(_) => debug!(
"Successfully removed temp file in filesystem store health check {:?}",
file_path.as_path()
),
Err(err) => return Ok(failed_fn("remove", err.to_std_err())),
}

Ok(self.make_ok("Successfully filesystem store health check".into()))
}

Err(err) => return Ok(failed_fn("create", err)),
}
}
}

impl<Fe: FileEntry> MetricsComponent for FilesystemStore<Fe> {
Expand Down
2 changes: 2 additions & 0 deletions nativelink-util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust_library(
"src/evicting_map.rs",
"src/fastcdc.rs",
"src/fs.rs",
"src/health_utils.rs",
"src/lib.rs",
"src/metrics_utils.rs",
"src/platform_properties.rs",
Expand All @@ -27,6 +28,7 @@ rust_library(
"src/write_request_stream_wrapper.rs",
],
proc_macro_deps = [
"@crate_index//:async-recursion",
"@crate_index//:async-trait",
],
visibility = ["//visibility:public"],
Expand Down
1 change: 1 addition & 0 deletions nativelink-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ nativelink-error = { path = "../nativelink-error" }
nativelink-proto = { path = "../nativelink-proto" }

async-lock = "3.2.0"
async-recursion = "1.0.5"
async-trait = "0.1.74"
blake3 = "1.5.0"
bytes = "1.5.0"
Expand Down
Loading

0 comments on commit 01188ae

Please sign in to comment.