From 0af5077e1f028c1c69bbdc098bb567e486282c37 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 27 Nov 2021 16:23:26 +0800 Subject: [PATCH] =?UTF-8?q?A=20mad=20attempt=20to=20use=20thread-local=20e?= =?UTF-8?q?verywhere=20and=20avoid=20Sync=E2=80=A6=20(#263)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …which seems to work except for one caveat: Progress now demands to be Sync even though there is no need and it shouldn't have to think that. Have to go back and try to fix that, as it ultimately bubbles up to every method that uses such method generically, including trait bounds which really shouldn't have to be that strict. --- Cargo.toml | 2 +- cargo-features.md | 5 +-- git-features/Cargo.toml | 6 +-- git-features/src/parallel/mod.rs | 23 ++++++++++ git-features/src/threading.rs | 4 +- git-pack/src/bundle/mod.rs | 3 +- git-pack/src/bundle/write/mod.rs | 37 +++++++++++----- git-pack/src/cache/delta/traverse/mod.rs | 42 +++++++++++-------- git-pack/src/cache/delta/traverse/resolve.rs | 6 +-- .../src/data/output/entry/iter_from_counts.rs | 2 +- git-pack/src/index/traverse/indexed.rs | 5 ++- git-pack/src/index/traverse/mod.rs | 5 ++- git-pack/src/index/traverse/reduce.rs | 11 ++--- git-pack/src/index/traverse/with_lookup.rs | 29 +++++++------ git-pack/src/index/verify.rs | 3 +- git-pack/src/index/write/mod.rs | 8 ++-- git-pack/tests/pack/index.rs | 7 ++-- git-ref/Cargo.toml | 2 +- gitoxide-core/src/pack/explode.rs | 10 +++-- gitoxide-core/src/pack/index.rs | 11 +++-- gitoxide-core/src/pack/receive.rs | 12 ++++-- gitoxide-core/src/pack/verify.rs | 6 ++- src/plumbing/pretty/options.rs | 1 + 23 files changed, 155 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 463ae6c57b0..1a0b3786619 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ gitoxide-core-tools = ["gitoxide-core/organize", "gitoxide-core/estimate-hours"] gitoxide-core-blocking-client = ["gitoxide-core/blocking-client"] gitoxide-core-async-client = ["gitoxide-core/async-client", "futures-lite"] http-client-curl = ["git-transport-for-configuration-only/http-client-curl"] -fast = ["git-features/threading", "git-features/parallel", "git-features/fast-sha1", "git-features/zlib-ng-compat"] +fast = ["git-features/parallel", "git-features/fast-sha1", "git-features/zlib-ng-compat"] pretty-cli = ["clap", "gitoxide-core/serde1", diff --git a/cargo-features.md b/cargo-features.md index beabae0d81f..07ac5460a73 100644 --- a/cargo-features.md +++ b/cargo-features.md @@ -106,9 +106,8 @@ All feature toggles are additive. * Use scoped threads and channels to parallelize common workloads on multiple objects. If enabled, it is used everywhere where it makes sense. * As caches are likely to be used and instantiated per thread, more memory will be used on top of the costs for threads. -* **threading** - * If set, the `threading` module will contain thread-safe primitives for shared ownership and mutation, otherwise these will be their single threaded counterparts. - * This way, single-threaded applications don't have to pay for threaded primitives. + * The `threading` module will contain thread-safe primitives for shared ownership and mutation, otherwise these will be their single threaded counterparts. + * This way, single-threaded applications don't have to pay for threaded primitives. * **crc32** * provide a proven and fast `crc32` implementation. * **io-pipe** diff --git a/git-features/Cargo.toml b/git-features/Cargo.toml index e1c4ebddd6d..d07ee447e47 100644 --- a/git-features/Cargo.toml +++ b/git-features/Cargo.toml @@ -13,9 +13,8 @@ test = false [features] default = [] -threading = ["parking_lot"] progress = ["prodash"] -parallel = ["crossbeam-utils", "crossbeam-channel", "num_cpus", "jwalk"] +parallel = ["crossbeam-utils", "crossbeam-channel", "num_cpus", "jwalk", "parking_lot"] fast-sha1 = ["sha-1"] io-pipe = ["bytes"] crc32 = ["crc32fast"] @@ -53,13 +52,12 @@ required-features = ["io-pipe"] git-hash = { version ="^0.8.0", path = "../git-hash" } -# 'threading' feature -parking_lot = { version = "0.11.0", default-features = false, optional = true } # 'parallel' feature crossbeam-utils = { version = "0.8.5", optional = true } crossbeam-channel = { version = "0.5.0", optional = true } num_cpus = { version = "1.13.0", optional = true } +parking_lot = { version = "0.11.0", default-features = false, optional = true } jwalk = { version = "0.6.0", optional = true } walkdir = { version = "2.3.1", optional = true } # used when parallel is off diff --git a/git-features/src/parallel/mod.rs b/git-features/src/parallel/mod.rs index 9ad5af7decc..c191e4227ec 100644 --- a/git-features/src/parallel/mod.rs +++ b/git-features/src/parallel/mod.rs @@ -129,6 +129,7 @@ fn num_threads(thread_limit: Option) -> usize { /// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. /// /// For parameters, see the documentation of [`in_parallel()`] +#[cfg(feature = "parallel")] pub fn in_parallel_if( condition: impl FnOnce() -> bool, input: impl Iterator + Send, @@ -149,6 +150,28 @@ where } } +/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. +/// +/// For parameters, see the documentation of [`in_parallel()`] +/// +/// Note that the non-parallel version is equivalent to [`in_parallel()`]. +#[cfg(not(feature = "parallel"))] +pub fn in_parallel_if( + _condition: impl FnOnce() -> bool, + input: impl Iterator, + thread_limit: Option, + new_thread_state: impl Fn(usize) -> S, + consume: impl Fn(I, &mut S) -> O, + reducer: R, +) -> Result<::Output, ::Error> +where + R: Reduce, + I: Send, + O: Send, +{ + serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer) +} + /// pub mod reduce; pub use reduce::Reduce; diff --git a/git-features/src/threading.rs b/git-features/src/threading.rs index 1300647cb55..10c7a1605d4 100644 --- a/git-features/src/threading.rs +++ b/git-features/src/threading.rs @@ -2,7 +2,7 @@ //! //! That way, single-threaded applications will not have to use thread-safe primitives, and simply do not specify the 'threading' feature. -#[cfg(feature = "threading")] +#[cfg(feature = "parallel")] mod _impl { use std::sync::Arc; @@ -36,7 +36,7 @@ mod _impl { } } -#[cfg(not(feature = "threading"))] +#[cfg(not(feature = "parallel"))] mod _impl { use std::{ cell::{Ref, RefCell, RefMut}, diff --git a/git-pack/src/bundle/mod.rs b/git-pack/src/bundle/mod.rs index 2c169ddfa69..504607d5f69 100644 --- a/git-pack/src/bundle/mod.rs +++ b/git-pack/src/bundle/mod.rs @@ -41,7 +41,7 @@ mod verify { &self, verify_mode: crate::index::verify::Mode, traversal: crate::index::traverse::Algorithm, - make_pack_lookup_cache: impl Fn() -> C + Send + Sync, + make_pack_lookup_cache: impl Fn() -> C + Send + Clone, thread_limit: Option, progress: Option

, should_interrupt: Arc, @@ -51,6 +51,7 @@ mod verify { > where P: Progress, +

::SubProgress: Sync, C: crate::cache::DecodeEntry, { self.index.verify_integrity( diff --git a/git-pack/src/bundle/write/mod.rs b/git-pack/src/bundle/write/mod.rs index fc506065d65..1b424aa5f8e 100644 --- a/git-pack/src/bundle/write/mod.rs +++ b/git-pack/src/bundle/write/mod.rs @@ -37,14 +37,19 @@ impl crate::Bundle { /// * the resulting pack may be empty, that is, contains zero objects in some situations. This is a valid reply by a server and should /// be accounted for. /// - Empty packs always have the same name and not handling this case will result in at most one superfluous pack. - pub fn write_to_directory( + pub fn write_to_directory

( pack: impl io::BufRead, directory: Option>, - mut progress: impl Progress, + mut progress: P, should_interrupt: &AtomicBool, thin_pack_base_object_lookup_fn: Option, options: Options, - ) -> Result { + ) -> Result + where + P: Progress + Sync, +

::SubProgress: Sync, + <

::SubProgress as Progress>::SubProgress: Sync, + { let mut read_progress = progress.add_child("read pack"); read_progress.init(None, progress::bytes()); let pack = progress::Read { @@ -131,15 +136,20 @@ impl crate::Bundle { /// As it sends portions of the input to a thread it requires the 'static lifetime for the interrupt flags. This can only /// be satisfied by a static AtomicBool which is only suitable for programs that only run one of these operations at a time /// or don't mind that all of them abort when the flag is set. - pub fn write_to_directory_eagerly( + pub fn write_to_directory_eagerly

( pack: impl io::Read + Send + 'static, pack_size: Option, directory: Option>, - mut progress: impl Progress, + mut progress: P, should_interrupt: &'static AtomicBool, thin_pack_base_object_lookup_fn: Option, options: Options, - ) -> Result { + ) -> Result + where + P: Progress + Sync, +

::SubProgress: Sync, + <

::SubProgress as Progress>::SubProgress: Sync, + { let mut read_progress = progress.add_child("read pack"); read_progress.init(pack_size.map(|s| s as usize), progress::bytes()); let pack = progress::Read { @@ -212,9 +222,9 @@ impl crate::Bundle { }) } - fn inner_write( + fn inner_write

( directory: Option>, - mut progress: impl Progress, + mut progress: P, Options { thread_limit, iteration_mode: _, @@ -223,7 +233,12 @@ impl crate::Bundle { data_file: Arc>>, pack_entries_iter: impl Iterator>, should_interrupt: &AtomicBool, - ) -> Result<(crate::index::write::Outcome, Option, Option), Error> { + ) -> Result<(crate::index::write::Outcome, Option, Option), Error> + where + P: Progress + Sync, +

::SubProgress: Sync, + <

::SubProgress as Progress>::SubProgress: Sync, + { let indexing_progress = progress.add_child("create index file"); Ok(match directory { Some(directory) => { @@ -280,8 +295,8 @@ impl crate::Bundle { fn new_pack_file_resolver( data_file: Arc>>, -) -> io::Result) -> Option<()> + Send + Sync> { - let mapped_file = FileBuffer::open(data_file.lock().with_mut(|f| f.path().to_owned())?)?; +) -> io::Result) -> Option<()> + Send + Clone> { + let mapped_file = Arc::new(FileBuffer::open(data_file.lock().with_mut(|f| f.path().to_owned())?)?); let pack_data_lookup = move |range: std::ops::Range, out: &mut Vec| -> Option<()> { mapped_file .get(range.start as usize..range.end as usize) diff --git a/git-pack/src/cache/delta/traverse/mod.rs b/git-pack/src/cache/delta/traverse/mod.rs index d46e1809c67..7fbaa9afde1 100644 --- a/git-pack/src/cache/delta/traverse/mod.rs +++ b/git-pack/src/cache/delta/traverse/mod.rs @@ -3,6 +3,7 @@ use std::{ sync::atomic::{AtomicBool, Ordering}, }; +use git_features::threading::{get_mut, MutableOnDemand, OwnShared}; use git_features::{ parallel, parallel::in_parallel_if, @@ -82,33 +83,38 @@ where thread_limit: Option, should_interrupt: &AtomicBool, pack_entries_end: u64, - new_thread_state: impl Fn() -> S + Send + Sync, + new_thread_state: impl Fn() -> S + Send + Clone, inspect_object: MBFN, ) -> Result>, Error> where - F: for<'r> Fn(EntryRange, &'r mut Vec) -> Option<()> + Send + Sync, - P: Progress + Send, - MBFN: Fn(&mut T, &mut

::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Sync, + F: for<'r> Fn(EntryRange, &'r mut Vec) -> Option<()> + Send + Clone, + P: Progress + Send + Sync, + MBFN: Fn(&mut T, &mut

::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Clone, E: std::error::Error + Send + Sync + 'static, { self.set_pack_entries_end(pack_entries_end); let (chunk_size, thread_limit, _) = parallel::optimize_chunk_size_and_thread_limit(1, None, thread_limit, None); - let object_progress = parking_lot::Mutex::new(object_progress); + let object_progress = OwnShared::new(MutableOnDemand::new(object_progress)); let num_objects = self.items.len(); in_parallel_if( should_run_in_parallel, self.iter_root_chunks(chunk_size), thread_limit, - |thread_index| { - ( - Vec::::with_capacity(4096), - object_progress.lock().add_child(format!("thread {}", thread_index)), - new_thread_state(), - ) + { + let object_progress = object_progress.clone(); + move |thread_index| { + ( + Vec::::with_capacity(4096), + get_mut(&object_progress).add_child(format!("thread {}", thread_index)), + new_thread_state(), + resolve.clone(), + inspect_object.clone(), + ) + } }, - |root_nodes, state| resolve::deltas(root_nodes, state, &resolve, &inspect_object), - Reducer::new(num_objects, &object_progress, size_progress, should_interrupt), + move |root_nodes, state| resolve::deltas(root_nodes, state), + Reducer::new(num_objects, object_progress, size_progress, should_interrupt), )?; Ok(self.into_items()) } @@ -116,7 +122,7 @@ where struct Reducer<'a, P> { item_count: usize, - progress: &'a parking_lot::Mutex

, + progress: OwnShared>, start: std::time::Instant, size_progress: P, should_interrupt: &'a AtomicBool, @@ -128,11 +134,11 @@ where { pub fn new( num_objects: usize, - progress: &'a parking_lot::Mutex

, + progress: OwnShared>, mut size_progress: P, should_interrupt: &'a AtomicBool, ) -> Self { - progress.lock().init(Some(num_objects), progress::count("objects")); + get_mut(&progress).init(Some(num_objects), progress::count("objects")); size_progress.init(None, progress::bytes()); Reducer { item_count: 0, @@ -157,7 +163,7 @@ where let (num_objects, decompressed_size) = input?; self.item_count += num_objects; self.size_progress.inc_by(decompressed_size as usize); - self.progress.lock().set(self.item_count); + get_mut(&self.progress).set(self.item_count); if self.should_interrupt.load(Ordering::SeqCst) { return Err(Error::Interrupted); } @@ -165,7 +171,7 @@ where } fn finalize(mut self) -> Result { - self.progress.lock().show_throughput(self.start); + get_mut(&self.progress).show_throughput(self.start); self.size_progress.show_throughput(self.start); Ok(()) } diff --git a/git-pack/src/cache/delta/traverse/resolve.rs b/git-pack/src/cache/delta/traverse/resolve.rs index 4e5eadfd422..e4de792b899 100644 --- a/git-pack/src/cache/delta/traverse/resolve.rs +++ b/git-pack/src/cache/delta/traverse/resolve.rs @@ -12,12 +12,10 @@ use crate::{ pub(crate) fn deltas( nodes: crate::cache::delta::Chunk<'_, T>, - (bytes_buf, ref mut progress, state): &mut (Vec, P, S), - resolve: F, - modify_base: MBFN, + (bytes_buf, ref mut progress, state, resolve, modify_base): &mut (Vec, P, S, F, MBFN), ) -> Result<(usize, u64), Error> where - F: for<'r> Fn(EntryRange, &'r mut Vec) -> Option<()> + Send + Sync, + F: for<'r> Fn(EntryRange, &'r mut Vec) -> Option<()> + Send + Clone, P: Progress, MBFN: Fn(&mut T, &mut P, Context<'_, S>) -> Result<(), E>, T: Default, diff --git a/git-pack/src/data/output/entry/iter_from_counts.rs b/git-pack/src/data/output/entry/iter_from_counts.rs index 06b9660799d..428526c1162 100644 --- a/git-pack/src/data/output/entry/iter_from_counts.rs +++ b/git-pack/src/data/output/entry/iter_from_counts.rs @@ -42,7 +42,7 @@ use crate::data::{output, output::ChunkId}; pub fn iter_from_counts( mut counts: Vec, db: Find, - make_cache: impl Fn() -> Cache + Send + Clone + Sync + 'static, + make_cache: impl Fn() -> Cache + Send + Clone + 'static, mut progress: impl Progress, Options { version, diff --git a/git-pack/src/index/traverse/indexed.rs b/git-pack/src/index/traverse/indexed.rs index 8c6a58a871a..53dd350adca 100644 --- a/git-pack/src/index/traverse/indexed.rs +++ b/git-pack/src/index/traverse/indexed.rs @@ -24,13 +24,14 @@ impl index::File { &self, check: SafetyCheck, thread_limit: Option, - new_processor: impl Fn() -> Processor + Send + Sync, + new_processor: impl Fn() -> Processor + Send + Clone, mut progress: P, pack: &crate::data::File, should_interrupt: Arc, ) -> Result<(git_hash::ObjectId, index::traverse::Outcome, P), Error> where P: Progress, +

::SubProgress: Sync, Processor: FnMut( git_object::Kind, &[u8], @@ -78,7 +79,7 @@ impl index::File { thread_limit, &should_interrupt, pack.pack_end() as u64, - || new_processor(), + move || new_processor(), |data, progress, Context { diff --git a/git-pack/src/index/traverse/mod.rs b/git-pack/src/index/traverse/mod.rs index cd7d89ecb1a..2c282f9978b 100644 --- a/git-pack/src/index/traverse/mod.rs +++ b/git-pack/src/index/traverse/mod.rs @@ -79,8 +79,8 @@ impl index::File { &self, pack: &crate::data::File, progress: Option

, - new_processor: impl Fn() -> Processor + Send + Sync, - new_cache: impl Fn() -> C + Send + Sync, + new_processor: impl Fn() -> Processor + Send + Clone, + new_cache: impl Fn() -> C + Send + Clone, Options { algorithm, thread_limit, @@ -90,6 +90,7 @@ impl index::File { ) -> Result<(git_hash::ObjectId, Outcome, Option

), Error> where P: Progress, +

::SubProgress: Sync, C: crate::cache::DecodeEntry, E: std::error::Error + Send + Sync + 'static, Processor: FnMut( diff --git a/git-pack/src/index/traverse/reduce.rs b/git-pack/src/index/traverse/reduce.rs index db7c183063a..c7a15f8740a 100644 --- a/git-pack/src/index/traverse/reduce.rs +++ b/git-pack/src/index/traverse/reduce.rs @@ -3,6 +3,7 @@ use std::{ time::Instant, }; +use git_features::threading::{get_mut, MutableOnDemand, OwnShared}; use git_features::{parallel, progress::Progress}; use crate::{data, index::traverse}; @@ -24,7 +25,7 @@ fn div_decode_result(lhs: &mut data::decode_entry::Outcome, div: usize) { } pub struct Reducer<'a, P, E> { - progress: &'a parking_lot::Mutex

, + progress: OwnShared>, check: traverse::SafetyCheck, then: Instant, entries_seen: usize, @@ -38,7 +39,7 @@ where P: Progress, { pub fn from_progress( - progress: &'a parking_lot::Mutex

, + progress: OwnShared>, pack_data_len_in_bytes: usize, check: traverse::SafetyCheck, should_interrupt: &'a AtomicBool, @@ -72,7 +73,7 @@ where fn feed(&mut self, input: Self::Input) -> Result<(), Self::Error> { let chunk_stats: Vec<_> = match input { Err(err @ traverse::Error::PackDecode { .. }) if !self.check.fatal_decode_error() => { - self.progress.lock().info(format!("Ignoring decode error: {}", err)); + get_mut(&self.progress).info(format!("Ignoring decode error: {}", err)); return Ok(()); } res => res, @@ -99,7 +100,7 @@ where ); add_decode_result(&mut self.stats.average, chunk_total); - self.progress.lock().set(self.entries_seen); + get_mut(&self.progress).set(self.entries_seen); if self.should_interrupt.load(Ordering::SeqCst) { return Err(Self::Error::Interrupted); @@ -113,7 +114,7 @@ where let elapsed_s = self.then.elapsed().as_secs_f32(); let objects_per_second = (self.entries_seen as f32 / elapsed_s) as u32; - self.progress.lock().info(format!( + get_mut(&self.progress).info(format!( "of {} objects done in {:.2}s ({} objects/s, ~{}/s)", self.entries_seen, elapsed_s, diff --git a/git-pack/src/index/traverse/with_lookup.rs b/git-pack/src/index/traverse/with_lookup.rs index 7c30c030d7b..e2d149bd4ae 100644 --- a/git-pack/src/index/traverse/with_lookup.rs +++ b/git-pack/src/index/traverse/with_lookup.rs @@ -36,6 +36,7 @@ mod options { } } } +use git_features::threading::{get_mut, MutableOnDemand, OwnShared}; use std::sync::atomic::Ordering; pub use options::Options; @@ -48,8 +49,8 @@ impl index::File { /// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method. pub fn traverse_with_lookup( &self, - new_processor: impl Fn() -> Processor + Send + Sync, - new_cache: impl Fn() -> C + Send + Sync, + new_processor: impl Fn() -> Processor + Send + Clone, + new_cache: impl Fn() -> C + Send + Clone, mut progress: P, pack: &crate::data::File, Options { @@ -62,6 +63,7 @@ impl index::File { P: Progress, C: crate::cache::DecodeEntry, E: std::error::Error + Send + Sync + 'static, +

::SubProgress: Send + Sync, Processor: FnMut( git_object::Kind, &[u8], @@ -96,18 +98,21 @@ impl index::File { parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None); let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores; let input_chunks = index_entries.chunks(chunk_size.max(chunk_size)); - let reduce_progress = parking_lot::Mutex::new({ + let reduce_progress = OwnShared::new(MutableOnDemand::new({ let mut p = progress.add_child("Traversing"); p.init(Some(self.num_objects() as usize), progress::count("objects")); p - }); - let state_per_thread = |index| { - ( - new_cache(), - new_processor(), - Vec::with_capacity(2048), // decode buffer - reduce_progress.lock().add_child(format!("thread {}", index)), // per thread progress - ) + })); + let state_per_thread = { + let reduce_progress = reduce_progress.clone(); + move |index| { + ( + new_cache(), + new_processor(), + Vec::with_capacity(2048), // decode buffer + get_mut(&reduce_progress).add_child(format!("thread {}", index)), // per thread progress + ) + } }; in_parallel_if( @@ -148,7 +153,7 @@ impl index::File { } Ok(stats) }, - Reducer::from_progress(&reduce_progress, pack.data_len(), check, &should_interrupt), + Reducer::from_progress(reduce_progress, pack.data_len(), check, &should_interrupt), ) }, ); diff --git a/git-pack/src/index/verify.rs b/git-pack/src/index/verify.rs index b847de69dbb..ce41670aa56 100644 --- a/git-pack/src/index/verify.rs +++ b/git-pack/src/index/verify.rs @@ -122,7 +122,7 @@ impl index::File { &crate::data::File, Mode, index::traverse::Algorithm, - impl Fn() -> C + Send + Sync, + impl Fn() -> C + Send + Clone, )>, thread_limit: Option, progress: Option

, @@ -133,6 +133,7 @@ impl index::File { > where P: Progress, +

::SubProgress: Sync, C: crate::cache::DecodeEntry, { let mut root = progress::DoOrDiscard::from(progress); diff --git a/git-pack/src/index/write/mod.rs b/git-pack/src/index/write/mod.rs index 4b0ee94bbcd..380cde6e0c0 100644 --- a/git-pack/src/index/write/mod.rs +++ b/git-pack/src/index/write/mod.rs @@ -55,18 +55,20 @@ impl crate::index::File { /// provides all bytes belonging to a pack entry writing them to the given mutable output `Vec`. /// It should return `None` if the entry cannot be resolved from the pack that produced the `entries` iterator, causing /// the write operation to fail. - pub fn write_data_iter_to_stream( + pub fn write_data_iter_to_stream( kind: crate::index::Version, make_resolver: F, entries: impl Iterator>, thread_limit: Option, - mut root_progress: impl Progress, + mut root_progress: P, out: impl io::Write, should_interrupt: &AtomicBool, ) -> Result where F: FnOnce() -> io::Result, - F2: for<'r> Fn(crate::data::EntryRange, &'r mut Vec) -> Option<()> + Send + Sync, + P: Progress + Sync, +

::SubProgress: Sync, + F2: for<'r> Fn(crate::data::EntryRange, &'r mut Vec) -> Option<()> + Send + Clone, { if kind != crate::index::Version::default() { return Err(Error::Unsupported(kind)); diff --git a/git-pack/tests/pack/index.rs b/git-pack/tests/pack/index.rs index b4ba6711932..c6ee1600e0f 100644 --- a/git-pack/tests/pack/index.rs +++ b/git-pack/tests/pack/index.rs @@ -85,7 +85,8 @@ mod file { for compressed in &[input::EntryDataMode::Crc32, input::EntryDataMode::KeepAndCrc32] { for (index_path, data_path) in V2_PACKS_AND_INDICES { let resolve = { - let buf = FileBuffer::open(fixture_path(data_path))?; + let buf = + git_features::threading::OwnShared::new(FileBuffer::open(fixture_path(data_path))?); move |entry: EntryRange, out: &mut Vec| { buf.get(entry.start as usize..entry.end as usize) .map(|slice| out.copy_from_slice(slice)) @@ -106,7 +107,7 @@ mod file { resolve: F, ) -> Result<(), Box> where - F: Fn(pack::data::EntryRange, &mut Vec) -> Option<()> + Send + Sync, + F: Fn(pack::data::EntryRange, &mut Vec) -> Option<()> + Send + Clone, { let pack_iter = pack::data::input::BytesToEntriesIter::new_from_header( io::BufReader::new(fs::File::open(fixture_path(data_path))?), @@ -119,7 +120,7 @@ mod file { let num_objects = pack_iter.len() as u32; let outcome = pack::index::File::write_data_iter_to_stream( desired_kind, - || Ok(resolve), + move || Ok(resolve), pack_iter, None, progress::Discard, diff --git a/git-ref/Cargo.toml b/git-ref/Cargo.toml index 272c8cf6edd..a4fb849f2ac 100644 --- a/git-ref/Cargo.toml +++ b/git-ref/Cargo.toml @@ -14,7 +14,7 @@ test = true [features] serde1 = ["serde", "git-hash/serde1", "git-actor/serde1", "git-object/serde1"] -internal-testing-git-features-parallel = ["git-features/parallel", "git-features/threading"] # test sorted parallel loose file traversal +internal-testing-git-features-parallel = ["git-features/parallel"] # test sorted parallel loose file traversal [[test]] name = "refs-parallel-fs-traversal" diff --git a/gitoxide-core/src/pack/explode.rs b/gitoxide-core/src/pack/explode.rs index c342a060ffb..7b5223edb94 100644 --- a/gitoxide-core/src/pack/explode.rs +++ b/gitoxide-core/src/pack/explode.rs @@ -154,11 +154,11 @@ pub struct Context { pub should_interrupt: Arc, } -pub fn pack_or_pack_index( +pub fn pack_or_pack_index

( pack_path: impl AsRef, object_path: Option>, check: SafetyCheck, - progress: Option, + progress: Option

, Context { thread_limit, delete_pack, @@ -166,7 +166,11 @@ pub fn pack_or_pack_index( verify, should_interrupt, }: Context, -) -> Result<()> { +) -> Result<()> +where + P: Progress + Sync, +

::SubProgress: Sync, +{ use anyhow::Context; let path = pack_path.as_ref(); diff --git a/gitoxide-core/src/pack/index.rs b/gitoxide-core/src/pack/index.rs index fb560759f04..1cce4f0e361 100644 --- a/gitoxide-core/src/pack/index.rs +++ b/gitoxide-core/src/pack/index.rs @@ -74,12 +74,17 @@ pub enum PathOrRead { Read(Box), } -pub fn from_pack( +pub fn from_pack

( pack: PathOrRead, directory: Option, - progress: impl Progress, + progress: P, ctx: Context<'static, impl io::Write>, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +where + P: Progress + Sync, +

::SubProgress: Sync, + <

::SubProgress as Progress>::SubProgress: Sync, +{ use anyhow::Context; let options = pack::bundle::write::Options { thread_limit: ctx.thread_limit, diff --git a/gitoxide-core/src/pack/receive.rs b/gitoxide-core/src/pack/receive.rs index a13f67f4f7e..cfc3df98fe4 100644 --- a/gitoxide-core/src/pack/receive.rs +++ b/gitoxide-core/src/pack/receive.rs @@ -14,6 +14,7 @@ use git_repository::{ transport, transport::client::Capabilities, }, + Progress, }; use crate::{remote::refs::JsonRef, OutputFormat}; @@ -317,14 +318,19 @@ fn write_raw_refs(refs: &[Ref], directory: PathBuf) -> std::io::Result<()> { Ok(()) } -fn receive_pack_blocking( +fn receive_pack_blocking( mut directory: Option, mut refs_directory: Option, ctx: &mut Context, input: impl io::BufRead, - progress: impl git_repository::Progress, + progress: P, refs: &[Ref], -) -> io::Result<()> { +) -> io::Result<()> +where + P: Progress + Sync, +

::SubProgress: Sync, + <

::SubProgress as Progress>::SubProgress: Sync, +{ let options = pack::bundle::write::Options { thread_limit: ctx.thread_limit, index_kind: pack::index::Version::V2, diff --git a/gitoxide-core/src/pack/verify.rs b/gitoxide-core/src/pack/verify.rs index b30b1f33561..995c32063e9 100644 --- a/gitoxide-core/src/pack/verify.rs +++ b/gitoxide-core/src/pack/verify.rs @@ -104,9 +104,9 @@ impl pack::cache::DecodeEntry for EitherCache { } } -pub fn pack_or_pack_index( +pub fn pack_or_pack_index( path: impl AsRef, - progress: Option, + progress: Option

, Context { mut out, mut err, @@ -120,6 +120,8 @@ pub fn pack_or_pack_index( where W1: io::Write, W2: io::Write, + P: Progress + Sync, +

::SubProgress: Sync, { let path = path.as_ref(); let ext = path.extension().and_then(|ext| ext.to_str()).ok_or_else(|| { diff --git a/src/plumbing/pretty/options.rs b/src/plumbing/pretty/options.rs index 9968fa0797a..cf3977ec517 100644 --- a/src/plumbing/pretty/options.rs +++ b/src/plumbing/pretty/options.rs @@ -1,6 +1,7 @@ use std::{ffi::OsString, path::PathBuf}; use clap::AppSettings; + use gitoxide_core as core; #[derive(Debug, clap::Parser)]