Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1895,13 +1895,28 @@ mod tests {
struct File {
name: &'static str,
date: &'static str,
statistics: Vec<Option<(f64, f64)>>,
statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
}
impl File {
fn new(
name: &'static str,
date: &'static str,
statistics: Vec<Option<(f64, f64)>>,
) -> Self {
Self::new_nullable(
name,
date,
statistics
.into_iter()
.map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
.collect(),
)
}

fn new_nullable(
name: &'static str,
date: &'static str,
statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
) -> Self {
Self {
name,
Expand Down Expand Up @@ -1968,21 +1983,35 @@ mod tests {
sort: vec![col("value").sort(false, true)],
expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
},
// reject nullable sort columns
TestCase {
name: "no nullable sort columns",
name: "nullable sort columns, nulls last",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
true, // should fail because nullable
true,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
},
TestCase {
name: "nullable sort columns, nulls first",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
true,
)]),
files: vec![
File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
],
sort: vec![col("value").sort(true, true)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
},
TestCase {
name: "all three non-overlapping",
Expand Down Expand Up @@ -2142,12 +2171,12 @@ mod tests {
.map(|stats| {
stats
.map(|(min, max)| ColumnStatistics {
min_value: Precision::Exact(ScalarValue::from(
min,
)),
max_value: Precision::Exact(ScalarValue::from(
max,
)),
min_value: Precision::Exact(
ScalarValue::Float64(min),
),
max_value: Precision::Exact(
ScalarValue::Float64(max),
),
..Default::default()
})
.unwrap_or_default()
Expand Down
7 changes: 0 additions & 7 deletions datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,7 @@ impl MinMaxStatistics {
.zip(sort_columns.iter().copied())
.map(|(sort_expr, column)| {
let schema = values.schema();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be the only actual code change: remove these lines

git praise says they came in via #9593 from @suremarc . Do you remember why this condition was added @suremarc ?

https://github.com/apache/datafusion/blame/a614716e7d97ff1d3374aef31b9a66fd10141423/datafusion/datasource/src/statistics.rs#L238

let idx = schema.index_of(column.name())?;
let field = schema.field(idx);

// check that sort columns are non-nullable
if field.is_nullable() {
return plan_err!("cannot sort by nullable column");
}

Ok(SortColumn {
values: Arc::clone(values.column(idx)),
Expand Down
6 changes: 2 additions & 4 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ STORED AS PARQUET;
----
3

# Check output plan again, expect no "output_ordering" clause in the physical_plan -> ParquetExec,
# due to there being more files than partitions:
# Check output plan again
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the actual reason why the output_ordering was missing here wasn't the number of files, but because DF had trouble with a column that wasn't marked as NOT NULL (i.e. is nullable).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- I reviewed the test and the table definition explicitly says

WITH ORDER (string_col ASC NULLS LAST, int_col ASC NULLS LAST)

So I would expect this plan not to have additional sorting

query TT
EXPLAIN SELECT int_col, string_col
FROM test_table
Expand All @@ -142,8 +141,7 @@ logical_plan
02)--TableScan: test_table projection=[int_col, string_col]
physical_plan
01)SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST]
02)--SortExec: expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], file_type=parquet
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], file_type=parquet


# Perform queries using MIN and MAX
Expand Down
57 changes: 33 additions & 24 deletions datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@ CREATE TABLE src_table (
bigint_col BIGINT,
date_col DATE,
overlapping_col INT,
constant_col INT
constant_col INT,
nulls_first_col INT,
nulls_last_col INT
) AS VALUES
-- first file
(1, 3, 'aaa', 100, 1, 0, 0),
(2, 2, 'bbb', 200, 2, 1, 0),
(3, 1, 'ccc', 300, 3, 2, 0),
(1, 3, 'aaa', 100, 1, 0, 0, NULL, 1),
(2, 2, 'bbb', 200, 2, 1, 0, NULL, 2),
(3, 1, 'ccc', 300, 3, 2, 0, 1, 3),
-- second file
(4, 6, 'ddd', 400, 4, 0, 0),
(5, 5, 'eee', 500, 5, 1, 0),
(6, 4, 'fff', 600, 6, 2, 0),
(4, 6, 'ddd', 400, 4, 0, 0, 2, 4),
(5, 5, 'eee', 500, 5, 1, 0, 3, 5),
(6, 4, 'fff', 600, 6, 2, 0, 4, 6),
-- third file
(7, 9, 'ggg', 700, 7, 3, 0),
(8, 8, 'hhh', 800, 8, 4, 0),
(9, 7, 'iii', 900, 9, 5, 0);
(7, 9, 'ggg', 700, 7, 3, 0, 5, 7),
(8, 8, 'hhh', 800, 8, 4, 0, 6, NULL),
(9, 7, 'iii', 900, 9, 5, 0, 7, NULL);

# Setup 3 files, in particular more files than there are partitions

Expand Down Expand Up @@ -90,45 +92,52 @@ CREATE EXTERNAL TABLE test_table (
bigint_col BIGINT NOT NULL,
date_col DATE NOT NULL,
overlapping_col INT NOT NULL,
constant_col INT NOT NULL
constant_col INT NOT NULL,
nulls_first_col INT,
nulls_last_col INT
)
STORED AS PARQUET
PARTITIONED BY (partition_col)
WITH ORDER (int_col ASC NULLS LAST, bigint_col ASC NULLS LAST)
WITH ORDER (
int_col ASC NULLS LAST,
bigint_col ASC NULLS LAST,
nulls_first_col ASC NULLS FIRST,
nulls_last_col ASC NULLS LAST
)
LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table';

# Order by numeric columns
# This is to exercise file group sorting, which uses file-level statistics
# DataFusion doesn't currently support string column statistics
# This should not require a sort.
query TT
EXPLAIN SELECT int_col, bigint_col
EXPLAIN SELECT int_col, bigint_col, nulls_first_col, nulls_last_col
FROM test_table
ORDER BY int_col, bigint_col;
ORDER BY int_col, bigint_col, nulls_first_col NULLS FIRST, nulls_last_col NULLS LAST;
----
logical_plan
01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST
02)--TableScan: test_table projection=[int_col, bigint_col]
01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST
02)--TableScan: test_table projection=[int_col, bigint_col, nulls_first_col, nulls_last_col]
physical_plan
01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST]
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet
01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST]
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], file_type=parquet

# Another planning test, but project on a column with unsupported statistics
# We should be able to ignore this and look at only the relevant statistics
query TT
EXPLAIN SELECT string_col
FROM test_table
ORDER BY int_col, bigint_col;
ORDER BY int_col, bigint_col, nulls_first_col NULLS FIRST, nulls_last_col NULLS LAST;
----
logical_plan
01)Projection: test_table.string_col
02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST
03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col
04)------TableScan: test_table projection=[int_col, string_col, bigint_col]
02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST
03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col, test_table.nulls_first_col, test_table.nulls_last_col
04)------TableScan: test_table projection=[int_col, string_col, bigint_col, nulls_first_col, nulls_last_col]
physical_plan
01)ProjectionExec: expr=[string_col@0 as string_col]
02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST]
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST], file_type=parquet
02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST]
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST], file_type=parquet

# Clean up & recreate but sort on descending column
statement ok
Expand Down