Skip to content

Commit

Permalink
refactor: fetch statistics for a given ParquetMetaData (apache#10880)
Browse files Browse the repository at this point in the history
* refactor: fetch statistics for a given ParquetMetaData

* test: add tests for fetch_statistics_from_parquet_meta

* Rename function and improve docs

* Simplify the test

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and appletreeisyellow committed Jun 24, 2024
1 parent cb9068c commit b26f680
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,26 @@ async fn fetch_schema(
}

/// Read and parse the statistics of the Parquet file at location `path`
///
/// See [`statistics_from_parquet_meta`] for more details
async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Statistics> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
statistics_from_parquet_meta(&metadata, table_schema).await
}

/// Convert statistics in [`ParquetMetaData`] into [`Statistics`]
///
/// The statistics are calculated for each column in the table schema
/// using the row group statistics in the parquet metadata.
pub async fn statistics_from_parquet_meta(
metadata: &ParquetMetaData,
table_schema: SchemaRef,
) -> Result<Statistics> {
let file_metadata = metadata.file_metadata();

let file_schema = parquet_to_arrow_schema(
Expand Down Expand Up @@ -1402,6 +1415,66 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_statistics_from_parquet_metadata() -> Result<()> {
// Data for column c1: ["Foo", null, "bar"]
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();

// Data for column c2: [1, 2, null]
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();

// Use store_parquet to write each batch to its own file
// . batch1 written into first file and includes:
// - column c1 that has 3 rows with one null. Stats min and max of string column is missing for this test even the column has values
// . batch2 written into second file and includes:
// - column c2 that has 3 rows with one null. Stats min and max of int are avaialble and 1 and 2 respectively
let store = Arc::new(LocalFileSystem::new()) as _;
let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?;

let state = SessionContext::new().state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&state, &store, &files).await.unwrap();

let null_i64 = ScalarValue::Int64(None);
let null_utf8 = ScalarValue::Utf8(None);

// Fetch statistics for first file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
//
assert_eq!(stats.num_rows, Precision::Exact(3));
// column c1
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c1_stats.max_value, Precision::Absent);
assert_eq!(c1_stats.min_value, Precision::Absent);
// column c2: missing from the file so the table treats all 3 rows as null
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(3));
assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone()));
assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));

// Fetch statistics for second file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
assert_eq!(stats.num_rows, Precision::Exact(3));
// column c1: missing from the file so the table treats all 3 rows as null
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(3));
assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone()));
assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone()));
// column c2
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(1));
assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into()));
assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into()));

Ok(())
}

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
Expand Down

0 comments on commit b26f680

Please sign in to comment.