From addf446f052ff74edcdb083f2b2968b313daa940 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 11 May 2024 15:40:41 +0200 Subject: [PATCH] fix: more robustness in the face of a trampling-herd of threads loading a single index. The motivating example is here: https://github.com/praetorian-inc/noseyparker/issues/179 Previously, it was possible for a trampling herd of threads to consolidate the disk state. Most of them would be 'needs-init' threads which could notice that the initialization already happened, and just use that. But a thread might be late for the party and somehow manages to not get any newly loaded index, and thus tries to consolidate with what's on disk again. Then it would again determine no change, and return nothing, causing the caller to abort and not find objects it should find because it wouldn't see the index that it should have seen. The reason the thread got into this mess is that the 'is-load-ongoing' flagging was racy itself, so it would not wait for ongoing loads and just conclude nothing happened. An extra delay (by yielding) now assures it either seees the loading state and waits for it, sees the newly loaded indices. Note that this issue can be reproduced with: ``` './target/release/gix -r repo-with-one-pack -t10 --trace odb stats --extra-header-lookup' ``` --- gitoxide-core/src/repository/odb.rs | 76 ++++++++++-- gix-odb/src/store_impls/dynamic/load_index.rs | 111 ++++++++++-------- gix-odb/src/store_impls/dynamic/types.rs | 4 +- src/plumbing/main.rs | 8 +- src/plumbing/options/mod.rs | 6 +- 5 files changed, 142 insertions(+), 63 deletions(-) diff --git a/gitoxide-core/src/repository/odb.rs b/gitoxide-core/src/repository/odb.rs index 85c35618db2..e8cb1f7a0b1 100644 --- a/gitoxide-core/src/repository/odb.rs +++ b/gitoxide-core/src/repository/odb.rs @@ -1,4 +1,5 @@ use std::io; +use std::sync::atomic::Ordering; use anyhow::bail; @@ -50,6 +51,8 @@ pub mod statistics { pub struct Options { pub format: OutputFormat, pub thread_limit: Option, + /// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded + pub extra_header_lookup: bool, } } @@ -59,7 +62,11 @@ pub fn statistics( mut progress: impl gix::Progress, out: impl io::Write, mut err: impl io::Write, - statistics::Options { format, thread_limit }: statistics::Options, + statistics::Options { + format, + thread_limit, + extra_header_lookup, + }: statistics::Options, ) -> anyhow::Result<()> { use bytesize::ByteSize; use gix::odb::{find, HeaderExt}; @@ -76,6 +83,10 @@ pub fn statistics( #[cfg_attr(feature = "serde", derive(serde::Serialize))] #[derive(Default)] struct Statistics { + /// All objects that were used to produce these statistics. + /// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices. + #[cfg_attr(feature = "serde", serde(skip_serializing))] + ids: Option>, total_objects: usize, loose_objects: usize, packed_objects: usize, @@ -135,14 +146,17 @@ pub fn statistics( } impl gix::parallel::Reduce for Reduce { - type Input = Result, anyhow::Error>; + type Input = Result, anyhow::Error>; type FeedProduce = (); type Output = Statistics; type Error = anyhow::Error; fn feed(&mut self, items: Self::Input) -> Result { - for item in items? { + for (id, item) in items? { self.stats.consume(item); + if let Some(ids) = self.stats.ids.as_mut() { + ids.push(id); + } } Ok(()) } @@ -154,9 +168,9 @@ pub fn statistics( } let cancelled = || anyhow::anyhow!("Cancelled by user"); - let object_ids = repo.objects.store_ref().iter()?.filter_map(Result::ok); + let object_ids = repo.objects.iter()?.filter_map(Result::ok); let chunk_size = 1_000; - let stats = if gix::parallel::num_threads(thread_limit) > 1 { + let mut stats = if gix::parallel::num_threads(thread_limit) > 1 { gix::parallel::in_parallel( gix::interrupt::Iter::new( gix::features::iter::Chunks { @@ -166,19 +180,30 @@ pub fn statistics( cancelled, ), thread_limit, - move |_| (repo.objects.clone().into_inner(), counter), + { + let objects = repo.objects.clone(); + move |_| (objects.clone().into_inner(), counter) + }, |ids, (handle, counter)| { let ids = ids?; - counter.fetch_add(ids.len(), std::sync::atomic::Ordering::Relaxed); + counter.fetch_add(ids.len(), Ordering::Relaxed); let out = ids .into_iter() - .map(|id| handle.header(id)) + .map(|id| handle.header(id).map(|hdr| (id, hdr))) .collect::, _>>()?; Ok(out) }, - Reduce::default(), + Reduce { + stats: Statistics { + ids: extra_header_lookup.then(Vec::new), + ..Default::default() + }, + }, )? } else { + if extra_header_lookup { + bail!("extra-header-lookup is only meaningful in threaded mode"); + } let mut stats = Statistics::default(); for (count, id) in object_ids.enumerate() { @@ -193,6 +218,39 @@ pub fn statistics( progress.show_throughput(start); + if let Some(mut ids) = stats.ids.take() { + // Critical to re-open the repo to assure we don't have any ODB state and start fresh. + let start = std::time::Instant::now(); + let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?; + progress.set_name("re-counting".into()); + progress.init(Some(ids.len()), gix::progress::count("objects")); + let counter = progress.counter(); + counter.store(0, Ordering::Relaxed); + let errors = gix::parallel::in_parallel_with_slice( + &mut ids, + thread_limit, + { + let objects = repo.objects.clone(); + move |_| (objects.clone().into_inner(), counter, false) + }, + |id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> { + counter.fetch_add(1, Ordering::Relaxed); + if let Err(_err) = odb.header(id) { + *has_error = true; + gix::trace::error!(err = ?_err, "Object that is known to be present wasn't found"); + } + Ok(()) + }, + || Some(std::time::Duration::from_millis(100)), + |(_, _, has_error)| has_error, + )?; + + progress.show_throughput(start); + if errors.contains(&true) { + bail!("At least one object couldn't be looked up even though it must exist"); + } + } + #[cfg(feature = "serde")] { serde_json::to_writer_pretty(out, &stats)?; diff --git a/gix-odb/src/store_impls/dynamic/load_index.rs b/gix-odb/src/store_impls/dynamic/load_index.rs index a255f6c4b58..18afbc86958 100644 --- a/gix-odb/src/store_impls/dynamic/load_index.rs +++ b/gix-odb/src/store_impls/dynamic/load_index.rs @@ -4,7 +4,7 @@ use std::{ ops::Deref, path::{Path, PathBuf}, sync::{ - atomic::{AtomicU16, AtomicUsize, Ordering}, + atomic::{AtomicU16, Ordering}, Arc, }, time::SystemTime, @@ -86,7 +86,7 @@ impl super::Store { Ok(Some(self.collect_snapshot())) } else { // always compare to the latest state - // Nothing changed in the mean time, try to load another index… + // Nothing changed in the meantime, try to load another index… if self.load_next_index(index) { Ok(Some(self.collect_snapshot())) } else { @@ -119,7 +119,7 @@ impl super::Store { let slot = &self.files[index.slot_indices[slot_map_index]]; let _lock = slot.write.lock(); if slot.generation.load(Ordering::SeqCst) > index.generation { - // There is a disk consolidation in progress which just overwrote a slot that cold be disposed with some other + // There is a disk consolidation in progress which just overwrote a slot that could be disposed with some other // index, one we didn't intend to load. // Continue with the next slot index in the hope there is something else we can do… continue 'retry_with_next_slot_index; @@ -128,14 +128,18 @@ impl super::Store { let bundle_mut = Arc::make_mut(&mut bundle); if let Some(files) = bundle_mut.as_mut() { // these are always expected to be set, unless somebody raced us. We handle this later by retrying. - let _loaded_count = IncOnDrop(&index.loaded_indices); - match files.load_index(self.object_hash) { + let res = { + let res = files.load_index(self.object_hash); + slot.files.store(bundle); + index.loaded_indices.fetch_add(1, Ordering::SeqCst); + res + }; + match res { Ok(_) => { - slot.files.store(bundle); break 'retry_with_next_slot_index; } - Err(_) => { - slot.files.store(bundle); + Err(_err) => { + gix_features::trace::error!(err=?_err, "Failed to load index file - some objects may seem to not exist"); continue 'retry_with_next_slot_index; } } @@ -145,9 +149,14 @@ impl super::Store { // There can be contention as many threads start working at the same time and take all the // slots to load indices for. Some threads might just be left-over and have to wait for something // to change. - let num_load_operations = index.num_indices_currently_being_loaded.deref(); // TODO: potentially hot loop - could this be a condition variable? - while num_load_operations.load(Ordering::Relaxed) != 0 { + // This is a timing-based fix for the case that the `num_indices_being_loaded` isn't yet incremented, + // and we might break out here without actually waiting for the loading operation. Then we'd fail to + // observe a change and the underlying handler would not have all the indices it needs at its disposal. + // Yielding means we will definitely loose enough time to observe the ongoing operation, + // or its effects. + std::thread::yield_now(); + while index.num_indices_currently_being_loaded.load(Ordering::SeqCst) != 0 { std::thread::yield_now() } break 'retry_with_next_slot_index; @@ -197,7 +206,7 @@ impl super::Store { // We might not be able to detect by pointer if the state changed, as this itself is racy. So we keep track of double-initialization // using a flag, which means that if `needs_init` was true we saw the index uninitialized once, but now that we are here it's - // initialized meaning that somebody was faster and we couldn't detect it by comparisons to the index. + // initialized meaning that somebody was faster, and we couldn't detect it by comparisons to the index. // If so, make sure we collect the snapshot instead of returning None in case nothing actually changed, which is likely with a // race like this. if !was_uninitialized && needs_init { @@ -397,18 +406,19 @@ impl super::Store { // generation stays the same, as it's the same value still but scheduled for eventual removal. } } else { + // set the generation before we actually change the value, otherwise readers of old generations could observe the new one. + // We rather want them to turn around here and update their index, which, by that time, might actually already be available. + // If not, they would fail unable to load a pack or index they need, but that's preferred over returning wrong objects. + // Safety: can't race as we hold the lock, have to set the generation beforehand to help avoid others to observe the value. + slot.generation.store(generation, Ordering::SeqCst); *files_mut = None; }; slot.files.store(files); - if !needs_stable_indices { - // Not racy due to lock, generation must be set after unsetting the slot value AND storing it. - slot.generation.store(generation, Ordering::SeqCst); - } } let new_index = self.index.load(); Ok(if index.state_id() == new_index.state_id() { - // there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops + // there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops. None } else { if load_new_index { @@ -619,34 +629,44 @@ impl super::Store { } pub(crate) fn collect_snapshot(&self) -> Snapshot { + // We don't observe changes-on-disk in our 'wait-for-load' loop. + // That loop is meant to help assure the marker (which includes the amount of loaded indices) matches + // the actual amount of indices we collect. let index = self.index.load(); - let indices = if index.is_initialized() { - index - .slot_indices - .iter() - .map(|idx| (*idx, &self.files[*idx])) - .filter_map(|(id, file)| { - let lookup = match (**file.files.load()).as_ref()? { - types::IndexAndPacks::Index(bundle) => handle::SingleOrMultiIndex::Single { - index: bundle.index.loaded()?.clone(), - data: bundle.data.loaded().cloned(), - }, - types::IndexAndPacks::MultiIndex(multi) => handle::SingleOrMultiIndex::Multi { - index: multi.multi_index.loaded()?.clone(), - data: multi.data.iter().map(|f| f.loaded().cloned()).collect(), - }, - }; - handle::IndexLookup { file: lookup, id }.into() - }) - .collect() - } else { - Vec::new() - }; + loop { + if index.num_indices_currently_being_loaded.deref().load(Ordering::SeqCst) != 0 { + std::thread::yield_now(); + continue; + } + let marker = index.marker(); + let indices = if index.is_initialized() { + index + .slot_indices + .iter() + .map(|idx| (*idx, &self.files[*idx])) + .filter_map(|(id, file)| { + let lookup = match (**file.files.load()).as_ref()? { + types::IndexAndPacks::Index(bundle) => handle::SingleOrMultiIndex::Single { + index: bundle.index.loaded()?.clone(), + data: bundle.data.loaded().cloned(), + }, + types::IndexAndPacks::MultiIndex(multi) => handle::SingleOrMultiIndex::Multi { + index: multi.multi_index.loaded()?.clone(), + data: multi.data.iter().map(|f| f.loaded().cloned()).collect(), + }, + }; + handle::IndexLookup { file: lookup, id }.into() + }) + .collect() + } else { + Vec::new() + }; - Snapshot { - indices, - loose_dbs: Arc::clone(&index.loose_dbs), - marker: index.marker(), + return Snapshot { + indices, + loose_dbs: Arc::clone(&index.loose_dbs), + marker, + }; } } } @@ -669,13 +689,6 @@ impl<'a> Drop for IncOnNewAndDecOnDrop<'a> { } } -struct IncOnDrop<'a>(&'a AtomicUsize); -impl<'a> Drop for IncOnDrop<'a> { - fn drop(&mut self) { - self.0.fetch_add(1, Ordering::SeqCst); - } -} - pub(crate) enum Either { IndexPath(PathBuf), MultiIndexFile(Arc), diff --git a/gix-odb/src/store_impls/dynamic/types.rs b/gix-odb/src/store_impls/dynamic/types.rs index 473c587bbea..35257ee7ee8 100644 --- a/gix-odb/src/store_impls/dynamic/types.rs +++ b/gix-odb/src/store_impls/dynamic/types.rs @@ -18,7 +18,7 @@ pub(crate) type AtomicGeneration = AtomicU32; /// A way to indicate which pack indices we have seen already and which of them are loaded, along with an idea /// of whether stored `PackId`s are still usable. -#[derive(Default, Copy, Clone)] +#[derive(Default, Copy, Clone, Debug)] pub struct SlotIndexMarker { /// The generation the `loaded_until_index` belongs to. Indices of different generations are completely incompatible. /// This value changes once the internal representation is compacted, something that may happen only if there is no handle @@ -262,7 +262,7 @@ impl IndexAndPacks { } } - /// If we are garbage, put ourselves into the loaded state. Otherwise put ourselves back to unloaded. + /// If we are garbage, put ourselves into the loaded state. Otherwise, put ourselves back to unloaded. pub(crate) fn put_back(&mut self) { match self { IndexAndPacks::Index(bundle) => { diff --git a/src/plumbing/main.rs b/src/plumbing/main.rs index 0f5a146d0ed..49b2c50fe9e 100644 --- a/src/plumbing/main.rs +++ b/src/plumbing/main.rs @@ -1153,7 +1153,7 @@ pub fn main() -> Result<()> { ), }, Subcommands::Odb(cmd) => match cmd { - odb::Subcommands::Stats => prepare_and_run( + odb::Subcommands::Stats { extra_header_lookup } => prepare_and_run( "odb-stats", trace, auto_verbose, @@ -1166,7 +1166,11 @@ pub fn main() -> Result<()> { progress, out, err, - core::repository::odb::statistics::Options { format, thread_limit }, + core::repository::odb::statistics::Options { + format, + thread_limit, + extra_header_lookup, + }, ) }, ), diff --git a/src/plumbing/options/mod.rs b/src/plumbing/options/mod.rs index b5fa3649a68..10d0312bc7b 100644 --- a/src/plumbing/options/mod.rs +++ b/src/plumbing/options/mod.rs @@ -586,7 +586,11 @@ pub mod odb { Info, /// Count and obtain information on all, possibly duplicate, objects in the database. #[clap(visible_alias = "statistics")] - Stats, + Stats { + /// Lookup headers again, but without preloading indices. + #[clap(long)] + extra_header_lookup: bool, + }, } }