Skip to content

Commit 0aa2938

Browse files
alambberkaysynnadafindepi
authored
Prevent exponential planning time for Window functions - v2 (#17684) (#17778)
* fix * Update mod.rs * Update mod.rs * Update mod.rs * tests copied from v1 pr * test case from review comment #17684 (comment) * one more test case * Update mod.rs * Update datafusion/physical-plan/src/windows/mod.rs * Update datafusion/physical-plan/src/windows/mod.rs * Update mod.rs * Update mod.rs --------- Co-authored-by: Berkay Şahin <[email protected]> Co-authored-by: Piotr Findeisen <[email protected]>
1 parent 340bd6a commit 0aa2938

File tree

3 files changed

+209
-31
lines changed

3 files changed

+209
-31
lines changed

datafusion/core/benches/sql_planner.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,8 @@ fn criterion_benchmark(c: &mut Criterion) {
476476
});
477477
});
478478

479-
for partitioning_columns in [4, 7, 8] {
479+
// It was observed in production that queries with window functions sometimes partition over more than 30 columns
480+
for partitioning_columns in [4, 7, 8, 12, 30] {
480481
c.bench_function(
481482
&format!(
482483
"physical_window_function_partition_by_{partitioning_columns}_on_values"

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 118 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -371,17 +371,40 @@ pub(crate) fn window_equivalence_properties(
371371
for (i, expr) in window_exprs.iter().enumerate() {
372372
let partitioning_exprs = expr.partition_by();
373373
let no_partitioning = partitioning_exprs.is_empty();
374-
// Collect columns defining partitioning, and construct all `SortOptions`
375-
// variations for them. Then, we will check each one whether it satisfies
376-
// the existing ordering provided by the input plan.
374+
375+
// Find "one" valid ordering for partition columns to avoid exponential complexity.
376+
// see https://github.com/apache/datafusion/issues/17401
377377
let mut all_satisfied_lexs = vec![];
378-
for lex in partitioning_exprs
379-
.iter()
380-
.map(|pb_order| sort_options_resolving_constant(Arc::clone(pb_order)))
381-
.multi_cartesian_product()
382-
.filter_map(LexOrdering::new)
383-
{
384-
if window_eq_properties.ordering_satisfy(lex.clone())? {
378+
let mut candidate_ordering = vec![];
379+
380+
for partition_expr in partitioning_exprs.iter() {
381+
let sort_options =
382+
sort_options_resolving_constant(Arc::clone(partition_expr), true);
383+
384+
// Try each sort option and pick the first one that works
385+
let mut found = false;
386+
for sort_expr in sort_options.into_iter() {
387+
candidate_ordering.push(sort_expr);
388+
if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) {
389+
if window_eq_properties.ordering_satisfy(lex)? {
390+
found = true;
391+
break;
392+
}
393+
}
394+
// This option didn't work, remove it and try the next one
395+
candidate_ordering.pop();
396+
}
397+
// If no sort option works for this column, we can't build a valid ordering
398+
if !found {
399+
candidate_ordering.clear();
400+
break;
401+
}
402+
}
403+
404+
// If we successfully built an ordering for all columns, use it
405+
// When there are no partition expressions, candidate_ordering will be empty and won't be added
406+
if candidate_ordering.len() == partitioning_exprs.len() {
407+
if let Some(lex) = LexOrdering::new(candidate_ordering) {
385408
all_satisfied_lexs.push(lex);
386409
}
387410
}
@@ -410,8 +433,10 @@ pub(crate) fn window_equivalence_properties(
410433
// Window function results in a partial constant value in
411434
// some ordering. Adjust the ordering equivalences accordingly:
412435
let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| {
413-
let new_partial_consts =
414-
sort_options_resolving_constant(Arc::clone(&window_col));
436+
let new_partial_consts = sort_options_resolving_constant(
437+
Arc::clone(&window_col),
438+
false,
439+
);
415440

416441
new_partial_consts.into_iter().map(move |partial| {
417442
let mut existing = lex.clone();
@@ -467,23 +492,52 @@ pub(crate) fn window_equivalence_properties(
467492
// utilize set-monotonicity since the set shrinks as the frame
468493
// boundary starts "touching" the end of the table.
469494
else if frame.is_causal() {
470-
let args_all_lexs = sliding_expr
471-
.get_aggregate_expr()
472-
.expressions()
473-
.into_iter()
474-
.map(sort_options_resolving_constant)
475-
.multi_cartesian_product();
476-
477-
let (mut asc, mut satisfied) = (false, false);
478-
for order in args_all_lexs {
479-
if let Some(f) = order.first() {
480-
asc = !f.options.descending;
495+
// Find one valid ordering for aggregate arguments instead of
496+
// checking all combinations
497+
let aggregate_exprs = sliding_expr.get_aggregate_expr().expressions();
498+
let mut candidate_order = vec![];
499+
let mut asc = false;
500+
501+
for (idx, expr) in aggregate_exprs.iter().enumerate() {
502+
let mut found = false;
503+
let sort_options =
504+
sort_options_resolving_constant(Arc::clone(expr), false);
505+
506+
// Try each option and pick the first that works
507+
for sort_expr in sort_options.into_iter() {
508+
let is_asc = !sort_expr.options.descending;
509+
candidate_order.push(sort_expr);
510+
511+
if let Some(lex) = LexOrdering::new(candidate_order.clone()) {
512+
if window_eq_properties.ordering_satisfy(lex)? {
513+
if idx == 0 {
514+
// The first column's ordering direction determines the overall
515+
// monotonicity behavior of the window result.
516+
// - If the aggregate has increasing set monotonicity (e.g., MAX, COUNT)
517+
// and the first arg is ascending, the window result is increasing
518+
// - If the aggregate has decreasing set monotonicity (e.g., MIN)
519+
// and the first arg is ascending, the window result is also increasing
520+
// This flag is used to determine the final window column ordering.
521+
asc = is_asc;
522+
}
523+
found = true;
524+
break;
525+
}
526+
}
527+
// This option didn't work, remove it and try the next one
528+
candidate_order.pop();
481529
}
482-
if window_eq_properties.ordering_satisfy(order)? {
483-
satisfied = true;
530+
531+
// If we couldn't extend the ordering, stop trying
532+
if !found {
484533
break;
485534
}
486535
}
536+
537+
// Check if we successfully built a complete ordering
538+
let satisfied = candidate_order.len() == aggregate_exprs.len()
539+
&& !aggregate_exprs.is_empty();
540+
487541
if satisfied {
488542
let increasing =
489543
set_monotonicity.eq(&SetMonotonicity::Increasing);
@@ -634,11 +688,45 @@ pub fn get_window_mode(
634688
Ok(None)
635689
}
636690

637-
fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) -> Vec<PhysicalSortExpr> {
638-
vec![
639-
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
640-
PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
641-
]
691+
/// Generates sort option variations for a given expression.
692+
///
693+
/// This function is used to handle constant columns in window operations. Since constant
694+
/// columns can be considered as having any ordering, we generate multiple sort options
695+
/// to explore different ordering possibilities.
696+
///
697+
/// # Parameters
698+
/// - `expr`: The physical expression to generate sort options for
699+
/// - `only_monotonic`: If false, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST).
700+
/// If true, generates only 2 options that preserve set monotonicity.
701+
///
702+
/// # When to use `only_monotonic = false`:
703+
/// Use for PARTITION BY columns where we want to explore all possible orderings to find
704+
/// one that matches the existing data ordering.
705+
///
706+
/// # When to use `only_monotonic = true`:
707+
/// Use for aggregate/window function arguments where set monotonicity needs to be preserved.
708+
/// Only generates ASC NULLS LAST and DESC NULLS FIRST because:
709+
/// - Set monotonicity is broken if data has increasing order but nulls come first
710+
/// - Set monotonicity is broken if data has decreasing order but nulls come last
711+
fn sort_options_resolving_constant(
712+
expr: Arc<dyn PhysicalExpr>,
713+
only_monotonic: bool,
714+
) -> Vec<PhysicalSortExpr> {
715+
if only_monotonic {
716+
// Generate only the 2 options that preserve set monotonicity
717+
vec![
718+
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST
719+
PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST
720+
]
721+
} else {
722+
// Generate all 4 possible sort options for partition columns
723+
vec![
724+
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST
725+
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), // ASC NULLS FIRST
726+
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), // DESC NULLS LAST
727+
PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST
728+
]
729+
}
642730
}
643731

644732
#[cfg(test)]

datafusion/sqllogictest/test_files/window.slt

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6034,3 +6034,92 @@ LIMIT 5
60346034
0 2 NULL NULL 0 NULL NULL
60356035
0 3 NULL NULL 0 NULL NULL
60366036
0 4 NULL NULL 0 NULL NULL
6037+
6038+
# regression test for https://github.com/apache/datafusion/issues/17401
6039+
query I
6040+
WITH source AS (
6041+
SELECT
6042+
1 AS n,
6043+
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
6044+
'' AS a9, '' AS a10, '' AS a11, '' AS a12
6045+
)
6046+
SELECT
6047+
sum(n) OVER (PARTITION BY
6048+
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12
6049+
)
6050+
FROM source;
6051+
----
6052+
1
6053+
6054+
# regression test for https://github.com/apache/datafusion/issues/17401
6055+
query I
6056+
WITH source AS (
6057+
SELECT
6058+
1 AS n,
6059+
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
6060+
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
6061+
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
6062+
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
6063+
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
6064+
)
6065+
SELECT
6066+
sum(n) OVER (PARTITION BY
6067+
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
6068+
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
6069+
)
6070+
FROM source;
6071+
----
6072+
1
6073+
6074+
# regression test for https://github.com/apache/datafusion/issues/17401
6075+
query I
6076+
WITH source AS (
6077+
SELECT
6078+
1 AS n,
6079+
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
6080+
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
6081+
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
6082+
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
6083+
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
6084+
)
6085+
SELECT
6086+
sum(n) OVER (PARTITION BY
6087+
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
6088+
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
6089+
)
6090+
FROM (
6091+
SELECT * FROM source
6092+
ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
6093+
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
6094+
);
6095+
----
6096+
1
6097+
6098+
# regression test for https://github.com/apache/datafusion/issues/17401
6099+
query I
6100+
WITH source AS (
6101+
SELECT
6102+
1 AS n,
6103+
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
6104+
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
6105+
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
6106+
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
6107+
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
6108+
)
6109+
SELECT
6110+
sum(n) OVER (PARTITION BY
6111+
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
6112+
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
6113+
)
6114+
FROM (
6115+
SELECT * FROM source
6116+
WHERE a1 = '' AND a2 = '' AND a3 = '' AND a4 = '' AND a5 = '' AND a6 = '' AND a7 = '' AND a8 = ''
6117+
AND a9 = '' AND a10 = '' AND a11 = '' AND a12 = '' AND a13 = '' AND a14 = '' AND a15 = '' AND a16 = ''
6118+
AND a17 = '' AND a18 = '' AND a19 = '' AND a20 = '' AND a21 = '' AND a22 = '' AND a23 = '' AND a24 = ''
6119+
AND a25 = '' AND a26 = '' AND a27 = '' AND a28 = '' AND a29 = '' AND a30 = '' AND a31 = '' AND a32 = ''
6120+
AND a33 = '' AND a34 = '' AND a35 = '' AND a36 = '' AND a37 = '' AND a38 = '' AND a39 = '' AND a40 = ''
6121+
ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
6122+
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
6123+
);
6124+
----
6125+
1

0 commit comments

Comments
 (0)