Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
29f1acd
feat: change approx percentile/median UDFs to return floats (#21074)
theirix Apr 18, 2026
8a650f5
Make `test_display_pg_json` pass regardless of build setup and depend…
AdamGS Apr 18, 2026
b75df6f
fix: Prevent CLI crash on wide tables (#21721)
Geethapranay1 Apr 18, 2026
1fbbba5
feat: support '>', '<', '>=', '<=', '<>' in all operator (#21416)
buraksenn Apr 18, 2026
6aa5a7e
refactor: Share left-side spill file across partitions on OOM fallbac…
viirya Apr 19, 2026
d8c9797
Spark is_valid_utf8 function implementation (#21627)
kazantsev-maksim Apr 19, 2026
43d32a8
chore: use bench array helpers from Arrow bench_util (#21544)
theirix Apr 19, 2026
fd882fb
feat: Optimize ORDER BY by Pruning Functionally Redundant Sort Keys (…
xiedeyantu Apr 19, 2026
7e1a710
fix(unparser): make `BigQueryDialect` more robust (#21296)
sgrebnov Apr 19, 2026
9c0edcc
chore: add count distinct group benchmarks (#21575)
coderfender Apr 19, 2026
90a8117
feat: Add support for `LEFT JOIN LATERAL` (#21352)
neilconway Apr 19, 2026
8614308
perf: Optimize logical optimizer's `OptimizeProjections` pass (#21726)
neilconway Apr 19, 2026
935382f
perf: Optimize `DFSchema::qualified_name` (#21722)
neilconway Apr 19, 2026
675881d
fix: insert placeholder type inference showing wrong type when there …
buraksenn Apr 19, 2026
03ca0aa
perf: Tweak vec capacity in `project_statistics` (#21734)
neilconway Apr 19, 2026
3aaf393
minor: More comments to `read_spill_as_stream` (#21713)
2010YOUY01 Apr 20, 2026
466c3ea
Dynamic work scheduling in FileStream (#21351)
alamb Apr 20, 2026
a311d14
chore: Update Release instructions (#21705)
comphead Apr 20, 2026
f5cad0b
feat: adaptive filter selectivity tracking for Parquet pushdown
adriangb Apr 11, 2026
4b730df
fix: start optional filters as RowFilter when byte ratio is small
adriangb Apr 18, 2026
6ebaaa2
fix: mid-stream skip for ineffective optional row/post-scan filters
adriangb Apr 20, 2026
487e56b
fix: admit zero-bytes-saved samples so skip fires for filter==projection
adriangb Apr 20, 2026
d6e049a
test: update expected output for Optional(DynamicFilter) wrapper
adriangb Apr 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ run_clickbench_partitioned() {
run_clickbench_pushdown() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..."
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

Expand Down
3 changes: 0 additions & 3 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub struct RunOpt {
///
/// Specifically, it enables:
/// * `pushdown_filters = true`
/// * `reorder_filters = true`
#[arg(long = "pushdown")]
pushdown: bool,

Expand Down Expand Up @@ -196,14 +195,12 @@ impl RunOpt {
// Turn on Parquet filter pushdown if requested
if self.pushdown {
parquet_options.pushdown_filters = true;
parquet_options.reorder_filters = true;
}

if self.sorted_by.is_some() {
// We should compare the dynamic topk optimization when data is sorted, so we make the
// assumption that filter pushdown is also enabled in this case.
parquet_options.pushdown_filters = true;
parquet_options.reorder_filters = true;
}
}

Expand Down
37 changes: 36 additions & 1 deletion datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
let last_line = &lines[lines.len() - 1]; // bottom border line

let spaces = last_line.len().saturating_sub(4);
let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
let dotted_line = format!("| .{}|", " ".repeat(spaces));

let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines
result.extend(vec![dotted_line; 3]); // Append ... lines
Expand Down Expand Up @@ -632,6 +632,41 @@ mod tests {
.unwrap()
}

#[test]
fn print_maxrows_limited_wide_table() {
let output = PrintBatchesTest::new()
.with_format(PrintFormat::Table)
.with_batches(vec![wide_column_batch()])
.with_maxrows(MaxRows::Limited(1))
.run();
assert_snapshot!(output, @r"
+----+----+----+----+----+----+----+----+----+----+
| c0 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 |
+----+----+----+----+----+----+----+----+----+----+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
| . |
| . |
| . |
+----+----+----+----+----+----+----+----+----+----+
");
}

/// return a schema with many columns (to exercise wide table formatting)
fn wide_column_schema() -> SchemaRef {
let fields: Vec<Field> = (0..10)
.map(|i| Field::new(format!("c{i}"), DataType::Int32, false))
.collect();
Arc::new(Schema::new(fields))
}

/// return a batch with many columns and three rows
fn wide_column_batch() -> RecordBatch {
let arrays: Vec<Arc<dyn arrow::array::Array>> = (0..10)
.map(|_| Arc::new(Int32Array::from(vec![0, 1, 2])) as _)
.collect();
RecordBatch::try_new(wide_column_schema(), arrays).unwrap()
}

/// Slice the record batch into 2 batches
fn split_batch(batch: &RecordBatch) -> Vec<RecordBatch> {
assert!(batch.num_rows() > 1);
Expand Down
17 changes: 17 additions & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,23 @@ fn test_cli_with_unbounded_memory_pool() {
assert_cmd_snapshot!(cmd);
}

#[test]
fn test_cli_wide_result_set_no_crash() {
let mut settings = make_settings();

settings.set_snapshot_suffix("wide_result_set");

let _bound = settings.bind_to_scope();

let mut cmd = cli();
let sql = "SELECT v1 as c0, v1+1 as c1, v1+2 as c2, v1+3 as c3, v1+4 as c4, \
v1+5 as c5, v1+6 as c6, v1+7 as c7, v1+8 as c8, v1+9 as c9 \
FROM generate_series(1, 100) as t1(v1);";
cmd.args(["--maxrows", "5", "--command", sql]);

assert_cmd_snapshot!(cmd);
}

#[tokio::test]
async fn test_cli() {
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
source: datafusion-cli/tests/cli_integration.rs
assertion_line: 307
info:
program: datafusion-cli
args:
- "--maxrows"
- "5"
- "--command"
- "SELECT v1 as c0, v1+1 as c1, v1+2 as c2, v1+3 as c3, v1+4 as c4, v1+5 as c5, v1+6 as c6, v1+7 as c7, v1+8 as c8, v1+9 as c9 FROM generate_series(1, 100) as t1(v1);"
---
success: true
exit_code: 0
----- stdout -----
[CLI_VERSION]
+----+----+----+----+----+----+----+----+----+----+
| c0 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 |
+----+----+----+----+----+----+----+----+----+----+
| 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 |
| 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 |
| 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 |
| . |
| . |
| . |
+----+----+----+----+----+----+----+----+----+----+
100 row(s) fetched. (First 5 displayed. Use --maxrows to adjust)
[ELAPSED]


----- stderr -----
5 changes: 4 additions & 1 deletion datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ pub async fn json_shredding() -> Result<()> {
store.put(&path, payload).await?;

// Set up query execution
let mut cfg = SessionConfig::new();
let mut cfg = SessionConfig::default().set(
"datafusion.execution.parquet.filter_pushdown_min_bytes_per_sec",
&ScalarValue::Float64(Some(0.0)),
);
cfg.options_mut().execution.parquet.pushdown_filters = true;
let ctx = SessionContext::new_with_config(cfg);
ctx.runtime_env().register_object_store(
Expand Down
48 changes: 43 additions & 5 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,11 +877,6 @@ config_namespace! {
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
pub pushdown_filters: bool, default = false

/// (reading) If true, filter expressions evaluated during the parquet decoding operation
/// will be reordered heuristically to minimize the cost of evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false

/// (reading) Force the use of RowSelections for filter results, when
/// pushdown_filters is enabled. If false, the reader will automatically
/// choose between a RowSelection and a Bitmap based on the number and
Expand Down Expand Up @@ -919,6 +914,49 @@ config_namespace! {
/// parquet reader setting. 0 means no caching.
pub max_predicate_cache_size: Option<usize>, default = None

/// (reading) Minimum bytes/sec throughput for adaptive filter pushdown.
/// Filters that achieve at least this throughput (bytes_saved / eval_time)
/// are promoted to row filters.
/// f64::INFINITY = no filters promoted (feature disabled).
/// 0.0 = all filters pushed as row filters (no adaptive logic).
/// Default: 104,857,600 bytes/sec (100 MiB/sec), empirically chosen based on
/// TPC-H, TPC-DS, and ClickBench benchmarks on an m4 MacBook Pro.
/// The optimal value for this setting likely depends on the relative
/// cost of CPU vs. IO in your environment, and to some extent the shape
/// of your query.
///
/// **Interaction with `pushdown_filters`:**
/// This option only takes effect when `pushdown_filters = true`.
/// When pushdown is disabled, all filters run post-scan and this
/// threshold is ignored.
pub filter_pushdown_min_bytes_per_sec: f64, default = 104_857_600.0

/// (reading) Byte-ratio threshold for applying filters one at a time
/// (iterative pruning; aka row-level) vs. all at once (post-scan).
/// The ratio is computed as: (extra filter bytes not in projection) / (projected bytes).
/// Filters whose extra columns consume a smaller fraction than this threshold are placed as row filters.
/// Ratio of filter column bytes to projection bytes that controls
/// initial filter placement. Computed as
/// `filter_compressed_bytes / projection_compressed_bytes`.
/// Filters below this ratio start as row-level filters (enabling late
/// materialization); those above start as post-scan filters.
/// Default: 0.20 — filters whose columns are less than 20% of the
/// projection bytes start at row-level.
///
/// **Interaction with `pushdown_filters`:**
/// Only takes effect when `pushdown_filters = true`.
pub filter_collecting_byte_ratio_threshold: f64, default = 0.20

/// (reading) Z-score for confidence intervals on filter effectiveness.
/// Controls how much statistical evidence is required before promoting
/// or demoting a filter. Lower values = faster decisions with less
/// confidence. Higher values = more conservative, requiring more data.
/// Default: 2.0 (~95% confidence).
///
/// **Interaction with `pushdown_filters`:**
/// Only takes effect when `pushdown_filters = true`.
pub filter_confidence_z: f64, default = 2.0

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
71 changes: 67 additions & 4 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,11 +1338,44 @@ impl SchemaExt for Schema {
}
}

/// Build a fully-qualified field name string. This is equivalent to
/// `format!("{q}.{name}")` when `qualifier` is `Some`, or just `name` when
/// `None`. We avoid going through the `fmt` machinery for performance reasons.
pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
match qualifier {
Some(q) => format!("{q}.{name}"),
None => name.to_string(),
}
let qualifier = match qualifier {
None => return name.to_string(),
Some(q) => q,
};
let (first, second, third) = match qualifier {
TableReference::Bare { table } => (table.as_ref(), None, None),
TableReference::Partial { schema, table } => {
(schema.as_ref(), Some(table.as_ref()), None)
}
TableReference::Full {
catalog,
schema,
table,
} => (
catalog.as_ref(),
Some(schema.as_ref()),
Some(table.as_ref()),
),
};

let extra = second.map_or(0, str::len) + third.map_or(0, str::len);
let mut s = String::with_capacity(first.len() + extra + 3 + name.len());
s.push_str(first);
if let Some(second) = second {
s.push('.');
s.push_str(second);
}
if let Some(third) = third {
s.push('.');
s.push_str(third);
}
s.push('.');
s.push_str(name);
s
}

#[cfg(test)]
Expand All @@ -1351,6 +1384,36 @@ mod tests {

use super::*;

/// `qualified_name` doesn't use `TableReference::Display` for performance
/// reasons, but check that the output is consistent.
#[test]
fn qualified_name_agrees_with_display() {
let cases: &[(Option<TableReference>, &str)] = &[
(None, "col"),
(Some(TableReference::bare("t")), "c0"),
(Some(TableReference::partial("s", "t")), "c0"),
(Some(TableReference::full("c", "s", "t")), "c0"),
(Some(TableReference::bare("mytable")), "some_column_name"),
// Empty segments must be preserved so that distinct qualified
// fields don't collide in `DFSchema::field_names()`.
(Some(TableReference::bare("")), "col"),
(Some(TableReference::partial("s", "")), "col"),
(Some(TableReference::partial("", "t")), "col"),
(Some(TableReference::full("c", "", "t")), "col"),
(Some(TableReference::full("", "s", "t")), "col"),
(Some(TableReference::full("c", "s", "")), "col"),
(Some(TableReference::full("", "", "")), "col"),
];
for (qualifier, name) in cases {
let actual = qualified_name(qualifier.as_ref(), name);
let expected = match qualifier {
Some(q) => format!("{q}.{name}"),
None => name.to_string(),
};
assert_eq!(actual, expected, "qualifier={qualifier:?} name={name}");
}
}

#[test]
fn qualifier_in_name() -> Result<()> {
let col = Column::from_name("t1.c0");
Expand Down
18 changes: 15 additions & 3 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl ParquetOptions {
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,

force_filter_selections: _, // not used for writer props
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
Expand All @@ -210,6 +210,9 @@ impl ParquetOptions {
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
max_predicate_cache_size: _,
filter_pushdown_min_bytes_per_sec: _, // not used for writer props
filter_collecting_byte_ratio_threshold: _, // not used for writer props
filter_confidence_z: _, // not used for writer props
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -470,7 +473,7 @@ mod tests {
skip_metadata: defaults.skip_metadata,
metadata_size_hint: defaults.metadata_size_hint,
pushdown_filters: defaults.pushdown_filters,
reorder_filters: defaults.reorder_filters,

force_filter_selections: defaults.force_filter_selections,
allow_single_file_parallelism: defaults.allow_single_file_parallelism,
maximum_parallel_row_group_writers: defaults
Expand All @@ -484,6 +487,10 @@ mod tests {
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
filter_collecting_byte_ratio_threshold: defaults
.filter_collecting_byte_ratio_threshold,
filter_confidence_z: defaults.filter_confidence_z,
}
}

Expand Down Expand Up @@ -585,7 +592,7 @@ mod tests {
skip_metadata: global_options_defaults.skip_metadata,
metadata_size_hint: global_options_defaults.metadata_size_hint,
pushdown_filters: global_options_defaults.pushdown_filters,
reorder_filters: global_options_defaults.reorder_filters,

force_filter_selections: global_options_defaults.force_filter_selections,
allow_single_file_parallelism: global_options_defaults
.allow_single_file_parallelism,
Expand All @@ -607,6 +614,11 @@ mod tests {
norm_level: c.norm_level,
}
}),
filter_pushdown_min_bytes_per_sec: global_options_defaults
.filter_pushdown_min_bytes_per_sec,
filter_collecting_byte_ratio_threshold: global_options_defaults
.filter_collecting_byte_ratio_threshold,
filter_confidence_z: global_options_defaults.filter_confidence_z,
},
column_specific_options,
key_value_metadata,
Expand Down
Loading
Loading