Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,11 +745,21 @@ config_namespace! {
/// past window functions, if possible
pub enable_window_limits: bool, default = true

/// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
/// When set to true, the optimizer will attempt to push down TopK dynamic filters
/// into the file scan phase.
pub enable_topk_dynamic_filter_pushdown: bool, default = true

/// When set to true, the optimizer will attempt to push down Join dynamic filters
/// into the file scan phase.
pub enable_join_dynamic_filter_pushdown: bool, default = true

/// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase.
/// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
/// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
/// This means that if we already have 10 timestamps in the year 2025
/// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
/// The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown`
/// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
pub enable_dynamic_filter_pushdown: bool, default = true

/// When set to true, the optimizer will insert filters before a join between
Expand Down Expand Up @@ -1039,6 +1049,20 @@ impl ConfigOptions {
};

if prefix == "datafusion" {
if key == "optimizer.enable_dynamic_filter_pushdown" {
let bool_value = value.parse::<bool>().map_err(|e| {
DataFusionError::Configuration(format!(
"Failed to parse '{value}' as bool: {e}",
))
})?;

{
self.optimizer.enable_dynamic_filter_pushdown = bool_value;
self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
}
return Ok(());
}
return ConfigField::set(self, key, value);
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ impl ExecutionPlan for HashJoinExec {

// Add dynamic filters in Post phase if enabled
if matches!(phase, FilterPushdownPhase::Post)
&& config.optimizer.enable_dynamic_filter_pushdown
&& config.optimizer.enable_join_dynamic_filter_pushdown
{
// Add actual dynamic filter to right side (probe side)
let dynamic_filter = Self::create_dynamic_filter(&self.on);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,7 @@ impl ExecutionPlan for SortExec {
ChildFilterDescription::from_child(&parent_filters, self.input())?;

if let Some(filter) = &self.filter {
if config.optimizer.enable_dynamic_filter_pushdown {
if config.optimizer.enable_topk_dynamic_filter_pushdown {
child = child.with_self_filter(filter.read().expr());
}
}
Expand Down
Loading