Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ config_namespace! {
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true

/// When set to true, the optimizer will attempt to push limit operations
/// past window functions, if possible
pub enable_window_limits: bool, default = true

/// When set to true, the optimizer will insert filters before a join between
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
/// filter can add additional overhead when the file format does not fully support
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,11 @@ impl<T> Transformed<T> {
Self::new(data, true, TreeNodeRecursion::Continue)
}

/// Wrapper for transformed data with [`TreeNodeRecursion::Stop`] statement.
pub fn complete(data: T) -> Self {
Self::new(data, true, TreeNodeRecursion::Stop)
}

/// Wrapper for unchanged data with [`TreeNodeRecursion::Continue`] statement.
pub fn no(data: T) -> Self {
Self::new(data, false, TreeNodeRecursion::Continue)
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use datafusion_physical_optimizer::PhysicalOptimizerRule;
use std::sync::Arc;

use datafusion_physical_optimizer::limit_pushdown_past_window::LimitPushPastWindows;
use super::projection_pushdown::ProjectionPushdown;
use super::update_aggr_exprs::OptimizeAggregateOrder;
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
Expand Down Expand Up @@ -92,6 +92,10 @@ impl PhysicalOptimizer {
// into an `order by max(x) limit y`. In this case it will copy the limit value down
// to the aggregation, allowing it to use only y number of accumulators.
Arc::new(TopKAggregation::new()),
// Tries to push limits down through window functions, growing as appropriate
// This can possibly be combined with [LimitPushdown]
// It needs to come after [EnforceSorting]
Arc::new(LimitPushPastWindows::new()),
// The ProjectionPushdown rule tries to push projections towards
// the sources in the execution plan. As a result of this process,
// a projection can disappear if it reaches the source providers, and
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ arrow = { workspace = true }
arrow-schema = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-expr.workspace = true
datafusion-expr-common = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
mod optimizer;
pub mod output_requirements;
pub mod limit_pushdown_past_window;
pub mod topk_aggregation;

pub use optimizer::PhysicalOptimizerRule;
141 changes: 141 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::PhysicalOptimizerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_expr::{WindowFrameBound, WindowFrameUnits};
use datafusion_physical_plan::execution_plan::CardinalityEffect;
use datafusion_physical_plan::limit::GlobalLimitExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::windows::BoundedWindowAggExec;
use datafusion_physical_plan::ExecutionPlan;
use std::cmp;
use std::sync::Arc;

/// This rule inspects [`ExecutionPlan`]'s attempting to find fetch limits that were not pushed
/// down by `LimitPushdown` because [BoundedWindowAggExec]s were "in the way". If the window is
/// bounded by [WindowFrameUnits::Rows] then we calculate the adjustment needed to grow the limit
/// and continue pushdown.
#[derive(Default, Clone, Debug)]
pub struct LimitPushPastWindows;

impl LimitPushPastWindows {
pub fn new() -> Self {
Self
}
}

impl PhysicalOptimizerRule for LimitPushPastWindows {
fn optimize(
&self,
original: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
if !config.optimizer.enable_window_limits {
return Ok(original);
}
let mut latest_limit: Option<usize> = None;
let mut latest_max = 0;
let result = original.transform_down(|node| {
// helper closure to DRY out most the early return cases
let mut reset = |node,
max: &mut usize|
-> datafusion_common::Result<
Transformed<Arc<dyn ExecutionPlan>>,
> {
latest_limit = None;
*max = 0;
Ok(Transformed::no(node))
};

// traversing sides of joins will require more thought
if node.children().len() > 1 {
return reset(node, &mut latest_max);
}

// grab the latest limit we see
if let Some(limit) = node.as_any().downcast_ref::<GlobalLimitExec>() {
latest_limit = limit.fetch().map(|fetch| fetch + limit.skip());
latest_max = 0;
return Ok(Transformed::no(node));
}

// grow the limit if we hit a window function
if let Some(window) = node.as_any().downcast_ref::<BoundedWindowAggExec>() {
for expr in window.window_expr().iter() {
let frame = expr.get_window_frame();
if frame.units != WindowFrameUnits::Rows {
return reset(node, &mut latest_max); // expression-based limits?
}
let Some(end_bound) = bound_to_usize(&frame.end_bound) else {
return reset(node, &mut latest_max);
};
latest_max = cmp::max(end_bound, latest_max);
}
return Ok(Transformed::no(node));
}

// Apply the limit if we hit a sort node
if let Some(sort) = node.as_any().downcast_ref::<SortExec>() {
let latest = latest_limit.take();
let Some(fetch) = latest else {
latest_max = 0;
return Ok(Transformed::no(node));
};
let fetch = match sort.fetch() {
None => fetch + latest_max,
Some(existing) => cmp::min(existing, fetch + latest_max),
};
let sort: Arc<dyn ExecutionPlan> = Arc::new(sort.with_fetch(Some(fetch)));
latest_max = 0;
return Ok(Transformed::complete(sort));
}

// we can't push the limit past nodes that decrease row count
match node.cardinality_effect() {
CardinalityEffect::Equal => {}
_ => return reset(node, &mut latest_max),
}

Ok(Transformed::no(node))
})?;
Ok(result.data)
}

fn name(&self) -> &str {
"LimitPushPastWindows"
}

fn schema_check(&self) -> bool {
false // we don't change the schema
}
}

fn bound_to_usize(bound: &WindowFrameBound) -> Option<usize> {
match bound {
WindowFrameBound::Preceding(_) => Some(0),
WindowFrameBound::CurrentRow => Some(0),
WindowFrameBound::Following(ScalarValue::UInt64(Some(scalar))) => {
Some(*scalar as usize)
}
_ => None,
}
}

// tests: all branches are covered by sqllogictests
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
datafusion.optimizer.enable_window_limits true
datafusion.optimizer.expand_views_at_output false
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
Expand Down Expand Up @@ -321,6 +322,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
Expand Down
Loading
Loading