Skip to content

Commit

Permalink
Merge branch 'integrate-filtering'
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Jul 9, 2023
2 parents c3ee57b + 8cc106a commit b19a56d
Show file tree
Hide file tree
Showing 107 changed files with 3,408 additions and 547 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 38 additions & 13 deletions gitoxide-core/src/index/checkout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ pub fn checkout_exclusive(
overwrite_existing: false,
keep_going,
thread_limit,
filters: repo
.as_ref()
.and_then(|repo| repo.filter_pipeline(None).ok().map(|t| t.0.into_parts().0))
.unwrap_or_default(),
..Default::default()
};

Expand All @@ -79,6 +83,8 @@ pub fn checkout_exclusive(
collisions,
files_updated,
bytes_written,
delayed_paths_unknown,
delayed_paths_unprocessed,
} = match repo {
Some(repo) => gix::worktree::checkout(
&mut index,
Expand Down Expand Up @@ -129,7 +135,8 @@ pub fn checkout_exclusive(
.then(|| {
format!(
" of {}",
entries_for_checkout.saturating_sub(errors.len() + collisions.len())
entries_for_checkout
.saturating_sub(errors.len() + collisions.len() + delayed_paths_unprocessed.len())
)
})
.unwrap_or_default(),
Expand All @@ -138,20 +145,38 @@ pub fn checkout_exclusive(
.display(bytes_written as usize, None, None)
));

if !(collisions.is_empty() && errors.is_empty()) {
let mut messages = Vec::new();
if !errors.is_empty() {
messages.push(format!("kept going through {} errors(s)", errors.len()));
for record in errors {
writeln!(err, "{}: {}", record.path, record.error).ok();
}
let mut messages = Vec::new();
if !errors.is_empty() {
messages.push(format!("kept going through {} errors(s)", errors.len()));
for record in errors {
writeln!(err, "{}: {}", record.path, record.error).ok();
}
}
if !collisions.is_empty() {
messages.push(format!("encountered {} collision(s)", collisions.len()));
for col in collisions {
writeln!(err, "{}: collision ({:?})", col.path, col.error_kind).ok();
}
if !collisions.is_empty() {
messages.push(format!("encountered {} collision(s)", collisions.len()));
for col in collisions {
writeln!(err, "{}: collision ({:?})", col.path, col.error_kind).ok();
}
}
if !delayed_paths_unknown.is_empty() {
messages.push(format!(
"A delayed process provided us with {} paths we never sent to it",
delayed_paths_unknown.len()
));
for unknown in delayed_paths_unknown {
writeln!(err, "{unknown}: unknown").ok();
}
}
if !delayed_paths_unprocessed.is_empty() {
messages.push(format!(
"A delayed process forgot to process {} paths",
delayed_paths_unprocessed.len()
));
for unprocessed in delayed_paths_unprocessed {
writeln!(err, "{unprocessed}: unprocessed and forgotten").ok();
}
}
if !messages.is_empty() {
bail!(
"One or more errors occurred - checkout is incomplete: {}",
messages.join(", ")
Expand Down
2 changes: 1 addition & 1 deletion gix-attributes/src/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub enum MatchKind {
}

/// The result of a search, containing all matching attributes.
#[derive(Default)]
#[derive(Default, Clone)]
pub struct Outcome {
/// The list of all available attributes, by ascending order. Each slots index corresponds to an attribute with that order, i.e.
/// `arr[attr.id] = <attr info>`.
Expand Down
4 changes: 4 additions & 0 deletions gix-attributes/src/search/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ impl Outcome {
}) {
self.matches_by_id[order].macro_attributes = macro_attributes.clone()
}

for (name, id) in self.selected.iter_mut().filter(|(_, id)| id.is_none()) {
*id = collection.name_to_meta.get(name.as_str()).map(|meta| meta.id)
}
}
self.reset();
}
Expand Down
1 change: 1 addition & 0 deletions gix-attributes/src/search/refmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

pub(crate) type RefMapKey = u64;
#[derive(Clone)]
pub(crate) struct RefMap<T>(BTreeMap<RefMapKey, T>);

impl<T> Default for RefMap<T> {
Expand Down
2 changes: 1 addition & 1 deletion gix-command/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ rust-version = "1.65"
doctest = false

[dependencies]
bstr = "1.3.0"
bstr = { version = "1.5.0", default-features = false, features = ["std"] }

[dev-dependencies]
gix-testtools = { path = "../tests/tools" }
2 changes: 1 addition & 1 deletion gix-config-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ serde = ["dep:serde", "bstr/serde"]
gix-path = { version = "^0.8.3", path = "../gix-path" }

thiserror = "1.0.32"
bstr = "1.0.1"
bstr = { version = "1.0.1", default-features = false, features = ["std"] }
serde = { version = "1.0.114", optional = true, default-features = false, features = ["derive"]}
bitflags = "2"

Expand Down
74 changes: 74 additions & 0 deletions gix-features/src/parallel/in_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,80 @@ where
})
}

/// Read items from `input` and `consume` them in multiple threads,
/// whose output output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
///
/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
/// created by `new_thread_state(…)`.
/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
/// * For `reducer`, see the [`Reduce`] trait
pub fn in_parallel_with_finalize<I, S, O, R>(
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
consume: impl FnMut(I, &mut S) -> O + Send + Clone,
finalize: impl FnOnce(S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
I: Send,
O: Send,
{
let num_threads = num_threads(thread_limit);
std::thread::scope(move |s| {
let receive_result = {
let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
for thread_id in 0..num_threads {
std::thread::Builder::new()
.name(format!("gitoxide.in_parallel.produce.{thread_id}"))
.spawn_scoped(s, {
let send_result = send_result.clone();
let receive_input = receive_input.clone();
let new_thread_state = new_thread_state.clone();
let mut consume = consume.clone();
let finalize = finalize.clone();
move || {
let mut state = new_thread_state(thread_id);
let mut can_send = true;
for item in receive_input {
if send_result.send(consume(item, &mut state)).is_err() {
can_send = false;
break;
}
}
if can_send {
send_result.send(finalize(state)).ok();
}
}
})
.expect("valid name");
}
std::thread::Builder::new()
.name("gitoxide.in_parallel.feed".into())
.spawn_scoped(s, move || {
for item in input {
if send_input.send(item).is_err() {
break;
}
}
})
.expect("valid name");
receive_result
};

for item in receive_result {
drop(reducer.feed(item)?);
}
reducer.finalize()
})
}

/// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
/// This is only good for operations where near-random access isn't detrimental, so it's not usually great
/// for file-io as it won't make use of sorted inputs well.
Expand Down
6 changes: 4 additions & 2 deletions gix-features/src/parallel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
#[cfg(feature = "parallel")]
mod in_parallel;
#[cfg(feature = "parallel")]
pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope};
pub use in_parallel::{
build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope,
};

mod serial;
#[cfg(not(feature = "parallel"))]
pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope};
pub use serial::{build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope};

mod in_order;
pub use in_order::{InOrderIter, SequenceId};
Expand Down
31 changes: 31 additions & 0 deletions gix-features/src/parallel/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,34 @@ where
}
reducer.finalize()
}

/// Read items from `input` and `consume` them in multiple threads,
/// whose output output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
///
/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
/// created by `new_thread_state(…)`.
/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
/// * For `reducer`, see the [`Reduce`] trait
#[cfg(not(feature = "parallel"))]
pub fn in_parallel_with_finalize<I, S, O, R>(
input: impl Iterator<Item = I>,
_thread_limit: Option<usize>,
new_thread_state: impl FnOnce(usize) -> S,
mut consume: impl FnMut(I, &mut S) -> O,
finalize: impl FnOnce(S) -> O + Send + Clone,
mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
R: Reduce<Input = O>,
{
let mut state = new_thread_state(0);
for item in input {
drop(reducer.feed(consume(item, &mut state))?);
}
reducer.feed(finalize(state))?;
reducer.finalize()
}
3 changes: 3 additions & 0 deletions gix-filter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ gix-command = { version = "^0.2.6", path = "../gix-command" }
gix-quote = { version = "^0.4.5", path = "../gix-quote" }
gix-path = { version = "^0.8.3", path = "../gix-path" }
gix-packetline = { package = "gix-packetline-blocking", version = "^0.16.3", path = "../gix-packetline-blocking" }
gix-attributes = { version = "0.14.1", path = "../gix-attributes" }

encoding_rs = "0.8.32"
bstr = { version = "1.5.0", default-features = false, features = ["std"] }
thiserror = "1.0.38"
smallvec = "1.10.0"


[dev-dependencies]
once_cell = "1.18.0"
gix-testtools = { path = "../tests/tools" }
gix-worktree = { version = "0.21.1", path = "../gix-worktree" }
File renamed without changes.
25 changes: 15 additions & 10 deletions gix-filter/src/driver/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use bstr::{BStr, BString};
use std::collections::HashMap;

/// What to do if delay is supported by a process filter.
#[derive(Debug, Copy, Clone)]
#[derive(Default, Debug, Copy, Clone)]
pub enum Delay {
/// Use delayed processing for this entry.
///
/// Note that it's up to the filter to determine whether or not the processing should be delayed.
#[default]
Allow,
/// Do not delay the processing, and force it to happen immediately. In this case, no delayed processing will occur
/// even if the filter supports it.
///
/// This is the default as it requires no special precautions to be taken by the caller as
/// outputs will be produced immediately.
Forbid,
}

Expand Down Expand Up @@ -40,11 +44,11 @@ pub enum Error {

/// Additional information for use in the [`State::apply()`] method.
#[derive(Debug, Copy, Clone)]
pub struct Context<'a> {
pub struct Context<'a, 'b> {
/// The repo-relative using slashes as separator of the entry currently being processed.
pub rela_path: &'a BStr,
/// The name of the reference that `HEAD` is pointing to. It's passed to `process` filters if present.
pub ref_name: Option<&'a BStr>,
pub ref_name: Option<&'b BStr>,
/// The root-level tree that contains the current entry directly or indirectly, or the commit owning the tree (if available).
///
/// This is passed to `process` filters if present.
Expand All @@ -58,6 +62,7 @@ pub struct Context<'a> {
impl State {
/// Apply `operation` of `driver` to the bytes read from `src` and return a reader to immediately consume the output
/// produced by the filter. `rela_path` is the repo-relative path of the entry to handle.
/// It's possible that the filter stays inactive, in which case the `src` isn't consumed and has to be used by the caller.
///
/// Each call to this method will cause the corresponding filter to be invoked unless `driver` indicates a `process` filter,
/// which is only launched once and maintained using this state.
Expand All @@ -74,9 +79,9 @@ impl State {
pub fn apply<'a>(
&'a mut self,
driver: &Driver,
src: impl std::io::Read,
src: &mut impl std::io::Read,
operation: Operation,
ctx: Context<'_>,
ctx: Context<'_, '_>,
) -> Result<Option<Box<dyn std::io::Read + 'a>>, Error> {
match self.apply_delayed(driver, src, operation, Delay::Forbid, ctx)? {
Some(MaybeDelayed::Delayed(_)) => {
Expand All @@ -94,14 +99,14 @@ impl State {
pub fn apply_delayed<'a>(
&'a mut self,
driver: &Driver,
mut src: impl std::io::Read,
src: &mut impl std::io::Read,
operation: Operation,
delay: Delay,
ctx: Context<'_>,
ctx: Context<'_, '_>,
) -> Result<Option<MaybeDelayed<'a>>, Error> {
match self.process(driver, operation, ctx.rela_path)? {
match self.maybe_launch_process(driver, operation, ctx.rela_path)? {
Some(Process::SingleFile { mut child, command }) => {
std::io::copy(&mut src, &mut child.stdin.take().expect("configured"))?;
std::io::copy(src, &mut child.stdin.take().expect("configured"))?;
Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput {
inner: child.stdout.take(),
child: driver.required.then_some((child, command)),
Expand Down Expand Up @@ -179,7 +184,7 @@ impl State {
}
}

/// A utility type to represent delayed or immediate apply-filter results.
/// A type to represent delayed or immediate apply-filter results.
pub enum MaybeDelayed<'a> {
/// Using the delayed protocol, this entry has been sent to a long-running process and needs to be
/// checked for again, later, using the [`driver::Key`] to refer to the filter who owes a response.
Expand Down
Loading

0 comments on commit b19a56d

Please sign in to comment.