Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ arrow = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions-window = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is something we have been trying to avoid -- explicit dependencies on functions in the optimizer (this allows faster compilation times)

Since the package is only used in tests, can you please move it into the dev-dependencies section below ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

datafusion-physical-expr = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
Expand Down
300 changes: 296 additions & 4 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,77 @@ impl OptimizerRule for PushDownFilter {
}
})
}
LogicalPlan::Window(window) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding an example here for better understandability, the rule applied here is not straightforward I think

Something like

Before:
Filter: (a > 1) and (b > 1) and (c > 1)
-- Window: func() PARTITION BY [a] ...
                    func() PARTITION BY [a, b] ...

After:
Filter: (b > 1) and (c > 1)
-- Window: func() PARTITION BY [a] ...
                    func() PARTITION BY [a, b] ...
---- Filter (a > 1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@2010YOUY01 You mean as a comment above the new match arm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Retrieve the set of potential partition keys where we can push filters by.
// Unlike aggregations, where there is only one statement per SELECT, there can be
// multiple window functions, each with potentially different partition keys.
// Therefore, we need to ensure that any potential partition key returned is used in
// ALL window functions. Otherwise, filters cannot be pushed by through that column.
let potential_partition_keys = window
.window_expr
.iter()
.map(|e| {
if let Expr::WindowFunction(window_expression) = e {
window_expression
.partition_by
.iter()
.map(|c| {
Column::from_qualified_name(
c.schema_name().to_string(),
)
})
.collect::<HashSet<_>>()
} else {
// window functions expressions are only Expr::WindowFunction
unreachable!()
}
})
// performs the set intersection of the partition keys of all window functions,
// returning only the common ones
.reduce(|a, b| &a & &b)
.unwrap_or_default();

let predicates = split_conjunction_owned(filter.predicate);
let mut keep_predicates = vec![];
let mut push_predicates = vec![];
for expr in predicates {
let cols = expr.column_refs();
if cols.iter().all(|c| potential_partition_keys.contains(c)) {
push_predicates.push(expr);
} else {
keep_predicates.push(expr);
}
}

// Unlike with aggregations, there are no cases where we have to replace, e.g.,
// `a+b` with Column(a)+Column(b). This is because partition expressions are not
// available as standalone columns to the user. For example, while an aggregation on
// `a+b` becomes Column(a + b), in a window partition it becomes
// `func() PARTITION BY [a + b] ...`. Thus, filters on expressions always remain in
// place, so we can use `push_predicates` directly. This is consistent with other
// optimizers, such as the one used by Postgres.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand this comment block correctly:

Filter: a+b > 0
-- Window: func() PARTITION BY [a+b] ...

Filter in theory can be pushed down in this plan, but we are not able to implement it due to engineering issue

(I think it's related to your concern @comphead )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. Unlike with aggregations, the partition column a+b is not projected.
Interestingly enough, Postgres also does not push filters in these cases, so it must follow a similar approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a PR to try and document the output schema more clearly:


let window_input = Arc::clone(&window.input);
Transformed::yes(LogicalPlan::Window(window))
.transform_data(|new_plan| {
// If we have a filter to push, we push it down to the input of the window
if let Some(predicate) = conjunction(push_predicates) {
let new_filter = make_filter(predicate, window_input)?;
insert_below(new_plan, new_filter)
} else {
Ok(Transformed::no(new_plan))
}
})?
.map_data(|child_plan| {
// if there are any remaining predicates we can't push, add them
// back as a filter
if let Some(predicate) = conjunction(keep_predicates) {
make_filter(predicate, Arc::new(child_plan))
} else {
Ok(child_plan)
}
})
}
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
LogicalPlan::TableScan(scan) => {
let filter_predicates = split_conjunction(&filter.predicate);
Expand Down Expand Up @@ -1289,12 +1360,12 @@ mod tests {
use async_trait::async_trait;

use datafusion_common::{DFSchemaRef, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr::{ScalarFunction, WindowFunction};
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
col, in_list, in_subquery, lit, ColumnarValue, Extension, LogicalPlanBuilder,
ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
UserDefinedLogicalNodeCore, Volatility,
col, in_list, in_subquery, lit, ColumnarValue, ExprFunctionExt, Extension,
LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
UserDefinedLogicalNodeCore, Volatility, WindowFunctionDefinition,
};

use crate::optimizer::Optimizer;
Expand Down Expand Up @@ -1442,6 +1513,227 @@ mod tests {
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and 'b', and filtering by 'b', 'b' is pushed
#[test]
fn filter_move_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a"), col("b")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(col("b").gt(lit(10i64)))?
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.b > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and 'b', and filtering by 'a' and 'b', both 'a' and
/// 'b' are pushed
#[test]
fn filter_move_complex_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a"), col("b")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10), test.b = Int64(1)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when partitioning by 'a' and filtering by 'a' and 'b', only 'a' is pushed
#[test]
fn filter_move_partial_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
.build()?;

let expected = "\
Filter: test.b = Int64(1)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that filters on partition expressions are not pushed, as the single expression
/// column is not available to the user, unlike with aggregations
#[test]
fn filter_expression_keep_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![add(col("a"), col("b"))]) // PARTITION BY a + b
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
// unlike with aggregations, single partition column "test.a + test.b" is not available
// to the plan, so we use multiple columns when filtering
.filter(add(col("a"), col("b")).gt(lit(10i64)))?
.build()?;

let expected = "\
Filter: test.a + test.b > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a + test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that filters are not pushed on order by columns (that are not used in partitioning)
#[test]
fn filter_order_keep_window() -> Result<()> {
let table_scan = test_table_scan()?;

let window = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window])?
.filter(col("c").gt(lit(10i64)))?
.build()?;

let expected = "\
Filter: test.c > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when we use multiple window functions with a common partition key, the filter
/// on that key is pushed
#[test]
fn filter_multiple_windows_common_partitions() -> Result<()> {
let table_scan = test_table_scan()?;

let window1 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let window2 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("b"), col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window1, window2])?
.filter(col("a").gt(lit(10i64)))? // a appears in both window functions
.build()?;

let expected = "\
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test, full_filters=[test.a > Int64(10)]";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that when we use multiple window functions with different partitions keys, the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure the rationale for not pushing in this case is that the filtered rows may contribute / affect the values of other partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we try to push the filter of a column that is not used as a partition key in all window functions, it could produce a wrong result set. On the other hand, if the key is used as a partition key in all window functions, then we can safely push it.

/// filter cannot be pushed
#[test]
fn filter_multiple_windows_disjoint_partitions() -> Result<()> {
let table_scan = test_table_scan()?;

let window1 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let window2 = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(
datafusion_functions_window::rank::rank_udwf(),
),
vec![],
))
.partition_by(vec![col("b"), col("a")])
.order_by(vec![col("c").sort(true, true)])
.build()
.unwrap();

let plan = LogicalPlanBuilder::from(table_scan)
.window(vec![window1, window2])?
.filter(col("b").gt(lit(10i64)))? // b only appears in one window function
.build()?;

let expected = "\
Filter: test.b > Int64(10)\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: test";
assert_optimized_plan_eq(plan, expected)
}

/// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
#[test]
fn alias() -> Result<()> {
Expand Down
Loading
Loading