Skip to content

Commit

Permalink
Introduce component based health
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-singer committed Jan 27, 2024
1 parent ae92fb3 commit 1e4cf06
Show file tree
Hide file tree
Showing 16 changed files with 877 additions and 365 deletions.
731 changes: 389 additions & 342 deletions Cargo.Bazel.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand All @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
&nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()),
&store_manager,
Some(&mut <Registry>::default()),
None,
)
.await?,
);
Expand Down
1 change: 1 addition & 0 deletions nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ rust_library(
"@crate_index//:serde",
"@crate_index//:sha2",
"@crate_index//:shellexpand",
"@crate_index//:tempfile",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
"@crate_index//:tokio-util",
Expand Down
1 change: 1 addition & 0 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rand = "0.8.5"
serde = "1.0.193"
sha2 = "0.10.8"
shellexpand = "3.1.0"
tempfile = "3.9.0"
tokio = { version = "1.35.1" }
tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-util = { version = "0.7.10" }
Expand Down
31 changes: 19 additions & 12 deletions nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered;
use futures::{Future, TryStreamExt};
use nativelink_config::stores::StoreConfig;
use nativelink_error::Error;
use nativelink_util::health_utils::HealthRegistry;
use nativelink_util::metrics_utils::Registry;
use nativelink_util::store_trait::Store;

Expand All @@ -44,51 +45,52 @@ pub fn store_factory<'a>(
backend: &'a StoreConfig,
store_manager: &'a Arc<StoreManager>,
maybe_store_metrics: Option<&'a mut Registry>,
maybe_health_registry: Option<&'a mut HealthRegistry>,
) -> Pin<FutureMaybeStore<'a>> {
Box::pin(async move {
let store: Arc<dyn Store> = match backend {
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?),
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
config,
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)),
StoreConfig::compression(config) => Arc::new(CompressionStore::new(
*config.clone(),
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)?),
StoreConfig::dedup(config) => Arc::new(DedupStore::new(
config,
store_factory(&config.index_store, store_manager, None).await?,
store_factory(&config.content_store, store_manager, None).await?,
store_factory(&config.index_store, store_manager, None, None).await?,
store_factory(&config.content_store, store_manager, None, None).await?,
)),
StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new(
config,
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
)),
StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new(
store_factory(&config.backend, store_manager, None).await?,
store_factory(&config.cas_store, store_manager, None).await?,
store_factory(&config.backend, store_manager, None, None).await?,
store_factory(&config.cas_store, store_manager, None, None).await?,
)),
StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new(
config,
store_factory(&config.fast, store_manager, None).await?,
store_factory(&config.slow, store_manager, None).await?,
store_factory(&config.fast, store_manager, None, None).await?,
store_factory(&config.slow, store_manager, None, None).await?,
)),
StoreConfig::filesystem(config) => Arc::new(<FilesystemStore>::new(config).await?),
StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))),
StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new(
config,
store_factory(&config.lower_store, store_manager, None).await?,
store_factory(&config.upper_store, store_manager, None).await?,
store_factory(&config.lower_store, store_manager, None, None).await?,
store_factory(&config.upper_store, store_manager, None, None).await?,
)),
StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?),
StoreConfig::noop => Arc::new(NoopStore::new()),
StoreConfig::shard(config) => {
let stores = config
.stores
.iter()
.map(|store_config| store_factory(&store_config.store, store_manager, None))
.map(|store_config| store_factory(&store_config.store, store_manager, None, None))
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await?;
Expand All @@ -98,6 +100,11 @@ pub fn store_factory<'a>(
if let Some(store_metrics) = maybe_store_metrics {
store.clone().register_metrics(store_metrics);
}

if let Some(health_registry) = maybe_health_registry {
store.clone().register_health(health_registry);
}

Ok(store)
})
}
69 changes: 68 additions & 1 deletion nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::ffi::OsString;
use std::fmt::{Debug, Formatter};
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -29,13 +30,16 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::{fs, DigestInfo};
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::health_utils::{HealthRegistry, HealthStatus, HealthStatusIndicator};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use rand::RngCore;
use tempfile::NamedTempFile;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::task::spawn_blocking;
use tokio::time::{sleep, timeout, Sleep};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

use crate::cas_utils::is_zero_digest;

Expand Down Expand Up @@ -748,6 +752,69 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
registry.register_collector(Box::new(Collector::new(&self)));
}

fn register_health(self: Arc<Self>, registry: &mut HealthRegistry) {
registry.register_indicator(self);
}
}

#[async_trait]
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
async fn check_health(self: Arc<Self>) -> Result<HealthStatus, Error> {
let temp_path = &self.shared_context.temp_path;
let temp_file_with_prefix = NamedTempFile::with_prefix_in(".fs_hc_", temp_path);

let failed_fn = |failure_action, error| {
let message = format!(
"Failed to {:?} temp file in filesystem store health check: {:?}",
failure_action, error
);
error!(message);
self.make_failed(message.into())
};

match temp_file_with_prefix {
Ok(mut file) => {
let mut data = [0u8; 1_000_000];

rand::thread_rng().fill_bytes(&mut data);

match file.write_all(&data) {
Ok(()) => debug!(
"Successfully wrote to temp file in filesystem store health check {:?}",
file
),
Err(err) => return Ok(failed_fn("write", err)),
}

match file.flush() {
Ok(_) => debug!("Successfully flushed temp file in filesystem store health check"),
Err(err) => return Ok(failed_fn("flush", err)),
}

let file_path: std::path::PathBuf = match file.keep() {
Ok((_, p)) => {
debug!("Successfully kept temp file in filesystem store health check {:?}", p);
p.to_path_buf()
}
Err(err) => return Ok(failed_fn("keep", err.into())),
};

debug!("Removing temp file in filesystem store health check {:?}", file_path);
match fs::remove_file(file_path.as_path()).await {
Ok(_) => debug!(
"Successfully removed temp file in filesystem store health check {:?}",
file_path.as_path()
),
Err(err) => return Ok(failed_fn("remove", err.to_std_err())),
}

Ok(self.make_ok("Successfully filesystem store health check".into()))
}

Err(err) => return Ok(failed_fn("create", err)),
}
}
}

impl<Fe: FileEntry> MetricsComponent for FilesystemStore<Fe> {
Expand Down
2 changes: 2 additions & 0 deletions nativelink-util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust_library(
"src/evicting_map.rs",
"src/fastcdc.rs",
"src/fs.rs",
"src/health_utils.rs",
"src/lib.rs",
"src/metrics_utils.rs",
"src/platform_properties.rs",
Expand All @@ -27,6 +28,7 @@ rust_library(
"src/write_request_stream_wrapper.rs",
],
proc_macro_deps = [
"@crate_index//:async-recursion",
"@crate_index//:async-trait",
],
visibility = ["//visibility:public"],
Expand Down
1 change: 1 addition & 0 deletions nativelink-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ nativelink-error = { path = "../nativelink-error" }
nativelink-proto = { path = "../nativelink-proto" }

async-lock = "3.2.0"
async-recursion = "1.0.5"
async-trait = "0.1.74"
blake3 = "1.5.0"
bytes = "1.5.0"
Expand Down
Loading

0 comments on commit 1e4cf06

Please sign in to comment.