diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index c71191507fbc..9ae6e1f57078 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -476,7 +476,8 @@ fn criterion_benchmark(c: &mut Criterion) { }); }); - for partitioning_columns in [4, 7, 8] { + // It was observed in production that queries with window functions sometimes partition over more than 30 columns + for partitioning_columns in [4, 7, 8, 12, 30] { c.bench_function( &format!( "physical_window_function_partition_by_{partitioning_columns}_on_values" diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index dccd9200fc77..ddc2bfa10ea7 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -371,17 +371,40 @@ pub(crate) fn window_equivalence_properties( for (i, expr) in window_exprs.iter().enumerate() { let partitioning_exprs = expr.partition_by(); let no_partitioning = partitioning_exprs.is_empty(); - // Collect columns defining partitioning, and construct all `SortOptions` - // variations for them. Then, we will check each one whether it satisfies - // the existing ordering provided by the input plan. + + // Find "one" valid ordering for partition columns to avoid exponential complexity. + // see https://github.com/apache/datafusion/issues/17401 let mut all_satisfied_lexs = vec![]; - for lex in partitioning_exprs - .iter() - .map(|pb_order| sort_options_resolving_constant(Arc::clone(pb_order))) - .multi_cartesian_product() - .filter_map(LexOrdering::new) - { - if window_eq_properties.ordering_satisfy(lex.clone())? { + let mut candidate_ordering = vec![]; + + for partition_expr in partitioning_exprs.iter() { + let sort_options = + sort_options_resolving_constant(Arc::clone(partition_expr), true); + + // Try each sort option and pick the first one that works + let mut found = false; + for sort_expr in sort_options.into_iter() { + candidate_ordering.push(sort_expr); + if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) { + if window_eq_properties.ordering_satisfy(lex)? { + found = true; + break; + } + } + // This option didn't work, remove it and try the next one + candidate_ordering.pop(); + } + // If no sort option works for this column, we can't build a valid ordering + if !found { + candidate_ordering.clear(); + break; + } + } + + // If we successfully built an ordering for all columns, use it + // When there are no partition expressions, candidate_ordering will be empty and won't be added + if candidate_ordering.len() == partitioning_exprs.len() { + if let Some(lex) = LexOrdering::new(candidate_ordering) { all_satisfied_lexs.push(lex); } } @@ -410,8 +433,10 @@ pub(crate) fn window_equivalence_properties( // Window function results in a partial constant value in // some ordering. Adjust the ordering equivalences accordingly: let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| { - let new_partial_consts = - sort_options_resolving_constant(Arc::clone(&window_col)); + let new_partial_consts = sort_options_resolving_constant( + Arc::clone(&window_col), + false, + ); new_partial_consts.into_iter().map(move |partial| { let mut existing = lex.clone(); @@ -467,23 +492,52 @@ pub(crate) fn window_equivalence_properties( // utilize set-monotonicity since the set shrinks as the frame // boundary starts "touching" the end of the table. else if frame.is_causal() { - let args_all_lexs = sliding_expr - .get_aggregate_expr() - .expressions() - .into_iter() - .map(sort_options_resolving_constant) - .multi_cartesian_product(); - - let (mut asc, mut satisfied) = (false, false); - for order in args_all_lexs { - if let Some(f) = order.first() { - asc = !f.options.descending; + // Find one valid ordering for aggregate arguments instead of + // checking all combinations + let aggregate_exprs = sliding_expr.get_aggregate_expr().expressions(); + let mut candidate_order = vec![]; + let mut asc = false; + + for (idx, expr) in aggregate_exprs.iter().enumerate() { + let mut found = false; + let sort_options = + sort_options_resolving_constant(Arc::clone(expr), false); + + // Try each option and pick the first that works + for sort_expr in sort_options.into_iter() { + let is_asc = !sort_expr.options.descending; + candidate_order.push(sort_expr); + + if let Some(lex) = LexOrdering::new(candidate_order.clone()) { + if window_eq_properties.ordering_satisfy(lex)? { + if idx == 0 { + // The first column's ordering direction determines the overall + // monotonicity behavior of the window result. + // - If the aggregate has increasing set monotonicity (e.g., MAX, COUNT) + // and the first arg is ascending, the window result is increasing + // - If the aggregate has decreasing set monotonicity (e.g., MIN) + // and the first arg is ascending, the window result is also increasing + // This flag is used to determine the final window column ordering. + asc = is_asc; + } + found = true; + break; + } + } + // This option didn't work, remove it and try the next one + candidate_order.pop(); } - if window_eq_properties.ordering_satisfy(order)? { - satisfied = true; + + // If we couldn't extend the ordering, stop trying + if !found { break; } } + + // Check if we successfully built a complete ordering + let satisfied = candidate_order.len() == aggregate_exprs.len() + && !aggregate_exprs.is_empty(); + if satisfied { let increasing = set_monotonicity.eq(&SetMonotonicity::Increasing); @@ -634,11 +688,45 @@ pub fn get_window_mode( Ok(None) } -fn sort_options_resolving_constant(expr: Arc) -> Vec { - vec![ - PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), - PhysicalSortExpr::new(expr, SortOptions::new(true, true)), - ] +/// Generates sort option variations for a given expression. +/// +/// This function is used to handle constant columns in window operations. Since constant +/// columns can be considered as having any ordering, we generate multiple sort options +/// to explore different ordering possibilities. +/// +/// # Parameters +/// - `expr`: The physical expression to generate sort options for +/// - `only_monotonic`: If false, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST). +/// If true, generates only 2 options that preserve set monotonicity. +/// +/// # When to use `only_monotonic = false`: +/// Use for PARTITION BY columns where we want to explore all possible orderings to find +/// one that matches the existing data ordering. +/// +/// # When to use `only_monotonic = true`: +/// Use for aggregate/window function arguments where set monotonicity needs to be preserved. +/// Only generates ASC NULLS LAST and DESC NULLS FIRST because: +/// - Set monotonicity is broken if data has increasing order but nulls come first +/// - Set monotonicity is broken if data has decreasing order but nulls come last +fn sort_options_resolving_constant( + expr: Arc, + only_monotonic: bool, +) -> Vec { + if only_monotonic { + // Generate only the 2 options that preserve set monotonicity + vec![ + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST + PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST + ] + } else { + // Generate all 4 possible sort options for partition columns + vec![ + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), // ASC NULLS FIRST + PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), // DESC NULLS LAST + PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST + ] + } } #[cfg(test)] diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index c30258234490..e81662a75319 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6034,3 +6034,92 @@ LIMIT 5 0 2 NULL NULL 0 NULL NULL 0 3 NULL NULL 0 NULL NULL 0 4 NULL NULL 0 NULL NULL + +# regression test for https://github.com/apache/datafusion/issues/17401 +query I +WITH source AS ( + SELECT + 1 AS n, + '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, + '' AS a9, '' AS a10, '' AS a11, '' AS a12 +) +SELECT + sum(n) OVER (PARTITION BY + a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12 + ) +FROM source; +---- +1 + +# regression test for https://github.com/apache/datafusion/issues/17401 +query I +WITH source AS ( + SELECT + 1 AS n, + '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, + '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, + '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, + '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, + '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 +) +SELECT + sum(n) OVER (PARTITION BY + a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 + ) +FROM source; +---- +1 + +# regression test for https://github.com/apache/datafusion/issues/17401 +query I +WITH source AS ( + SELECT + 1 AS n, + '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, + '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, + '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, + '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, + '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 +) +SELECT + sum(n) OVER (PARTITION BY + a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 + ) +FROM ( + SELECT * FROM source + ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 +); +---- +1 + +# regression test for https://github.com/apache/datafusion/issues/17401 +query I +WITH source AS ( + SELECT + 1 AS n, + '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, + '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, + '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, + '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, + '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 +) +SELECT + sum(n) OVER (PARTITION BY + a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 + ) +FROM ( + SELECT * FROM source + WHERE a1 = '' AND a2 = '' AND a3 = '' AND a4 = '' AND a5 = '' AND a6 = '' AND a7 = '' AND a8 = '' + AND a9 = '' AND a10 = '' AND a11 = '' AND a12 = '' AND a13 = '' AND a14 = '' AND a15 = '' AND a16 = '' + AND a17 = '' AND a18 = '' AND a19 = '' AND a20 = '' AND a21 = '' AND a22 = '' AND a23 = '' AND a24 = '' + AND a25 = '' AND a26 = '' AND a27 = '' AND a28 = '' AND a29 = '' AND a30 = '' AND a31 = '' AND a32 = '' + AND a33 = '' AND a34 = '' AND a35 = '' AND a36 = '' AND a37 = '' AND a38 = '' AND a39 = '' AND a40 = '' + ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, + a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 +); +---- +1 \ No newline at end of file