Skip to content

Commit

Permalink
Add Completeness Checking Store
Browse files Browse the repository at this point in the history
This verifies all file and folder digests from action results are in the CAS.
  • Loading branch information
blakehatch committed Dec 8, 2023
1 parent 5acfa25 commit d30d832
Show file tree
Hide file tree
Showing 9 changed files with 620 additions and 4 deletions.
16 changes: 16 additions & 0 deletions native-link-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ pub enum StoreConfig {
/// hash and size and the AC validate nothing.
verify(Box<VerifyStore>),

/// Completeness checking store verifies if the
/// output files & folders exist in the CAS before forwarding
/// the request to the underlying store.
/// Note: This store should only be used on AC stores.
completeness_checking(Box<CompletenessCheckingStore>),

/// A compression store that will compress the data inbound and
/// outbound. There will be a non-trivial cost to compress and
/// decompress the data, but in many cases if the final store is
Expand Down Expand Up @@ -331,6 +337,16 @@ pub struct VerifyStore {
pub verify_hash: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CompletenessCheckingStore {
/// The underlying store that will have it's results validated before sending to client.
pub backend: StoreConfig,

/// When a request is made, the results are decoded and all output digests/files are verified
/// to exist in this CAS store before returning success.
pub cas_store: StoreConfig,
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, Copy)]
pub struct Lz4Config {
/// Size of the blocks to compress.
Expand Down
3 changes: 3 additions & 0 deletions native-link-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rust_library(
name = "native-link-store",
srcs = [
"src/ac_utils.rs",
"src/completeness_checking_store.rs",
"src/compression_store.rs",
"src/dedup_store.rs",
"src/default_store_factory.rs",
Expand Down Expand Up @@ -69,6 +70,7 @@ rust_test_suite(
name = "integration",
srcs = [
"tests/ac_utils_test.rs",
"tests/completeness_checking_store_test.rs",
"tests/compression_store_test.rs",
"tests/dedup_store_test.rs",
"tests/existence_store_test.rs",
Expand All @@ -89,6 +91,7 @@ rust_test_suite(
"//error",
"//native-link-config",
"//native-link-util",
"//proto",
"@crate_index//:async-lock",
"@crate_index//:aws-sdk-s3",
"@crate_index//:aws-smithy-runtime",
Expand Down
2 changes: 1 addition & 1 deletion native-link-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ futures = "0.3.28"
hex = "0.4.3"
hyper = { version = "0.14.27" }
hyper-rustls = { version = "0.24.2", features = ["webpki-tokio"] }
tracing = "0.1.40"
lz4_flex = "0.11.1"
parking_lot = "0.12.1"
prost = "0.11.9"
Expand All @@ -34,7 +35,6 @@ tokio = { version = "1.29.1" }
tokio-stream = { version = "0.1.14", features = ["fs"] }
tokio-util = { version = "0.7.8" }
tonic = { version = "0.9.2", features = ["gzip"] }
tracing = "0.1.40"
uuid = { version = "1.4.0", features = ["v4"] }

[dev-dependencies]
Expand Down
15 changes: 13 additions & 2 deletions native-link-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::pin::Pin;
use bytes::{Bytes, BytesMut};
use error::{Code, Error, ResultExt};
use futures::future::join;
use futures::{Future, FutureExt};
use futures::{Future, FutureExt, TryFutureExt};
use native_link_util::buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
use native_link_util::common::{fs, DigestInfo};
use native_link_util::digest_hasher::DigestHasher;
Expand All @@ -48,6 +48,14 @@ pub async fn get_and_decode_digest<T: Message + Default>(
store: Pin<&dyn Store>,
digest: &DigestInfo,
) -> Result<T, Error> {
get_size_and_decode_digest(store, digest).map_ok(|(v, _)| v).await
}

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_size_and_decode_digest<T: Message + Default>(
store: Pin<&dyn Store>,
digest: &DigestInfo,
) -> Result<(T, usize), Error> {
let mut store_data_resp = store
.get_part_unchunked(*digest, 0, Some(MAX_ACTION_MSG_SIZE), Some(ESTIMATED_DIGEST_SIZE))
.await;
Expand All @@ -60,8 +68,11 @@ pub async fn get_and_decode_digest<T: Message + Default>(
}
}
let store_data = store_data_resp?;
let store_data_len = store_data.len();

T::decode(store_data).err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
T::decode(store_data)
.err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
.map(|v| (v, store_data_len))
}

/// Computes the digest of a message.
Expand Down
Loading

0 comments on commit d30d832

Please sign in to comment.