Skip to content

Commit

Permalink
MINOR: Add maintains input order flag to CoalesceBatches (#6730)
Browse files Browse the repository at this point in the history
* add maintains input order flag to CoalesceBatches

* minor changes
  • Loading branch information
mustafasrepo authored Jun 20, 2023
1 parent 748f4be commit da9d261
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
20 changes: 15 additions & 5 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ mod tests {
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::aggregates::PhysicalGroupBy;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::JoinOn;
use crate::physical_plan::joins::SortMergeJoinExec;
Expand Down Expand Up @@ -1462,8 +1463,11 @@ mod tests {
},
)];
let sort = sort_exec(sort_exprs.clone(), source);
// Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before
let coalesce_batches = coalesce_batches_exec(sort);

let window_agg = bounded_window_exec("non_nullable_col", sort_exprs, sort);
let window_agg =
bounded_window_exec("non_nullable_col", sort_exprs, coalesce_batches);

let sort_exprs = vec![sort_expr_options(
"non_nullable_col",
Expand Down Expand Up @@ -1491,16 +1495,18 @@ mod tests {
" FilterExec: NOT non_nullable_col@1",
" SortExec: expr=[non_nullable_col@1 ASC NULLS LAST]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" SortExec: expr=[non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];

let expected_optimized = vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]",
" FilterExec: NOT non_nullable_col@1",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" SortExec: expr=[non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
Expand Down Expand Up @@ -2892,4 +2898,8 @@ mod tests {
.unwrap(),
)
}

fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalesceBatchesExec::new(input, 128))
}
}
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
self.input.output_ordering()
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}

fn equivalence_properties(&self) -> EquivalenceProperties {
self.input.equivalence_properties()
}
Expand Down

0 comments on commit da9d261

Please sign in to comment.