Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ regex-syntax = "0.8.0"
async-trait = { workspace = true }
ctor = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-sql = { workspace = true }
env_logger = { workspace = true }
310 changes: 306 additions & 4 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,87 @@ impl OptimizerRule for PushDownFilter {
}
})
}
// Tries to push filters based on the partition key(s) of the window function(s) used.
// Example:
// Before:
// Filter: (a > 1) and (b > 1) and (c > 1)
// Window: func() PARTITION BY [a] ...
// ---
// After:
// Filter: (b > 1) and (c > 1)
// Window: func() PARTITION BY [a] ...
// Filter: (a > 1)
LogicalPlan::Window(window) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding an example here for better understandability, the rule applied here is not straightforward I think

Something like

Before:
Filter: (a > 1) and (b > 1) and (c > 1)
-- Window: func() PARTITION BY [a] ...
                    func() PARTITION BY [a, b] ...

After:
Filter: (b > 1) and (c > 1)
-- Window: func() PARTITION BY [a] ...
                    func() PARTITION BY [a, b] ...
---- Filter (a > 1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@2010YOUY01 You mean as a comment above the new match arm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Retrieve the set of potential partition keys where we can push filters by.
// Unlike aggregations, where there is only one statement per SELECT, there can be
// multiple window functions, each with potentially different partition keys.
// Therefore, we need to ensure that any potential partition key returned is used in
// ALL window functions. Otherwise, filters cannot be pushed by through that column.
let potential_partition_keys = window
.window_expr
.iter()
.map(|e| {
if let Expr::WindowFunction(window_expression) = e {
window_expression
.partition_by
.iter()
.map(|c| {
Column::from_qualified_name(
c.schema_name().to_string(),
)
})
.collect::<HashSet<_>>()
} else {
// window functions expressions are only Expr::WindowFunction
unreachable!()
}
})
// performs the set intersection of the partition keys of all window functions,
// returning only the common ones
.reduce(|a, b| &a & &b)
.unwrap_or_default();

let predicates = split_conjunction_owned(filter.predicate);
let mut keep_predicates = vec![];
let mut push_predicates = vec![];
for expr in predicates {
let cols = expr.column_refs();
if cols.iter().all(|c| potential_partition_keys.contains(c)) {
push_predicates.push(expr);
} else {
keep_predicates.push(expr);
}
}

// Unlike with aggregations, there are no cases where we have to replace, e.g.,
// `a+b` with Column(a)+Column(b). This is because partition expressions are not
// available as standalone columns to the user. For example, while an aggregation on
// `a+b` becomes Column(a + b), in a window partition it becomes
// `func() PARTITION BY [a + b] ...`. Thus, filters on expressions always remain in
// place, so we can use `push_predicates` directly. This is consistent with other
// optimizers, such as the one used by Postgres.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand this comment block correctly:

Filter: a+b > 0
-- Window: func() PARTITION BY [a+b] ...

Filter in theory can be pushed down in this plan, but we are not able to implement it due to engineering issue

(I think it's related to your concern @comphead )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. Unlike with aggregations, the partition column a+b is not projected.
Interestingly enough, Postgres also does not push filters in these cases, so it must follow a similar approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a PR to try and document the output schema more clearly:


let window_input = Arc::clone(&window.input);
Transformed::yes(LogicalPlan::Window(window))
.transform_data(|new_plan| {
// If we have a filter to push, we push it down to the input of the window
if let Some(predicate) = conjunction(push_predicates) {
let new_filter = make_filter(predicate, window_input)?;
insert_below(new_plan, new_filter)
} else {
Ok(Transformed::no(new_plan))
}
})?
.map_data(|child_plan| {
// if there are any remaining predicates we can't push, add them
// back as a filter
if let Some(predicate) = conjunction(keep_predicates) {
make_filter(predicate, Arc::new(child_plan))
} else {
Ok(child_plan)
}
})
}
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
LogicalPlan::TableScan(scan) => {
let filter_predicates = split_conjunction(&filter.predicate);
Expand Down Expand Up @@ -1289,12 +1370,12 @@ mod tests {
use async_trait::async_trait;

use datafusion_common::{DFSchemaRef, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr::{ScalarFunction, WindowFunction};
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
col, in_list, in_subquery, lit, ColumnarValue, Extension, LogicalPlanBuilder,
ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
UserDefinedLogicalNodeCore, Volatility,
col, in_list, in_subquery, lit, ColumnarValue, ExprFunctionExt, Extension,
LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
UserDefinedLogicalNodeCore, Volatility, WindowFunctionDefinition,
};

use crate::optimizer::Optimizer;
Expand Down Expand Up @@ -1442,6 +1523,227 @@ mod tests {
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and 'b', and filtering by 'b', 'b' is pushed
#[test]
fn filter_move_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a"), col("b")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(col("b").gt(lit(10i64)))?
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.b > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and 'b', and filtering by 'a' and 'b', both 'a' and
/// 'b' are pushed
#[test]
fn filter_move_complex_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a"), col("b")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10), test.b = Int64(1)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and filtering by 'a' and 'b', only 'a' is pushed
#[test]
fn filter_move_partial_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
.build()?;

let expected = "\
Filter: test.b = Int64(1)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that filters on partition expressions are not pushed, as the single expression
/// column is not available to the user, unlike with aggregations
#[test]
fn filter_expression_keep_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![add(col("a"), col("b"))]) // PARTITION BY a + b
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
// unlike with aggregations, single partition column "test.a + test.b" is not available
// to the plan, so we use multiple columns when filtering
.filter(add(col("a"), col("b")).gt(lit(10i64)))?
.build()?;

let expected = "\
Filter: test.a + test.b > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a + test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that filters are not pushed on order by columns (that are not used in partitioning)
#[test]
fn filter_order_keep_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(col("c").gt(lit(10i64)))?
.build()?;

let expected = "\
Filter: test.c > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when we use multiple window functions with a common partition key, the filter
/// on that key is pushed
#[test]
fn filter_multiple_windows_common_partitions() -> Result<()> {
let table_scan = test_table_scan()?;

let window1 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let window2 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("b"), col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window1, window2])?
.filter(col("a").gt(lit(10i64)))? // a appears in both window functions
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when we use multiple window functions with different partitions keys, the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure the rationale for not pushing in this case is that the filtered rows may contribute / affect the values of other partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we try to push the filter of a column that is not used as a partition key in all window functions, it could produce a wrong result set. On the other hand, if the key is used as a partition key in all window functions, then we can safely push it.

/// filter cannot be pushed
#[test]
fn filter_multiple_windows_disjoint_partitions() -> Result<()> {
let table_scan = test_table_scan()?;

let window1 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let window2 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("b"), col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window1, window2])?
.filter(col("b").gt(lit(10i64)))? // b only appears in one window function
.build()?;

let expected = "\
Filter: test.b > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
#[test]
fn alias() -> Result<()> {
Expand Down
Loading
Loading