Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ pub trait FileSource: Send + Sync {
Ok(None)
}

/// Push down filters to the file source if supported.
///
/// Returns `Ok(None)` by default. See [`ExecutionPlan::with_filter_pushdown_result`]
/// for more details.
///
/// [`ExecutionPlan::with_filter_pushdown_result`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result
fn push_down_filters(
&self,
_filters: &[PhysicalExprRef],
Expand Down
42 changes: 26 additions & 16 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_physical_plan::{

use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Statistics};
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExprRef};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
Expand All @@ -54,7 +54,7 @@ pub trait DataSource: Send + Sync + Debug {
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream>;
) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
/// Format this source for display in explain plans
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
Expand All @@ -65,13 +65,13 @@ pub trait DataSource: Send + Sync + Debug {
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}

fn output_partitioning(&self) -> Partitioning;
fn eq_properties(&self) -> EquivalenceProperties;
fn statistics(&self) -> datafusion_common::Result<Statistics>;
fn statistics(&self) -> Result<Statistics>;
/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
Expand All @@ -81,15 +81,25 @@ pub trait DataSource: Send + Sync + Debug {
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
/// Push down filters from parent execution plans to this data source.
/// This is expected to return Ok(None) if the filters cannot be pushed down.
/// If they can be pushed down it should return a [`FilterPushdownResult`] containing the new
/// data source and the support level for each filter (exact or inexact).
) -> Result<Option<Arc<dyn ExecutionPlan>>>;

/// Push down filters into this `DataSource`.
///
/// Returns `Ok(None)` if the filters cannot be evaluated within the
/// `DataSource`.
///
/// If the filters can be evaluated by the `DataSource`,
/// return a [`FilterPushdownResult`] containing an updated
/// `DataSource` and the support level for each filter (exact or inexact).
///
/// Default implementation returns `Ok(None)`. See [`ExecutionPlan::with_filter_pushdown_result`]
/// for more details.
///
/// [`ExecutionPlan::push_down_filters`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks will use this style of reference elswhere

fn push_down_filters(
&self,
_filters: &[PhysicalExprRef],
) -> datafusion_common::Result<Option<DataSourceFilterPushdownResult>> {
) -> Result<Option<DataSourceFilterPushdownResult>>> {
Ok(None)
}
}
Expand Down Expand Up @@ -146,15 +156,15 @@ impl ExecutionPlan for DataSourceExec {
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let data_source = self.data_source.repartitioned(
target_partitions,
config.optimizer.repartition_file_min_size,
Expand All @@ -178,15 +188,15 @@ impl ExecutionPlan for DataSourceExec {
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
) -> Result<SendableRecordBatchStream> {
self.data_source.open(partition, context)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.data_source.metrics().clone_inner())
}

fn statistics(&self) -> datafusion_common::Result<Statistics> {
fn statistics(&self) -> Result<Statistics> {
self.data_source.statistics()
}

Expand All @@ -204,15 +214,15 @@ impl ExecutionPlan for DataSourceExec {
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
self.data_source.try_swapping_with_projection(projection)
}

fn with_filter_pushdown_result(
self: Arc<Self>,
own_filters_result: &[FilterSupport],
parent_filters_remaining: &[PhysicalExprRef],
) -> datafusion_common::Result<Option<ExecutionPlanFilterPushdownResult>> {
) -> Result<Option<ExecutionPlanFilterPushdownResult>> {
// We didn't give out any filters, this should be empty!
assert!(own_filters_result.is_empty());
// Forward filter pushdown to our data source.
Expand Down
25 changes: 15 additions & 10 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,27 +469,32 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}

/// Returns a set of filters that this operator owns but would like to be pushed down.
/// For example, a `TopK` operator may produce dynamic filters that reference it's currrent state,
/// while a `FilterExec` will just hand of the filters it has as is.
/// The default implementation returns an empty vector.
/// These filters are applied row-by row and any that return `false` or `NULL` will be
/// filtered out and any that return `true` will be kept.
/// The expressions returned **must** always return `true` or `false`;
/// other truthy or falsy values are not allowed (e.g. `0`, `1`).
///
/// For example, a `TopK` operator may produce dynamic filters that
/// reference it's current state, while a `FilterExec` will just hand of the
/// filters it has as is.
///
/// The default implementation returns an empty vector. These filters are
/// applied row-by row:
/// 1. any that return `false` or `NULL` will be filtered out
/// 2. any that return `true` will be kept.
///
/// The expressions returned **must** always be Boolean ( `true`, `false` or
/// NULL); other truthy or falsy values are not allowed (e.g. `0`, `1`).
///
/// # Returns
/// A vector of filters that this operator would like to push down.
/// These should be treated as the split conjunction of a `WHERE` clause.
/// That is, a query such as `WHERE a = 1 AND b = 2` would return two
/// filters: `a = 1` and `b = 2`.
/// They can always be assembled into a single filter using
/// [`split_conjunction`][datafusion_physical_expr::split_conjunction].
/// They can be combined into a single filter using
/// [`conjunction`][datafusion_physical_expr::conjunction].
fn filters_for_pushdown(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
Ok(Vec::new())
}

/// Checks which filters this node allows to be pushed down through it from a parent to a child.
/// For example, a `ProjectionExec` node can allow filters that only refernece
/// For example, a `ProjectionExec` node can allow filters that only reference
/// columns it did not create through but filters that reference columns it is creating cannot be pushed down any further.
/// That is, it only allows some filters through because it changes the schema of the data.
/// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data.
Expand Down
Loading