Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
378 changes: 378 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,378 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::{Arc, LazyLock};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
logical_expr::Operator,
physical_plan::{
expressions::{BinaryExpr, Column, Literal},
PhysicalExpr,
},
scalar::ScalarValue,
};
use datafusion_common::config::ConfigOptions;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning};
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_plan::{
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
coalesce_batches::CoalesceBatchesExec,
filter::FilterExec,
repartition::RepartitionExec,
};

use util::{OptimizationTest, TestNode, TestScanBuilder};

mod util;

#[test]
fn test_pushdown_into_scan() {
let scan = TestScanBuilder::new(schema()).with_support(true).build();
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());

// expect the predicate to be pushed down into the DataSource
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
"
);
}

/// Show that we can use config options to determine how to do pushdown.
#[test]
fn test_pushdown_into_scan_with_config_options() {
let scan = TestScanBuilder::new(schema()).with_support(true).build();
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _;

let mut cfg = ConfigOptions::default();
insta::assert_snapshot!(
OptimizationTest::new(
Arc::clone(&plan),
FilterPushdown {},
false
),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
"
);

cfg.execution.parquet.pushdown_filters = true;
insta::assert_snapshot!(
OptimizationTest::new(
plan,
FilterPushdown {},
true
),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
"
);
}

#[test]
fn test_filter_collapse() {
// filter should be pushed down into the parquet scan with two filters
let scan = TestScanBuilder::new(schema()).with_support(true).build();
let predicate1 = col_lit_predicate("a", "foo", &schema());
let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
let predicate2 = col_lit_predicate("b", "bar", &schema());
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());

insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
- FilterExec: b@1 = bar
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
"
);
}

#[test]
fn test_filter_with_projection() {
let scan = TestScanBuilder::new(schema()).with_support(true).build();
let projection = vec![1, 0];
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(
FilterExec::try_new(predicate, Arc::clone(&scan))
.unwrap()
.with_projection(Some(projection))
.unwrap(),
);

// expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = foo, projection=[b@1, a@0]
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
",
);

// add a test where the filter is on a column that isn't included in the output
let projection = vec![1];
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(
FilterExec::try_new(predicate, scan)
.unwrap()
.with_projection(Some(projection))
.unwrap(),
);
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{},true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = foo, projection=[b@1]
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- ProjectionExec: expr=[b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
"
);
}

#[test]
fn test_push_down_through_transparent_nodes() {
// expect the predicate to be pushed down into the DataSource
let scan = TestScanBuilder::new(schema()).with_support(true).build();
let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1));
let predicate = col_lit_predicate("a", "foo", &schema());
let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
let repartition = Arc::new(
RepartitionExec::try_new(filter, Partitioning::RoundRobinBatch(1)).unwrap(),
);
let predicate = col_lit_predicate("b", "bar", &schema());
let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap());

// expect the predicate to be pushed down into the DataSource
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{},true),
@r"
OptimizationTest:
input:
- FilterExec: b@1 = bar
- RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
- FilterExec: a@0 = foo
- CoalesceBatchesExec: target_batch_size=1
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1
- CoalesceBatchesExec: target_batch_size=1
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
"
);
}

#[test]
fn test_no_pushdown_through_aggregates() {
// There are 2 important points here:
// 1. The outer filter **is not** pushed down at all because we haven't implemented pushdown support
// yet for AggregateExec.
// 2. The inner filter **is** pushed down into the DataSource.
let scan = TestScanBuilder::new(schema()).with_support(true).build();

let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 10));

let filter = Arc::new(
FilterExec::try_new(col_lit_predicate("a", "foo", &schema()), coalesce).unwrap(),
);

let aggregate_expr =
vec![
AggregateExprBuilder::new(count_udaf(), vec![col("a", &schema()).unwrap()])
.schema(schema())
.alias("cnt")
.build()
.map(Arc::new)
.unwrap(),
];
let group_by = PhysicalGroupBy::new_single(vec![
(col("a", &schema()).unwrap(), "a".to_string()),
(col("b", &schema()).unwrap(), "b".to_string()),
]);
let aggregate = Arc::new(
AggregateExec::try_new(
AggregateMode::Final,
group_by,
aggregate_expr.clone(),
vec![None],
filter,
schema(),
)
.unwrap(),
);

let coalesce = Arc::new(CoalesceBatchesExec::new(aggregate, 100));

let predicate = col_lit_predicate("b", "bar", &schema());
let plan = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());

// expect the predicate to be pushed down into the DataSource
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
- FilterExec: b@1 = bar
- CoalesceBatchesExec: target_batch_size=100
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0])
- FilterExec: a@0 = foo
- CoalesceBatchesExec: target_batch_size=10
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- FilterExec: b@1 = bar
- CoalesceBatchesExec: target_batch_size=100
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
- CoalesceBatchesExec: target_batch_size=10
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
"
);
}

/// Test various combinations of handling of child pushdown results
/// in an ExectionPlan in combination with support/not support in a DataSource.
#[test]
fn test_node_handles_child_pushdown_result() {
// If we set `with_support(true)` + `inject_filter = true` then the filter is pushed down to the DataSource
// and no FilterExec is created.
let scan = TestScanBuilder::new(schema()).with_support(true).build();
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
- TestInsertExec { inject_filter: true }
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- TestInsertExec { inject_filter: true }
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
",
);

// If we set `with_support(false)` + `inject_filter = true` then the filter is not pushed down to the DataSource
// and a FilterExec is created.
let scan = TestScanBuilder::new(schema()).with_support(false).build();
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
- TestInsertExec { inject_filter: true }
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
output:
Ok:
- TestInsertExec { inject_filter: false }
- FilterExec: a@0 = foo
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
",
);

// If we set `with_support(false)` + `inject_filter = false` then the filter is not pushed down to the DataSource
// and no FilterExec is created.
let scan = TestScanBuilder::new(schema()).with_support(false).build();
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(TestNode::new(false, Arc::clone(&scan), predicate));
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
- TestInsertExec { inject_filter: false }
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
output:
Ok:
- TestInsertExec { inject_filter: false }
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
",
);
}

/// Schema:
/// a: String
/// b: String
/// c: f64
static TEST_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
let fields = vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
];
Arc::new(Schema::new(fields))
});

fn schema() -> SchemaRef {
Arc::clone(&TEST_SCHEMA)
}

/// Returns a predicate that is a binary expression col = lit
fn col_lit_predicate(
column_name: &str,
scalar_value: impl Into<ScalarValue>,
schema: &Schema,
) -> Arc<dyn PhysicalExpr> {
let scalar_value = scalar_value.into();
Arc::new(BinaryExpr::new(
Arc::new(Column::new_with_schema(column_name, schema).unwrap()),
Operator::Eq,
Arc::new(Literal::new(scalar_value)),
))
}
Loading