diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index bd09c1192dba1..ceea81feaf7b1 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -95,6 +95,12 @@ pub trait FileSource: Send + Sync { Ok(None) } + /// Push down filters to the file source if supported. + /// + /// Returns `Ok(None)` by default. See [`ExecutionPlan::with_filter_pushdown_result`] + /// for more details. + /// + /// [`ExecutionPlan::with_filter_pushdown_result`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result fn push_down_filters( &self, _filters: &[PhysicalExprRef], diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 159dbe1d5d181..6dc3d47e31f47 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -34,7 +34,7 @@ use datafusion_physical_plan::{ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; -use datafusion_common::{Constraints, Statistics}; +use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExprRef}; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -54,7 +54,7 @@ pub trait DataSource: Send + Sync + Debug { &self, partition: usize, context: Arc, - ) -> datafusion_common::Result; + ) -> Result; fn as_any(&self) -> &dyn Any; /// Format this source for display in explain plans fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; @@ -65,13 +65,13 @@ pub trait DataSource: Send + Sync + Debug { _target_partitions: usize, _repartition_file_min_size: usize, _output_ordering: Option, - ) -> datafusion_common::Result>> { + ) -> Result>> { Ok(None) } fn output_partitioning(&self) -> Partitioning; fn eq_properties(&self) -> EquivalenceProperties; - fn statistics(&self) -> datafusion_common::Result; + fn statistics(&self) -> Result; /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; fn fetch(&self) -> Option; @@ -81,15 +81,25 @@ pub trait DataSource: Send + Sync + Debug { fn try_swapping_with_projection( &self, _projection: &ProjectionExec, - ) -> datafusion_common::Result>>; - /// Push down filters from parent execution plans to this data source. - /// This is expected to return Ok(None) if the filters cannot be pushed down. - /// If they can be pushed down it should return a [`FilterPushdownResult`] containing the new - /// data source and the support level for each filter (exact or inexact). + ) -> Result>>; + + /// Push down filters into this `DataSource`. + /// + /// Returns `Ok(None)` if the filters cannot be evaluated within the + /// `DataSource`. + /// + /// If the filters can be evaluated by the `DataSource`, + /// return a [`FilterPushdownResult`] containing an updated + /// `DataSource` and the support level for each filter (exact or inexact). + /// + /// Default implementation returns `Ok(None)`. See [`ExecutionPlan::with_filter_pushdown_result`] + /// for more details. + /// + /// [`ExecutionPlan::push_down_filters`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result fn push_down_filters( &self, _filters: &[PhysicalExprRef], - ) -> datafusion_common::Result> { + ) -> Result>> { Ok(None) } } @@ -146,7 +156,7 @@ impl ExecutionPlan for DataSourceExec { fn with_new_children( self: Arc, _: Vec>, - ) -> datafusion_common::Result> { + ) -> Result> { Ok(self) } @@ -154,7 +164,7 @@ impl ExecutionPlan for DataSourceExec { &self, target_partitions: usize, config: &ConfigOptions, - ) -> datafusion_common::Result>> { + ) -> Result>> { let data_source = self.data_source.repartitioned( target_partitions, config.optimizer.repartition_file_min_size, @@ -178,7 +188,7 @@ impl ExecutionPlan for DataSourceExec { &self, partition: usize, context: Arc, - ) -> datafusion_common::Result { + ) -> Result { self.data_source.open(partition, context) } @@ -186,7 +196,7 @@ impl ExecutionPlan for DataSourceExec { Some(self.data_source.metrics().clone_inner()) } - fn statistics(&self) -> datafusion_common::Result { + fn statistics(&self) -> Result { self.data_source.statistics() } @@ -204,7 +214,7 @@ impl ExecutionPlan for DataSourceExec { fn try_swapping_with_projection( &self, projection: &ProjectionExec, - ) -> datafusion_common::Result>> { + ) -> Result>> { self.data_source.try_swapping_with_projection(projection) } @@ -212,7 +222,7 @@ impl ExecutionPlan for DataSourceExec { self: Arc, own_filters_result: &[FilterSupport], parent_filters_remaining: &[PhysicalExprRef], - ) -> datafusion_common::Result> { + ) -> Result> { // We didn't give out any filters, this should be empty! assert!(own_filters_result.is_empty()); // Forward filter pushdown to our data source. diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 7c519bd404ad2..1ea6c14f4d2a7 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -469,27 +469,32 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } /// Returns a set of filters that this operator owns but would like to be pushed down. - /// For example, a `TopK` operator may produce dynamic filters that reference it's currrent state, - /// while a `FilterExec` will just hand of the filters it has as is. - /// The default implementation returns an empty vector. - /// These filters are applied row-by row and any that return `false` or `NULL` will be - /// filtered out and any that return `true` will be kept. - /// The expressions returned **must** always return `true` or `false`; - /// other truthy or falsy values are not allowed (e.g. `0`, `1`). + /// + /// For example, a `TopK` operator may produce dynamic filters that + /// reference it's current state, while a `FilterExec` will just hand of the + /// filters it has as is. + /// + /// The default implementation returns an empty vector. These filters are + /// applied row-by row: + /// 1. any that return `false` or `NULL` will be filtered out + /// 2. any that return `true` will be kept. + /// + /// The expressions returned **must** always be Boolean ( `true`, `false` or + /// NULL); other truthy or falsy values are not allowed (e.g. `0`, `1`). /// /// # Returns /// A vector of filters that this operator would like to push down. /// These should be treated as the split conjunction of a `WHERE` clause. /// That is, a query such as `WHERE a = 1 AND b = 2` would return two /// filters: `a = 1` and `b = 2`. - /// They can always be assembled into a single filter using - /// [`split_conjunction`][datafusion_physical_expr::split_conjunction]. + /// They can be combined into a single filter using + /// [`conjunction`][datafusion_physical_expr::conjunction]. fn filters_for_pushdown(&self) -> Result>> { Ok(Vec::new()) } /// Checks which filters this node allows to be pushed down through it from a parent to a child. - /// For example, a `ProjectionExec` node can allow filters that only refernece + /// For example, a `ProjectionExec` node can allow filters that only reference /// columns it did not create through but filters that reference columns it is creating cannot be pushed down any further. /// That is, it only allows some filters through because it changes the schema of the data. /// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data.