diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 7d5d07715eebc..6643e7fd59b7a 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -25,6 +25,7 @@ mod join_selection; mod limit_pushdown; mod limited_distinct_aggregation; mod projection_pushdown; +mod push_down_filter; mod replace_with_order_preserving_variants; mod sanity_checker; mod test_utils; diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs new file mode 100644 index 0000000000000..b19144f1bcffe --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -0,0 +1,542 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::{Arc, OnceLock}; +use std::{ + any::Any, + fmt::{Display, Formatter}, +}; + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, conjunction, Partitioning, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::push_down_filter::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{ + filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, + FilterPushdownSupport, +}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, + filter::FilterExec, + repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ + displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; + +use object_store::ObjectStore; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { + support: bool, + predicate: Option>, + statistics: Option, +} + +impl TestSource { + fn new(support: bool) -> Self { + Self { + support, + predicate: None, + statistics: None, + } + } +} + +impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Arc { + todo!("should not be called") + } + + fn as_any(&self) -> &dyn Any { + todo!("should not be called") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc { + todo!("should not be called") + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc { + todo!("should not be called") + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc { + todo!("should not be called") + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + Arc::new(TestSource { + statistics: Some(statistics), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + todo!("should not be called") + } + + fn statistics(&self) -> Result { + Ok(self + .statistics + .as_ref() + .expect("statistics not set") + .clone()) + } + + fn file_type(&self) -> &str { + "test" + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let support = format!(", pushdown_supported={}", self.support); + + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + write!(f, "{}{}", support, predicate_string) + } + DisplayFormatType::TreeRender => { + if let Some(predicate) = &self.predicate { + writeln!(f, "pushdown_supported={}", fmt_sql(predicate.as_ref()))?; + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + } + Ok(()) + } + } + } + + fn try_pushdown_filters( + &self, + mut fd: FilterDescription, + config: &ConfigOptions, + ) -> Result>> { + if self.support && config.execution.parquet.pushdown_filters { + if let Some(internal) = self.predicate.as_ref() { + fd.filters.push(Arc::clone(internal)); + } + let all_filters = fd.take_description(); + + Ok(FilterPushdownResult { + support: FilterPushdownSupport::Supported { + child_descriptions: vec![], + op: Arc::new(TestSource { + support: true, + predicate: Some(conjunction(all_filters)), + statistics: self.statistics.clone(), // should be updated in reality + }), + revisit: false, + }, + remaining_description: FilterDescription::empty(), + }) + } else { + Ok(filter_pushdown_not_supported(fd)) + } + } +} + +fn test_scan(support: bool) -> Arc { + let schema = schema(); + let source = Arc::new(TestSource::new(support)); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test://").unwrap(), + Arc::clone(schema), + source, + ) + .build(); + DataSourceExec::from_data_source(base_config) +} + +#[test] +fn test_pushdown_into_scan() { + let scan = test_scan(true); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}, true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +/// Show that we can use config options to determine how to do pushdown. +#[test] +fn test_pushdown_into_scan_with_config_options() { + let scan = test_scan(true); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _; + + let mut cfg = ConfigOptions::default(); + insta::assert_snapshot!( + OptimizationTest::new( + Arc::clone(&plan), + PushdownFilter {}, + false + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + " + ); + + cfg.execution.parquet.pushdown_filters = true; + insta::assert_snapshot!( + OptimizationTest::new( + plan, + PushdownFilter {}, + true + ), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_filter_collapse() { + // filter should be pushed down into the parquet scan with two filters + let scan = test_scan(true); + let predicate1 = col_lit_predicate("a", "foo", schema()); + let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap()); + let predicate2 = col_lit_predicate("b", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}, true), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo + " + ); +} + +#[test] +fn test_filter_with_projection() { + let scan = test_scan(true); + let projection = vec![1, 0]; + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new( + FilterExec::try_new(predicate, Arc::clone(&scan)) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + + // expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}, true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - ProjectionExec: expr=[b@1 as b, a@0 as a] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + ", + ); + + // add a test where the filter is on a column that isn't included in the output + let projection = vec![1]; + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new( + FilterExec::try_new(predicate, scan) + .unwrap() + .with_projection(Some(projection)) + .unwrap(), + ); + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{},true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo, projection=[b@1] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - ProjectionExec: expr=[b@1 as b] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_push_down_through_transparent_nodes() { + // expect the predicate to be pushed down into the DataSource + let scan = test_scan(true); + let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1)); + let predicate = col_lit_predicate("a", "foo", schema()); + let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + let repartition = Arc::new( + RepartitionExec::try_new(filter, Partitioning::RoundRobinBatch(1)).unwrap(), + ); + let predicate = col_lit_predicate("b", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{},true), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=0 + - FilterExec: a@0 = foo + - CoalesceBatchesExec: target_batch_size=1 + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=0 + - CoalesceBatchesExec: target_batch_size=1 + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo + " + ); +} + +#[test] +fn test_no_pushdown_through_aggregates() { + // There are 2 important points here: + // 1. The outer filter **is not** pushed down at all because we haven't implemented pushdown support + // yet for AggregateExec. + // 2. The inner filter **is** pushed down into the DataSource. + let scan = test_scan(true); + + let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 10)); + + let filter = Arc::new( + FilterExec::try_new(col_lit_predicate("a", "foo", schema()), coalesce).unwrap(), + ); + + let aggregate_expr = + vec![ + AggregateExprBuilder::new(count_udaf(), vec![col("a", schema()).unwrap()]) + .schema(Arc::clone(schema())) + .alias("cnt") + .build() + .map(Arc::new) + .unwrap(), + ]; + let group_by = PhysicalGroupBy::new_single(vec![ + (col("a", schema()).unwrap(), "a".to_string()), + (col("b", schema()).unwrap(), "b".to_string()), + ]); + let aggregate = Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + group_by, + aggregate_expr.clone(), + vec![None], + filter, + Arc::clone(schema()), + ) + .unwrap(), + ); + + let coalesce = Arc::new(CoalesceBatchesExec::new(aggregate, 100)); + + let predicate = col_lit_predicate("b", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}, true), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - CoalesceBatchesExec: target_batch_size=100 + - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) + - FilterExec: a@0 = foo + - CoalesceBatchesExec: target_batch_size=10 + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: b@1 = bar + - CoalesceBatchesExec: target_batch_size=100 + - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt] + - CoalesceBatchesExec: target_batch_size=10 + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +/// Schema: +/// a: String +/// b: String +/// c: f64 +static TEST_SCHEMA: OnceLock = OnceLock::new(); + +fn schema() -> &'static SchemaRef { + TEST_SCHEMA.get_or_init(|| { + let fields = vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ]; + Arc::new(Schema::new(fields)) + }) +} + +/// Returns a predicate that is a binary expression col = lit +fn col_lit_predicate( + column_name: &str, + scalar_value: impl Into, + schema: &Schema, +) -> Arc { + let scalar_value = scalar_value.into(); + Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema(column_name, schema).unwrap()), + Operator::Eq, + Arc::new(Literal::new(scalar_value)), + )) +} + +/// A harness for testing physical optimizers. +/// +/// You can use this to test the output of a physical optimizer rule using insta snapshots +#[derive(Debug)] +pub struct OptimizationTest { + input: Vec, + output: Result, String>, +} + +impl OptimizationTest { + pub fn new( + input_plan: Arc, + opt: O, + allow_pushdown_filters: bool, + ) -> Self + where + O: PhysicalOptimizerRule, + { + let mut parquet_pushdown_config = ConfigOptions::default(); + parquet_pushdown_config.execution.parquet.pushdown_filters = + allow_pushdown_filters; + + let input = format_execution_plan(&input_plan); + let input_schema = input_plan.schema(); + + let output_result = opt.optimize(input_plan, &parquet_pushdown_config); + let output = output_result + .and_then(|plan| { + if opt.schema_check() && (plan.schema() != input_schema) { + internal_err!( + "Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}", + input_schema, + plan.schema() + ) + } else { + Ok(plan) + } + }) + .map(|plan| format_execution_plan(&plan)) + .map_err(|e| e.to_string()); + + Self { input, output } + } +} + +impl Display for OptimizationTest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "OptimizationTest:")?; + writeln!(f, " input:")?; + for line in &self.input { + writeln!(f, " - {line}")?; + } + writeln!(f, " output:")?; + match &self.output { + Ok(output) => { + writeln!(f, " Ok:")?; + for line in output { + writeln!(f, " - {line}")?; + } + } + Err(err) => { + writeln!(f, " Err: {err}")?; + } + } + Ok(()) + } +} + +pub fn format_execution_plan(plan: &Arc) -> Vec { + format_lines(&displayable(plan.as_ref()).indent(false).to_string()) +} + +fn format_lines(s: &str) -> Vec { + s.trim().split('\n').map(|s| s.to_string()).collect() +} diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 0066f39801a1b..835285b21e38a 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -26,8 +26,12 @@ use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; use arrow::datatypes::SchemaRef; -use datafusion_common::Statistics; +use datafusion_common::config::ConfigOptions; +use datafusion_common::{Result, Statistics}; use datafusion_physical_expr::LexOrdering; +use datafusion_physical_plan::filter_pushdown::{ + filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, +}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -57,7 +61,7 @@ pub trait FileSource: Send + Sync { /// Return execution plan metrics fn metrics(&self) -> &ExecutionPlanMetricsSet; /// Return projected statistics - fn statistics(&self) -> datafusion_common::Result; + fn statistics(&self) -> Result; /// String representation of file source such as "csv", "json", "parquet" fn file_type(&self) -> &str; /// Format FileType specific information @@ -75,7 +79,7 @@ pub trait FileSource: Send + Sync { repartition_file_min_size: usize, output_ordering: Option, config: &FileScanConfig, - ) -> datafusion_common::Result> { + ) -> Result> { if config.file_compression_type.is_compressed() || config.new_lines_in_values { return Ok(None); } @@ -93,4 +97,16 @@ pub trait FileSource: Send + Sync { } Ok(None) } + + /// Try to push down filters into this FileSource. + /// See [`ExecutionPlan::try_pushdown_filters`] for more details. + /// + /// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters + fn try_pushdown_filters( + &self, + fd: FilterDescription, + _config: &ConfigOptions, + ) -> Result>> { + Ok(filter_pushdown_not_supported(fd)) + } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 19482eb2ccc69..761d2b655878f 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -23,6 +23,16 @@ use std::{ fmt::Result as FmtResult, marker::PhantomData, sync::Arc, }; +use crate::file_groups::FileGroup; +use crate::{ + display::FileGroupsDisplay, + file::FileSource, + file_compression_type::FileCompressionType, + file_stream::FileStream, + source::{DataSource, DataSourceExec}, + statistics::MinMaxStatistics, + PartitionedFile, +}; use arrow::{ array::{ ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch, @@ -31,7 +41,9 @@ use arrow::{ buffer::Buffer, datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}, }; -use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics}; +use datafusion_common::{ + config::ConfigOptions, exec_err, ColumnStatistics, Constraints, Result, Statistics, +}; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, @@ -40,6 +52,10 @@ use datafusion_physical_expr::{ expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr, }; +use datafusion_physical_plan::filter_pushdown::{ + filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, + FilterPushdownSupport, +}; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, metrics::ExecutionPlanMetricsSet, @@ -48,17 +64,6 @@ use datafusion_physical_plan::{ }; use log::{debug, warn}; -use crate::file_groups::FileGroup; -use crate::{ - display::FileGroupsDisplay, - file::FileSource, - file_compression_type::FileCompressionType, - file_stream::FileStream, - source::{DataSource, DataSourceExec}, - statistics::MinMaxStatistics, - PartitionedFile, -}; - /// The base configurations for a [`DataSourceExec`], the a physical plan for /// any given file format. /// @@ -587,6 +592,46 @@ impl DataSource for FileScanConfig { ) as _ })) } + + fn try_pushdown_filters( + &self, + fd: FilterDescription, + config: &ConfigOptions, + ) -> Result>> { + let FilterPushdownResult { + support, + remaining_description, + } = self.file_source.try_pushdown_filters(fd, config)?; + + match support { + FilterPushdownSupport::Supported { + child_descriptions, + op, + revisit, + } => { + let new_data_source = Arc::new( + FileScanConfigBuilder::from(self.clone()) + .with_source(op) + .build(), + ); + + debug_assert!(child_descriptions.is_empty()); + debug_assert!(!revisit); + + Ok(FilterPushdownResult { + support: FilterPushdownSupport::Supported { + child_descriptions, + op: new_data_source, + revisit, + }, + remaining_description, + }) + } + FilterPushdownSupport::NotSupported => { + Ok(filter_pushdown_not_supported(remaining_description)) + } + } + } } impl FileScanConfig { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 6c9122ce1ac10..2d6ea1a8b3915 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -31,10 +31,14 @@ use datafusion_physical_plan::{ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; -use datafusion_common::{Constraints, Statistics}; +use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::filter_pushdown::{ + filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, + FilterPushdownSupport, +}; /// Common behaviors in Data Sources for both from Files and Memory. /// @@ -51,7 +55,7 @@ pub trait DataSource: Send + Sync + Debug { &self, partition: usize, context: Arc, - ) -> datafusion_common::Result; + ) -> Result; fn as_any(&self) -> &dyn Any; /// Format this source for display in explain plans fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; @@ -62,13 +66,13 @@ pub trait DataSource: Send + Sync + Debug { _target_partitions: usize, _repartition_file_min_size: usize, _output_ordering: Option, - ) -> datafusion_common::Result>> { + ) -> Result>> { Ok(None) } fn output_partitioning(&self) -> Partitioning; fn eq_properties(&self) -> EquivalenceProperties; - fn statistics(&self) -> datafusion_common::Result; + fn statistics(&self) -> Result; /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; fn fetch(&self) -> Option; @@ -78,7 +82,16 @@ pub trait DataSource: Send + Sync + Debug { fn try_swapping_with_projection( &self, _projection: &ProjectionExec, - ) -> datafusion_common::Result>>; + ) -> Result>>; + /// Try to push down filters into this DataSource. + /// See [`ExecutionPlan::try_pushdown_filters`] for more details. + fn try_pushdown_filters( + &self, + fd: FilterDescription, + _config: &ConfigOptions, + ) -> Result>> { + Ok(filter_pushdown_not_supported(fd)) + } } /// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET @@ -131,7 +144,7 @@ impl ExecutionPlan for DataSourceExec { fn with_new_children( self: Arc, _: Vec>, - ) -> datafusion_common::Result> { + ) -> Result> { Ok(self) } @@ -139,7 +152,7 @@ impl ExecutionPlan for DataSourceExec { &self, target_partitions: usize, config: &ConfigOptions, - ) -> datafusion_common::Result>> { + ) -> Result>> { let data_source = self.data_source.repartitioned( target_partitions, config.optimizer.repartition_file_min_size, @@ -163,7 +176,7 @@ impl ExecutionPlan for DataSourceExec { &self, partition: usize, context: Arc, - ) -> datafusion_common::Result { + ) -> Result { self.data_source.open(partition, context) } @@ -171,7 +184,7 @@ impl ExecutionPlan for DataSourceExec { Some(self.data_source.metrics().clone_inner()) } - fn statistics(&self) -> datafusion_common::Result { + fn statistics(&self) -> Result { self.data_source.statistics() } @@ -189,9 +202,45 @@ impl ExecutionPlan for DataSourceExec { fn try_swapping_with_projection( &self, projection: &ProjectionExec, - ) -> datafusion_common::Result>> { + ) -> Result>> { self.data_source.try_swapping_with_projection(projection) } + + fn try_pushdown_filters( + &self, + fd: FilterDescription, + config: &ConfigOptions, + ) -> Result>> { + let FilterPushdownResult { + support, + remaining_description, + } = self.data_source.try_pushdown_filters(fd, config)?; + + match support { + FilterPushdownSupport::Supported { + child_descriptions, + op, + revisit, + } => { + let new_exec = Arc::new(DataSourceExec::new(op)); + + debug_assert!(child_descriptions.is_empty()); + debug_assert!(!revisit); + + Ok(FilterPushdownResult { + support: FilterPushdownSupport::Supported { + child_descriptions, + op: new_exec, + revisit, + }, + remaining_description, + }) + } + FilterPushdownSupport::NotSupported => { + Ok(filter_pushdown_not_supported(remaining_description)) + } + } + } } impl DataSourceExec { @@ -254,3 +303,13 @@ impl DataSourceExec { }) } } + +/// Create a new `DataSourceExec` from a `DataSource` +impl From for DataSourceExec +where + S: DataSource + 'static, +{ + fn from(source: S) -> Self { + Self::new(Arc::new(source)) + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 35503f3b0b5f9..57dac21b6eeed 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -36,6 +36,7 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub mod pruning; +pub mod push_down_filter; pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index bab31150e2508..d4ff7d6b9e153 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -30,6 +30,7 @@ use crate::limit_pushdown::LimitPushdown; use crate::limited_distinct_aggregation::LimitedDistinctAggregation; use crate::output_requirements::OutputRequirements; use crate::projection_pushdown::ProjectionPushdown; +use crate::push_down_filter::PushdownFilter; use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; @@ -121,6 +122,10 @@ impl PhysicalOptimizer { // into an `order by max(x) limit y`. In this case it will copy the limit value down // to the aggregation, allowing it to use only y number of accumulators. Arc::new(TopKAggregation::new()), + // The FilterPushdown rule tries to push down filters as far as it can. + // For example, it will push down filtering from a `FilterExec` to + // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. + Arc::new(PushdownFilter::new()), // The LimitPushdown rule tries to push limits down as far as possible, // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs b/datafusion/physical-optimizer/src/push_down_filter.rs new file mode 100644 index 0000000000000..80201454d06d4 --- /dev/null +++ b/datafusion/physical-optimizer/src/push_down_filter.rs @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_physical_expr::conjunction; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::filter_pushdown::{ + FilterDescription, FilterPushdownResult, FilterPushdownSupport, +}; +use datafusion_physical_plan::tree_node::PlanContext; +use datafusion_physical_plan::ExecutionPlan; + +/// Attempts to recursively push given filters from the top of the tree into leafs. +/// +/// # Default Implementation +/// +/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a no-op +/// that assumes that: +/// +/// * Parent filters can't be passed onto children. +/// * This node has no filters to contribute. +/// +/// # Example: Push filter into a `DataSourceExec` +/// +/// For example, consider the following plan: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [ id=1] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node. +/// +/// If this filter is selective pushing it into the scan can avoid massive +/// amounts of data being read from the source (the projection is `*` so all +/// matching columns are read). +/// +/// The new plan looks like: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [ id=1] │ +/// └──────────────────────┘ +/// ``` +/// +/// # Example: Push filters with `ProjectionExec` +/// +/// Let's consider a more complex example involving a [`ProjectionExec`] +/// node in between the [`FilterExec`] and `DataSourceExec` nodes that +/// creates a new column that the filter depends on. +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = │ +/// │ [cost>50,id=1] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ cost = price * 1.2 │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// We want to push down the filters `[id=1]` to the `DataSourceExec` node, +/// but can't push down `cost>50` because it requires the [`ProjectionExec`] +/// node to be executed first. A simple thing to do would be to split up the +/// filter into two separate filters and push down the first one: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = │ +/// │ [cost>50] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ cost = price * 1.2 │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [ id=1] │ +/// └──────────────────────┘ +/// ``` +/// +/// We can actually however do better by pushing down `price * 1.2 > 50` +/// instead of `cost > 50`: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ cost = price * 1.2 │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [id=1, │ +/// │ price * 1.2 > 50] │ +/// └──────────────────────┘ +/// ``` +/// +/// # Example: Push filters within a subtree +/// +/// There are also cases where we may be able to push down filters within a +/// subtree but not the entire tree. A good example of this is aggregation +/// nodes: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [sum > 10] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌───────────────────────┐ +/// │ AggregateExec │ +/// │ group by = [id] │ +/// │ aggregate = │ +/// │ [sum(price)] │ +/// └───────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [id=1] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// The transformation here is to push down the `id=1` filter to the +/// `DataSourceExec` node: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [sum > 10] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌───────────────────────┐ +/// │ AggregateExec │ +/// │ group by = [id] │ +/// │ aggregate = │ +/// │ [sum(price)] │ +/// └───────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [id=1] │ +/// └──────────────────────┘ +/// ``` +/// +/// The point here is that: +/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node. +/// Any filters above the [`AggregateExec`] node are not pushed down. +/// This is determined by calling [`ExecutionPlan::try_pushdown_filters`] on the [`AggregateExec`] node. +/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push +/// down the `id=1` filter. +/// +/// # Example: Push filters through Joins +/// +/// It is also possible to push down filters through joins and filters that +/// originate from joins. For example, a hash join where we build a hash +/// table of the left side and probe the right side (ignoring why we would +/// choose this order, typically it depends on the size of each table, +/// etc.). +/// +/// ```text +/// ┌─────────────────────┐ +/// │ FilterExec │ +/// │ filters = │ +/// │ [d.size > 100] │ +/// └─────────────────────┘ +/// │ +/// │ +/// ┌──────────▼──────────┐ +/// │ │ +/// │ HashJoinExec │ +/// │ [u.dept@hash(d.id)] │ +/// │ │ +/// └─────────────────────┘ +/// │ +/// ┌────────────┴────────────┐ +/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ +/// │ DataSourceExec │ │ DataSourceExec │ +/// │ alias [users as u] │ │ alias [dept as d] │ +/// │ │ │ │ +/// └─────────────────────┘ └─────────────────────┘ +/// ``` +/// +/// There are two pushdowns we can do here: +/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec` +/// node for the `departments` table. +/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading +/// rows from the `users` table that will be eliminated by the join. +/// This can be done via a bloom filter or similar and is not (yet) supported +/// in DataFusion. See . +/// +/// ```text +/// ┌─────────────────────┐ +/// │ │ +/// │ HashJoinExec │ +/// │ [u.dept@hash(d.id)] │ +/// │ │ +/// └─────────────────────┘ +/// │ +/// ┌────────────┴────────────┐ +/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ +/// │ DataSourceExec │ │ DataSourceExec │ +/// │ alias [users as u] │ │ alias [dept as d] │ +/// │ filters = │ │ filters = │ +/// │ [depg@hash(d.id)] │ │ [ d.size > 100] │ +/// └─────────────────────┘ └─────────────────────┘ +/// ``` +/// +/// You may notice in this case that the filter is *dynamic*: the hash table +/// is built _after_ the `departments` table is read and at runtime. We +/// don't have a concrete `InList` filter or similar to push down at +/// optimization time. These sorts of dynamic filters are handled by +/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime +/// and internally maintains a reference to the hash table or other state. +/// +/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`] +/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter. +/// For a join this could mean converting it to an `InList` filter or a min/max filter for example. +/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details. +/// +/// # Example: Push TopK filters into Scans +/// +/// Another form of dynamic filter is pushing down the state of a `TopK` +/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ TopK │ +/// │ limit = 10 │ +/// │ order by = [id] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// We can avoid large amounts of data processing by transforming this into: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ TopK │ +/// │ limit = 10 │ +/// │ order by = [id] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = │ +/// │ [id < @ TopKHeap] │ +/// └──────────────────────┘ +/// ``` +/// +/// Now as we fill our `TopK` heap we can push down the state of the heap to +/// the `DataSourceExec` node to avoid reading files / row groups / pages / +/// rows that could not possibly be in the top 10. +/// +/// This is not yet implemented in DataFusion. See +/// +/// +/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr +/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot +/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec +/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec +/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec +#[derive(Debug)] +pub struct PushdownFilter {} + +impl Default for PushdownFilter { + fn default() -> Self { + Self::new() + } +} + +pub type FilterDescriptionContext = PlanContext; + +impl PhysicalOptimizerRule for PushdownFilter { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + let context = FilterDescriptionContext::new_default(plan); + + context + .transform_up(|node| { + if node.plan.as_any().downcast_ref::().is_some() { + let initial_plan = Arc::clone(&node.plan); + let mut accept_updated = false; + let updated_node = node.transform_down(|filter_node| { + Self::try_pushdown(filter_node, config, &mut accept_updated) + }); + + if accept_updated { + updated_node + } else { + Ok(Transformed::no(FilterDescriptionContext::new_default( + initial_plan, + ))) + } + } + // Other filter introducing operators extends here + else { + Ok(Transformed::no(node)) + } + }) + .map(|updated| updated.data.plan) + } + + fn name(&self) -> &str { + "PushdownFilter" + } + + fn schema_check(&self) -> bool { + true // Filter pushdown does not change the schema of the plan + } +} + +impl PushdownFilter { + pub fn new() -> Self { + Self {} + } + + fn try_pushdown( + mut node: FilterDescriptionContext, + config: &ConfigOptions, + accept_updated: &mut bool, + ) -> Result> { + let initial_description = FilterDescription { + filters: node.data.take_description(), + }; + + let FilterPushdownResult { + support, + remaining_description, + } = node + .plan + .try_pushdown_filters(initial_description, config)?; + + match support { + FilterPushdownSupport::Supported { + mut child_descriptions, + op, + revisit, + } => { + if revisit { + // This check handles cases where the current operator is entirely removed + // from the plan and replaced with its child. In such cases, to not skip + // over the new node, we need to explicitly re-apply this pushdown logic + // to the new node. + // + // TODO: If TreeNodeRecursion supports a Revisit mechanism in the future, + // this manual recursion could be removed. + + // If the operator is removed, it should not leave any filters as remaining + debug_assert!(remaining_description.filters.is_empty()); + // Operators having 2 children cannot be removed + debug_assert_eq!(child_descriptions.len(), 1); + debug_assert_eq!(node.children.len(), 1); + + node.plan = op; + node.data = child_descriptions.swap_remove(0); + node.children = node.children.swap_remove(0).children; + Self::try_pushdown(node, config, accept_updated) + } else { + if remaining_description.filters.is_empty() { + // Filter can be pushed down safely + node.plan = op; + if node.children.is_empty() { + *accept_updated = true; + } else { + for (child, descr) in + node.children.iter_mut().zip(child_descriptions) + { + child.data = descr; + } + } + } else { + // Filter cannot be pushed down + node = insert_filter_exec( + node, + child_descriptions, + remaining_description, + )?; + } + Ok(Transformed::yes(node)) + } + } + FilterPushdownSupport::NotSupported => { + if remaining_description.filters.is_empty() { + Ok(Transformed { + data: node, + transformed: false, + tnr: TreeNodeRecursion::Stop, + }) + } else { + node = insert_filter_exec( + node, + vec![FilterDescription::empty(); 1], + remaining_description, + )?; + Ok(Transformed { + data: node, + transformed: true, + tnr: TreeNodeRecursion::Stop, + }) + } + } + } + } +} + +fn insert_filter_exec( + node: FilterDescriptionContext, + mut child_descriptions: Vec, + remaining_description: FilterDescription, +) -> Result { + let mut new_child_node = node; + + // Filter has one child + if !child_descriptions.is_empty() { + debug_assert_eq!(child_descriptions.len(), 1); + new_child_node.data = child_descriptions.swap_remove(0); + } + let new_plan = Arc::new(FilterExec::try_new( + conjunction(remaining_description.filters), + Arc::clone(&new_child_node.plan), + )?); + let new_children = vec![new_child_node]; + let new_data = FilterDescription::empty(); + + Ok(FilterDescriptionContext::new( + new_plan, + new_data, + new_children, + )) +} diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 5244038b9ae27..faab5fdc5eb6c 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -35,6 +35,10 @@ use datafusion_execution::TaskContext; use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::execution_plan::CardinalityEffect; +use crate::filter_pushdown::{ + filter_pushdown_transparent, FilterDescription, FilterPushdownResult, +}; +use datafusion_common::config::ConfigOptions; use futures::ready; use futures::stream::{Stream, StreamExt}; @@ -212,6 +216,17 @@ impl ExecutionPlan for CoalesceBatchesExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + + fn try_pushdown_filters( + &self, + fd: FilterDescription, + _config: &ConfigOptions, + ) -> Result>> { + Ok(filter_pushdown_transparent::>( + Arc::new(self.clone()), + fd, + )) + } } /// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details. diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2bc5706ee0e18..2b6eac7be0675 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -16,6 +16,9 @@ // under the License. pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; +use crate::filter_pushdown::{ + filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, +}; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; @@ -467,6 +470,41 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result>> { Ok(None) } + + /// Attempts to recursively push given filters from the top of the tree into leafs. + /// + /// This is used for various optimizations, such as: + /// + /// * Pushing down filters into scans in general to minimize the amount of data that needs to be materialzied. + /// * Pushing down dynamic filters from operators like TopK and Joins into scans. + /// + /// Generally the further down (closer to leaf nodes) that filters can be pushed, the better. + /// + /// Consider the case of a query such as `SELECT * FROM t WHERE a = 1 AND b = 2`. + /// With no filter pushdown the scan needs to read and materialize all the data from `t` and then filter based on `a` and `b`. + /// With filter pushdown into the scan it can first read only `a`, then `b` and keep track of + /// which rows match the filter. + /// Then only for rows that match the filter does it have to materialize the rest of the columns. + /// + /// # Default Implementation + /// + /// The default implementation assumes: + /// * Parent filters can't be passed onto children. + /// * This node has no filters to contribute. + /// + /// # Implementation Notes + /// + /// Most of the actual logic is implemented as a Physical Optimizer rule. + /// See [`PushdownFilter`] for more details. + /// + /// [`PushdownFilter`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/filter_pushdown/struct.PushdownFilter.html + fn try_pushdown_filters( + &self, + fd: FilterDescription, + _config: &ConfigOptions, + ) -> Result>> { + Ok(filter_pushdown_not_supported(fd)) + } } /// [`ExecutionPlan`] Invariant Level @@ -519,13 +557,15 @@ pub trait ExecutionPlanProperties { /// If this ExecutionPlan makes no changes to the schema of the rows flowing /// through it or how columns within each row relate to each other, it /// should return the equivalence properties of its input. For - /// example, since `FilterExec` may remove rows from its input, but does not + /// example, since [`FilterExec`] may remove rows from its input, but does not /// otherwise modify them, it preserves its input equivalence properties. /// However, since `ProjectionExec` may calculate derived expressions, it /// needs special handling. /// /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`] /// for related concepts. + /// + /// [`FilterExec`]: crate::filter::FilterExec fn equivalence_properties(&self) -> &EquivalenceProperties; } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a8a9973ea0434..95fa67025e90d 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -26,6 +26,9 @@ use super::{ }; use crate::common::can_project; use crate::execution_plan::CardinalityEffect; +use crate::filter_pushdown::{ + FilterDescription, FilterPushdownResult, FilterPushdownSupport, +}; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, ProjectionExec, @@ -39,6 +42,7 @@ use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; +use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, @@ -46,7 +50,7 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::BinaryExpr; +use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ @@ -433,6 +437,56 @@ impl ExecutionPlan for FilterExec { } try_embed_projection(projection, self) } + + fn try_pushdown_filters( + &self, + mut fd: FilterDescription, + _config: &ConfigOptions, + ) -> Result>> { + // Extend the filter descriptions + fd.filters.push(Arc::clone(&self.predicate)); + + // Extract the information + let child_descriptions = vec![fd]; + let remaining_description = FilterDescription { filters: vec![] }; + let filter_input = Arc::clone(self.input()); + + if let Some(projection_indices) = self.projection.as_ref() { + // Push the filters down, but leave a ProjectionExec behind, instead of the FilterExec + let filter_child_schema = filter_input.schema(); + let proj_exprs = projection_indices + .iter() + .map(|p| { + let field = filter_child_schema.field(*p).clone(); + ( + Arc::new(Column::new(field.name(), *p)) as Arc, + field.name().to_string(), + ) + }) + .collect::>(); + let projection_exec = + Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) as _; + + Ok(FilterPushdownResult { + support: FilterPushdownSupport::Supported { + child_descriptions, + op: projection_exec, + revisit: false, + }, + remaining_description, + }) + } else { + // Pull out the FilterExec, and inform the rule as it should be re-run + Ok(FilterPushdownResult { + support: FilterPushdownSupport::Supported { + child_descriptions, + op: filter_input, + revisit: true, + }, + remaining_description, + }) + } + } } impl EmbeddedProjection for FilterExec { diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs new file mode 100644 index 0000000000000..38f5aef5923e1 --- /dev/null +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::ExecutionPlan; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +#[derive(Clone, Debug)] +pub struct FilterDescription { + /// Expressions coming from the parent nodes + pub filters: Vec>, +} + +impl Default for FilterDescription { + fn default() -> Self { + Self::empty() + } +} + +impl FilterDescription { + /// Takes the filters out of the struct, leaving an empty vector in its place. + pub fn take_description(&mut self) -> Vec> { + std::mem::take(&mut self.filters) + } + + pub fn empty() -> FilterDescription { + Self { filters: vec![] } + } +} + +#[derive(Debug)] +pub enum FilterPushdownSupport { + Supported { + // Filter predicates which can be pushed down through the operator. + // NOTE that these are not placed into any operator. + child_descriptions: Vec, + // Possibly updated new operator + op: T, + // Whether the node is removed from the plan and the rule should be re-run manually + // on the new node. + // TODO: If TreeNodeRecursion supports Revisit mechanism, this flag can be removed + revisit: bool, + }, + NotSupported, +} + +#[derive(Debug)] +pub struct FilterPushdownResult { + pub support: FilterPushdownSupport, + // Filters which cannot be pushed down through the operator. + // NOTE that caller of try_pushdown_filters() should handle these remanining predicates, + // possibly introducing a FilterExec on top of this operator. + pub remaining_description: FilterDescription, +} + +pub fn filter_pushdown_not_supported( + remaining_description: FilterDescription, +) -> FilterPushdownResult { + FilterPushdownResult { + support: FilterPushdownSupport::NotSupported, + remaining_description, + } +} + +pub fn filter_pushdown_transparent( + plan: Arc, + fd: FilterDescription, +) -> FilterPushdownResult> { + let child_descriptions = vec![fd]; + let remaining_description = FilterDescription::empty(); + + FilterPushdownResult { + support: FilterPushdownSupport::Supported { + child_descriptions, + op: plan, + revisit: false, + }, + remaining_description, + } +} diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index b256e615b2320..a1862554b303e 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -67,6 +67,7 @@ pub mod empty; pub mod execution_plan; pub mod explain; pub mod filter; +pub mod filter_pushdown; pub mod joins; pub mod limit; pub mod memory; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 71479ffa960d0..c480fc2abaa1a 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -43,6 +43,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Stat use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; +use datafusion_common::config::ConfigOptions; use datafusion_common::utils::transpose; use datafusion_common::HashMap; use datafusion_common::{not_impl_err, DataFusionError, Result}; @@ -52,6 +53,9 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use crate::filter_pushdown::{ + filter_pushdown_transparent, FilterDescription, FilterPushdownResult, +}; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::trace; @@ -730,6 +734,17 @@ impl ExecutionPlan for RepartitionExec { new_partitioning, )?))) } + + fn try_pushdown_filters( + &self, + fd: FilterDescription, + _config: &ConfigOptions, + ) -> Result>> { + Ok(filter_pushdown_transparent::>( + Arc::new(self.clone()), + fd, + )) + } } impl RepartitionExec { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index a55ac079aa745..35bbc758141b6 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -6795,4 +6795,3 @@ select c2, count(*) from test WHERE 1 = 1 group by c2; 4 1 5 1 6 1 - diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index f165d3bf66ba0..e780d6c8b295a 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -5992,7 +5992,7 @@ logical_plan 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] 03)----SubqueryAlias: test 04)------SubqueryAlias: t -05)--------Projection: +05)--------Projection: 06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), Utf8View("a"), Utf8View("b"), Utf8View("c")]) 07)------------TableScan: tmp_table projection=[value] physical_plan @@ -6021,7 +6021,7 @@ logical_plan 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] 03)----SubqueryAlias: test 04)------SubqueryAlias: t -05)--------Projection: +05)--------Projection: 06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), Utf8View("a"), Utf8View("b"), Utf8View("c")]) 07)------------TableScan: tmp_table projection=[value] physical_plan @@ -6050,7 +6050,7 @@ logical_plan 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] 03)----SubqueryAlias: test 04)------SubqueryAlias: t -05)--------Projection: +05)--------Projection: 06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), Utf8View("a"), Utf8View("b"), Utf8View("c")]) 07)------------TableScan: tmp_table projection=[value] physical_plan @@ -6081,7 +6081,7 @@ logical_plan 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] 03)----SubqueryAlias: test 04)------SubqueryAlias: t -05)--------Projection: +05)--------Projection: 06)----------Filter: array_has(LargeList([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]), substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), Int64(1), Int64(32))) 07)------------TableScan: tmp_table projection=[value] physical_plan @@ -6110,7 +6110,7 @@ logical_plan 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] 03)----SubqueryAlias: test 04)------SubqueryAlias: t -05)--------Projection: +05)--------Projection: 06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), Utf8View("a"), Utf8View("b"), Utf8View("c")]) 07)------------TableScan: tmp_table projection=[value] physical_plan @@ -6130,7 +6130,7 @@ select count(*) from test WHERE array_has([needle], needle); ---- 100000 -# The optimizer does not currently eliminate the filter; +# The optimizer does not currently eliminate the filter; # Instead, it's rewritten as `IS NULL OR NOT NULL` due to SQL null semantics query TT explain with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM generate_series(1, 100000) t(i)) diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index e019af9775a42..32320a06f4fb0 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -722,7 +722,7 @@ logical_plan 03)----Projection: Int64(1) AS val 04)------EmptyRelation 05)----Projection: Int64(2) AS val -06)------Cross Join: +06)------Cross Join: 07)--------Filter: recursive_cte.val < Int64(2) 08)----------TableScan: recursive_cte 09)--------SubqueryAlias: sub_cte diff --git a/datafusion/sqllogictest/test_files/dictionary.slt b/datafusion/sqllogictest/test_files/dictionary.slt index 1769f42c2d2a3..d241e61f33ffd 100644 --- a/datafusion/sqllogictest/test_files/dictionary.slt +++ b/datafusion/sqllogictest/test_files/dictionary.slt @@ -456,4 +456,4 @@ statement ok CREATE TABLE test0 AS VALUES ('foo',1), ('bar',2), ('foo',3); statement ok -COPY (SELECT arrow_cast(column1, 'Dictionary(Int32, Utf8)') AS column1, column2 FROM test0) TO 'test_files/scratch/copy/part_dict_test' STORED AS PARQUET PARTITIONED BY (column1); \ No newline at end of file +COPY (SELECT arrow_cast(column1, 'Dictionary(Int32, Utf8)') AS column1, column2 FROM test0) TO 'test_files/scratch/copy/part_dict_test' STORED AS PARQUET PARTITIONED BY (column1); diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index deff793e51106..ba2596551f1d5 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -237,6 +237,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE +physical_plan after PushdownFilter SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -313,6 +314,7 @@ physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after LimitAggregation SAME TEXT AS ABOVE +physical_plan after PushdownFilter SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -353,6 +355,7 @@ physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after LimitAggregation SAME TEXT AS ABOVE +physical_plan after PushdownFilter SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/simplify_expr.slt b/datafusion/sqllogictest/test_files/simplify_expr.slt index 9985ab49c2da0..075ccafcfd2e0 100644 --- a/datafusion/sqllogictest/test_files/simplify_expr.slt +++ b/datafusion/sqllogictest/test_files/simplify_expr.slt @@ -107,4 +107,3 @@ query B SELECT a / NULL::DECIMAL(4,3) > 1.2::decimal(2,1) FROM VALUES (1) AS t(a); ---- NULL -