Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 297 additions & 3 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,27 @@ pub struct Statistics {
pub column_statistics: Vec<ColumnStatistics>,
}

/// Fallback to use when NDV overlap can not be estimated from column bounds.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum NdvFallback {
/// Use the larger input NDV. This is the conservative default for
/// related fragments such as files from the same table.
#[default]
Max,
/// Sum the input NDVs. This is a conservative upper bound for
/// independent inputs such as `UNION ALL`.
Sum,
}

impl NdvFallback {
fn merge(self, left: usize, right: usize) -> usize {
match self {
Self::Max => usize::max(left, right),
Self::Sum => left.saturating_add(right),
}
}
}

impl Default for Statistics {
/// Returns a new [`Statistics`] instance with all fields set to unknown
/// and no columns.
Expand Down Expand Up @@ -630,6 +651,9 @@ impl Statistics {
/// The method assumes that all statistics are for the same schema.
/// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent.
///
/// This method uses [`NdvFallback::Max`] when `distinct_count` overlap
/// can not be estimated from column bounds.
///
/// Returns an error if the statistics do not match the specified schemas.
///
/// # Example
Expand Down Expand Up @@ -670,6 +694,19 @@ impl Statistics {
/// Precision::Exact(ScalarValue::Int64(Some(1500))));
/// ```
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
where
I: IntoIterator<Item = &'a Statistics>,
{
Self::try_merge_iter_with_ndv_fallback(items, schema, NdvFallback::Max)
}

/// Same as [`Statistics::try_merge_iter`], but lets callers choose the
/// fallback used when `distinct_count` overlap can not be estimated.
pub fn try_merge_iter_with_ndv_fallback<'a, I>(
items: I,
schema: &Schema,
ndv_fallback: NdvFallback,
) -> Result<Statistics>
where
I: IntoIterator<Item = &'a Statistics>,
{
Expand Down Expand Up @@ -717,7 +754,7 @@ impl Statistics {
) {
(Some(&l), Some(&r)) => Precision::Inexact(
estimate_ndv_with_overlap(col_stats, item_cs, l, r)
.unwrap_or_else(|| usize::max(l, r)),
.unwrap_or_else(|| ndv_fallback.merge(l, r)),
),
_ => Precision::Absent,
};
Expand Down Expand Up @@ -1465,6 +1502,44 @@ mod tests {
}
}

fn make_single_i64_ndv_stats(
distinct_count: Precision<usize>,
min_value: Option<i64>,
max_value: Option<i64>,
) -> Statistics {
let to_precision = |value| Precision::Exact(ScalarValue::Int64(Some(value)));

Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_distinct_count(distinct_count)
.with_min_value(
min_value.map(to_precision).unwrap_or(Precision::Absent),
)
.with_max_value(
max_value.map(to_precision).unwrap_or(Precision::Absent),
),
)
}

fn merge_single_i64_ndv_distinct_count(
left: Statistics,
right: Statistics,
ndv_fallback: NdvFallback,
) -> Precision<usize> {
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);

Statistics::try_merge_iter_with_ndv_fallback(
[&left, &right],
&schema,
ndv_fallback,
)
.unwrap()
.column_statistics[0]
.distinct_count
}

#[test]
fn test_try_merge() {
// Create a schema with two columns
Expand Down Expand Up @@ -1815,7 +1890,7 @@ mod tests {

let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
// No min/max -> fallback to max(5, 8)
// No min/max -> default fallback is max
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(8)
Expand Down Expand Up @@ -1851,13 +1926,55 @@ mod tests {

let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
// distance() unsupported for strings -> fallback to max
// distance() unsupported for strings -> default fallback is max
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(8)
);
}

#[test]
fn test_try_merge_ndv_non_numeric_types_sum_fallback() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"aaa".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"zzz".to_string(),
))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"bbb".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"yyy".to_string(),
))))
.with_distinct_count(Precision::Exact(8)),
);

let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let merged = Statistics::try_merge_iter_with_ndv_fallback(
[&stats1, &stats2],
&schema,
NdvFallback::Sum,
)
.unwrap();

// distance() unsupported for strings -> sum fallback is caller-selected
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(13)
);
}

#[test]
fn test_try_merge_ndv_constant_columns() {
// Same constant: [5,5]+[5,5] -> max
Expand Down Expand Up @@ -1910,6 +2027,183 @@ mod tests {
);
}

#[test]
fn test_try_merge_ndv_original_union_edge_cases() {
struct NdvTestCase {
name: &'static str,
left_ndv: Precision<usize>,
left_min: Option<i64>,
left_max: Option<i64>,
right_ndv: Precision<usize>,
right_min: Option<i64>,
right_max: Option<i64>,
expected: Precision<usize>,
}

let cases = vec![
NdvTestCase {
name: "disjoint ranges",
left_ndv: Precision::Exact(5),
left_min: Some(0),
left_max: Some(10),
right_ndv: Precision::Exact(3),
right_min: Some(20),
right_max: Some(30),
expected: Precision::Inexact(8),
},
NdvTestCase {
name: "identical ranges",
left_ndv: Precision::Exact(10),
left_min: Some(0),
left_max: Some(100),
right_ndv: Precision::Exact(8),
right_min: Some(0),
right_max: Some(100),
expected: Precision::Inexact(10),
},
NdvTestCase {
name: "partial overlap",
left_ndv: Precision::Exact(100),
left_min: Some(0),
left_max: Some(100),
right_ndv: Precision::Exact(50),
right_min: Some(50),
right_max: Some(150),
expected: Precision::Inexact(125),
},
NdvTestCase {
name: "right contained in left",
left_ndv: Precision::Exact(100),
left_min: Some(0),
left_max: Some(100),
right_ndv: Precision::Exact(50),
right_min: Some(25),
right_max: Some(75),
expected: Precision::Inexact(100),
},
NdvTestCase {
name: "same constant value",
left_ndv: Precision::Exact(1),
left_min: Some(5),
left_max: Some(5),
right_ndv: Precision::Exact(1),
right_min: Some(5),
right_max: Some(5),
expected: Precision::Inexact(1),
},
NdvTestCase {
name: "different constant values",
left_ndv: Precision::Exact(1),
left_min: Some(5),
left_max: Some(5),
right_ndv: Precision::Exact(1),
right_min: Some(10),
right_max: Some(10),
expected: Precision::Inexact(2),
},
NdvTestCase {
name: "left constant within right range",
left_ndv: Precision::Exact(1),
left_min: Some(5),
left_max: Some(5),
right_ndv: Precision::Exact(10),
right_min: Some(0),
right_max: Some(10),
expected: Precision::Inexact(10),
},
NdvTestCase {
name: "left constant outside right range",
left_ndv: Precision::Exact(1),
left_min: Some(20),
left_max: Some(20),
right_ndv: Precision::Exact(10),
right_min: Some(0),
right_max: Some(10),
expected: Precision::Inexact(11),
},
NdvTestCase {
name: "right constant within left range",
left_ndv: Precision::Exact(10),
left_min: Some(0),
left_max: Some(10),
right_ndv: Precision::Exact(1),
right_min: Some(5),
right_max: Some(5),
expected: Precision::Inexact(10),
},
NdvTestCase {
name: "right constant outside left range",
left_ndv: Precision::Exact(10),
left_min: Some(0),
left_max: Some(10),
right_ndv: Precision::Exact(1),
right_min: Some(20),
right_max: Some(20),
expected: Precision::Inexact(11),
},
NdvTestCase {
name: "missing bounds exact plus exact",
left_ndv: Precision::Exact(10),
left_min: None,
left_max: None,
right_ndv: Precision::Exact(5),
right_min: None,
right_max: None,
expected: Precision::Inexact(15),
},
NdvTestCase {
name: "missing bounds exact plus inexact",
left_ndv: Precision::Exact(10),
left_min: None,
left_max: None,
right_ndv: Precision::Inexact(5),
right_min: None,
right_max: None,
expected: Precision::Inexact(15),
},
NdvTestCase {
name: "missing bounds inexact plus inexact",
left_ndv: Precision::Inexact(7),
left_min: None,
left_max: None,
right_ndv: Precision::Inexact(3),
right_min: None,
right_max: None,
expected: Precision::Inexact(10),
},
NdvTestCase {
name: "exact plus absent",
left_ndv: Precision::Exact(10),
left_min: None,
left_max: None,
right_ndv: Precision::Absent,
right_min: None,
right_max: None,
expected: Precision::Absent,
},
NdvTestCase {
name: "inexact plus absent",
left_ndv: Precision::Inexact(4),
left_min: None,
left_max: None,
right_ndv: Precision::Absent,
right_min: None,
right_max: None,
expected: Precision::Absent,
},
];

for case in cases {
let actual = merge_single_i64_ndv_distinct_count(
make_single_i64_ndv_stats(case.left_ndv, case.left_min, case.left_max),
make_single_i64_ndv_stats(case.right_ndv, case.right_min, case.right_max),
NdvFallback::Sum,
);

assert_eq!(actual, case.expected, "case {} failed", case.name);
}
}

#[test]
fn test_with_fetch_basic_preservation() {
// Test that column statistics and byte size are preserved (as inexact) when applying fetch
Expand Down
Loading
Loading