Skip to content

Commit

Permalink
Fix incorrect OFFSET during LIMIT pushdown. (#12399)
Browse files Browse the repository at this point in the history
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
wiedld and alamb committed Sep 11, 2024
1 parent 13dc8a6 commit 9025c1c
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 2 deletions.
18 changes: 16 additions & 2 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ impl From<LimitExec> for Arc<dyn ExecutionPlan> {
/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
/// an instance of `GlobalRequirements` and modifies these parameters while
/// checking if the limits can be pushed down or not.
///
/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise,
/// return a [`TreeNodeRecursion::Continue`].
pub fn pushdown_limit_helper(
mut pushdown_plan: Arc<dyn ExecutionPlan>,
mut global_state: GlobalRequirements,
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
// Extract limit, if exist, and return child inputs.
if let Some(limit_exec) = extract_limit(&pushdown_plan) {
// If we have fetch/skip info in the global state already, we need to
// decide which one to continue with:
Expand Down Expand Up @@ -199,10 +203,17 @@ pub fn pushdown_limit_helper(
// This plan is combining input partitions, so we need to add the
// fetch info to plan if possible. If not, we must add a `LimitExec`
// with the information from the global state.
let mut new_plan = plan_with_fetch;
// Execution plans can't (yet) handle skip, so if we have one,
// we still need to add a global limit
if global_state.skip > 0 {
new_plan =
add_global_limit(new_plan, global_state.skip, global_state.fetch);
}
global_state.fetch = skip_and_fetch;
global_state.skip = 0;
global_state.satisfied = true;
Ok((Transformed::yes(plan_with_fetch), global_state))
Ok((Transformed::yes(new_plan), global_state))
} else if global_state.satisfied {
// If the plan is already satisfied, do not add a limit:
Ok((Transformed::no(pushdown_plan), global_state))
Expand Down Expand Up @@ -256,21 +267,24 @@ pub(crate) fn pushdown_limits(
pushdown_plan: Arc<dyn ExecutionPlan>,
global_state: GlobalRequirements,
) -> Result<Arc<dyn ExecutionPlan>> {
// Call pushdown_limit_helper.
// This will either extract the limit node (returning the child), or apply the limit pushdown.
let (mut new_node, mut global_state) =
pushdown_limit_helper(pushdown_plan, global_state)?;

// While limits exist, continue combining the global_state.
while new_node.tnr == TreeNodeRecursion::Stop {
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
}

// Apply pushdown limits in children
let children = new_node.data.children();
let new_children = children
.into_iter()
.map(|child| {
pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
})
.collect::<Result<_>>()?;

new_node.data.with_new_children(new_children)
}

Expand Down
146 changes: 146 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,149 @@ physical_plan

statement ok
drop table data;


####################
# Test issue: limit pushdown with offsets
# Ensure the offset is not lost: https://github.com/apache/datafusion/issues/12423
####################

statement ok
CREATE EXTERNAL TABLE ordered_table (
a0 INT,
a INT,
b INT,
c INT UNSIGNED,
d INT
)
STORED AS CSV
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true');

# all results
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc;
----
3 25
2 25
1 0
0 0

# limit only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3;
----
3 25
2 25
1 0

# offset only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1;
----
2 25
1 0
0 0

# offset + limit
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
2 25
1 0

# Applying offset & limit when multiple streams from groupby
# the plan must still have a global limit to apply the offset
query TT
EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
logical_plan
01)Limit: skip=1, fetch=2
02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3
03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]]
04)------TableScan: ordered_table projection=[a, b]
physical_plan
01)GlobalLimitExec: skip=1, fetch=2
02)--SortPreservingMergeExec: [b@0 DESC], fetch=3
03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true]
04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true

# Applying offset & limit when multiple streams from union
# the plan must still have a global limit to apply the offset
query TT
explain select * FROM (
select c FROM ordered_table
UNION ALL
select d FROM ordered_table
) order by 1 desc LIMIT 10 OFFSET 4;
----
logical_plan
01)Limit: skip=4, fetch=10
02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14
03)----Union
04)------Projection: CAST(ordered_table.c AS Int64) AS c
05)--------TableScan: ordered_table projection=[c]
06)------Projection: CAST(ordered_table.d AS Int64) AS c
07)--------TableScan: ordered_table projection=[d]
physical_plan
01)GlobalLimitExec: skip=4, fetch=10
02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
03)----UnionExec
04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true
08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true

# Applying LIMIT & OFFSET to subquery.
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc;
----
3 98 96
3 98 89
3 98 82
3 98 79
3 97 96
3 97 89
3 97 82
3 97 79
3 96 96
3 96 89
3 96 82
3 96 79
3 95 96
3 95 89
3 95 82
3 95 79

# Apply OFFSET & LIMIT to both parent and child (subquery).
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc
OFFSET 3 LIMIT 2;
----
3 99 82
3 99 79

statement ok
drop table ordered_table;

0 comments on commit 9025c1c

Please sign in to comment.