Skip to content

Commit

Permalink
A mad attempt to use thread-local everywhere and avoid Sync… (#263)
Browse files Browse the repository at this point in the history
…which seems to work except for one caveat: Progress now demands to be
Sync even though there is no need and it shouldn't have to think that.

Have to go back and try to fix that, as it ultimately bubbles up to
every method that uses such method generically, including trait
bounds which really shouldn't have to be that strict.
  • Loading branch information
Byron committed Nov 27, 2021
1 parent 82ea1b8 commit 0af5077
Show file tree
Hide file tree
Showing 23 changed files with 155 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ gitoxide-core-tools = ["gitoxide-core/organize", "gitoxide-core/estimate-hours"]
gitoxide-core-blocking-client = ["gitoxide-core/blocking-client"]
gitoxide-core-async-client = ["gitoxide-core/async-client", "futures-lite"]
http-client-curl = ["git-transport-for-configuration-only/http-client-curl"]
fast = ["git-features/threading", "git-features/parallel", "git-features/fast-sha1", "git-features/zlib-ng-compat"]
fast = ["git-features/parallel", "git-features/fast-sha1", "git-features/zlib-ng-compat"]

pretty-cli = ["clap",
"gitoxide-core/serde1",
Expand Down
5 changes: 2 additions & 3 deletions cargo-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ All feature toggles are additive.
* Use scoped threads and channels to parallelize common workloads on multiple objects. If enabled, it is used everywhere
where it makes sense.
* As caches are likely to be used and instantiated per thread, more memory will be used on top of the costs for threads.
* **threading**
* If set, the `threading` module will contain thread-safe primitives for shared ownership and mutation, otherwise these will be their single threaded counterparts.
* This way, single-threaded applications don't have to pay for threaded primitives.
* The `threading` module will contain thread-safe primitives for shared ownership and mutation, otherwise these will be their single threaded counterparts.
* This way, single-threaded applications don't have to pay for threaded primitives.
* **crc32**
* provide a proven and fast `crc32` implementation.
* **io-pipe**
Expand Down
6 changes: 2 additions & 4 deletions git-features/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ test = false

[features]
default = []
threading = ["parking_lot"]
progress = ["prodash"]
parallel = ["crossbeam-utils", "crossbeam-channel", "num_cpus", "jwalk"]
parallel = ["crossbeam-utils", "crossbeam-channel", "num_cpus", "jwalk", "parking_lot"]
fast-sha1 = ["sha-1"]
io-pipe = ["bytes"]
crc32 = ["crc32fast"]
Expand Down Expand Up @@ -53,13 +52,12 @@ required-features = ["io-pipe"]
git-hash = { version ="^0.8.0", path = "../git-hash" }


# 'threading' feature
parking_lot = { version = "0.11.0", default-features = false, optional = true }

# 'parallel' feature
crossbeam-utils = { version = "0.8.5", optional = true }
crossbeam-channel = { version = "0.5.0", optional = true }
num_cpus = { version = "1.13.0", optional = true }
parking_lot = { version = "0.11.0", default-features = false, optional = true }

jwalk = { version = "0.6.0", optional = true }
walkdir = { version = "2.3.1", optional = true } # used when parallel is off
Expand Down
23 changes: 23 additions & 0 deletions git-features/src/parallel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ fn num_threads(thread_limit: Option<usize>) -> usize {
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
///
/// For parameters, see the documentation of [`in_parallel()`]
#[cfg(feature = "parallel")]
pub fn in_parallel_if<I, S, O, R>(
condition: impl FnOnce() -> bool,
input: impl Iterator<Item = I> + Send,
Expand All @@ -149,6 +150,28 @@ where
}
}

/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
///
/// For parameters, see the documentation of [`in_parallel()`]
///
/// Note that the non-parallel version is equivalent to [`in_parallel()`].
#[cfg(not(feature = "parallel"))]
pub fn in_parallel_if<I, S, O, R>(
_condition: impl FnOnce() -> bool,
input: impl Iterator<Item = I>,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S,
consume: impl Fn(I, &mut S) -> O,
reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
I: Send,
O: Send,
{
serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
}

///
pub mod reduce;
pub use reduce::Reduce;
4 changes: 2 additions & 2 deletions git-features/src/threading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! That way, single-threaded applications will not have to use thread-safe primitives, and simply do not specify the 'threading' feature.

#[cfg(feature = "threading")]
#[cfg(feature = "parallel")]
mod _impl {
use std::sync::Arc;

Expand Down Expand Up @@ -36,7 +36,7 @@ mod _impl {
}
}

#[cfg(not(feature = "threading"))]
#[cfg(not(feature = "parallel"))]
mod _impl {
use std::{
cell::{Ref, RefCell, RefMut},
Expand Down
3 changes: 2 additions & 1 deletion git-pack/src/bundle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mod verify {
&self,
verify_mode: crate::index::verify::Mode,
traversal: crate::index::traverse::Algorithm,
make_pack_lookup_cache: impl Fn() -> C + Send + Sync,
make_pack_lookup_cache: impl Fn() -> C + Send + Clone,
thread_limit: Option<usize>,
progress: Option<P>,
should_interrupt: Arc<AtomicBool>,
Expand All @@ -51,6 +51,7 @@ mod verify {
>
where
P: Progress,
<P as Progress>::SubProgress: Sync,
C: crate::cache::DecodeEntry,
{
self.index.verify_integrity(
Expand Down
37 changes: 26 additions & 11 deletions git-pack/src/bundle/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ impl crate::Bundle {
/// * the resulting pack may be empty, that is, contains zero objects in some situations. This is a valid reply by a server and should
/// be accounted for.
/// - Empty packs always have the same name and not handling this case will result in at most one superfluous pack.
pub fn write_to_directory(
pub fn write_to_directory<P>(
pack: impl io::BufRead,
directory: Option<impl AsRef<Path>>,
mut progress: impl Progress,
mut progress: P,
should_interrupt: &AtomicBool,
thin_pack_base_object_lookup_fn: Option<ThinPackLookupFn>,
options: Options,
) -> Result<Outcome, Error> {
) -> Result<Outcome, Error>
where
P: Progress + Sync,
<P as Progress>::SubProgress: Sync,
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
{
let mut read_progress = progress.add_child("read pack");
read_progress.init(None, progress::bytes());
let pack = progress::Read {
Expand Down Expand Up @@ -131,15 +136,20 @@ impl crate::Bundle {
/// As it sends portions of the input to a thread it requires the 'static lifetime for the interrupt flags. This can only
/// be satisfied by a static AtomicBool which is only suitable for programs that only run one of these operations at a time
/// or don't mind that all of them abort when the flag is set.
pub fn write_to_directory_eagerly(
pub fn write_to_directory_eagerly<P>(
pack: impl io::Read + Send + 'static,
pack_size: Option<u64>,
directory: Option<impl AsRef<Path>>,
mut progress: impl Progress,
mut progress: P,
should_interrupt: &'static AtomicBool,
thin_pack_base_object_lookup_fn: Option<ThinPackLookupFnSend>,
options: Options,
) -> Result<Outcome, Error> {
) -> Result<Outcome, Error>
where
P: Progress + Sync,
<P as Progress>::SubProgress: Sync,
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
{
let mut read_progress = progress.add_child("read pack");
read_progress.init(pack_size.map(|s| s as usize), progress::bytes());
let pack = progress::Read {
Expand Down Expand Up @@ -212,9 +222,9 @@ impl crate::Bundle {
})
}

fn inner_write(
fn inner_write<P>(
directory: Option<impl AsRef<Path>>,
mut progress: impl Progress,
mut progress: P,
Options {
thread_limit,
iteration_mode: _,
Expand All @@ -223,7 +233,12 @@ impl crate::Bundle {
data_file: Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>,
pack_entries_iter: impl Iterator<Item = Result<data::input::Entry, data::input::Error>>,
should_interrupt: &AtomicBool,
) -> Result<(crate::index::write::Outcome, Option<PathBuf>, Option<PathBuf>), Error> {
) -> Result<(crate::index::write::Outcome, Option<PathBuf>, Option<PathBuf>), Error>
where
P: Progress + Sync,
<P as Progress>::SubProgress: Sync,
<<P as Progress>::SubProgress as Progress>::SubProgress: Sync,
{
let indexing_progress = progress.add_child("create index file");
Ok(match directory {
Some(directory) => {
Expand Down Expand Up @@ -280,8 +295,8 @@ impl crate::Bundle {

fn new_pack_file_resolver(
data_file: Arc<parking_lot::Mutex<git_tempfile::Handle<Writable>>>,
) -> io::Result<impl Fn(data::EntryRange, &mut Vec<u8>) -> Option<()> + Send + Sync> {
let mapped_file = FileBuffer::open(data_file.lock().with_mut(|f| f.path().to_owned())?)?;
) -> io::Result<impl Fn(data::EntryRange, &mut Vec<u8>) -> Option<()> + Send + Clone> {
let mapped_file = Arc::new(FileBuffer::open(data_file.lock().with_mut(|f| f.path().to_owned())?)?);
let pack_data_lookup = move |range: std::ops::Range<u64>, out: &mut Vec<u8>| -> Option<()> {
mapped_file
.get(range.start as usize..range.end as usize)
Expand Down
42 changes: 24 additions & 18 deletions git-pack/src/cache/delta/traverse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
sync::atomic::{AtomicBool, Ordering},
};

use git_features::threading::{get_mut, MutableOnDemand, OwnShared};
use git_features::{
parallel,
parallel::in_parallel_if,
Expand Down Expand Up @@ -82,41 +83,46 @@ where
thread_limit: Option<usize>,
should_interrupt: &AtomicBool,
pack_entries_end: u64,
new_thread_state: impl Fn() -> S + Send + Sync,
new_thread_state: impl Fn() -> S + Send + Clone,
inspect_object: MBFN,
) -> Result<VecDeque<Item<T>>, Error>
where
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Sync,
P: Progress + Send,
MBFN: Fn(&mut T, &mut <P as Progress>::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Sync,
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
P: Progress + Send + Sync,
MBFN: Fn(&mut T, &mut <P as Progress>::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Clone,
E: std::error::Error + Send + Sync + 'static,
{
self.set_pack_entries_end(pack_entries_end);
let (chunk_size, thread_limit, _) = parallel::optimize_chunk_size_and_thread_limit(1, None, thread_limit, None);
let object_progress = parking_lot::Mutex::new(object_progress);
let object_progress = OwnShared::new(MutableOnDemand::new(object_progress));

let num_objects = self.items.len();
in_parallel_if(
should_run_in_parallel,
self.iter_root_chunks(chunk_size),
thread_limit,
|thread_index| {
(
Vec::<u8>::with_capacity(4096),
object_progress.lock().add_child(format!("thread {}", thread_index)),
new_thread_state(),
)
{
let object_progress = object_progress.clone();
move |thread_index| {
(
Vec::<u8>::with_capacity(4096),
get_mut(&object_progress).add_child(format!("thread {}", thread_index)),
new_thread_state(),
resolve.clone(),
inspect_object.clone(),
)
}
},
|root_nodes, state| resolve::deltas(root_nodes, state, &resolve, &inspect_object),
Reducer::new(num_objects, &object_progress, size_progress, should_interrupt),
move |root_nodes, state| resolve::deltas(root_nodes, state),
Reducer::new(num_objects, object_progress, size_progress, should_interrupt),
)?;
Ok(self.into_items())
}
}

struct Reducer<'a, P> {
item_count: usize,
progress: &'a parking_lot::Mutex<P>,
progress: OwnShared<MutableOnDemand<P>>,
start: std::time::Instant,
size_progress: P,
should_interrupt: &'a AtomicBool,
Expand All @@ -128,11 +134,11 @@ where
{
pub fn new(
num_objects: usize,
progress: &'a parking_lot::Mutex<P>,
progress: OwnShared<MutableOnDemand<P>>,
mut size_progress: P,
should_interrupt: &'a AtomicBool,
) -> Self {
progress.lock().init(Some(num_objects), progress::count("objects"));
get_mut(&progress).init(Some(num_objects), progress::count("objects"));
size_progress.init(None, progress::bytes());
Reducer {
item_count: 0,
Expand All @@ -157,15 +163,15 @@ where
let (num_objects, decompressed_size) = input?;
self.item_count += num_objects;
self.size_progress.inc_by(decompressed_size as usize);
self.progress.lock().set(self.item_count);
get_mut(&self.progress).set(self.item_count);
if self.should_interrupt.load(Ordering::SeqCst) {
return Err(Error::Interrupted);
}
Ok(())
}

fn finalize(mut self) -> Result<Self::Output, Self::Error> {
self.progress.lock().show_throughput(self.start);
get_mut(&self.progress).show_throughput(self.start);
self.size_progress.show_throughput(self.start);
Ok(())
}
Expand Down
6 changes: 2 additions & 4 deletions git-pack/src/cache/delta/traverse/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ use crate::{

pub(crate) fn deltas<T, F, P, MBFN, S, E>(
nodes: crate::cache::delta::Chunk<'_, T>,
(bytes_buf, ref mut progress, state): &mut (Vec<u8>, P, S),
resolve: F,
modify_base: MBFN,
(bytes_buf, ref mut progress, state, resolve, modify_base): &mut (Vec<u8>, P, S, F, MBFN),
) -> Result<(usize, u64), Error>
where
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Sync,
F: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
P: Progress,
MBFN: Fn(&mut T, &mut P, Context<'_, S>) -> Result<(), E>,
T: Default,
Expand Down
2 changes: 1 addition & 1 deletion git-pack/src/data/output/entry/iter_from_counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::data::{output, output::ChunkId};
pub fn iter_from_counts<Find, Cache>(
mut counts: Vec<output::Count>,
db: Find,
make_cache: impl Fn() -> Cache + Send + Clone + Sync + 'static,
make_cache: impl Fn() -> Cache + Send + Clone + 'static,
mut progress: impl Progress,
Options {
version,
Expand Down
5 changes: 3 additions & 2 deletions git-pack/src/index/traverse/indexed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ impl index::File {
&self,
check: SafetyCheck,
thread_limit: Option<usize>,
new_processor: impl Fn() -> Processor + Send + Sync,
new_processor: impl Fn() -> Processor + Send + Clone,
mut progress: P,
pack: &crate::data::File,
should_interrupt: Arc<AtomicBool>,
) -> Result<(git_hash::ObjectId, index::traverse::Outcome, P), Error<E>>
where
P: Progress,
<P as Progress>::SubProgress: Sync,
Processor: FnMut(
git_object::Kind,
&[u8],
Expand Down Expand Up @@ -78,7 +79,7 @@ impl index::File {
thread_limit,
&should_interrupt,
pack.pack_end() as u64,
|| new_processor(),
move || new_processor(),
|data,
progress,
Context {
Expand Down
5 changes: 3 additions & 2 deletions git-pack/src/index/traverse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ impl index::File {
&self,
pack: &crate::data::File,
progress: Option<P>,
new_processor: impl Fn() -> Processor + Send + Sync,
new_cache: impl Fn() -> C + Send + Sync,
new_processor: impl Fn() -> Processor + Send + Clone,
new_cache: impl Fn() -> C + Send + Clone,
Options {
algorithm,
thread_limit,
Expand All @@ -90,6 +90,7 @@ impl index::File {
) -> Result<(git_hash::ObjectId, Outcome, Option<P>), Error<E>>
where
P: Progress,
<P as Progress>::SubProgress: Sync,
C: crate::cache::DecodeEntry,
E: std::error::Error + Send + Sync + 'static,
Processor: FnMut(
Expand Down
Loading

0 comments on commit 0af5077

Please sign in to comment.