Skip to content

Commit 95df9ee

Browse files
Add additional tests for filter pushdown apis (#15955)
* Add additional tests for filter pushdown apis * rename the testing module * move TestNode to util * fmt --------- Co-authored-by: berkaysynnada <[email protected]>
1 parent 555fc2e commit 95df9ee

File tree

5 files changed

+922
-546
lines changed

5 files changed

+922
-546
lines changed
Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::{Arc, LazyLock};
19+
20+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
21+
use datafusion::{
22+
logical_expr::Operator,
23+
physical_plan::{
24+
expressions::{BinaryExpr, Column, Literal},
25+
PhysicalExpr,
26+
},
27+
scalar::ScalarValue,
28+
};
29+
use datafusion_common::config::ConfigOptions;
30+
use datafusion_functions_aggregate::count::count_udaf;
31+
use datafusion_physical_expr::expressions::col;
32+
use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning};
33+
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
34+
use datafusion_physical_plan::{
35+
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
36+
coalesce_batches::CoalesceBatchesExec,
37+
filter::FilterExec,
38+
repartition::RepartitionExec,
39+
};
40+
41+
use util::{OptimizationTest, TestNode, TestScanBuilder};
42+
43+
mod util;
44+
45+
#[test]
46+
fn test_pushdown_into_scan() {
47+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
48+
let predicate = col_lit_predicate("a", "foo", &schema());
49+
let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
50+
51+
// expect the predicate to be pushed down into the DataSource
52+
insta::assert_snapshot!(
53+
OptimizationTest::new(plan, FilterPushdown{}, true),
54+
@r"
55+
OptimizationTest:
56+
input:
57+
- FilterExec: a@0 = foo
58+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
59+
output:
60+
Ok:
61+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
62+
"
63+
);
64+
}
65+
66+
/// Show that we can use config options to determine how to do pushdown.
67+
#[test]
68+
fn test_pushdown_into_scan_with_config_options() {
69+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
70+
let predicate = col_lit_predicate("a", "foo", &schema());
71+
let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _;
72+
73+
let mut cfg = ConfigOptions::default();
74+
insta::assert_snapshot!(
75+
OptimizationTest::new(
76+
Arc::clone(&plan),
77+
FilterPushdown {},
78+
false
79+
),
80+
@r"
81+
OptimizationTest:
82+
input:
83+
- FilterExec: a@0 = foo
84+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
85+
output:
86+
Ok:
87+
- FilterExec: a@0 = foo
88+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
89+
"
90+
);
91+
92+
cfg.execution.parquet.pushdown_filters = true;
93+
insta::assert_snapshot!(
94+
OptimizationTest::new(
95+
plan,
96+
FilterPushdown {},
97+
true
98+
),
99+
@r"
100+
OptimizationTest:
101+
input:
102+
- FilterExec: a@0 = foo
103+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
104+
output:
105+
Ok:
106+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
107+
"
108+
);
109+
}
110+
111+
#[test]
112+
fn test_filter_collapse() {
113+
// filter should be pushed down into the parquet scan with two filters
114+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
115+
let predicate1 = col_lit_predicate("a", "foo", &schema());
116+
let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
117+
let predicate2 = col_lit_predicate("b", "bar", &schema());
118+
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
119+
120+
insta::assert_snapshot!(
121+
OptimizationTest::new(plan, FilterPushdown{}, true),
122+
@r"
123+
OptimizationTest:
124+
input:
125+
- FilterExec: b@1 = bar
126+
- FilterExec: a@0 = foo
127+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
128+
output:
129+
Ok:
130+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
131+
"
132+
);
133+
}
134+
135+
#[test]
136+
fn test_filter_with_projection() {
137+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
138+
let projection = vec![1, 0];
139+
let predicate = col_lit_predicate("a", "foo", &schema());
140+
let plan = Arc::new(
141+
FilterExec::try_new(predicate, Arc::clone(&scan))
142+
.unwrap()
143+
.with_projection(Some(projection))
144+
.unwrap(),
145+
);
146+
147+
// expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
148+
insta::assert_snapshot!(
149+
OptimizationTest::new(plan, FilterPushdown{}, true),
150+
@r"
151+
OptimizationTest:
152+
input:
153+
- FilterExec: a@0 = foo, projection=[b@1, a@0]
154+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
155+
output:
156+
Ok:
157+
- ProjectionExec: expr=[b@1 as b, a@0 as a]
158+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
159+
",
160+
);
161+
162+
// add a test where the filter is on a column that isn't included in the output
163+
let projection = vec![1];
164+
let predicate = col_lit_predicate("a", "foo", &schema());
165+
let plan = Arc::new(
166+
FilterExec::try_new(predicate, scan)
167+
.unwrap()
168+
.with_projection(Some(projection))
169+
.unwrap(),
170+
);
171+
insta::assert_snapshot!(
172+
OptimizationTest::new(plan, FilterPushdown{},true),
173+
@r"
174+
OptimizationTest:
175+
input:
176+
- FilterExec: a@0 = foo, projection=[b@1]
177+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
178+
output:
179+
Ok:
180+
- ProjectionExec: expr=[b@1 as b]
181+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
182+
"
183+
);
184+
}
185+
186+
#[test]
187+
fn test_push_down_through_transparent_nodes() {
188+
// expect the predicate to be pushed down into the DataSource
189+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
190+
let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1));
191+
let predicate = col_lit_predicate("a", "foo", &schema());
192+
let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
193+
let repartition = Arc::new(
194+
RepartitionExec::try_new(filter, Partitioning::RoundRobinBatch(1)).unwrap(),
195+
);
196+
let predicate = col_lit_predicate("b", "bar", &schema());
197+
let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap());
198+
199+
// expect the predicate to be pushed down into the DataSource
200+
insta::assert_snapshot!(
201+
OptimizationTest::new(plan, FilterPushdown{},true),
202+
@r"
203+
OptimizationTest:
204+
input:
205+
- FilterExec: b@1 = bar
206+
- RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
207+
- FilterExec: a@0 = foo
208+
- CoalesceBatchesExec: target_batch_size=1
209+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
210+
output:
211+
Ok:
212+
- RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
213+
- CoalesceBatchesExec: target_batch_size=1
214+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
215+
"
216+
);
217+
}
218+
219+
#[test]
220+
fn test_no_pushdown_through_aggregates() {
221+
// There are 2 important points here:
222+
// 1. The outer filter **is not** pushed down at all because we haven't implemented pushdown support
223+
// yet for AggregateExec.
224+
// 2. The inner filter **is** pushed down into the DataSource.
225+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
226+
227+
let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 10));
228+
229+
let filter = Arc::new(
230+
FilterExec::try_new(col_lit_predicate("a", "foo", &schema()), coalesce).unwrap(),
231+
);
232+
233+
let aggregate_expr =
234+
vec![
235+
AggregateExprBuilder::new(count_udaf(), vec![col("a", &schema()).unwrap()])
236+
.schema(schema())
237+
.alias("cnt")
238+
.build()
239+
.map(Arc::new)
240+
.unwrap(),
241+
];
242+
let group_by = PhysicalGroupBy::new_single(vec![
243+
(col("a", &schema()).unwrap(), "a".to_string()),
244+
(col("b", &schema()).unwrap(), "b".to_string()),
245+
]);
246+
let aggregate = Arc::new(
247+
AggregateExec::try_new(
248+
AggregateMode::Final,
249+
group_by,
250+
aggregate_expr.clone(),
251+
vec![None],
252+
filter,
253+
schema(),
254+
)
255+
.unwrap(),
256+
);
257+
258+
let coalesce = Arc::new(CoalesceBatchesExec::new(aggregate, 100));
259+
260+
let predicate = col_lit_predicate("b", "bar", &schema());
261+
let plan = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
262+
263+
// expect the predicate to be pushed down into the DataSource
264+
insta::assert_snapshot!(
265+
OptimizationTest::new(plan, FilterPushdown{}, true),
266+
@r"
267+
OptimizationTest:
268+
input:
269+
- FilterExec: b@1 = bar
270+
- CoalesceBatchesExec: target_batch_size=100
271+
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0])
272+
- FilterExec: a@0 = foo
273+
- CoalesceBatchesExec: target_batch_size=10
274+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
275+
output:
276+
Ok:
277+
- FilterExec: b@1 = bar
278+
- CoalesceBatchesExec: target_batch_size=100
279+
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
280+
- CoalesceBatchesExec: target_batch_size=10
281+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
282+
"
283+
);
284+
}
285+
286+
/// Test various combinations of handling of child pushdown results
287+
/// in an ExectionPlan in combination with support/not support in a DataSource.
288+
#[test]
289+
fn test_node_handles_child_pushdown_result() {
290+
// If we set `with_support(true)` + `inject_filter = true` then the filter is pushed down to the DataSource
291+
// and no FilterExec is created.
292+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
293+
let predicate = col_lit_predicate("a", "foo", &schema());
294+
let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
295+
insta::assert_snapshot!(
296+
OptimizationTest::new(plan, FilterPushdown{}, true),
297+
@r"
298+
OptimizationTest:
299+
input:
300+
- TestInsertExec { inject_filter: true }
301+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
302+
output:
303+
Ok:
304+
- TestInsertExec { inject_filter: true }
305+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
306+
",
307+
);
308+
309+
// If we set `with_support(false)` + `inject_filter = true` then the filter is not pushed down to the DataSource
310+
// and a FilterExec is created.
311+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
312+
let predicate = col_lit_predicate("a", "foo", &schema());
313+
let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
314+
insta::assert_snapshot!(
315+
OptimizationTest::new(plan, FilterPushdown{}, true),
316+
@r"
317+
OptimizationTest:
318+
input:
319+
- TestInsertExec { inject_filter: true }
320+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
321+
output:
322+
Ok:
323+
- TestInsertExec { inject_filter: false }
324+
- FilterExec: a@0 = foo
325+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
326+
",
327+
);
328+
329+
// If we set `with_support(false)` + `inject_filter = false` then the filter is not pushed down to the DataSource
330+
// and no FilterExec is created.
331+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
332+
let predicate = col_lit_predicate("a", "foo", &schema());
333+
let plan = Arc::new(TestNode::new(false, Arc::clone(&scan), predicate));
334+
insta::assert_snapshot!(
335+
OptimizationTest::new(plan, FilterPushdown{}, true),
336+
@r"
337+
OptimizationTest:
338+
input:
339+
- TestInsertExec { inject_filter: false }
340+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
341+
output:
342+
Ok:
343+
- TestInsertExec { inject_filter: false }
344+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
345+
",
346+
);
347+
}
348+
349+
/// Schema:
350+
/// a: String
351+
/// b: String
352+
/// c: f64
353+
static TEST_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
354+
let fields = vec![
355+
Field::new("a", DataType::Utf8, false),
356+
Field::new("b", DataType::Utf8, false),
357+
Field::new("c", DataType::Float64, false),
358+
];
359+
Arc::new(Schema::new(fields))
360+
});
361+
362+
fn schema() -> SchemaRef {
363+
Arc::clone(&TEST_SCHEMA)
364+
}
365+
366+
/// Returns a predicate that is a binary expression col = lit
367+
fn col_lit_predicate(
368+
column_name: &str,
369+
scalar_value: impl Into<ScalarValue>,
370+
schema: &Schema,
371+
) -> Arc<dyn PhysicalExpr> {
372+
let scalar_value = scalar_value.into();
373+
Arc::new(BinaryExpr::new(
374+
Arc::new(Column::new_with_schema(column_name, schema).unwrap()),
375+
Operator::Eq,
376+
Arc::new(Literal::new(scalar_value)),
377+
))
378+
}

0 commit comments

Comments
 (0)