Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ pub trait FileSource: Send + Sync {
Ok(None)
}

/// Push down filters to the file source if supported.
///
/// Returns `Ok(None)` by default. See [`ExecutionPlan::with_filter_pushdown_result`]
/// for more details.
///
/// [`ExecutionPlan::with_filter_pushdown_result`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result
fn push_down_filters(
&self,
_filters: &[PhysicalExprRef],
Expand Down
42 changes: 26 additions & 16 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_physical_plan::{

use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Statistics};
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExprRef};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
Expand All @@ -54,7 +54,7 @@ pub trait DataSource: Send + Sync + Debug {
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream>;
) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
/// Format this source for display in explain plans
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
Expand All @@ -65,13 +65,13 @@ pub trait DataSource: Send + Sync + Debug {
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}

fn output_partitioning(&self) -> Partitioning;
fn eq_properties(&self) -> EquivalenceProperties;
fn statistics(&self) -> datafusion_common::Result<Statistics>;
fn statistics(&self) -> Result<Statistics>;
/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
Expand All @@ -81,15 +81,25 @@ pub trait DataSource: Send + Sync + Debug {
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
/// Push down filters from parent execution plans to this data source.
/// This is expected to return Ok(None) if the filters cannot be pushed down.
/// If they can be pushed down it should return a [`FilterPushdownResult`] containing the new
/// data source and the support level for each filter (exact or inexact).
) -> Result<Option<Arc<dyn ExecutionPlan>>>;

/// Push down filters into this `DataSource`.
///
/// Returns `Ok(None)` if the filters cannot be evaluated within the
/// `DataSource`.
///
/// If the filters can be evaluated by the `DataSource`,
/// sreturn a [`FilterPushdownResult`] containing an updated
/// `DataSource` and the support level for each filter (exact or inexact).
///
/// Default implementation returns `Ok(None)`. See [`ExecutionPlan::with_filter_pushdown_result`]
/// for more details.
///
/// [`ExecutionPlan::push_down_filters`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks will use this style of reference elswhere

fn push_down_filters(
&self,
_filters: &[PhysicalExprRef],
) -> datafusion_common::Result<Option<DataSourceFilterPushdownResult>> {
) -> Result<Option<FilterPushdownResult<Arc<dyn DataSource>>>> {
Ok(None)
}
}
Expand Down Expand Up @@ -146,15 +156,15 @@ impl ExecutionPlan for DataSourceExec {
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let data_source = self.data_source.repartitioned(
target_partitions,
config.optimizer.repartition_file_min_size,
Expand All @@ -178,15 +188,15 @@ impl ExecutionPlan for DataSourceExec {
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
) -> Result<SendableRecordBatchStream> {
self.data_source.open(partition, context)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.data_source.metrics().clone_inner())
}

fn statistics(&self) -> datafusion_common::Result<Statistics> {
fn statistics(&self) -> Result<Statistics> {
self.data_source.statistics()
}

Expand All @@ -204,15 +214,15 @@ impl ExecutionPlan for DataSourceExec {
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
self.data_source.try_swapping_with_projection(projection)
}

fn with_filter_pushdown_result(
self: Arc<Self>,
own_filters_result: &[FilterSupport],
parent_filters_remaining: &[PhysicalExprRef],
) -> datafusion_common::Result<Option<ExecutionPlanFilterPushdownResult>> {
) -> Result<Option<ExecutionPlanFilterPushdownResult>> {
// We didn't give out any filters, this should be empty!
assert!(own_filters_result.is_empty());
// Forward filter pushdown to our data source.
Expand Down
27 changes: 16 additions & 11 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// `ExecutionPlan` node.
///
/// If this method returns `true`, and the query plan contains a limit at
/// the output of this node, DataFusion will push the limit to the input
/// the output of this node, DataFusion will push the limit to the input
/// of this node.
fn supports_limit_pushdown(&self) -> bool {
false
Expand Down Expand Up @@ -469,27 +469,32 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}

/// Returns a set of filters that this operator owns but would like to be pushed down.
/// For example, a `TopK` operator may produce dynamic filters that reference it's currrent state,
/// while a `FilterExec` will just hand of the filters it has as is.
/// The default implementation returns an empty vector.
/// These filters are applied row-by row and any that return `false` or `NULL` will be
/// filtered out and any that return `true` will be kept.
/// The expressions returned **must** always return `true` or `false`;
/// other truthy or falsy values are not allowed (e.g. `0`, `1`).
///
/// For example, a `TopK` operator may produce dynamic filters that
/// reference it's current state, while a `FilterExec` will just hand of the
/// filters it has as is.
///
/// The default implementation returns an empty vector. These filters are
/// applied row-by row:
/// 1. any that return `false` or `NULL` will be filtered out
/// 2. any that return `true` will be kept.
///
/// The expressions returned **must** always be Boolean ( `true`, `false` or
/// NULL); other truthy or falsy values are not allowed (e.g. `0`, `1`).
///
/// # Returns
/// A vector of filters that this operator would like to push down.
/// These should be treated as the split conjunction of a `WHERE` clause.
/// That is, a query such as `WHERE a = 1 AND b = 2` would return two
/// filters: `a = 1` and `b = 2`.
/// They can always be assembled into a single filter using
/// [`split_conjunction`][datafusion_physical_expr::split_conjunction].
/// They can be combined into a single filter using
/// [`conjunction`][datafusion_physical_expr::conjunction].
fn filters_for_pushdown(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
Ok(Vec::new())
}

/// Checks which filters this node allows to be pushed down through it from a parent to a child.
/// For example, a `ProjectionExec` node can allow filters that only refernece
/// For example, a `ProjectionExec` node can allow filters that only reference
/// columns it did not create through but filters that reference columns it is creating cannot be pushed down any further.
/// That is, it only allows some filters through because it changes the schema of the data.
/// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data.
Expand Down