Skip to content

Commit

Permalink
add an example program to try reproduce odb errors due to parallelism.
Browse files Browse the repository at this point in the history
A motivating example is here: praetorian-inc/noseyparker#179
  • Loading branch information
Byron committed May 11, 2024
1 parent 7b3dc92 commit 95c2ffd
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 14 deletions.
76 changes: 67 additions & 9 deletions gitoxide-core/src/repository/odb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::io;
use std::sync::atomic::Ordering;

use anyhow::bail;

Expand Down Expand Up @@ -50,6 +51,8 @@ pub mod statistics {
pub struct Options {
pub format: OutputFormat,
pub thread_limit: Option<usize>,
/// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded
pub extra_header_lookup: bool,
}
}

Expand All @@ -59,7 +62,11 @@ pub fn statistics(
mut progress: impl gix::Progress,
out: impl io::Write,
mut err: impl io::Write,
statistics::Options { format, thread_limit }: statistics::Options,
statistics::Options {
format,
thread_limit,
extra_header_lookup,
}: statistics::Options,
) -> anyhow::Result<()> {
use bytesize::ByteSize;
use gix::odb::{find, HeaderExt};
Expand All @@ -76,6 +83,10 @@ pub fn statistics(
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[derive(Default)]
struct Statistics {
/// All objects that were used to produce these statistics.
/// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices.
#[cfg_attr(feature = "serde", serde(skip_serializing))]
ids: Option<Vec<gix::ObjectId>>,
total_objects: usize,
loose_objects: usize,
packed_objects: usize,
Expand Down Expand Up @@ -135,14 +146,17 @@ pub fn statistics(
}

impl gix::parallel::Reduce for Reduce {
type Input = Result<Vec<gix::odb::find::Header>, anyhow::Error>;
type Input = Result<Vec<(gix::ObjectId, gix::odb::find::Header)>, anyhow::Error>;
type FeedProduce = ();
type Output = Statistics;
type Error = anyhow::Error;

fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
for item in items? {
for (id, item) in items? {
self.stats.consume(item);
if let Some(ids) = self.stats.ids.as_mut() {
ids.push(id);
}
}
Ok(())
}
Expand All @@ -154,9 +168,9 @@ pub fn statistics(
}

let cancelled = || anyhow::anyhow!("Cancelled by user");
let object_ids = repo.objects.store_ref().iter()?.filter_map(Result::ok);
let object_ids = repo.objects.iter()?.filter_map(Result::ok);
let chunk_size = 1_000;
let stats = if gix::parallel::num_threads(thread_limit) > 1 {
let mut stats = if gix::parallel::num_threads(thread_limit) > 1 {
gix::parallel::in_parallel(
gix::interrupt::Iter::new(
gix::features::iter::Chunks {
Expand All @@ -166,19 +180,30 @@ pub fn statistics(
cancelled,
),
thread_limit,
move |_| (repo.objects.clone().into_inner(), counter),
{
let objects = repo.objects.clone();
move |_| (objects.clone().into_inner(), counter)
},
|ids, (handle, counter)| {
let ids = ids?;
counter.fetch_add(ids.len(), std::sync::atomic::Ordering::Relaxed);
counter.fetch_add(ids.len(), Ordering::Relaxed);
let out = ids
.into_iter()
.map(|id| handle.header(id))
.map(|id| handle.header(id).map(|hdr| (id, hdr)))
.collect::<Result<Vec<_>, _>>()?;
Ok(out)
},
Reduce::default(),
Reduce {
stats: Statistics {
ids: extra_header_lookup.then(Vec::new),
..Default::default()
},
},
)?
} else {
if extra_header_lookup {
bail!("extra-header-lookup is only meaningful in threaded mode");
}
let mut stats = Statistics::default();

for (count, id) in object_ids.enumerate() {
Expand All @@ -193,6 +218,39 @@ pub fn statistics(

progress.show_throughput(start);

if let Some(mut ids) = stats.ids.take() {
// Critical to re-open the repo to assure we don't have any ODB state and start fresh.
let start = std::time::Instant::now();
let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?;
progress.set_name("re-counting".into());
progress.init(Some(ids.len()), gix::progress::count("objects"));
let counter = progress.counter();
counter.store(0, Ordering::Relaxed);
let errors = gix::parallel::in_parallel_with_slice(
&mut ids,
thread_limit,
{
let objects = repo.objects.clone();
move |_| (objects.clone().into_inner(), counter, false)
},
|id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> {
counter.fetch_add(1, Ordering::Relaxed);
if let Err(err) = odb.header(id) {
*has_error = true;
gix::trace::error!(err = ?err, "Object that is known to be present wasn't found");
}
Ok(())
},
|| Some(std::time::Duration::from_millis(100)),
|(_, _, has_error)| has_error,
)?;

progress.show_throughput(start);
if errors.contains(&true) {
bail!("At least one object couldn't be looked up even though it must exist");
}
}

#[cfg(feature = "serde")]
{
serde_json::to_writer_pretty(out, &stats)?;
Expand Down
6 changes: 4 additions & 2 deletions gix-odb/src/store_impls/dynamic/load_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl super::Store {
let slot = &self.files[index.slot_indices[slot_map_index]];
let _lock = slot.write.lock();
if slot.generation.load(Ordering::SeqCst) > index.generation {
// There is a disk consolidation in progress which just overwrote a slot that cold be disposed with some other
// There is a disk consolidation in progress which just overwrote a slot that could be disposed with some other
// index, one we didn't intend to load.
// Continue with the next slot index in the hope there is something else we can do…
continue 'retry_with_next_slot_index;
Expand All @@ -134,7 +134,9 @@ impl super::Store {
slot.files.store(bundle);
break 'retry_with_next_slot_index;
}
Err(_) => {
#[allow(unused_variables)]
Err(err) => {
gix_features::trace::error!(err=?err, "Failed to load index file - some objects may seem to not exist");
slot.files.store(bundle);
continue 'retry_with_next_slot_index;
}
Expand Down
8 changes: 6 additions & 2 deletions src/plumbing/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ pub fn main() -> Result<()> {
),
},
Subcommands::Odb(cmd) => match cmd {
odb::Subcommands::Stats => prepare_and_run(
odb::Subcommands::Stats { extra_header_lookup } => prepare_and_run(
"odb-stats",
trace,
auto_verbose,
Expand All @@ -1166,7 +1166,11 @@ pub fn main() -> Result<()> {
progress,
out,
err,
core::repository::odb::statistics::Options { format, thread_limit },
core::repository::odb::statistics::Options {
format,
thread_limit,
extra_header_lookup,
},
)
},
),
Expand Down
6 changes: 5 additions & 1 deletion src/plumbing/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,11 @@ pub mod odb {
Info,
/// Count and obtain information on all, possibly duplicate, objects in the database.
#[clap(visible_alias = "statistics")]
Stats,
Stats {
/// Lookup headers again, but without preloading indices.
#[clap(long)]
extra_header_lookup: bool,
},
}
}

Expand Down

0 comments on commit 95c2ffd

Please sign in to comment.