diff --git a/rayon-demo/src/find/mod.rs b/rayon-demo/src/find/mod.rs index 12e257f08..b3a07e4b2 100644 --- a/rayon-demo/src/find/mod.rs +++ b/rayon-demo/src/find/mod.rs @@ -1,4 +1,4 @@ -/// Simple benchmarks of `find_any()` performance +/// Simple benchmarks of `find_any()` and `find_first` performance macro_rules! make_tests { ($n:expr, $m:ident) => { @@ -21,48 +21,229 @@ macro_rules! make_tests { .collect() }); + // this is a very dumb find_first algorithm. + // no early aborts so we have a linear best case cost. + fn find_dumb bool + Send + Sync>( + iter: I, + cond: P, + ) -> Option { + iter.map(|e| if cond(&e) { Some(e) } else { None }) + .reduce(|| None, |left, right| left.or(right)) + } + + #[bench] + fn parallel_find_any_start(b: &mut Bencher) { + let needle = HAYSTACK[0][0]; + b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some())); + } + #[bench] - fn parallel_find_first(b: &mut Bencher) { + fn parallel_find_first_start(b: &mut Bencher) { let needle = HAYSTACK[0][0]; b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some())); } #[bench] - fn serial_find_first(b: &mut Bencher) { + fn parallel_find_first_blocks_start(b: &mut Bencher) { + let needle = HAYSTACK[0][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .by_exponential_blocks() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } + + #[bench] + fn serial_find_start(b: &mut Bencher) { let needle = HAYSTACK[0][0]; b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some())); } #[bench] - fn parallel_find_last(b: &mut Bencher) { + fn parallel_find_any_end(b: &mut Bencher) { let needle = HAYSTACK[HAYSTACK.len() - 1][0]; b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some())); } + #[bench] + fn parallel_find_first_end(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() - 1][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } + #[bench] + fn parallel_find_first_blocks_end(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() - 1][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .by_exponential_blocks() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } #[bench] - fn serial_find_last(b: &mut Bencher) { + fn serial_find_end(b: &mut Bencher) { let needle = HAYSTACK[HAYSTACK.len() - 1][0]; b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some())); } #[bench] - fn parallel_find_middle(b: &mut Bencher) { - let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0]; + fn parallel_find_any_third(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() / 3][0]; + b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some())); + } + + #[bench] + fn parallel_find_first_third(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() / 3][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } + + #[bench] + fn parallel_find_dumb_third(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() / 3][0]; + b.iter( + || assert!(find_dumb(HAYSTACK.par_iter(), (|&&x| x[0] == needle)).is_some()), + ); + } + + #[bench] + fn parallel_find_first_blocks_third(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() / 3][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .by_exponential_blocks() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } + + #[bench] + fn serial_find_third(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() / 3][0]; + b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some())); + } + + #[bench] + fn parallel_find_any_middle(b: &mut Bencher) { + let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0]; b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some())); } + #[bench] + fn parallel_find_first_middle(b: &mut Bencher) { + let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } + + #[bench] + fn parallel_find_dumb_middle(b: &mut Bencher) { + let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0]; + b.iter( + || assert!(find_dumb(HAYSTACK.par_iter(), (|&&x| x[0] == needle)).is_some()), + ); + } + + #[bench] + fn parallel_find_first_blocks_middle(b: &mut Bencher) { + let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .by_exponential_blocks() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } + #[bench] fn serial_find_middle(b: &mut Bencher) { + let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0]; + b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some())); + } + + #[bench] + fn parallel_find_any_two_thirds(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0]; + b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some())); + } + + #[bench] + fn parallel_find_first_two_thirds(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } + + #[bench] + fn parallel_find_first_blocks_two_thirds(b: &mut Bencher) { + let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0]; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .by_exponential_blocks() + .find_first(|&&x| x[0] == needle) + .is_some()) + }); + } + + #[bench] + fn serial_find_two_thirds(b: &mut Bencher) { let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0]; b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some())); } #[bench] - fn parallel_find_missing(b: &mut Bencher) { + fn parallel_find_any_missing(b: &mut Bencher) { let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1; b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_none())); } + #[bench] + fn parallel_find_first_missing(b: &mut Bencher) { + let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .find_first(|&&x| x[0] == needle) + .is_none()) + }); + } + + #[bench] + fn parallel_find_first_blocks_missing(b: &mut Bencher) { + let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1; + b.iter(|| { + assert!(HAYSTACK + .par_iter() + .by_exponential_blocks() + .find_first(|&&x| x[0] == needle) + .is_none()) + }); + } + #[bench] fn serial_find_missing(b: &mut Bencher) { let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1; @@ -70,7 +251,7 @@ macro_rules! make_tests { } #[bench] - fn parallel_find_common(b: &mut Bencher) { + fn parallel_find_any_common(b: &mut Bencher) { b.iter(|| { assert!(HAYSTACK .par_iter() diff --git a/src/compile_fail/must_use.rs b/src/compile_fail/must_use.rs index b4b3d771c..da03b8f61 100644 --- a/src/compile_fail/must_use.rs +++ b/src/compile_fail/must_use.rs @@ -30,6 +30,8 @@ macro_rules! must_use { } must_use! { + by_exponential_blocks /** v.par_iter().by_exponential_blocks(); */ + by_uniform_blocks /** v.par_iter().by_uniform_blocks(2); */ step_by /** v.par_iter().step_by(2); */ chain /** v.par_iter().chain(&v); */ chunks /** v.par_iter().chunks(2); */ diff --git a/src/iter/blocks.rs b/src/iter/blocks.rs new file mode 100644 index 000000000..0d4e06c15 --- /dev/null +++ b/src/iter/blocks.rs @@ -0,0 +1,131 @@ +use super::plumbing::*; +use super::*; + +struct BlocksCallback { + sizes: S, + consumer: C, + len: usize, +} + +impl ProducerCallback for BlocksCallback +where + C: UnindexedConsumer, + S: Iterator, +{ + type Output = C::Result; + + fn callback>(mut self, mut producer: P) -> Self::Output { + let mut remaining_len = self.len; + let mut consumer = self.consumer; + + // we need a local variable for the accumulated results + // we call the reducer's identity by splitting at 0 + let (left_consumer, right_consumer, _) = consumer.split_at(0); + let mut leftmost_res = left_consumer.into_folder().complete(); + consumer = right_consumer; + + // now we loop on each block size + while remaining_len > 0 && !consumer.full() { + // we compute the next block's size + let size = self.sizes.next().unwrap_or(std::usize::MAX); + let capped_size = remaining_len.min(size); + remaining_len -= capped_size; + + // split the producer + let (left_producer, right_producer) = producer.split_at(capped_size); + producer = right_producer; + + // split the consumer + let (left_consumer, right_consumer, _) = consumer.split_at(capped_size); + consumer = right_consumer; + + leftmost_res = consumer.to_reducer().reduce( + leftmost_res, + bridge_producer_consumer(capped_size, left_producer, left_consumer), + ); + } + leftmost_res + } +} + +/// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence +/// of parallel blocks of increasing sizes (exponentially). +/// +/// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`] +/// +/// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks +/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html +#[must_use = "iterator adaptors are lazy and do nothing unless consumed"] +#[derive(Debug, Clone)] +pub struct ExponentialBlocks { + base: I, +} + +impl ExponentialBlocks { + pub(super) fn new(base: I) -> Self { + Self { base } + } +} + +impl ParallelIterator for ExponentialBlocks +where + I: IndexedParallelIterator, +{ + type Item = I::Item; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + let first = crate::current_num_threads(); + let callback = BlocksCallback { + consumer, + sizes: std::iter::successors(Some(first), exponential_size), + len: self.base.len(), + }; + self.base.with_producer(callback) + } +} + +fn exponential_size(size: &usize) -> Option { + Some(size.saturating_mul(2)) +} + +/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence +/// of parallel blocks of constant sizes. +/// +/// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`] +/// +/// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks +/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html +#[must_use = "iterator adaptors are lazy and do nothing unless consumed"] +#[derive(Debug, Clone)] +pub struct UniformBlocks { + base: I, + block_size: usize, +} + +impl UniformBlocks { + pub(super) fn new(base: I, block_size: usize) -> Self { + Self { base, block_size } + } +} + +impl ParallelIterator for UniformBlocks +where + I: IndexedParallelIterator, +{ + type Item = I::Item; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + let callback = BlocksCallback { + consumer, + sizes: std::iter::repeat(self.block_size), + len: self.base.len(), + }; + self.base.with_producer(callback) + } +} diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 8ddc9bab4..d2890aab0 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -103,6 +103,7 @@ mod test; // e.g. `find::find()`, are always used **prefixed**, so that they // can be readily distinguished. +mod blocks; mod chain; mod chunks; mod cloned; @@ -161,6 +162,7 @@ mod zip; mod zip_eq; pub use self::{ + blocks::{ExponentialBlocks, UniformBlocks}, chain::Chain, chunks::Chunks, cloned::Cloned, @@ -1678,6 +1680,9 @@ pub trait ParallelIterator: Sized + Send { /// will be stopped, while attempts to the left must continue in case /// an earlier match is found. /// + /// For added performance, you might consider using `find_first` in conjunction with + /// [`by_exponential_blocks()`][IndexedParallelIterator::by_exponential_blocks]. + /// /// Note that not all parallel iterators have a useful order, much like /// sequential `HashMap` iteration, so "first" may be nebulous. If you /// just want the first match that discovered anywhere in the iterator, @@ -2444,6 +2449,70 @@ impl IntoParallelIterator for T { // Waiting for `ExactSizeIterator::is_empty` to be stabilized. See rust-lang/rust#35428 #[allow(clippy::len_without_is_empty)] pub trait IndexedParallelIterator: ParallelIterator { + /// Divides an iterator into sequential blocks of exponentially-increasing size. + /// + /// Normally, parallel iterators are recursively divided into tasks in parallel. + /// This adaptor changes the default behavior by splitting the iterator into a **sequence** + /// of parallel iterators of increasing sizes. + /// Sizes grow exponentially in order to avoid creating + /// too many blocks. This also allows to balance the current block with all previous ones. + /// + /// This can have many applications but the most notable ones are: + /// - better performance with [`find_first()`][ParallelIterator::find_first] + /// - more predictable performance with [`find_any()`][ParallelIterator::find_any] + /// or any interruptible computation + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// assert_eq!((0..10_000).into_par_iter() + /// .by_exponential_blocks() + /// .find_first(|&e| e==4_999), Some(4_999)) + /// ``` + /// + /// In this example, without blocks, rayon will split the initial range into two but all work + /// on the right hand side (from 5,000 onwards) is **useless** since the sequential algorithm + /// never goes there. This means that if two threads are used there will be **no** speedup **at + /// all**. + /// + /// `by_exponential_blocks` on the other hand will start with the leftmost range from 0 + /// to `p` (threads number), continue with p to 3p, the 3p to 7p... + /// + /// Each subrange is treated in parallel, while all subranges are treated sequentially. + /// We therefore ensure a logarithmic number of blocks (and overhead) while guaranteeing + /// we stop at the first block containing the searched data. + fn by_exponential_blocks(self) -> ExponentialBlocks { + ExponentialBlocks::new(self) + } + + /// Divides an iterator into sequential blocks of the given size. + /// + /// Normally, parallel iterators are recursively divided into tasks in parallel. + /// This adaptor changes the default behavior by splitting the iterator into a **sequence** + /// of parallel iterators of given `block_size`. + /// The main application is to obtain better + /// memory locality (especially if the reduce operation re-use folded data). + /// + /// **Panics** if `block_size` is 0. + /// + /// # Example + /// ``` + /// use rayon::prelude::*; + /// // during most reductions v1 and v2 fit the cache + /// let v = (0u32..10_000_000) + /// .into_par_iter() + /// .by_uniform_blocks(1_000_000) + /// .fold(Vec::new, |mut v, e| { v.push(e); v}) + /// .reduce(Vec::new, |mut v1, mut v2| { v1.append(&mut v2); v1}); + /// assert_eq!(v, (0u32..10_000_000).collect::>()); + /// ``` + #[track_caller] + fn by_uniform_blocks(self, block_size: usize) -> UniformBlocks { + assert!(block_size != 0, "block_size must not be zero"); + UniformBlocks::new(self, block_size) + } + /// Collects the results of the iterator into the specified /// vector. The vector is always cleared before execution /// begins. If possible, reusing the vector across calls can lead @@ -2609,6 +2678,8 @@ pub trait IndexedParallelIterator: ParallelIterator { /// [`par_chunks()`]: ../slice/trait.ParallelSlice.html#method.par_chunks /// [`par_chunks_mut()`]: ../slice/trait.ParallelSliceMut.html#method.par_chunks_mut /// + /// **Panics** if `chunk_size` is 0. + /// /// # Examples /// /// ``` diff --git a/src/iter/test.rs b/src/iter/test.rs index abdd68aa7..1e6c34f8f 100644 --- a/src/iter/test.rs +++ b/src/iter/test.rs @@ -2316,3 +2316,19 @@ fn walk_tree_postfix_degree5() { .collect(); assert_eq!(v, nodes) } + +#[test] +fn blocks() { + let count = AtomicUsize::new(0); + let v: Vec = (0..1000) + .into_par_iter() + .map(|_| count.fetch_add(1, Ordering::Relaxed)) + .by_uniform_blocks(100) + .collect(); + let m = v + .chunks(100) + .map(|c| c.iter().max().copied().unwrap()) + .collect::>(); + assert!(m.windows(2).all(|w| w[0].lt(&w[1]))); + assert_eq!(v.len(), 1000); +} diff --git a/tests/clones.rs b/tests/clones.rs index aaa153913..1306147f5 100644 --- a/tests/clones.rs +++ b/tests/clones.rs @@ -128,6 +128,8 @@ fn clone_array() { #[test] fn clone_adaptors() { let v: Vec<_> = (0..1000).map(Some).collect(); + check(v.par_iter().by_exponential_blocks()); + check(v.par_iter().by_uniform_blocks(100)); check(v.par_iter().chain(&v)); check(v.par_iter().cloned()); check(v.par_iter().copied()); diff --git a/tests/debug.rs b/tests/debug.rs index fc85229a0..bf16a2fdd 100644 --- a/tests/debug.rs +++ b/tests/debug.rs @@ -147,6 +147,8 @@ fn debug_array() { #[test] fn debug_adaptors() { let v: Vec<_> = (0..10).collect(); + check(v.par_iter().by_exponential_blocks()); + check(v.par_iter().by_uniform_blocks(5)); check(v.par_iter().chain(&v)); check(v.par_iter().cloned()); check(v.par_iter().copied());