Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ impl Partitioning {
// If the required exprs do not match, need to leverage the eq_properties provided by the child
// and normalize both exprs based on the equivalent groups.
if !fast_match {
// Check if partition_exprs is a subset of required_exprs (superset satisfaction)
// DataFusion's Hash partitioning guarantees that all rows with the same value
// combination of partition columns are in the same partition.
if partition_exprs.len() < required_exprs.len() {
let is_subset = partition_exprs.iter().all(|p_expr| {
required_exprs.iter().any(|r_expr| {
physical_exprs_equal(
&[p_expr.clone()],
&[r_expr.clone()],
)
})
});
if is_subset {
return true;
}
}

let eq_groups = eq_properties.eq_group();
if !eq_groups.is_empty() {
let normalized_required_exprs = required_exprs
Expand Down
13 changes: 11 additions & 2 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,10 +911,19 @@ fn add_hash_on_top(
.output_partitioning()
.satisfy(&dist, input.plan.equivalence_properties());

// Check if input already has hash partitioning
let has_hash_partitioning = matches!(
input.plan.output_partitioning(),
Partitioning::Hash(_, _)
);

// Add hash repartitioning when:
// - The hash distribution requirement is not satisfied, or
// - We can increase parallelism by adding hash partitioning.
if !satisfied || n_target > input.plan.output_partitioning().partition_count() {
// - We can increase parallelism by adding hash partitioning (but not if already hash partitioned).
//
// When data is already hash partitioned correctly, don't repartition just to increase
// parallelism, as this breaks file-level partitioning and is counterproductive.
if !satisfied || (n_target > input.plan.output_partitioning().partition_count() && !has_hash_partitioning) {
// When there is an existing ordering, we preserve ordering during
// repartition. This will be rolled back in the future if any of the
// following conditions is true:
Expand Down
Loading
Loading