Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5413130
Top Down Sort Enforer
mingmwang Feb 10, 2023
341448c
merge with latest
mingmwang Feb 10, 2023
01e87d6
Add support to optimize parallel sorting
mingmwang Feb 10, 2023
06b4064
fix UT
mingmwang Feb 13, 2023
b59ea3e
merge with upstream
mingmwang Feb 13, 2023
601a4d0
add more UTs to sort_enforcement2.rs
mingmwang Feb 13, 2023
5285a3e
refine codebase
mingmwang Feb 15, 2023
28e5e04
merge with upstream
mingmwang Feb 15, 2023
245428d
Fix SortMergeJoin case
mingmwang Feb 15, 2023
abad84c
Fix reverse window sort requirements
mingmwang Feb 16, 2023
6a53df0
fix test comments
mingmwang Feb 16, 2023
c5c5bc8
add determine_children_requirement() method
mingmwang Feb 16, 2023
e6c09ef
Convert Topdown to pushdown then unify
mustafasrepo Feb 20, 2023
f4ff603
reorganize to decrease diff, remove ignore
mustafasrepo Feb 27, 2023
0fca3d8
merge with main
mustafasrepo Mar 15, 2023
6c52daa
Update test
mustafasrepo Mar 15, 2023
4557d20
retract dist enforcement
mustafasrepo Mar 15, 2023
28c6ea3
tmp
mustafasrepo Mar 15, 2023
16fd9f5
reorganize files
mustafasrepo Mar 15, 2023
80396b6
reorganize tests
mustafasrepo Mar 15, 2023
ef331cc
simplify sort pushdown
mustafasrepo Mar 15, 2023
c50988d
remove global sort print
mustafasrepo Mar 15, 2023
1102a12
remove unnecessary parameter
mustafasrepo Mar 15, 2023
25c2c1c
Updates
mustafasrepo Mar 15, 2023
071b05a
simplifications
mustafasrepo Mar 15, 2023
777aa1c
Refactors and simplifications part 1
ozankabak Mar 16, 2023
de2d11e
Refactors and simplifications part 2
ozankabak Mar 17, 2023
c2f1b5c
simplifications
mustafasrepo Mar 17, 2023
ecb91f4
remove_sort_keys parameters from window
mustafasrepo Mar 17, 2023
8ea3f47
Update window multi_path test
mustafasrepo Mar 17, 2023
719f9a8
consider existing ordering during Coalesce
mustafasrepo Mar 18, 2023
67af8d2
retract assertion in planner
mustafasrepo Mar 18, 2023
659668b
Merge branch 'main' into feature/unify_sort_rules
mustafasrepo Mar 20, 2023
9a7577e
remove todo.
mustafasrepo Mar 20, 2023
44f3c6f
remove unnecessary repartition from plan
mustafasrepo Mar 20, 2023
9a49a34
update comments
mustafasrepo Mar 20, 2023
a2f0f70
Remove commented out code
ozankabak Mar 20, 2023
d87a5b4
Merge branch 'apache:main' into feature/unify_sort_rules
mustafasrepo Mar 22, 2023
9406c5b
Address reviews
mustafasrepo Mar 27, 2023
64bf519
Merge branch 'main' into feature/unify_sort_rules
mustafasrepo Mar 27, 2023
99508bc
Merge branch 'main' into feature/unify_sort_rules
mustafasrepo Mar 30, 2023
6699a2e
update comments
mustafasrepo Apr 3, 2023
8725639
address reviews
mustafasrepo Apr 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions datafusion/core/src/physical_optimizer/dist_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::NoOp;
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, AggregateExpr,
PhysicalExpr,
};
use std::collections::HashMap;
use std::sync::Arc;

/// The EnforceDistribution rule ensures that distribution requirements are met
Expand Down Expand Up @@ -487,30 +487,6 @@ fn reorder_aggregate_keys(
}
}

fn map_columns_before_projection(
parent_required: &[Arc<dyn PhysicalExpr>],
proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
) -> Vec<Arc<dyn PhysicalExpr>> {
let mut column_mapping = HashMap::new();
for (expression, name) in proj_exprs.iter() {
if let Some(column) = expression.as_any().downcast_ref::<Column>() {
column_mapping.insert(name.clone(), column.clone());
};
}
let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
.iter()
.filter_map(|r| {
if let Some(column) = r.as_any().downcast_ref::<Column>() {
column_mapping.get(column.name())
} else {
None
}
})
.map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
.collect::<Vec<_>>();
new_required
}

fn shift_right_required(
parent_required: &[Arc<dyn PhysicalExpr>],
left_columns_len: usize,
Expand Down Expand Up @@ -1026,6 +1002,30 @@ mod tests {
))
}

// Created a sorted parquet exec with multiple files
fn parquet_exec_multiple_sorted(
output_ordering: Option<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering,
infinite_source: false,
},
None,
None,
))
}

fn projection_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
Expand Down Expand Up @@ -2108,7 +2108,7 @@ mod tests {
}];

// Scan some sorted parquet files
let exec = parquet_exec_with_sort(Some(sort_key.clone()));
let exec = parquet_exec_multiple_sorted(Some(sort_key.clone()));

// CoalesceBatchesExec to mimic behavior after a filter
let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
Expand All @@ -2121,7 +2121,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"CoalesceBatchesExec: target_batch_size=4096",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
];
assert_optimized!(expected, exec);
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
let transformed =
plan.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
if sort_exec.input().output_partitioning().partition_count() > 1
&& sort_exec.fetch().is_some()
// It's already preserving the partitioning so that it can be regarded as a local sort
&& !sort_exec.preserve_partitioning()
{
&& (sort_exec.fetch().is_some() || config.optimizer.repartition_sorts)
{
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
sort_exec.input().clone(),
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod pipeline_checker;
pub mod pruning;
pub mod repartition;
pub mod sort_enforcement;
mod sort_pushdown;
mod utils;

pub mod pipeline_fixer;
Expand Down
45 changes: 35 additions & 10 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ fn init() {
mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};

use super::*;
use crate::datasource::listing::PartitionedFile;
Expand All @@ -342,6 +339,9 @@ mod tests {
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
Expand Down Expand Up @@ -412,6 +412,33 @@ mod tests {
))
}

// Created a sorted parquet exec with multiple files
fn parquet_exec_multiple_sorted() -> Arc<ParquetExec> {
let sort_exprs = vec![PhysicalSortExpr {
expr: col("c1", &schema()).unwrap(),
options: SortOptions::default(),
}];

Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: Some(sort_exprs),
infinite_source: false,
},
None,
None,
))
}

fn sort_preserving_merge_exec(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Expand Down Expand Up @@ -737,12 +764,12 @@ mod tests {
#[test]
fn repartition_ignores_sort_preserving_merge() -> Result<()> {
// sort preserving merge already sorted input,
let plan = sort_preserving_merge_exec(parquet_exec_sorted());
let plan = sort_preserving_merge_exec(parquet_exec_multiple_sorted());

// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan);
Expand Down Expand Up @@ -838,13 +865,14 @@ mod tests {
#[test]
fn repartition_ignores_transitively_with_projection() -> Result<()> {
// sorted input
let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted()));
let plan =
sort_preserving_merge_exec(projection_exec(parquet_exec_multiple_sorted()));

// data should not be repartitioned / resorted
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -857,7 +885,6 @@ mod tests {
sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()), true));

let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
Expand Down Expand Up @@ -1040,7 +1067,6 @@ mod tests {

// parallelization potentially could break sort order
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

Expand Down Expand Up @@ -1090,7 +1116,6 @@ mod tests {

// data should not be repartitioned / resorted
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];
Expand Down
Loading