-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Use interleave to speed up hash repartitioning
#15768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e3a2dc0
f411e02
7c24cd7
521deb3
91870b6
28e25c4
3696bd2
2734fad
b7177fa
e7b27e4
3ba7c79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,9 +41,9 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; | |
| use crate::stream::RecordBatchStreamAdapter; | ||
| use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; | ||
|
|
||
| use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; | ||
| use arrow::compute::take_arrays; | ||
| use arrow::datatypes::{SchemaRef, UInt32Type}; | ||
| use arrow::array::RecordBatch; | ||
| use arrow::compute::interleave_record_batch; | ||
| use arrow::datatypes::SchemaRef; | ||
| use datafusion_common::config::ConfigOptions; | ||
| use datafusion_common::utils::transpose; | ||
| use datafusion_common::{internal_err, HashMap}; | ||
|
|
@@ -292,7 +292,7 @@ impl BatchPartitioner { | |
| Ok(Self { state, timer }) | ||
| } | ||
|
|
||
| /// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`] | ||
| /// Partition a single [`RecordBatch`] into one or more partitioned [`RecordBatch`] | ||
| /// based on the [`Partitioning`] specified on construction | ||
| /// | ||
| /// `f` will be called for each partitioned [`RecordBatch`] with the corresponding | ||
|
|
@@ -301,11 +301,36 @@ impl BatchPartitioner { | |
| /// | ||
| /// The time spent repartitioning, not including time spent in `f` will be recorded | ||
| /// to the [`metrics::Time`] provided on construction | ||
| #[deprecated(since = "48.0.0", note = "use partition_batches instead")] | ||
| pub fn partition<F>(&mut self, batch: RecordBatch, mut f: F) -> Result<()> | ||
| where | ||
| F: FnMut(usize, RecordBatch) -> Result<()>, | ||
| { | ||
| self.partition_iter(batch)?.try_for_each(|res| match res { | ||
| self.partition_iter(vec![batch])? | ||
| .try_for_each(|res| match res { | ||
| Ok((partition, batch)) => f(partition, batch), | ||
| Err(e) => Err(e), | ||
| }) | ||
| } | ||
|
|
||
| /// Partition the provided [`Vec<RecordBatch>`] into one or more partitioned [`RecordBatch`] | ||
| /// based on the [`Partitioning`] specified on construction | ||
| /// | ||
| /// `f` will be called for each partitioned [`RecordBatch`] with the corresponding | ||
| /// partition index. Any error returned by `f` will be immediately returned by this | ||
| /// function without attempting to publish further [`RecordBatch`] | ||
| /// | ||
| /// The time spent repartitioning, not including time spent in `f` will be recorded | ||
| /// to the [`metrics::Time`] provided on construction | ||
| pub fn partition_batches<F>( | ||
| &mut self, | ||
| batches: Vec<RecordBatch>, | ||
| mut f: F, | ||
| ) -> Result<()> | ||
| where | ||
| F: FnMut(usize, RecordBatch) -> Result<()>, | ||
| { | ||
| self.partition_iter(batches)?.try_for_each(|res| match res { | ||
| Ok((partition, batch)) => f(partition, batch), | ||
| Err(e) => Err(e), | ||
| }) | ||
|
|
@@ -318,7 +343,7 @@ impl BatchPartitioner { | |
| /// this (so we don't need to clone the entire implementation). | ||
| fn partition_iter( | ||
| &mut self, | ||
| batch: RecordBatch, | ||
| mut batches: Vec<RecordBatch>, | ||
| ) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_> { | ||
| let it: Box<dyn Iterator<Item = Result<(usize, RecordBatch)>> + Send> = | ||
| match &mut self.state { | ||
|
|
@@ -328,33 +353,43 @@ impl BatchPartitioner { | |
| } => { | ||
| let idx = *next_idx; | ||
| *next_idx = (*next_idx + 1) % *num_partitions; | ||
| Box::new(std::iter::once(Ok((idx, batch)))) | ||
| assert_eq!(batches.len(), 1); | ||
| Box::new(std::iter::once(Ok((idx, batches.swap_remove(0))))) | ||
Dandandan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| BatchPartitionerState::Hash { | ||
| random_state, | ||
| exprs, | ||
| num_partitions: partitions, | ||
| hash_buffer, | ||
| } => { | ||
| if batches[0].num_columns() == 0 { | ||
| // If no columns, | ||
| let it: Box< | ||
| dyn Iterator<Item = Result<(usize, RecordBatch)>> + Send, | ||
| > = Box::new( | ||
| batches.into_iter().map(|batch| Ok((0, batch.clone()))), | ||
| ); | ||
| return Ok(it); | ||
| } | ||
| // Tracking time required for distributing indexes across output partitions | ||
| let timer = self.timer.timer(); | ||
|
|
||
| let arrays = exprs | ||
| .iter() | ||
| .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows())) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| hash_buffer.clear(); | ||
| hash_buffer.resize(batch.num_rows(), 0); | ||
|
|
||
| create_hashes(&arrays, random_state, hash_buffer)?; | ||
|
|
||
| let mut indices: Vec<_> = (0..*partitions) | ||
| .map(|_| Vec::with_capacity(batch.num_rows())) | ||
| .collect(); | ||
|
|
||
| for (index, hash) in hash_buffer.iter().enumerate() { | ||
| indices[(*hash % *partitions as u64) as usize].push(index as u32); | ||
| let mut indices = vec![vec![]; *partitions]; | ||
|
|
||
| for (i, batch) in batches.iter().enumerate() { | ||
| let arrays = exprs | ||
| .iter() | ||
| .map(|expr| { | ||
| expr.evaluate(batch)?.into_array(batch.num_rows()) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| hash_buffer.clear(); | ||
| hash_buffer.resize(batch.num_rows(), 0); | ||
| create_hashes(&arrays, random_state, hash_buffer)?; | ||
|
|
||
| for (index, hash) in hash_buffer.iter().enumerate() { | ||
| let p = *hash % *partitions as u64; | ||
| indices[p as usize].push((i, index)) | ||
| } | ||
| } | ||
|
|
||
| // Finished building index-arrays for output partitions | ||
|
|
@@ -366,25 +401,15 @@ impl BatchPartitioner { | |
| .into_iter() | ||
| .enumerate() | ||
| .filter_map(|(partition, indices)| { | ||
| let indices: PrimitiveArray<UInt32Type> = indices.into(); | ||
| (!indices.is_empty()).then_some((partition, indices)) | ||
| }) | ||
| .map(move |(partition, indices)| { | ||
| // Tracking time required for repartitioned batches construction | ||
| let _timer = partitioner_timer.timer(); | ||
| let b: Vec<&RecordBatch> = batches.iter().collect(); | ||
|
|
||
| // Produce batches based on indices | ||
| let columns = take_arrays(batch.columns(), &indices, None)?; | ||
|
|
||
| let mut options = RecordBatchOptions::new(); | ||
| options = options.with_row_count(Some(indices.len())); | ||
| let batch = RecordBatch::try_new_with_options( | ||
| batch.schema(), | ||
| columns, | ||
| &options, | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let batch = interleave_record_batch(&b, &indices)?; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably an api like apache/arrow-rs#7325 would be even faster (avoiding one level of "trivial" indexing).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI @ctsk
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice change, and a much clearer performance win than #15479. I expect (without testing) that these two PRs interact negatively with one another - Removing coalesce will mean that the data is "more scattered" in memory and probably make interleave work worse - as well as the computation of the left join keys.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think removing coalesce after this change (for all hash repartitions) might be possible, as the output batch size will be roughly equal to input batch size (instead of roughly 1/partitions * batch_size). Unless hash values are somehow skewed (but this is currently also not good anyway). A future api could use your
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see I had misunderstood this PR. It makes a lot of sense to do this. As part of prototyping the integration of a take_in API in datafusion, I made a similar change - move the buffering before sending the small batches to their destination thread. I don't remember seeing as much speedup when I benchmarked that change independently - I guess using interleave instead of a take/concat combo (like I did back then) makes a significant difference. Awesome!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah the speedup comes from avoiding copying the data a second time in |
||
| Ok((partition, batch)) | ||
| }); | ||
|
|
||
|
|
@@ -926,8 +951,15 @@ impl RepartitionExec { | |
| partitioning: Partitioning, | ||
| metrics: RepartitionMetrics, | ||
| ) -> Result<()> { | ||
| let is_hash_partitioning = matches!(partitioning, Partitioning::Hash(_, _)); | ||
|
|
||
| let mut partitioner = | ||
| BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; | ||
| // execute the child operator | ||
| let timer = metrics.fetch_time.timer(); | ||
| timer.done(); | ||
|
|
||
| let mut batches_buffer = Vec::with_capacity(partitioner.num_partitions()); | ||
|
|
||
| // While there are still outputs to send to, keep pulling inputs | ||
| let mut batches_until_yield = partitioner.num_partitions(); | ||
|
|
@@ -938,12 +970,21 @@ impl RepartitionExec { | |
| timer.done(); | ||
|
|
||
| // Input is done | ||
| let batch = match result { | ||
| Some(result) => result?, | ||
| None => break, | ||
| match result { | ||
| Some(result) => { | ||
| batches_buffer.push(result?); | ||
| if is_hash_partitioning | ||
| && batches_buffer.len() < partitioner.num_partitions() | ||
| { | ||
| // Keep buffering batches | ||
| continue; | ||
| } | ||
| } | ||
| None if batches_buffer.is_empty() => break, | ||
| None => {} | ||
| }; | ||
|
|
||
| for res in partitioner.partition_iter(batch)? { | ||
| let batches_buffer = std::mem::take(&mut batches_buffer); | ||
| for res in partitioner.partition_iter(batches_buffer)? { | ||
| let (partition, batch) = res?; | ||
| let size = batch.get_array_memory_size(); | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.