Skip to content

Commit

Permalink
Add Completeness Checking Store
Browse files Browse the repository at this point in the history
  • Loading branch information
blakehatch committed Dec 1, 2023
1 parent 3ec203b commit d668561
Show file tree
Hide file tree
Showing 8 changed files with 1,165 additions and 22 deletions.
26 changes: 6 additions & 20 deletions Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "b6ac671d2578b85d845eb82ff6476b66191872c127cd21141c325f53caf48152",
"checksum": "1cddc72c7bc33d5a78a2a93dae73e91a7ac809a5a43492db15db43c5fb80bee9",
"crates": {
"addr2line 0.21.0": {
"name": "addr2line",
Expand Down Expand Up @@ -7359,25 +7359,11 @@
"**"
],
"crate_features": {
"common": [],
"selects": {
"aarch64-unknown-linux-gnu": [
"default",
"std"
],
"arm-unknown-linux-gnueabi": [
"default",
"std"
],
"armv7-unknown-linux-gnueabi": [
"default",
"std"
],
"x86_64-unknown-linux-gnu": [
"default",
"std"
]
}
"common": [
"default",
"std"
],
"selects": {}
},
"deps": {
"common": [
Expand Down
37 changes: 37 additions & 0 deletions native-link-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ pub enum StoreConfig {
/// hash and size and the AC validate nothing.
verify(Box<VerifyStore>),

/// Completeness checking store verifies if the
/// action items exist in the CAS or else return not found.
///
/// The action cache api requires the outputs to exist if you return
/// an action result. This store is only valid for Action Cache stores
/// and will verify that all outputs of a previously ran result still exist
/// in the CAS.
completeness_checking(Box<CompletenessCheckingStore>),

/// A compression store that will compress the data inbound and
/// outbound. There will be a non-trivial cost to compress and
/// decompress the data, but in many cases if the final store is
Expand Down Expand Up @@ -331,6 +340,34 @@ pub struct VerifyStore {
pub verify_hash: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RefStoreConfig {
pub config: StoreConfig,
}

impl RefStoreConfig {
pub fn new(config: StoreConfig) -> Option<Self> {
match config {
StoreConfig::ref_store(_) => Some(Self { config }),
_ => None,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CompletenessCheckingStore {
/// The underlying store wrap around. All content will first flow
/// through self before forwarding to backend. In the event there
/// is an error detected in self, the connection to the backend
/// will be terminated, and early termination should always cause
/// updates to fail on the backend.
pub backend: StoreConfig,

/// The CAS that will have item's existence validated within it.
/// This store should always be a RefStore
pub cas_store: RefStoreConfig,
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, Copy)]
pub struct Lz4Config {
/// Size of the blocks to compress.
Expand Down
5 changes: 5 additions & 0 deletions native-link-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rust_library(
name = "native-link-store",
srcs = [
"src/ac_utils.rs",
"src/completeness_checking_store.rs",
"src/compression_store.rs",
"src/dedup_store.rs",
"src/default_store_factory.rs",
Expand Down Expand Up @@ -46,6 +47,7 @@ rust_library(
"@crate_index//:bytes",
"@crate_index//:filetime",
"@crate_index//:futures",
"@crate_index//:hashbrown",
"@crate_index//:hex",
"@crate_index//:hyper",
"@crate_index//:hyper-rustls",
Expand All @@ -68,6 +70,7 @@ rust_test_suite(
name = "integration",
srcs = [
"tests/ac_utils_test.rs",
"tests/completeness_checking_store_test.rs",
"tests/compression_store_test.rs",
"tests/dedup_store_test.rs",
"tests/existence_store_test.rs",
Expand All @@ -88,6 +91,7 @@ rust_test_suite(
"//error",
"//native-link-config",
"//native-link-util",
"//proto",
"@crate_index//:async-lock",
"@crate_index//:aws-sdk-s3",
"@crate_index//:aws-smithy-runtime",
Expand All @@ -101,6 +105,7 @@ rust_test_suite(
"@crate_index//:memory-stats",
"@crate_index//:once_cell",
"@crate_index//:pretty_assertions",
"@crate_index//:prost",
"@crate_index//:rand",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
Expand Down
15 changes: 13 additions & 2 deletions native-link-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::pin::Pin;
use bytes::{Bytes, BytesMut};
use error::{Code, Error, ResultExt};
use futures::future::join;
use futures::{Future, FutureExt};
use futures::{Future, FutureExt, TryFutureExt};
use native_link_util::buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
use native_link_util::common::{fs, DigestInfo};
use native_link_util::digest_hasher::DigestHasher;
Expand All @@ -48,6 +48,14 @@ pub async fn get_and_decode_digest<T: Message + Default>(
store: Pin<&dyn Store>,
digest: &DigestInfo,
) -> Result<T, Error> {
get_size_and_decode_digest(store, digest).map_ok(|(v, _)| v).await
}

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_size_and_decode_digest<T: Message + Default>(
store: Pin<&dyn Store>,
digest: &DigestInfo,
) -> Result<(T, usize), Error> {
let mut store_data_resp = store
.get_part_unchunked(*digest, 0, Some(MAX_ACTION_MSG_SIZE), Some(ESTIMATED_DIGEST_SIZE))
.await;
Expand All @@ -60,8 +68,11 @@ pub async fn get_and_decode_digest<T: Message + Default>(
}
}
let store_data = store_data_resp?;
let store_data_len = store_data.len();

T::decode(store_data).err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
T::decode(store_data)
.err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
.map(|v| (v, store_data_len))
}

/// Computes the digest of a message.
Expand Down
222 changes: 222 additions & 0 deletions native-link-store/src/completeness_checking_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright 2023 The Native Link Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use async_trait::async_trait;
use error::{make_input_err, Error, ResultExt};
use futures::future::BoxFuture;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::{FutureExt, TryFutureExt};
use hashbrown::HashSet;
use native_link_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use native_link_util::common::DigestInfo;
use native_link_util::store_trait::{Store, UploadSizeInfo};
use proto::build::bazel::remote::execution::v2::{ActionResult as ProtoActionResult, Tree as ProtoTree};

use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest};

pub struct CompletenessCheckingStore {
cas_store: Arc<dyn Store>,
ac_store: Arc<dyn Store>,
}

impl CompletenessCheckingStore {
pub fn new(ac_store: Arc<dyn Store>, cas_store: Arc<dyn Store>) -> Self {
CompletenessCheckingStore { cas_store, ac_store }
}

fn pin_cas(&self) -> Pin<&dyn Store> {
Pin::new(self.cas_store.as_ref())
}

fn pin_ac(&self) -> Pin<&dyn Store> {
Pin::new(self.ac_store.as_ref())
}
}

#[async_trait]
impl Store for CompletenessCheckingStore {
async fn has_with_results(
self: Pin<&Self>,
action_result_digests: &[DigestInfo],
results: &mut [Option<usize>],
) -> Result<(), Error> {
enum FutureResult<'a> {
AddFuturesAndDigests((Vec<BoxFuture<'a, FutureResult<'a>>>, Vec<(DigestInfo, usize)>)),
Err(usize),
}

let mut futures = FuturesUnordered::new();

for (i, digest) in action_result_digests.iter().enumerate() {
futures.push(
get_size_and_decode_digest::<ProtoActionResult>(self.pin_cas(), digest)
.and_then(move |(action_result, _size)| async move {
// We need to add 2 because stderr and stdout digests.
const NUM_ADDTIONAL_DIGESTS: usize = 2;
let mut digest_infos =
Vec::with_capacity(action_result.output_files.len() + NUM_ADDTIONAL_DIGESTS);
let maybe_digests = action_result
.stderr_digest
.map(DigestInfo::try_from)
.into_iter()
.chain(action_result.stdout_digest.map(DigestInfo::try_from).into_iter())
.chain(
action_result
.output_files
.into_iter()
.filter_map(|file| file.digest.map(DigestInfo::try_from)),
);
for maybe_digest in maybe_digests {
match maybe_digest {
Ok(digest) => digest_infos.push((digest, i)),
Err(_) => return Err(make_input_err!("")),
}
}

let v = action_result
.output_directories
.into_iter()
.map(move |output_directory| {
{
async move {
let Ok(tree_digest) = output_directory
.tree_digest
.err_tip(|| "Could not decode tree digest completeness_checking_store::has_with_results")
.and_then(DigestInfo::try_from)
else {
return FutureResult::Err(i);
};
get_and_decode_digest::<ProtoTree>(self.pin_cas(), &tree_digest)
.map_ok(|tree| {
let digest_count =
tree.children.iter().chain(&tree.root).fold(0, |acc, directory| {
acc + directory.files.len() + directory.directories.len()
});
let mut digest_infos = Vec::with_capacity(digest_count);
for directory in tree.children.into_iter().chain(tree.root) {
let maybe_digests = directory
.files
.into_iter()
.filter_map(|file| file.digest.map(DigestInfo::try_from))
.chain(directory.directories.into_iter().filter_map(
|directory| directory.digest.map(DigestInfo::try_from),
))
.collect::<Vec<_>>();
for maybe_digest in maybe_digests {
match maybe_digest {
Ok(digest) => digest_infos.push((digest, i)),
Err(_) => return FutureResult::Err(i),
}
}
}
FutureResult::AddFuturesAndDigests((vec![], digest_infos))
})
.map(move |result| match result {
Ok(v) => v,
Err(_) => FutureResult::Err(i),
})
.await
}
}
.boxed()
})
.collect::<Vec<_>>();

Ok(FutureResult::AddFuturesAndDigests((v, digest_infos)))
})
.map(move |v| v.unwrap_or_else(|_| FutureResult::Err(i)))
.boxed(),
);
}

let mut digest_deque = VecDeque::new();
let has_request_outstanding = Arc::new(AtomicBool::new(false));
let mut none_found: HashSet<usize> = HashSet::new();
const INDEX_ZERO: usize = 0;

while let Some(future_result) = futures.next().await {
match future_result {
FutureResult::Err(i) => {
results[i] = None;
}
FutureResult::AddFuturesAndDigests((futures_to_add, digest_infos)) => {
futures.extend(futures_to_add.into_iter());
digest_deque.extend(digest_infos.into_iter());
if !digest_deque.is_empty() && !has_request_outstanding.load(Ordering::Acquire) {
let has_request_outstanding = has_request_outstanding.clone();
has_request_outstanding.store(true, Ordering::Release);

let (digests, indexes): (Vec<_>, Vec<_>) = digest_deque.drain(..).unzip();

// Optimization: every batch of digests above comes in with the
// same indices so by cross-referencing the first index in indexes for a previous missing digest found
// this becomes far more efficient.
if !none_found.contains(&indexes[INDEX_ZERO]) {
let res_list = self.pin_cas().has_many(&digests).await?;

for (i, result) in res_list.iter().enumerate() {
if result.is_none() {
results[indexes[i]] = None;
none_found.insert(indexes[i]);
break;
} else {
results[indexes[i]] = *result;
}
}

futures.push(
async move {
has_request_outstanding.store(false, Ordering::Release);
FutureResult::AddFuturesAndDigests((vec![], vec![]))
}
.boxed(),
);
}
}
}
}
}

Ok(())
}

async fn update(
self: Pin<&Self>,
digest: DigestInfo,
reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
self.pin_ac().update(digest, reader, size_info).await
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
let pin_ac = self.pin_ac();
pin_ac.get_part_ref(digest, writer, offset, length).await
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}
Loading

0 comments on commit d668561

Please sign in to comment.