Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,11 @@ impl DataFrame {
.and_then(|r| r.columns().first())
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.and_then(|a| a.values().first())
.ok_or(DataFusionError::Internal(
"Unexpected output when collecting for count()".to_string(),
))? as usize;
.ok_or_else(|| {
DataFusionError::Internal(
"Unexpected output when collecting for count()".to_string(),
)
})? as usize;
Ok(len)
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_optimizer::sort_enforcement2::TopDownEnforceSorting;
use datafusion_optimizer::OptimizerConfig;
use datafusion_sql::planner::object_name_to_table_reference;
use uuid::Uuid;
Expand Down Expand Up @@ -1541,7 +1541,7 @@ impl SessionState {
// ordering. Please make sure that the whole plan tree is determined before this rule.
// Note that one should always run this rule after running the EnforceDistribution rule
// as the latter may break local sorting requirements.
Arc::new(EnforceSorting::new()),
Arc::new(TopDownEnforceSorting::new()),
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
Expand Down
219 changes: 123 additions & 96 deletions datafusion/core/src/physical_optimizer/dist_enforcement.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
Ok(plan
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
if sort_exec.input().output_partitioning().partition_count() > 1
&& sort_exec.fetch().is_some()
// It's already preserving the partitioning so that it can be regarded as a local sort
&& !sort_exec.preserve_partitioning()
&& (sort_exec.fetch().is_some() || config.optimizer.repartition_sorts)
{
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod sort_enforcement;
mod utils;

pub mod pipeline_fixer;
pub mod sort_enforcement2;
#[cfg(test)]
pub mod test_utils;

Expand Down
61 changes: 46 additions & 15 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ mod tests {
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
use datafusion_physical_expr::{new_sort_requirements, PhysicalSortRequirements};

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
Expand Down Expand Up @@ -400,6 +401,33 @@ mod tests {
))
}

// Created a sorted parquet exec with multiple files
fn parquet_exec_multiple_sorted() -> Arc<ParquetExec> {
let sort_exprs = vec![PhysicalSortExpr {
expr: col("c1", &schema()).unwrap(),
options: SortOptions::default(),
}];

Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: Some(sort_exprs),
infinite_source: false,
},
None,
None,
))
}

fn sort_preserving_merge_exec(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Expand Down Expand Up @@ -607,7 +635,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
"SortExec: expr=[c1@0 ASC]",
"SortExec: expr=[c1@0 ASC], global=true",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

Expand All @@ -625,7 +653,7 @@ mod tests {
"FilterExec: c1@0",
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be wrong
"SortExec: expr=[c1@0 ASC]",
"SortExec: expr=[c1@0 ASC], global=true",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

Expand Down Expand Up @@ -713,7 +741,7 @@ mod tests {
// need repartiton and resort as the data was not sorted correctly
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"SortExec: expr=[c1@0 ASC], global=false",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
Expand All @@ -725,12 +753,12 @@ mod tests {
#[test]
fn repartition_ignores_sort_preserving_merge() -> Result<()> {
// sort preserving merge already sorted input,
let plan = sort_preserving_merge_exec(parquet_exec_sorted());
let plan = sort_preserving_merge_exec(parquet_exec_multiple_sorted());

// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan);
Expand Down Expand Up @@ -813,7 +841,7 @@ mod tests {
// needs to repartition / sort as the data was not sorted correctly
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"SortExec: expr=[c1@0 ASC], global=false",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
Expand All @@ -826,13 +854,14 @@ mod tests {
#[test]
fn repartition_ignores_transitively_with_projection() -> Result<()> {
// sorted input
let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted()));
let plan =
sort_preserving_merge_exec(projection_exec(parquet_exec_multiple_sorted()));

// data should not be repartitioned / resorted
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan);
Expand All @@ -846,7 +875,8 @@ mod tests {

let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"SortExec: expr=[c1@0 ASC], global=false",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
Expand All @@ -863,7 +893,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"SortExec: expr=[c1@0 ASC]",
"SortExec: expr=[c1@0 ASC], global=false",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
Expand All @@ -883,7 +913,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"SortExec: expr=[c1@0 ASC]",
"SortExec: expr=[c1@0 ASC], global=false",
"ProjectionExec: expr=[c1@0 as c1]",
"FilterExec: c1@0",
// repartition is lowest down
Expand Down Expand Up @@ -950,7 +980,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
"SortExec: expr=[c1@0 ASC]",
"SortExec: expr=[c1@0 ASC], global=true",
// Doesn't parallelize for SortExec without preserve_partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
Expand All @@ -969,7 +999,7 @@ mod tests {
"FilterExec: c1@0",
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be wrong
"SortExec: expr=[c1@0 ASC]",
"SortExec: expr=[c1@0 ASC], global=true",
// SortExec doesn't benefit from input partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
Expand Down Expand Up @@ -1122,8 +1152,9 @@ mod tests {
}

// model that it requires the output ordering of its input
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![self.input.output_ordering()]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirements>>> {
let ordering_requirements = new_sort_requirements(self.output_ordering());
vec![ordering_requirements]
}

fn with_new_children(
Expand Down
Loading