Skip to content

Commit

Permalink
fix: more robustness in the face of a trampling-herd of threads loadi…
Browse files Browse the repository at this point in the history
…ng a single index.

The motivating example is here: praetorian-inc/noseyparker#179

Previously, it was possible for a trampling herd of threads to consolidate the
disk state. Most of them would be 'needs-init' threads which could notice that
the initialization already happened, and just use that.

But a thread might be late for the party and somehow manages to not get any
newly loaded index, and thus tries to consolidate with what's on disk again.
Then it would again determine no change, and return nothing, causing the caller
to abort and not find objects it should find because it wouldn't see the index
that it should have seen.

The reason the thread got into this mess is that the 'is-load-ongoing' flagging
was racy itself, so it would not wait for ongoing loads and just conclude nothing
happened. An extra delay (by yielding) now assures it either seees the loading state
and waits for it, sees the newly loaded indices.

Note that this issue can be reproduced with:

```
'./target/release/gix -r repo-with-one-pack -t10 --trace odb stats --extra-header-lookup'
```
  • Loading branch information
Byron committed May 12, 2024
1 parent 7b3dc92 commit addf446
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 63 deletions.
76 changes: 67 additions & 9 deletions gitoxide-core/src/repository/odb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::io;
use std::sync::atomic::Ordering;

use anyhow::bail;

Expand Down Expand Up @@ -50,6 +51,8 @@ pub mod statistics {
pub struct Options {
pub format: OutputFormat,
pub thread_limit: Option<usize>,
/// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded
pub extra_header_lookup: bool,
}
}

Expand All @@ -59,7 +62,11 @@ pub fn statistics(
mut progress: impl gix::Progress,
out: impl io::Write,
mut err: impl io::Write,
statistics::Options { format, thread_limit }: statistics::Options,
statistics::Options {
format,
thread_limit,
extra_header_lookup,
}: statistics::Options,
) -> anyhow::Result<()> {
use bytesize::ByteSize;
use gix::odb::{find, HeaderExt};
Expand All @@ -76,6 +83,10 @@ pub fn statistics(
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[derive(Default)]
struct Statistics {
/// All objects that were used to produce these statistics.
/// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices.
#[cfg_attr(feature = "serde", serde(skip_serializing))]
ids: Option<Vec<gix::ObjectId>>,
total_objects: usize,
loose_objects: usize,
packed_objects: usize,
Expand Down Expand Up @@ -135,14 +146,17 @@ pub fn statistics(
}

impl gix::parallel::Reduce for Reduce {
type Input = Result<Vec<gix::odb::find::Header>, anyhow::Error>;
type Input = Result<Vec<(gix::ObjectId, gix::odb::find::Header)>, anyhow::Error>;
type FeedProduce = ();
type Output = Statistics;
type Error = anyhow::Error;

fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
for item in items? {
for (id, item) in items? {
self.stats.consume(item);
if let Some(ids) = self.stats.ids.as_mut() {
ids.push(id);
}
}
Ok(())
}
Expand All @@ -154,9 +168,9 @@ pub fn statistics(
}

let cancelled = || anyhow::anyhow!("Cancelled by user");
let object_ids = repo.objects.store_ref().iter()?.filter_map(Result::ok);
let object_ids = repo.objects.iter()?.filter_map(Result::ok);
let chunk_size = 1_000;
let stats = if gix::parallel::num_threads(thread_limit) > 1 {
let mut stats = if gix::parallel::num_threads(thread_limit) > 1 {
gix::parallel::in_parallel(
gix::interrupt::Iter::new(
gix::features::iter::Chunks {
Expand All @@ -166,19 +180,30 @@ pub fn statistics(
cancelled,
),
thread_limit,
move |_| (repo.objects.clone().into_inner(), counter),
{
let objects = repo.objects.clone();
move |_| (objects.clone().into_inner(), counter)
},
|ids, (handle, counter)| {
let ids = ids?;
counter.fetch_add(ids.len(), std::sync::atomic::Ordering::Relaxed);
counter.fetch_add(ids.len(), Ordering::Relaxed);
let out = ids
.into_iter()
.map(|id| handle.header(id))
.map(|id| handle.header(id).map(|hdr| (id, hdr)))
.collect::<Result<Vec<_>, _>>()?;
Ok(out)
},
Reduce::default(),
Reduce {
stats: Statistics {
ids: extra_header_lookup.then(Vec::new),
..Default::default()
},
},
)?
} else {
if extra_header_lookup {
bail!("extra-header-lookup is only meaningful in threaded mode");
}
let mut stats = Statistics::default();

for (count, id) in object_ids.enumerate() {
Expand All @@ -193,6 +218,39 @@ pub fn statistics(

progress.show_throughput(start);

if let Some(mut ids) = stats.ids.take() {
// Critical to re-open the repo to assure we don't have any ODB state and start fresh.
let start = std::time::Instant::now();
let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?;
progress.set_name("re-counting".into());
progress.init(Some(ids.len()), gix::progress::count("objects"));
let counter = progress.counter();
counter.store(0, Ordering::Relaxed);
let errors = gix::parallel::in_parallel_with_slice(
&mut ids,
thread_limit,
{
let objects = repo.objects.clone();
move |_| (objects.clone().into_inner(), counter, false)
},
|id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> {
counter.fetch_add(1, Ordering::Relaxed);
if let Err(_err) = odb.header(id) {
*has_error = true;
gix::trace::error!(err = ?_err, "Object that is known to be present wasn't found");
}
Ok(())
},
|| Some(std::time::Duration::from_millis(100)),
|(_, _, has_error)| has_error,
)?;

progress.show_throughput(start);
if errors.contains(&true) {
bail!("At least one object couldn't be looked up even though it must exist");
}
}

#[cfg(feature = "serde")]
{
serde_json::to_writer_pretty(out, &stats)?;
Expand Down
111 changes: 62 additions & 49 deletions gix-odb/src/store_impls/dynamic/load_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
ops::Deref,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU16, AtomicUsize, Ordering},
atomic::{AtomicU16, Ordering},
Arc,
},
time::SystemTime,
Expand Down Expand Up @@ -86,7 +86,7 @@ impl super::Store {
Ok(Some(self.collect_snapshot()))
} else {
// always compare to the latest state
// Nothing changed in the mean time, try to load another index…
// Nothing changed in the meantime, try to load another index…
if self.load_next_index(index) {
Ok(Some(self.collect_snapshot()))
} else {
Expand Down Expand Up @@ -119,7 +119,7 @@ impl super::Store {
let slot = &self.files[index.slot_indices[slot_map_index]];
let _lock = slot.write.lock();
if slot.generation.load(Ordering::SeqCst) > index.generation {
// There is a disk consolidation in progress which just overwrote a slot that cold be disposed with some other
// There is a disk consolidation in progress which just overwrote a slot that could be disposed with some other
// index, one we didn't intend to load.
// Continue with the next slot index in the hope there is something else we can do…
continue 'retry_with_next_slot_index;
Expand All @@ -128,14 +128,18 @@ impl super::Store {
let bundle_mut = Arc::make_mut(&mut bundle);
if let Some(files) = bundle_mut.as_mut() {
// these are always expected to be set, unless somebody raced us. We handle this later by retrying.
let _loaded_count = IncOnDrop(&index.loaded_indices);
match files.load_index(self.object_hash) {
let res = {
let res = files.load_index(self.object_hash);
slot.files.store(bundle);
index.loaded_indices.fetch_add(1, Ordering::SeqCst);
res
};
match res {
Ok(_) => {
slot.files.store(bundle);
break 'retry_with_next_slot_index;
}
Err(_) => {
slot.files.store(bundle);
Err(_err) => {
gix_features::trace::error!(err=?_err, "Failed to load index file - some objects may seem to not exist");
continue 'retry_with_next_slot_index;
}
}
Expand All @@ -145,9 +149,14 @@ impl super::Store {
// There can be contention as many threads start working at the same time and take all the
// slots to load indices for. Some threads might just be left-over and have to wait for something
// to change.
let num_load_operations = index.num_indices_currently_being_loaded.deref();
// TODO: potentially hot loop - could this be a condition variable?
while num_load_operations.load(Ordering::Relaxed) != 0 {
// This is a timing-based fix for the case that the `num_indices_being_loaded` isn't yet incremented,
// and we might break out here without actually waiting for the loading operation. Then we'd fail to
// observe a change and the underlying handler would not have all the indices it needs at its disposal.
// Yielding means we will definitely loose enough time to observe the ongoing operation,
// or its effects.
std::thread::yield_now();
while index.num_indices_currently_being_loaded.load(Ordering::SeqCst) != 0 {
std::thread::yield_now()
}
break 'retry_with_next_slot_index;
Expand Down Expand Up @@ -197,7 +206,7 @@ impl super::Store {

// We might not be able to detect by pointer if the state changed, as this itself is racy. So we keep track of double-initialization
// using a flag, which means that if `needs_init` was true we saw the index uninitialized once, but now that we are here it's
// initialized meaning that somebody was faster and we couldn't detect it by comparisons to the index.
// initialized meaning that somebody was faster, and we couldn't detect it by comparisons to the index.
// If so, make sure we collect the snapshot instead of returning None in case nothing actually changed, which is likely with a
// race like this.
if !was_uninitialized && needs_init {
Expand Down Expand Up @@ -397,18 +406,19 @@ impl super::Store {
// generation stays the same, as it's the same value still but scheduled for eventual removal.
}
} else {
// set the generation before we actually change the value, otherwise readers of old generations could observe the new one.
// We rather want them to turn around here and update their index, which, by that time, might actually already be available.
// If not, they would fail unable to load a pack or index they need, but that's preferred over returning wrong objects.
// Safety: can't race as we hold the lock, have to set the generation beforehand to help avoid others to observe the value.
slot.generation.store(generation, Ordering::SeqCst);
*files_mut = None;
};
slot.files.store(files);
if !needs_stable_indices {
// Not racy due to lock, generation must be set after unsetting the slot value AND storing it.
slot.generation.store(generation, Ordering::SeqCst);
}
}

let new_index = self.index.load();
Ok(if index.state_id() == new_index.state_id() {
// there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops
// there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops.
None
} else {
if load_new_index {
Expand Down Expand Up @@ -619,34 +629,44 @@ impl super::Store {
}

pub(crate) fn collect_snapshot(&self) -> Snapshot {
// We don't observe changes-on-disk in our 'wait-for-load' loop.
// That loop is meant to help assure the marker (which includes the amount of loaded indices) matches
// the actual amount of indices we collect.
let index = self.index.load();
let indices = if index.is_initialized() {
index
.slot_indices
.iter()
.map(|idx| (*idx, &self.files[*idx]))
.filter_map(|(id, file)| {
let lookup = match (**file.files.load()).as_ref()? {
types::IndexAndPacks::Index(bundle) => handle::SingleOrMultiIndex::Single {
index: bundle.index.loaded()?.clone(),
data: bundle.data.loaded().cloned(),
},
types::IndexAndPacks::MultiIndex(multi) => handle::SingleOrMultiIndex::Multi {
index: multi.multi_index.loaded()?.clone(),
data: multi.data.iter().map(|f| f.loaded().cloned()).collect(),
},
};
handle::IndexLookup { file: lookup, id }.into()
})
.collect()
} else {
Vec::new()
};
loop {
if index.num_indices_currently_being_loaded.deref().load(Ordering::SeqCst) != 0 {
std::thread::yield_now();
continue;
}
let marker = index.marker();
let indices = if index.is_initialized() {
index
.slot_indices
.iter()
.map(|idx| (*idx, &self.files[*idx]))
.filter_map(|(id, file)| {
let lookup = match (**file.files.load()).as_ref()? {
types::IndexAndPacks::Index(bundle) => handle::SingleOrMultiIndex::Single {
index: bundle.index.loaded()?.clone(),
data: bundle.data.loaded().cloned(),
},
types::IndexAndPacks::MultiIndex(multi) => handle::SingleOrMultiIndex::Multi {
index: multi.multi_index.loaded()?.clone(),
data: multi.data.iter().map(|f| f.loaded().cloned()).collect(),
},
};
handle::IndexLookup { file: lookup, id }.into()
})
.collect()
} else {
Vec::new()
};

Snapshot {
indices,
loose_dbs: Arc::clone(&index.loose_dbs),
marker: index.marker(),
return Snapshot {
indices,
loose_dbs: Arc::clone(&index.loose_dbs),
marker,
};
}
}
}
Expand All @@ -669,13 +689,6 @@ impl<'a> Drop for IncOnNewAndDecOnDrop<'a> {
}
}

struct IncOnDrop<'a>(&'a AtomicUsize);
impl<'a> Drop for IncOnDrop<'a> {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}

pub(crate) enum Either {
IndexPath(PathBuf),
MultiIndexFile(Arc<gix_pack::multi_index::File>),
Expand Down
4 changes: 2 additions & 2 deletions gix-odb/src/store_impls/dynamic/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) type AtomicGeneration = AtomicU32;

/// A way to indicate which pack indices we have seen already and which of them are loaded, along with an idea
/// of whether stored `PackId`s are still usable.
#[derive(Default, Copy, Clone)]
#[derive(Default, Copy, Clone, Debug)]
pub struct SlotIndexMarker {
/// The generation the `loaded_until_index` belongs to. Indices of different generations are completely incompatible.
/// This value changes once the internal representation is compacted, something that may happen only if there is no handle
Expand Down Expand Up @@ -262,7 +262,7 @@ impl IndexAndPacks {
}
}

/// If we are garbage, put ourselves into the loaded state. Otherwise put ourselves back to unloaded.
/// If we are garbage, put ourselves into the loaded state. Otherwise, put ourselves back to unloaded.
pub(crate) fn put_back(&mut self) {
match self {
IndexAndPacks::Index(bundle) => {
Expand Down
8 changes: 6 additions & 2 deletions src/plumbing/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ pub fn main() -> Result<()> {
),
},
Subcommands::Odb(cmd) => match cmd {
odb::Subcommands::Stats => prepare_and_run(
odb::Subcommands::Stats { extra_header_lookup } => prepare_and_run(
"odb-stats",
trace,
auto_verbose,
Expand All @@ -1166,7 +1166,11 @@ pub fn main() -> Result<()> {
progress,
out,
err,
core::repository::odb::statistics::Options { format, thread_limit },
core::repository::odb::statistics::Options {
format,
thread_limit,
extra_header_lookup,
},
)
},
),
Expand Down
6 changes: 5 additions & 1 deletion src/plumbing/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,11 @@ pub mod odb {
Info,
/// Count and obtain information on all, possibly duplicate, objects in the database.
#[clap(visible_alias = "statistics")]
Stats,
Stats {
/// Lookup headers again, but without preloading indices.
#[clap(long)]
extra_header_lookup: bool,
},
}
}

Expand Down

0 comments on commit addf446

Please sign in to comment.