Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ pub enum Partitioning {
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions and the specified
/// number of partitions
/// This partitioning scheme is not yet fully supported. See [ARROW-11011](https://issues.apache.org/jira/browse/ARROW-11011)
/// FIXME: This partitioning scheme is not yet fully supported.
Comment thread
jimexist marked this conversation as resolved.
Outdated
/// See https://github.com/apache/arrow-datafusion/issues/131
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
Expand Down
54 changes: 24 additions & 30 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,15 @@ impl DefaultPhysicalPlanner {
.flat_map(|x| x.0.data_type(physical_input_schema.as_ref()))
.any(|x| matches!(x, DataType::Dictionary(_, _)));

if !groups.is_empty()
let can_repartition = !groups.is_empty()
&& ctx_state.config.concurrency > 1
&& ctx_state.config.repartition_aggregations
&& !contains_dict
{
&& !contains_dict;

let (initial_aggr, next_partition_mode): (
Arc<dyn ExecutionPlan>,
AggregateMode,
) = if can_repartition {
// Divide partial hash aggregates into multiple partitions by hash key
let hash_repartition = Arc::new(RepartitionExec::try_new(
initial_aggr,
Expand All @@ -235,35 +239,25 @@ impl DefaultPhysicalPlanner {
ctx_state.config.concurrency,
),
)?);

// Combine hashaggregates within the partition
Ok(Arc::new(HashAggregateExec::try_new(
AggregateMode::FinalPartitioned,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
hash_repartition,
input_schema,
)?))
// Combine hash aggregates within the partition
(hash_repartition, AggregateMode::FinalPartitioned)
} else {
// construct a second aggregation, keeping the final column name equal to the first aggregation
// and the expressions corresponding to the respective aggregate
// construct a second aggregation, keeping the final column name equal to the
// first aggregation and the expressions corresponding to the respective aggregate
(initial_aggr, AggregateMode::Final)
};

Ok(Arc::new(HashAggregateExec::try_new(
AggregateMode::Final,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
initial_aggr,
input_schema,
)?))
}
Ok(Arc::new(HashAggregateExec::try_new(
next_partition_mode,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
initial_aggr,
input_schema,
)?))
}
LogicalPlan::Projection { input, expr, .. } => {
let input_exec = self.create_initial_plan(input, ctx_state)?;
Expand Down