diff --git a/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs deleted file mode 100644 index 4a4d7cf7a33f..000000000000 --- a/datafusion/core/src/datasource/physical_plan/parquet/arrow_statistics.rs +++ /dev/null @@ -1,252 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Extract parquet statistics and convert it to Arrow statistics - -use std::{fs::File, sync::Arc}; - -use arrow_array::{ArrayRef, Int64Array, UInt64Array}; -use arrow_schema::DataType; -use datafusion_common::{DataFusionError, Result}; -use parquet::{ - arrow::arrow_reader::ParquetRecordBatchReaderBuilder, - file::statistics::{Statistics as ParquetStatistics, ValueStatistics}, -}; - -use super::statistics::parquet_column; - -/// statistics extracted from `Statistics` as Arrow `ArrayRef`s -/// -/// # Note: -/// If the corresponding `Statistics` is not present, or has no information for -/// a column, a NULL is present in the corresponding array entry -#[derive(Debug)] -pub struct ArrowStatistics { - /// min values - min: ArrayRef, - /// max values - max: ArrayRef, - /// Row counts (UInt64Array) - row_count: ArrayRef, - /// Null Counts (UInt64Array) - null_count: ArrayRef, -} - -impl ArrowStatistics { - /// Create a new instance of `ArrowStatistics` - pub fn new( - min: ArrayRef, - max: ArrayRef, - row_count: ArrayRef, - null_count: ArrayRef, - ) -> Self { - Self { - min, - max, - row_count, - null_count, - } - } - - /// Get the min values - pub fn min(&self) -> &ArrayRef { - &self.min - } - - /// Get the max values - pub fn max(&self) -> &ArrayRef { - &self.max - } - - /// Get the row counts - pub fn row_count(&self) -> &ArrayRef { - &self.row_count - } - - /// Get the null counts - pub fn null_count(&self) -> &ArrayRef { - &self.null_count - } -} - -/// Extract `ArrowStatistics` from the parquet [`Statistics`] -pub fn parquet_stats_to_arrow( - arrow_datatype: &DataType, - statistics: &ParquetColumnStatistics, -) -> Result { - // check of the data type is Int64 - if !matches!(arrow_datatype, DataType::Int64) { - return Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for statistics", - arrow_datatype - ))); - } - - // row counts - let row_count = statistics - .rg_statistics - .iter() - .map(|rg| rg.row_count) - .collect::>(); - - // get null counts - let parquet_stats = statistics.rg_statistics.iter().map(|rg| rg.statistics); - let null_counts = parquet_stats - // .map(|stats| stats.and_then(|s| Some(s.null_count()))) - .map(|stats| stats.map(|s| s.null_count())) - .collect::>(); - - // get min and max values - let parquet_stats_with_min_max = statistics - .rg_statistics - .iter() - .map(|rg| rg.get_statistics()) - .collect::>(); - - let mins = parquet_stats_with_min_max - .iter() - .map(|stats| { - stats.and_then(|s| { - let stats = ParquetColumnRowGroupStatistics::try_as_i64(s)?; - Some(*stats.min()) - }) - }) - .collect::>(); - - let maxs = parquet_stats_with_min_max - .iter() - .map(|stats| { - stats.and_then(|s| { - let stats = ParquetColumnRowGroupStatistics::try_as_i64(s)?; - Some(*stats.max()) - }) - }) - .collect::>(); - - Ok(ArrowStatistics { - min: Arc::new(Int64Array::from(mins)), - max: Arc::new(Int64Array::from(maxs)), - row_count: Arc::new(UInt64Array::from(row_count)), - null_count: Arc::new(UInt64Array::from(null_counts)), - }) -} - -/// All row group statistics of a file for a column -pub struct ParquetColumnStatistics<'a> { - // todo: do we need this? - // arrow column schema - // column_schema: &'a FieldRef, - _column_name: &'a str, // todo: do we need this? - rg_statistics: Vec>, -} - -/// Row group statistics of a column -pub struct ParquetColumnRowGroupStatistics<'a> { - row_count: u64, - statistics: Option<&'a ParquetStatistics>, -} - -impl<'a> ParquetColumnRowGroupStatistics<'a> { - /// Create a new instance of `ParquetColumnRowGroupStatistics` - pub fn new(row_count: u64, statistics: Option<&'a ParquetStatistics>) -> Self { - Self { - row_count, - statistics, - } - } - - /// Return statistics if it exists and has min max - /// Otherwise return None - pub fn get_statistics(&self) -> Option<&'a ParquetStatistics> { - let stats = self.statistics?; - if stats.has_min_max_set() { - Some(stats) - } else { - None - } - } - - /// Return the statistics as ValuesStatistcs if the column is i64 - /// Otherwise return None - fn try_as_i64(stats: &'a ParquetStatistics) -> Option<&'a ValueStatistics> { - if let parquet::file::statistics::Statistics::Int64(statistics) = stats { - Some(statistics) - } else { - None - } - } -} - -impl<'a> ParquetColumnStatistics<'a> { - /// Create a new instance of `ParquetColumnStatistics` - pub fn new( - _column_name: &'a str, - rg_statistics: Vec>, - ) -> Self { - Self { - _column_name, - rg_statistics, - } - } - - /// Extract statistics of all columns from a parquet file metadata - pub fn from_parquet_statistics( - reader: &'a ParquetRecordBatchReaderBuilder, - ) -> Result> { - // Get metadata & schemas - let metadata = reader.metadata(); - let parquet_schema = reader.parquet_schema(); - let arrow_schema = reader.schema(); - - // Get colum names from arrow schema & its index in the parquet schema - let columns = arrow_schema - .fields() - .iter() - .map(|f| { - let col_name = f.name(); - let col_idx = parquet_column(parquet_schema, arrow_schema, col_name); - match col_idx { - Some(idx) => Ok((col_name, idx)), - None => Err(DataFusionError::Internal(format!( - "Column {} in Arrow schema not found in Parquet schema", - col_name, - ))), - } - }) - .collect::, _>>()?; - - // Get statistics for each column - let col_stats = columns - .iter() - .map(|(col_name, col_idx)| { - let rg_statistics = metadata - .row_groups() - .iter() - .map(|rg_meta| { - let row_count = rg_meta.num_rows() as u64; - let statistics = rg_meta.column(col_idx.0).statistics(); - ParquetColumnRowGroupStatistics::new(row_count, statistics) - }) - .collect::>(); - - ParquetColumnStatistics::new(col_name, rg_statistics) - }) - .collect::>(); - - Ok(col_stats) - } -} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index a9e340cce1a8..83fed1a9caab 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -63,15 +63,14 @@ use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; use tokio::task::JoinSet; -pub mod arrow_statistics; mod metrics; mod page_filter; mod row_filter; mod row_groups; mod statistics; -// pub use arrow_statistics::ParquetColumnStatistics; pub use metrics::ParquetFileMetrics; +pub use statistics::{RequestedStatistics, StatisticsConverter}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 8972c261b14a..0ebf7dfe2384 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -20,11 +20,15 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 use arrow::{array::ArrayRef, datatypes::DataType}; -use arrow_array::new_empty_array; -use arrow_schema::{FieldRef, Schema}; -use datafusion_common::{Result, ScalarValue}; +use arrow_array::{new_empty_array, new_null_array, UInt64Array}; +use arrow_schema::{Field, FieldRef, Schema}; +use datafusion_common::{ + internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, +}; +use parquet::file::metadata::ParquetMetaData; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; +use std::sync::Arc; // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. @@ -210,6 +214,152 @@ fn collect_scalars>>( } } +/// What type of statistics should be extracted? +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum RequestedStatistics { + /// Minimum Value + Min, + /// Maximum Value + Max, + /// Null Count, returned as a [`UInt64Array`]) + NullCount, +} + +/// Extracts Parquet statistics as Arrow arrays +/// +/// This is used to convert Parquet statistics to Arrow arrays, with proper type +/// conversions. This information can be used for pruning parquet files or row +/// groups based on the statistics embedded in parquet files +/// +/// # Schemas +/// +/// The schema of the parquet file and the arrow schema are used to convert the +/// underlying statistics value (stored as a parquet value) into the +/// corresponding Arrow value. For example, Decimals are stored as binary in +/// parquet files. +/// +/// The parquet_schema and arrow _schema do not have to be identical (for +/// example, the columns may be in different orders and one or the other schemas +/// may have additional columns). The function [`parquet_column`] is used to +/// match the column in the parquet file to the column in the arrow schema. +/// +/// # Multiple parquet files +/// +/// This API is designed to support efficiently extracting statistics from +/// multiple parquet files (hence why the parquet schema is passed in as an +/// argument). This is useful when building an index for a directory of parquet +/// files. +/// +#[derive(Debug)] +pub struct StatisticsConverter<'a> { + /// The name of the column to extract statistics for + column_name: &'a str, + /// The type of statistics to extract + statistics_type: RequestedStatistics, + /// The arrow schema of the query + arrow_schema: &'a Schema, + /// The field (with data type) of the column in the arrow schema + arrow_field: &'a Field, +} + +impl<'a> StatisticsConverter<'a> { + /// Returns a [`UInt64Array`] with counts for each row group + /// + /// The returned array has no nulls, and has one value for each row group. + /// Each value is the number of rows in the row group. + pub fn row_counts(metadata: &ParquetMetaData) -> Result { + let row_groups = metadata.row_groups(); + let mut builder = UInt64Array::builder(row_groups.len()); + for row_group in row_groups { + let row_count = row_group.num_rows(); + let row_count: u64 = row_count.try_into().map_err(|e| { + internal_datafusion_err!( + "Parquet row count {row_count} too large to convert to u64: {e}" + ) + })?; + builder.append_value(row_count); + } + Ok(builder.finish()) + } + + /// create an new statistics converter + pub fn try_new( + column_name: &'a str, + statistics_type: RequestedStatistics, + arrow_schema: &'a Schema, + ) -> Result { + // ensure the requested column is in the arrow schema + let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else { + return plan_err!( + "Column '{}' not found in schema for statistics conversion", + column_name + ); + }; + + Ok(Self { + column_name, + statistics_type, + arrow_schema, + arrow_field, + }) + } + + /// extract the statistics from a parquet file, given the parquet file's metadata + /// + /// The returned array contains 1 value for each row group in the parquet + /// file in order + /// + /// Each value is either + /// * the requested statistics type for the column + /// * a null value, if the statistics can not be extracted + /// + /// Note that a null value does NOT mean the min or max value was actually + /// `null` it means it the requested statistic is unknown + /// + /// Reasons for not being able to extract the statistics include: + /// * the column is not present in the parquet file + /// * statistics for the column are not present in the row group + /// * the stored statistic value can not be converted to the requested type + pub fn extract(&self, metadata: &ParquetMetaData) -> Result { + let data_type = self.arrow_field.data_type(); + let num_row_groups = metadata.row_groups().len(); + + let parquet_schema = metadata.file_metadata().schema_descr(); + let row_groups = metadata.row_groups(); + + // find the column in the parquet schema, if not, return a null array + let Some((parquet_idx, matched_field)) = + parquet_column(parquet_schema, self.arrow_schema, self.column_name) + else { + // column was in the arrow schema but not in the parquet schema, so return a null array + return Ok(new_null_array(data_type, num_row_groups)); + }; + + // sanity check that matching field matches the arrow field + if matched_field.as_ref() != self.arrow_field { + return internal_err!( + "Matched column '{:?}' does not match original matched column '{:?}'", + matched_field, + self.arrow_field + ); + } + + // Get an iterator over the column statistics + let iter = row_groups + .iter() + .map(|x| x.column(parquet_idx).statistics()); + + match self.statistics_type { + RequestedStatistics::Min => min_statistics(data_type, iter), + RequestedStatistics::Max => max_statistics(data_type, iter), + RequestedStatistics::NullCount => { + let null_counts = iter.map(|stats| stats.map(|s| s.null_count())); + Ok(Arc::new(UInt64Array::from_iter(null_counts))) + } + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 458d4222770b..2ccd12a27dd8 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -23,8 +23,8 @@ use std::sync::Arc; use arrow_array::{make_array, Array, ArrayRef, Int64Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; -use datafusion::datasource::physical_plan::parquet::arrow_statistics::{ - self, parquet_stats_to_arrow, +use datafusion::datasource::physical_plan::parquet::{ + RequestedStatistics, StatisticsConverter, }; use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; use parquet::arrow::ArrowWriter; @@ -99,46 +99,90 @@ pub fn parquet_file( ArrowReaderBuilder::try_new(file).unwrap() } +struct Test { + reader: ParquetRecordBatchReaderBuilder, + expected_min: ArrayRef, + expected_max: ArrayRef, + expected_null_counts: UInt64Array, + expected_row_counts: UInt64Array, +} + +impl Test { + fn run(self) { + let Self { + reader, + expected_min, + expected_max, + expected_null_counts, + expected_row_counts, + } = self; + + let min = StatisticsConverter::try_new( + "i64", + RequestedStatistics::Min, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + assert_eq!(&min, &expected_min, "Mismatch with expected minimums"); + + let max = StatisticsConverter::try_new( + "i64", + RequestedStatistics::Max, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + assert_eq!(&max, &expected_max, "Mismatch with expected maximum"); + + let null_counts = StatisticsConverter::try_new( + "i64", + RequestedStatistics::NullCount, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; + assert_eq!( + &null_counts, &expected_null_counts, + "Mismatch with expected null counts" + ); + + let row_counts = StatisticsConverter::row_counts(reader.metadata()).unwrap(); + assert_eq!( + row_counts, expected_row_counts, + "Mismatch with expected row counts" + ); + } +} + // TESTS +// +// Remaining cases +// - Create parquet files / metadata with missing statistic values +// - Create parquet files / metadata with different data types +// - Create parquet files / metadata with different row group sizes +// - Using truncated statistics ("exact min value" and "exact max value" https://docs.rs/parquet/latest/parquet/file/statistics/enum.Statistics.html#method.max_is_exact) #[tokio::test] async fn test_one_row_group_without_null() { let row_per_group = 20; let reader = parquet_file(0, 4, 7, row_per_group); - - let statistics = - arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) - .expect("getting statistics"); - - // only 1 column - assert_eq!(statistics.len(), 1, "expected 1 column"); - - let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); - println!("Arrow statistics:\n {arrow_stats:#?}"); - - // only one row group - let min = arrow_stats.min(); - assert_eq!(arrow_stats.min().len(), 1, "expected 1 row group"); - - // min is 4 - let expect = Int64Array::from(vec![4]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(min, &expect); - - // max is 6 - let expect = Int64Array::from(vec![6]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.max(), &expect); - - // no nulls - let expect = UInt64Array::from(vec![0]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.null_count(), &expect); - - // 3 rows - let expect = UInt64Array::from(vec![3]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.row_count(), &expect); + Test { + reader, + // min is 4 + expected_min: Arc::new(Int64Array::from(vec![4])), + // max is 6 + expected_max: Arc::new(Int64Array::from(vec![6])), + // no nulls + expected_null_counts: UInt64Array::from(vec![0]), + // 3 rows + expected_row_counts: UInt64Array::from(vec![3]), + } + .run() } #[tokio::test] @@ -146,39 +190,18 @@ async fn test_one_row_group_with_null_and_negative() { let row_per_group = 20; let reader = parquet_file(2, -1, 5, row_per_group); - let statistics = - arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) - .expect("getting statistics"); - - // only 1 column - assert_eq!(statistics.len(), 1, "expected 1 column"); - - let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); - println!("Arrow statistics:\n {arrow_stats:#?}"); - - // only one row group - let min = arrow_stats.min(); - assert_eq!(arrow_stats.min().len(), 1, "expected 1 row group"); - - // min is -1 - let expect = Int64Array::from(vec![-1]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(min, &expect); - - // max is 4 - let expect = Int64Array::from(vec![4]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.max(), &expect); - - // 2 nulls - let expect = UInt64Array::from(vec![2]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.null_count(), &expect); - - // 8 rows - let expect = UInt64Array::from(vec![8]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(arrow_stats.row_count(), &expect); + Test { + reader, + // min is -1 + expected_min: Arc::new(Int64Array::from(vec![-1])), + // max is 4 + expected_max: Arc::new(Int64Array::from(vec![4])), + // 2 nulls + expected_null_counts: UInt64Array::from(vec![2]), + // 8 rows + expected_row_counts: UInt64Array::from(vec![8]), + } + .run() } #[tokio::test] @@ -186,42 +209,18 @@ async fn test_two_row_group_with_null() { let row_per_group = 10; let reader = parquet_file(2, 4, 17, row_per_group); - let statistics = - arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) - .expect("getting statistics"); - - // only 1 column - assert_eq!(statistics.len(), 1, "expected 1 column"); - - let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); - println!("Arrow statistics:\n {arrow_stats:#?}"); - - // 2 row groups - let mins = arrow_stats.min(); - assert_eq!(mins.len(), 2, "expected 2 row groups"); - - // mins are [4, 14] - let expect = Int64Array::from(vec![4, 14]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(mins, &expect); - - // maxs are [13, 16] - let maxs = arrow_stats.max(); - let expect = Int64Array::from(vec![13, 16]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(maxs, &expect); - - // nuls are [0, 2] - let nulls = arrow_stats.null_count(); - let expect = UInt64Array::from(vec![0, 2]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(nulls, &expect); - - // row counts are [10, 5] - let row_counts = arrow_stats.row_count(); - let expect = UInt64Array::from(vec![10, 5]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(row_counts, &expect); + Test { + reader, + // mins are [4, 14] + expected_min: Arc::new(Int64Array::from(vec![4, 14])), + // maxes are [13, 16] + expected_max: Arc::new(Int64Array::from(vec![13, 16])), + // nulls are [0, 2] + expected_null_counts: UInt64Array::from(vec![0, 2]), + // row counts are [10, 5] + expected_row_counts: UInt64Array::from(vec![10, 5]), + } + .run() } #[tokio::test] @@ -229,42 +228,16 @@ async fn test_two_row_groups_with_all_nulls_in_one() { let row_per_group = 5; let reader = parquet_file(4, -2, 2, row_per_group); - let statistics = - arrow_statistics::ParquetColumnStatistics::from_parquet_statistics(&reader) - .expect("getting statistics"); - - // only 1 column - assert_eq!(statistics.len(), 1, "expected 1 column"); - - let arrow_stats = parquet_stats_to_arrow(&DataType::Int64, &statistics[0]).unwrap(); - println!("Arrow statistics:\n {arrow_stats:#?}"); - - // 2 row groups - let mins = arrow_stats.min(); - assert_eq!(mins.len(), 2, "expected 2 row groups"); - - // mins are [-2, null] - assert!(mins.is_null(1)); - // check the first value -2 - // let expect = Int64Array::from(vec![-2]); - // let expect = Arc::new(expect) as ArrayRef; - // assert_eq!(mins, &expect); - - // maxs are [1, null - assert!(arrow_stats.max().is_null(1)); - // let expect = Int64Array::from(vec![1]); - // let expect = Arc::new(expect) as ArrayRef; - // assert_eq!(arrow_stats.max(), &expect); - - // nuls are [1, 3] - let nulls = arrow_stats.null_count(); - let expect = UInt64Array::from(vec![1, 3]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(nulls, &expect); - - // row counts are [5, 3] - let row_counts = arrow_stats.row_count(); - let expect = UInt64Array::from(vec![5, 3]); - let expect = Arc::new(expect) as ArrayRef; - assert_eq!(row_counts, &expect); + Test { + reader, + // mins are [-2, null] + expected_min: Arc::new(Int64Array::from(vec![Some(-2), None])), + // maxes are [1, null] + expected_max: Arc::new(Int64Array::from(vec![Some(1), None])), + // nulls are [1, 3] + expected_null_counts: UInt64Array::from(vec![1, 3]), + // row counts are [5, 3] + expected_row_counts: UInt64Array::from(vec![5, 3]), + } + .run() }