Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ mod test {
let plan_string = get_plan_string(&aggregate_exec_partial).swap_remove(0);
assert_snapshot!(
plan_string,
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted"
@"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]"
);

let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;
Expand Down
56 changes: 55 additions & 1 deletion datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ impl DataSource for FileScanConfig {
let schema = self.file_source.table_schema().table_schema();
let mut eq_properties = EquivalenceProperties::new_with_orderings(
Arc::clone(schema),
self.output_ordering.clone(),
self.validated_output_ordering(),
)
.with_constraints(self.constraints.clone());

Expand Down Expand Up @@ -926,6 +926,60 @@ impl DataSource for FileScanConfig {
}

impl FileScanConfig {
/// Returns only the output orderings that are validated against actual
/// file group statistics.
///
/// For example, individual files may be ordered by `col1 ASC`,
/// but if we have files with these min/max statistics in a single partition / file group:
///
/// - file1: min(col1) = 10, max(col1) = 20
/// - file2: min(col1) = 5, max(col1) = 15
///
/// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
/// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
///
/// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
///
/// - file1: min(col1) = 20, max(col1) = 30
/// - file2: min(col1) = 10, max(col1) = 15
///
/// On the other hand if we had:
///
/// - file1: min(col1) = 5, max(col1) = 15
/// - file2: min(col1) = 16, max(col1) = 25
///
/// Then we know that reading file1 followed by file2 will produce ordered output,
/// so `col1 ASC` would be retained.
///
/// Note that we are checking for ordering *within* *each* file group / partition,
/// files in different partitions are read independently and do not affect each other's ordering.
/// Merging of the multiple partition streams into a single ordered stream is handled
/// upstream e.g. by `SortPreservingMergeExec`.
fn validated_output_ordering(&self) -> Vec<LexOrdering> {
let schema = self.file_source.table_schema().table_schema();
self.output_ordering
.iter()
.filter(|ordering| {
self.file_groups.iter().all(|group| {
if group.len() <= 1 {
return true; // single-file groups are trivially sorted
}
// TODO: should we cache MinMaxStatistics per file group?
match MinMaxStatistics::new_from_files(
ordering,
schema,
None, // no projection remapping needed at table-schema level
group.iter(),
) {
Ok(stats) => stats.is_sorted(),
Err(_) => false, // can't prove sorted → reject
}
})
})
.cloned()
.collect()
}

/// Get the file schema (schema of the files without partition columns)
pub fn file_schema(&self) -> &SchemaRef {
self.file_source.table_schema().file_schema()
Expand Down
Loading
Loading