diff --git a/Cargo.lock b/Cargo.lock index 026015829d..f20177722c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + [[package]] name = "base64" version = "0.20.0" @@ -78,6 +84,30 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.8.16" @@ -224,6 +254,7 @@ name = "ignore" version = "0.4.20" dependencies = [ "crossbeam-channel", + "crossbeam-deque", "globset", "lazy_static", "log", @@ -309,6 +340,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -434,6 +474,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.183" diff --git a/crates/ignore/Cargo.toml b/crates/ignore/Cargo.toml index a9495aa334..c659bb2d42 100644 --- a/crates/ignore/Cargo.toml +++ b/crates/ignore/Cargo.toml @@ -19,6 +19,7 @@ name = "ignore" bench = false [dependencies] +crossbeam-deque = "0.8.3" globset = { version = "0.4.10", path = "../globset" } lazy_static = "1.1" log = "0.4.5" diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs index 1f7d06e55c..450d674407 100644 --- a/crates/ignore/src/walk.rs +++ b/crates/ignore/src/walk.rs @@ -3,13 +3,15 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, FileType, Metadata}; use std::io; +use std::iter; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread; use std::time::Duration; use std::vec; +use crossbeam_deque::{Stealer, Worker as Deque}; use same_file::Handle; use walkdir::{self, WalkDir}; @@ -1228,9 +1230,8 @@ impl WalkParallel { /// can be merged together into a single data structure. pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) { let threads = self.threads(); - let stack = Arc::new(Mutex::new(vec![])); + let mut stack = vec![]; { - let mut stack = stack.lock().unwrap(); let mut visitor = builder.build(); let mut paths = Vec::new().into_iter(); std::mem::swap(&mut paths, &mut self.paths); @@ -1280,14 +1281,14 @@ impl WalkParallel { } // Create the workers and then wait for them to finish. let quit_now = Arc::new(AtomicBool::new(false)); - let num_pending = - Arc::new(AtomicUsize::new(stack.lock().unwrap().len())); + let num_pending = Arc::new(AtomicUsize::new(stack.len())); + let stacks = Stack::make_stacks(threads, stack); std::thread::scope(|s| { - let mut handles = vec![]; - for _ in 0..threads { - let worker = Worker { + let handles: Vec<_> = stacks + .into_iter() + .map(|stack| Worker { visitor: builder.build(), - stack: stack.clone(), + stack, quit_now: quit_now.clone(), num_pending: num_pending.clone(), max_depth: self.max_depth, @@ -1295,9 +1296,9 @@ impl WalkParallel { follow_links: self.follow_links, skip: self.skip.clone(), filter: self.filter.clone(), - }; - handles.push(s.spawn(|| worker.run())); - } + }) + .map(|worker| s.spawn(|| worker.run())) + .collect(); for handle in handles { handle.join().unwrap(); } @@ -1387,6 +1388,70 @@ impl Work { } } +/// A work-stealing stack. +struct Stack { + /// This thread's index. + index: usize, + /// The thread-local stack. + deque: Deque, + /// The work stealers. + stealers: Arc<[Stealer]>, +} + +impl Stack { + /// Create a stack for each thread. + fn make_stacks(threads: usize, msgs: Vec) -> Vec { + // Use new_lifo() to ensure each worker operates depth-first, not breadth-first + let deques: Vec<_> = + iter::repeat_with(Deque::new_lifo).take(threads).collect(); + + let stealers: Vec<_> = deques.iter().map(Deque::stealer).collect(); + let stealers: Arc<[_]> = stealers.into(); + + let stacks: Vec<_> = deques + .into_iter() + .enumerate() + .map(|(index, deque)| Self { + index, + deque, + stealers: stealers.clone(), + }) + .collect(); + + // Distribute the initial messages round-robin amongst the stacks + msgs.into_iter() + .zip(stacks.iter().cycle()) + .for_each(|(m, s)| s.push(m)); + + stacks + } + + /// Push a message. + fn push(&self, msg: Message) { + self.deque.push(msg); + } + + /// Pop a message. + fn pop(&self) -> Option { + self.deque.pop().or_else(|| self.steal()) + } + + /// Steal a message from another queue. + fn steal(&self) -> Option { + // For fairness, try to steal from index - 1, then index - 2, ... 0, + // then wrap around to len - 1, len - 2, ... index + 1. + let (left, right) = self.stealers.split_at(self.index); + // Don't steal from ourselves + let right = &right[1..]; + + left.iter() + .rev() + .chain(right.iter().rev()) + .map(|s| s.steal_batch_and_pop(&self.deque)) + .find_map(|s| s.success()) + } +} + /// A worker is responsible for descending into directories, updating the /// ignore matchers, producing new work and invoking the caller's callback. /// @@ -1400,7 +1465,7 @@ struct Worker<'s> { /// directories in depth first order. This can substantially reduce peak /// memory usage by keeping both the number of files path and gitignore /// matchers in memory lower. - stack: Arc>>, + stack: Stack, /// Whether all workers should terminate at the next opportunity. Note /// that we need this because we don't want other `Work` to be done after /// we quit. We wouldn't need this if have a priority channel. @@ -1665,20 +1730,17 @@ impl<'s> Worker<'s> { /// Send work. fn send(&self, work: Work) { self.num_pending.fetch_add(1, Ordering::SeqCst); - let mut stack = self.stack.lock().unwrap(); - stack.push(Message::Work(work)); + self.stack.push(Message::Work(work)); } /// Send a quit message. fn send_quit(&self) { - let mut stack = self.stack.lock().unwrap(); - stack.push(Message::Quit); + self.stack.push(Message::Quit); } /// Receive work. fn recv(&self) -> Option { - let mut stack = self.stack.lock().unwrap(); - stack.pop() + self.stack.pop() } /// Signal that work has been finished.