Skip to content

Commit c16b8b7

Browse files
nuno-fariamasonh22
authored andcommitted
feat(optimizer): Enable filter pushdown on window functions (apache#14026)
* feat(optimizer): Enable filter pushdown on window functions Ensures selections can be pushed past window functions similarly to what is already done with aggregations, when possible. * fix: Add missing dependency * minor(optimizer): Use 'datafusion-functions-window' as a dev dependency * docs(optimizer): Add example to filter pushdown on LogicalPlan::Window (cherry picked from commit ad5a04f)
1 parent aec71cf commit c16b8b7

File tree

3 files changed

+681
-4
lines changed

3 files changed

+681
-4
lines changed

datafusion/optimizer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ regex-syntax = "0.8.0"
5353
arrow-buffer = { workspace = true }
5454
ctor = { workspace = true }
5555
datafusion-functions-aggregate = { workspace = true }
56+
datafusion-functions-window = { workspace = true }
5657
datafusion-functions-window-common = { workspace = true }
5758
datafusion-sql = { workspace = true }
5859
env_logger = { workspace = true }

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 306 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,87 @@ impl OptimizerRule for PushDownFilter {
862862
}
863863
})
864864
}
865+
// Tries to push filters based on the partition key(s) of the window function(s) used.
866+
// Example:
867+
// Before:
868+
// Filter: (a > 1) and (b > 1) and (c > 1)
869+
// Window: func() PARTITION BY [a] ...
870+
// ---
871+
// After:
872+
// Filter: (b > 1) and (c > 1)
873+
// Window: func() PARTITION BY [a] ...
874+
// Filter: (a > 1)
875+
LogicalPlan::Window(window) => {
876+
// Retrieve the set of potential partition keys where we can push filters by.
877+
// Unlike aggregations, where there is only one statement per SELECT, there can be
878+
// multiple window functions, each with potentially different partition keys.
879+
// Therefore, we need to ensure that any potential partition key returned is used in
880+
// ALL window functions. Otherwise, filters cannot be pushed by through that column.
881+
let potential_partition_keys = window
882+
.window_expr
883+
.iter()
884+
.map(|e| {
885+
if let Expr::WindowFunction(window_expression) = e {
886+
window_expression
887+
.partition_by
888+
.iter()
889+
.map(|c| {
890+
Column::from_qualified_name(
891+
c.schema_name().to_string(),
892+
)
893+
})
894+
.collect::<HashSet<_>>()
895+
} else {
896+
// window functions expressions are only Expr::WindowFunction
897+
unreachable!()
898+
}
899+
})
900+
// performs the set intersection of the partition keys of all window functions,
901+
// returning only the common ones
902+
.reduce(|a, b| &a & &b)
903+
.unwrap_or_default();
904+
905+
let predicates = split_conjunction_owned(filter.predicate);
906+
let mut keep_predicates = vec![];
907+
let mut push_predicates = vec![];
908+
for expr in predicates {
909+
let cols = expr.column_refs();
910+
if cols.iter().all(|c| potential_partition_keys.contains(c)) {
911+
push_predicates.push(expr);
912+
} else {
913+
keep_predicates.push(expr);
914+
}
915+
}
916+
917+
// Unlike with aggregations, there are no cases where we have to replace, e.g.,
918+
// `a+b` with Column(a)+Column(b). This is because partition expressions are not
919+
// available as standalone columns to the user. For example, while an aggregation on
920+
// `a+b` becomes Column(a + b), in a window partition it becomes
921+
// `func() PARTITION BY [a + b] ...`. Thus, filters on expressions always remain in
922+
// place, so we can use `push_predicates` directly. This is consistent with other
923+
// optimizers, such as the one used by Postgres.
924+
925+
let window_input = Arc::clone(&window.input);
926+
Transformed::yes(LogicalPlan::Window(window))
927+
.transform_data(|new_plan| {
928+
// If we have a filter to push, we push it down to the input of the window
929+
if let Some(predicate) = conjunction(push_predicates) {
930+
let new_filter = make_filter(predicate, window_input)?;
931+
insert_below(new_plan, new_filter)
932+
} else {
933+
Ok(Transformed::no(new_plan))
934+
}
935+
})?
936+
.map_data(|child_plan| {
937+
// if there are any remaining predicates we can't push, add them
938+
// back as a filter
939+
if let Some(predicate) = conjunction(keep_predicates) {
940+
make_filter(predicate, Arc::new(child_plan))
941+
} else {
942+
Ok(child_plan)
943+
}
944+
})
945+
}
865946
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
866947
LogicalPlan::TableScan(scan) => {
867948
let filter_predicates = split_conjunction(&filter.predicate);
@@ -1152,12 +1233,12 @@ mod tests {
11521233
use async_trait::async_trait;
11531234

11541235
use datafusion_common::{DFSchemaRef, ScalarValue};
1155-
use datafusion_expr::expr::ScalarFunction;
1236+
use datafusion_expr::expr::{ScalarFunction, WindowFunction};
11561237
use datafusion_expr::logical_plan::table_scan;
11571238
use datafusion_expr::{
1158-
col, in_list, in_subquery, lit, ColumnarValue, Extension, LogicalPlanBuilder,
1159-
ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
1160-
UserDefinedLogicalNodeCore, Volatility,
1239+
col, in_list, in_subquery, lit, ColumnarValue, ExprFunctionExt, Extension,
1240+
LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
1241+
UserDefinedLogicalNodeCore, Volatility, WindowFunctionDefinition,
11611242
};
11621243

11631244
use crate::optimizer::Optimizer;
@@ -1305,6 +1386,227 @@ mod tests {
13051386
assert_optimized_plan_eq(plan, expected)
13061387
}
13071388

1389+
/// verifies that when partitioning by 'a' and 'b', and filtering by 'b', 'b' is pushed
1390+
#[test]
1391+
fn filter_move_window() -> Result<()> {
1392+
let table_scan = test_table_scan()?;
1393+
1394+
let window = Expr::WindowFunction(WindowFunction::new(
1395+
WindowFunctionDefinition::WindowUDF(
1396+
datafusion_functions_window::rank::rank_udwf(),
1397+
),
1398+
vec![],
1399+
))
1400+
.partition_by(vec![col("a"), col("b")])
1401+
.order_by(vec![col("c").sort(true, true)])
1402+
.build()
1403+
.unwrap();
1404+
1405+
let plan = LogicalPlanBuilder::from(table_scan)
1406+
.window(vec![window])?
1407+
.filter(col("b").gt(lit(10i64)))?
1408+
.build()?;
1409+
1410+
let expected = "\
1411+
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1412+
\n TableScan: test, full_filters=[test.b > Int64(10)]";
1413+
assert_optimized_plan_eq(plan, expected)
1414+
}
1415+
1416+
/// verifies that when partitioning by 'a' and 'b', and filtering by 'a' and 'b', both 'a' and
1417+
/// 'b' are pushed
1418+
#[test]
1419+
fn filter_move_complex_window() -> Result<()> {
1420+
let table_scan = test_table_scan()?;
1421+
1422+
let window = Expr::WindowFunction(WindowFunction::new(
1423+
WindowFunctionDefinition::WindowUDF(
1424+
datafusion_functions_window::rank::rank_udwf(),
1425+
),
1426+
vec![],
1427+
))
1428+
.partition_by(vec![col("a"), col("b")])
1429+
.order_by(vec![col("c").sort(true, true)])
1430+
.build()
1431+
.unwrap();
1432+
1433+
let plan = LogicalPlanBuilder::from(table_scan)
1434+
.window(vec![window])?
1435+
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
1436+
.build()?;
1437+
1438+
let expected = "\
1439+
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1440+
\n TableScan: test, full_filters=[test.a > Int64(10), test.b = Int64(1)]";
1441+
assert_optimized_plan_eq(plan, expected)
1442+
}
1443+
1444+
/// verifies that when partitioning by 'a' and filtering by 'a' and 'b', only 'a' is pushed
1445+
#[test]
1446+
fn filter_move_partial_window() -> Result<()> {
1447+
let table_scan = test_table_scan()?;
1448+
1449+
let window = Expr::WindowFunction(WindowFunction::new(
1450+
WindowFunctionDefinition::WindowUDF(
1451+
datafusion_functions_window::rank::rank_udwf(),
1452+
),
1453+
vec![],
1454+
))
1455+
.partition_by(vec![col("a")])
1456+
.order_by(vec![col("c").sort(true, true)])
1457+
.build()
1458+
.unwrap();
1459+
1460+
let plan = LogicalPlanBuilder::from(table_scan)
1461+
.window(vec![window])?
1462+
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
1463+
.build()?;
1464+
1465+
let expected = "\
1466+
Filter: test.b = Int64(1)\
1467+
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1468+
\n TableScan: test, full_filters=[test.a > Int64(10)]";
1469+
assert_optimized_plan_eq(plan, expected)
1470+
}
1471+
1472+
/// verifies that filters on partition expressions are not pushed, as the single expression
1473+
/// column is not available to the user, unlike with aggregations
1474+
#[test]
1475+
fn filter_expression_keep_window() -> Result<()> {
1476+
let table_scan = test_table_scan()?;
1477+
1478+
let window = Expr::WindowFunction(WindowFunction::new(
1479+
WindowFunctionDefinition::WindowUDF(
1480+
datafusion_functions_window::rank::rank_udwf(),
1481+
),
1482+
vec![],
1483+
))
1484+
.partition_by(vec![add(col("a"), col("b"))]) // PARTITION BY a + b
1485+
.order_by(vec![col("c").sort(true, true)])
1486+
.build()
1487+
.unwrap();
1488+
1489+
let plan = LogicalPlanBuilder::from(table_scan)
1490+
.window(vec![window])?
1491+
// unlike with aggregations, single partition column "test.a + test.b" is not available
1492+
// to the plan, so we use multiple columns when filtering
1493+
.filter(add(col("a"), col("b")).gt(lit(10i64)))?
1494+
.build()?;
1495+
1496+
let expected = "\
1497+
Filter: test.a + test.b > Int64(10)\
1498+
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a + test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1499+
\n TableScan: test";
1500+
assert_optimized_plan_eq(plan, expected)
1501+
}
1502+
1503+
/// verifies that filters are not pushed on order by columns (that are not used in partitioning)
1504+
#[test]
1505+
fn filter_order_keep_window() -> Result<()> {
1506+
let table_scan = test_table_scan()?;
1507+
1508+
let window = Expr::WindowFunction(WindowFunction::new(
1509+
WindowFunctionDefinition::WindowUDF(
1510+
datafusion_functions_window::rank::rank_udwf(),
1511+
),
1512+
vec![],
1513+
))
1514+
.partition_by(vec![col("a")])
1515+
.order_by(vec![col("c").sort(true, true)])
1516+
.build()
1517+
.unwrap();
1518+
1519+
let plan = LogicalPlanBuilder::from(table_scan)
1520+
.window(vec![window])?
1521+
.filter(col("c").gt(lit(10i64)))?
1522+
.build()?;
1523+
1524+
let expected = "\
1525+
Filter: test.c > Int64(10)\
1526+
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1527+
\n TableScan: test";
1528+
assert_optimized_plan_eq(plan, expected)
1529+
}
1530+
1531+
/// verifies that when we use multiple window functions with a common partition key, the filter
1532+
/// on that key is pushed
1533+
#[test]
1534+
fn filter_multiple_windows_common_partitions() -> Result<()> {
1535+
let table_scan = test_table_scan()?;
1536+
1537+
let window1 = Expr::WindowFunction(WindowFunction::new(
1538+
WindowFunctionDefinition::WindowUDF(
1539+
datafusion_functions_window::rank::rank_udwf(),
1540+
),
1541+
vec![],
1542+
))
1543+
.partition_by(vec![col("a")])
1544+
.order_by(vec![col("c").sort(true, true)])
1545+
.build()
1546+
.unwrap();
1547+
1548+
let window2 = Expr::WindowFunction(WindowFunction::new(
1549+
WindowFunctionDefinition::WindowUDF(
1550+
datafusion_functions_window::rank::rank_udwf(),
1551+
),
1552+
vec![],
1553+
))
1554+
.partition_by(vec![col("b"), col("a")])
1555+
.order_by(vec![col("c").sort(true, true)])
1556+
.build()
1557+
.unwrap();
1558+
1559+
let plan = LogicalPlanBuilder::from(table_scan)
1560+
.window(vec![window1, window2])?
1561+
.filter(col("a").gt(lit(10i64)))? // a appears in both window functions
1562+
.build()?;
1563+
1564+
let expected = "\
1565+
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1566+
\n TableScan: test, full_filters=[test.a > Int64(10)]";
1567+
assert_optimized_plan_eq(plan, expected)
1568+
}
1569+
1570+
/// verifies that when we use multiple window functions with different partitions keys, the
1571+
/// filter cannot be pushed
1572+
#[test]
1573+
fn filter_multiple_windows_disjoint_partitions() -> Result<()> {
1574+
let table_scan = test_table_scan()?;
1575+
1576+
let window1 = Expr::WindowFunction(WindowFunction::new(
1577+
WindowFunctionDefinition::WindowUDF(
1578+
datafusion_functions_window::rank::rank_udwf(),
1579+
),
1580+
vec![],
1581+
))
1582+
.partition_by(vec![col("a")])
1583+
.order_by(vec![col("c").sort(true, true)])
1584+
.build()
1585+
.unwrap();
1586+
1587+
let window2 = Expr::WindowFunction(WindowFunction::new(
1588+
WindowFunctionDefinition::WindowUDF(
1589+
datafusion_functions_window::rank::rank_udwf(),
1590+
),
1591+
vec![],
1592+
))
1593+
.partition_by(vec![col("b"), col("a")])
1594+
.order_by(vec![col("c").sort(true, true)])
1595+
.build()
1596+
.unwrap();
1597+
1598+
let plan = LogicalPlanBuilder::from(table_scan)
1599+
.window(vec![window1, window2])?
1600+
.filter(col("b").gt(lit(10i64)))? // b only appears in one window function
1601+
.build()?;
1602+
1603+
let expected = "\
1604+
Filter: test.b > Int64(10)\
1605+
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1606+
\n TableScan: test";
1607+
assert_optimized_plan_eq(plan, expected)
1608+
}
1609+
13081610
/// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
13091611
#[test]
13101612
fn alias() -> Result<()> {

0 commit comments

Comments
 (0)