Skip to content

Commit

Permalink
add valid distinct case for aggregate.slt (#11814)
Browse files Browse the repository at this point in the history
  • Loading branch information
mertak-synnada committed Aug 5, 2024
1 parent 682bc2e commit 6aad19f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
9 changes: 5 additions & 4 deletions datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
for dep in input.schema().functional_dependencies().iter() {
// If distinct is exactly the same with a previous GROUP BY, we can
// simply remove it:
if dep.source_indices[..field_count]
.iter()
.enumerate()
.all(|(idx, f_idx)| idx == *f_idx)
if dep.source_indices.len() >= field_count
&& dep.source_indices[..field_count]
.iter()
.enumerate()
.all(|(idx, f_idx)| idx == *f_idx)
{
return Ok(Transformed::yes(input.as_ref().clone()));
}
Expand Down
28 changes: 28 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4521,6 +4521,34 @@ false
true
NULL

#
# Add valid distinct case as aggregation plan test
#

query TT
EXPLAIN SELECT DISTINCT c3, min(c1) FROM aggregate_test_100 group by c3 limit 5;
----
logical_plan
01)Limit: skip=0, fetch=5
02)--Aggregate: groupBy=[[aggregate_test_100.c3, MIN(aggregate_test_100.c1)]], aggr=[[]]
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[MIN(aggregate_test_100.c1)]]
04)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--CoalescePartitionsExec
03)----LocalLimitExec: fetch=5
04)------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, MIN(aggregate_test_100.c1)@1 as MIN(aggregate_test_100.c1)], aggr=[], lim=[5]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([c3@0, MIN(aggregate_test_100.c1)@1], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[c3@0 as c3, MIN(aggregate_test_100.c1)@1 as MIN(aggregate_test_100.c1)], aggr=[], lim=[5]
08)--------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[MIN(aggregate_test_100.c1)]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
11)--------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[MIN(aggregate_test_100.c1)]
12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
13)------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true


#
# Push limit into distinct group-by aggregation tests
#
Expand Down

0 comments on commit 6aad19f

Please sign in to comment.