diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 39d730eaafb49..9bde637f43794 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -745,11 +745,21 @@ config_namespace! { /// past window functions, if possible pub enable_window_limits: bool, default = true - /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase. + /// When set to true, the optimizer will attempt to push down TopK dynamic filters + /// into the file scan phase. + pub enable_topk_dynamic_filter_pushdown: bool, default = true + + /// When set to true, the optimizer will attempt to push down Join dynamic filters + /// into the file scan phase. + pub enable_join_dynamic_filter_pushdown: bool, default = true + + /// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase. /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. /// This means that if we already have 10 timestamps in the year 2025 /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. + /// The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown` + /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. pub enable_dynamic_filter_pushdown: bool, default = true /// When set to true, the optimizer will insert filters before a join between @@ -1039,6 +1049,20 @@ impl ConfigOptions { }; if prefix == "datafusion" { + if key == "optimizer.enable_dynamic_filter_pushdown" { + let bool_value = value.parse::().map_err(|e| { + DataFusionError::Configuration(format!( + "Failed to parse '{value}' as bool: {e}", + )) + })?; + + { + self.optimizer.enable_dynamic_filter_pushdown = bool_value; + self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value; + self.optimizer.enable_join_dynamic_filter_pushdown = bool_value; + } + return Ok(()); + } return ConfigField::set(self, key, value); } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4c293b0498e77..b5fe5ee5cda14 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1137,7 +1137,7 @@ impl ExecutionPlan for HashJoinExec { // Add dynamic filters in Post phase if enabled if matches!(phase, FilterPushdownPhase::Post) - && config.optimizer.enable_dynamic_filter_pushdown + && config.optimizer.enable_join_dynamic_filter_pushdown { // Add actual dynamic filter to right side (probe side) let dynamic_filter = Self::create_dynamic_filter(&self.on); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 7f47d60c735a3..bd798ab4f54b2 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1355,7 +1355,7 @@ impl ExecutionPlan for SortExec { ChildFilterDescription::from_child(&parent_filters, self.input())?; if let Some(filter) = &self.filter { - if config.optimizer.enable_dynamic_filter_pushdown { + if config.optimizer.enable_topk_dynamic_filter_pushdown { child = child.with_self_filter(filter.read().expr()); } } diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt new file mode 100644 index 0000000000000..e5cd6d88b08f4 --- /dev/null +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -0,0 +1,339 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for dynamic filter pushdown configuration options +# - enable_topk_dynamic_filter_pushdown (for TopK dynamic filters) +# - enable_join_dynamic_filter_pushdown (for Join dynamic filters) +# - enable_dynamic_filter_pushdown (controls both) + +# Setup: Create parquet test files +statement ok +CREATE TABLE test_data(id INT, value INT, name VARCHAR) AS VALUES +(1, 100, 'a'), +(2, 200, 'b'), +(3, 300, 'c'), +(4, 400, 'd'), +(5, 500, 'e'), +(6, 600, 'f'), +(7, 700, 'g'), +(8, 800, 'h'), +(9, 900, 'i'), +(10, 1000, 'j'); + +statement ok +CREATE TABLE join_left(id INT, data VARCHAR) AS VALUES +(1, 'left1'), +(2, 'left2'), +(3, 'left3'), +(4, 'left4'), +(5, 'left5'); + +statement ok +CREATE TABLE join_right(id INT, info VARCHAR) AS VALUES +(1, 'right1'), +(3, 'right3'), +(5, 'right5'); + +# Copy data to parquet files +query I +COPY test_data TO 'test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet' STORED AS PARQUET; +---- +10 + +query I +COPY join_left TO 'test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet' STORED AS PARQUET; +---- +5 + +query I +COPY join_right TO 'test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet' STORED AS PARQUET; +---- +3 + +# Create external tables from parquet files +statement ok +CREATE EXTERNAL TABLE test_parquet(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet'; + +statement ok +CREATE EXTERNAL TABLE left_parquet(id INT, data VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet'; + +statement ok +CREATE EXTERNAL TABLE right_parquet(id INT, info VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet'; + +# Test 1: TopK dynamic filter pushdown with Parquet +query TT +EXPLAIN SELECT * FROM test_parquet ORDER BY value DESC LIMIT 3; +---- +logical_plan +01)Sort: test_parquet.value DESC NULLS FIRST, fetch=3 +02)--TableScan: test_parquet projection=[id, value, name] +physical_plan +01)SortExec: TopK(fetch=3), expr=[value@1 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Disable TopK dynamic filter pushdown +statement ok +SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = false; + +query TT +EXPLAIN SELECT * FROM test_parquet ORDER BY value DESC LIMIT 3; +---- +logical_plan +01)Sort: test_parquet.value DESC NULLS FIRST, fetch=3 +02)--TableScan: test_parquet projection=[id, value, name] +physical_plan +01)SortExec: TopK(fetch=3), expr=[value@1 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value, name], file_type=parquet + +# Re-enable for next tests +statement ok +SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = true; + +# Test 2: Join dynamic filter pushdown with Parquet +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +INNER JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Inner Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Disable Join dynamic filter pushdown +statement ok +SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = false; + +# Without Join filter, HashJoin should NOT have filter=DynamicFilter +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +INNER JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Inner Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# Re-enable for next tests +statement ok +SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; + +# Test 3: Test independent control + +# Disable TopK, keep Join enabled +statement ok +SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = false; + +statement ok +SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; + +# Join should still have dynamic filter +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +INNER JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Inner Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Enable TopK, disable Join +statement ok +SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = true; + +statement ok +SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = false; + +# Join should NOT have dynamic filter +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +INNER JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Inner Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# Test 4: Backward compatibility + +# First, set both new configs to specific values +statement ok +SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = true; + +statement ok +SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; + +statement ok +set datafusion.catalog.information_schema = true + +# Setting the config should override both +statement ok +SET datafusion.optimizer.enable_dynamic_filter_pushdown = false; + +# Verify both configs are now false +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.enable_topk_dynamic_filter_pushdown'; +---- +false + +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.enable_join_dynamic_filter_pushdown'; +---- +false + +statement ok +set datafusion.catalog.information_schema = false + +# Join should NOT have dynamic filter +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +INNER JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Inner Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# Re-enable +statement ok +SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; + +statement ok +set datafusion.catalog.information_schema = true + +# Verify both configs are now true +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.enable_topk_dynamic_filter_pushdown'; +---- +true + +query T +SELECT value FROM information_schema.df_settings +WHERE name = 'datafusion.optimizer.enable_join_dynamic_filter_pushdown'; +---- +true + +statement ok +set datafusion.catalog.information_schema = false + +# Join should have dynamic filter again +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +INNER JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Inner Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Cleanup + +statement ok +DROP TABLE test_data; + +statement ok +DROP TABLE join_left; + +statement ok +DROP TABLE join_right; + +statement ok +DROP TABLE test_parquet; + +statement ok +DROP TABLE left_parquet; + +statement ok +DROP TABLE right_parquet; + +# Reset configs to defaults +statement ok +SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = true; + +statement ok +SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; + +statement ok +SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 670992633bb85..a69a8d5c0d8f6 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -289,8 +289,10 @@ datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true +datafusion.optimizer.enable_join_dynamic_filter_pushdown true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true +datafusion.optimizer.enable_topk_dynamic_filter_pushdown true datafusion.optimizer.enable_window_limits true datafusion.optimizer.expand_views_at_output false datafusion.optimizer.filter_null_join_keys false @@ -404,9 +406,11 @@ datafusion.format.types_info false Show types in visual representation batches datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. -datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. +datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. +datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible +datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6bc7b90e893ad..ab3b11a8d833a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -132,7 +132,9 @@ The following configuration settings are available: | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | | datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible | -| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. | +| datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | +| datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | +| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. |