From ed6a0f38c84941a760060abba9a1f14b93410f64 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 09:23:02 +0200 Subject: [PATCH 01/22] feat: enable page statistics --- datafusion/core/tests/parquet/arrow_statistics.rs | 7 +++++-- datafusion/core/tests/parquet/mod.rs | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 0e23e6824027..0d964c6e94fe 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -40,7 +40,9 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::StatisticsConverter; use half::f16; -use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::arrow_reader::{ + ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; @@ -159,7 +161,8 @@ impl TestReader { // open the file & get the reader let file = file.reopen().unwrap(); - ArrowReaderBuilder::try_new(file).unwrap() + let options = ArrowReaderOptions::new().with_page_index(true); + ArrowReaderBuilder::try_new_with_options(file, options).unwrap() } } diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 5ab268beb92f..43d2da4efca5 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -49,7 +49,7 @@ use datafusion::{ use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; use half::f16; use parquet::arrow::ArrowWriter; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::sync::Arc; use tempfile::NamedTempFile; @@ -1427,6 +1427,7 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem let props = WriterProperties::builder() .set_max_row_group_size(row_per_group) .set_bloom_filter_enabled(true) + .set_statistics_enabled(EnabledStatistics::Page) .build(); let batches = create_data_batch(scenario); From e4729e8bd0419ba766b6bdf78b610b9c92761240 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 10:38:46 +0200 Subject: [PATCH 02/22] feat: prototype int64 data_page_min --- .../physical_plan/parquet/statistics.rs | 58 ++++++++++++++++++- .../core/tests/parquet/arrow_statistics.rs | 20 +++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index a4a919f20d0f..5e070d784f41 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -33,7 +33,8 @@ use arrow_array::{ use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; use half::f16; -use parquet::file::metadata::RowGroupMetaData; +use parquet::file::metadata::{ParquetColumnIndex, RowGroupMetaData}; +use parquet::file::page_index::index::Index; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use paste::paste; @@ -558,6 +559,30 @@ pub(crate) fn max_statistics<'a, I: Iterator( + data_type: Option<&DataType>, + iterator: I, +) -> Result +where + I: Iterator, +{ + let iter = iterator.flat_map(|index| match data_type { + Some(DataType::Int64) => match index { + Index::INT64(native_index) => native_index + .indexes + .iter() + .map(|x| x.min) + .collect::>(), + _ => vec![None], + }, + _ => unimplemented!(), + }); + + Ok(Arc::new(Int64Array::from_iter(iter))) +} + /// Extracts Parquet statistics as Arrow arrays /// /// This is used to convert Parquet statistics to Arrow arrays, with proper type @@ -766,10 +791,37 @@ impl<'a> StatisticsConverter<'a> { Ok(Arc::new(UInt64Array::from_iter(null_counts))) } + /// TODO: Docstring + pub fn data_page_mins( + &self, + column_index: &ParquetColumnIndex, + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let data_type = self.arrow_field.data_type(); + + let Some(parquet_index) = self.parquet_index else { + return Ok(self.make_null_array(data_type, row_group_indices)); + }; + + // TODO: Move into docstring + // Creates an iterator over each row group index + // that yields a `page_index::index::Index`. + // An `Index` contains a `NativeIndex` with a `Vec`, + // where each `PageIndex` contains information about the page's + // statistics (e.g. min, max, or null_count). + let iter = row_group_indices + .into_iter() + .map(|rg_index| &column_index[*rg_index][parquet_index]); + min_page_statistics(Some(data_type), iter) + } + /// Returns a null array of data_type with one element per row group - fn make_null_array(&self, data_type: &DataType, metadatas: I) -> ArrayRef + fn make_null_array(&self, data_type: &DataType, metadatas: I) -> ArrayRef where - I: IntoIterator, + I: IntoIterator, { // column was in the arrow schema but not in the parquet schema, so return a null array let num_row_groups = metadatas.into_iter().count(); diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 0d964c6e94fe..12d72f613377 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -196,7 +196,18 @@ impl<'a> Test<'a> { ) .unwrap(); + let colum_index = reader + .metadata() + .column_index() + .expect("File should have column indices"); + let row_groups = reader.metadata().row_groups(); + let row_group_indices = row_groups + .iter() + .enumerate() + .map(|(i, _)| i) + .collect::>(); + let min = converter.row_group_mins(row_groups).unwrap(); assert_eq!( @@ -204,6 +215,15 @@ impl<'a> Test<'a> { "{column_name}: Mismatch with expected minimums" ); + let min = converter + .data_page_mins(colum_index, &row_group_indices) + .unwrap(); + + assert_eq!( + &min, &expected_min, + "{column_name}: Mismatch with expected minimums" + ); + let max = converter.row_group_maxes(row_groups).unwrap(); assert_eq!( &max, &expected_max, From 5e6280ab6eafcd4e059c5bd0bf6d15c9da402d98 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 11:40:11 +0200 Subject: [PATCH 03/22] feat: prototype MinInt64DataPageStatsIterator --- .../physical_plan/parquet/statistics.rs | 69 ++++++++++++++++--- 1 file changed, 58 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 5e070d784f41..6c183ab0e94e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -568,19 +568,66 @@ pub(crate) fn min_page_statistics<'a, I>( where I: Iterator, { - let iter = iterator.flat_map(|index| match data_type { - Some(DataType::Int64) => match index { - Index::INT64(native_index) => native_index - .indexes - .iter() - .map(|x| x.min) - .collect::>(), - _ => vec![None], - }, + // TODO: Prototype + // This should be extracted and abstracted into + // macro (similar to get_statistics) + match data_type { + Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter( + MinInt64DataPageStatsIterator::new(iterator).flatten(), + ))), + // TODO: Implement missing data_types _ => unimplemented!(), - }); + } +} + +// TODO: Prototype +// This should be extracted and abstracted into +// macro (similar to make_stats_iterator) +struct MinInt64DataPageStatsIterator<'a, I> +where + I: Iterator, +{ + iter: I, +} - Ok(Arc::new(Int64Array::from_iter(iter))) +impl<'a, I> MinInt64DataPageStatsIterator<'a, I> +where + I: Iterator, +{ + fn new(iter: I) -> Self { + Self { iter } + } +} + +impl<'a, I> Iterator for MinInt64DataPageStatsIterator<'a, I> +where + I: Iterator, +{ + type Item = Vec>; + + fn next(&mut self) -> Option { + let next = self.iter.next(); + match next { + Some(index) => match index { + Index::INT64(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| x.min) + .collect::>(), + ), + // No matching `Index` found. Thus no statistics + // that can be extracted. We return vec![None] to effectively + // create an arrow null-array. + _ => Some(vec![None]), + }, + _ => None, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } } /// Extracts Parquet statistics as Arrow arrays From 5293d298c5a4e1e2fa68c43b5dd6f7e5b1f13ba6 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 13:39:23 +0200 Subject: [PATCH 04/22] feat: add make_data_page_stats_iterator macro --- .../physical_plan/parquet/statistics.rs | 94 ++++++++++--------- 1 file changed, 49 insertions(+), 45 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 6c183ab0e94e..cda6faf709d4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -580,56 +580,60 @@ where } } -// TODO: Prototype -// This should be extracted and abstracted into -// macro (similar to make_stats_iterator) -struct MinInt64DataPageStatsIterator<'a, I> -where - I: Iterator, -{ - iter: I, -} - -impl<'a, I> MinInt64DataPageStatsIterator<'a, I> -where - I: Iterator, -{ - fn new(iter: I) -> Self { - Self { iter } - } -} +macro_rules! make_data_page_stats_iterator { + ($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => { + struct $iterator_type<'a, I> + where + I: Iterator, + { + iter: I, + } -impl<'a, I> Iterator for MinInt64DataPageStatsIterator<'a, I> -where - I: Iterator, -{ - type Item = Vec>; - - fn next(&mut self) -> Option { - let next = self.iter.next(); - match next { - Some(index) => match index { - Index::INT64(native_index) => Some( - native_index - .indexes - .iter() - .map(|x| x.min) - .collect::>(), - ), - // No matching `Index` found. Thus no statistics - // that can be extracted. We return vec![None] to effectively - // create an arrow null-array. - _ => Some(vec![None]), - }, - _ => None, + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator, + { + fn new(iter: I) -> Self { + Self { iter } + } } - } - fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() - } + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator, + { + type Item = Vec>; + + fn next(&mut self) -> Option { + let next = self.iter.next(); + match next { + Some(index) => match index { + $index_type(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| x.$func) + .collect::>(), + ), + // No matching `Index` found. + // Thus no statistics that can be extracted. + // We return vec![None] to effectively + // create an arrow null-array. + _ => Some(vec![None]), + }, + _ => None, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } + } + }; } +make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64); + /// Extracts Parquet statistics as Arrow arrays /// /// This is used to convert Parquet statistics to Arrow arrays, with proper type From c771504a0ada859ebf5a3fae73591d288c1c2d82 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 13:54:55 +0200 Subject: [PATCH 05/22] feat: add get_data_page_statistics macro --- .../physical_plan/parquet/statistics.rs | 126 +++++++++--------- 1 file changed, 64 insertions(+), 62 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index cda6faf709d4..ecc9a63f2c95 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -518,68 +518,6 @@ macro_rules! get_statistics { }}} } -/// Lookups up the parquet column by name -/// -/// Returns the parquet column index and the corresponding arrow field -pub(crate) fn parquet_column<'a>( - parquet_schema: &SchemaDescriptor, - arrow_schema: &'a Schema, - name: &str, -) -> Option<(usize, &'a FieldRef)> { - let (root_idx, field) = arrow_schema.fields.find(name)?; - if field.data_type().is_nested() { - // Nested fields are not supported and require non-trivial logic - // to correctly walk the parquet schema accounting for the - // logical type rules - - // - // For example a ListArray could correspond to anything from 1 to 3 levels - // in the parquet schema - return None; - } - - // This could be made more efficient (#TBD) - let parquet_idx = (0..parquet_schema.columns().len()) - .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; - Some((parquet_idx, field)) -} - -/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] -pub(crate) fn min_statistics<'a, I: Iterator>>( - data_type: &DataType, - iterator: I, -) -> Result { - get_statistics!(Min, data_type, iterator) -} - -/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] -pub(crate) fn max_statistics<'a, I: Iterator>>( - data_type: &DataType, - iterator: I, -) -> Result { - get_statistics!(Max, data_type, iterator) -} - -/// Extracts the min statistics from an iterator -/// of parquet page [`Index`]'es to an [`ArrayRef`] -pub(crate) fn min_page_statistics<'a, I>( - data_type: Option<&DataType>, - iterator: I, -) -> Result -where - I: Iterator, -{ - // TODO: Prototype - // This should be extracted and abstracted into - // macro (similar to get_statistics) - match data_type { - Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter( - MinInt64DataPageStatsIterator::new(iterator).flatten(), - ))), - // TODO: Implement missing data_types - _ => unimplemented!(), - } -} - macro_rules! make_data_page_stats_iterator { ($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => { struct $iterator_type<'a, I> @@ -634,6 +572,70 @@ macro_rules! make_data_page_stats_iterator { make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64); +macro_rules! get_data_page_statistics { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + paste! { + match $data_type { + Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), + _ => unimplemented!() + } + } + } +} + +/// Lookups up the parquet column by name +/// +/// Returns the parquet column index and the corresponding arrow field +pub(crate) fn parquet_column<'a>( + parquet_schema: &SchemaDescriptor, + arrow_schema: &'a Schema, + name: &str, +) -> Option<(usize, &'a FieldRef)> { + let (root_idx, field) = arrow_schema.fields.find(name)?; + if field.data_type().is_nested() { + // Nested fields are not supported and require non-trivial logic + // to correctly walk the parquet schema accounting for the + // logical type rules - + // + // For example a ListArray could correspond to anything from 1 to 3 levels + // in the parquet schema + return None; + } + + // This could be made more efficient (#TBD) + let parquet_idx = (0..parquet_schema.columns().len()) + .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; + Some((parquet_idx, field)) +} + +/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] +pub(crate) fn min_statistics<'a, I: Iterator>>( + data_type: &DataType, + iterator: I, +) -> Result { + get_statistics!(Min, data_type, iterator) +} + +/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] +pub(crate) fn max_statistics<'a, I: Iterator>>( + data_type: &DataType, + iterator: I, +) -> Result { + get_statistics!(Max, data_type, iterator) +} + +/// Extracts the min statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +pub(crate) fn min_page_statistics<'a, I>( + data_type: Option<&DataType>, + iterator: I, +) -> Result +where + I: Iterator, +{ + get_data_page_statistics!(Min, data_type, iterator) +} + /// Extracts Parquet statistics as Arrow arrays /// /// This is used to convert Parquet statistics to Arrow arrays, with proper type From 72a2d4a5aaea14e5493043db66efa5d3c45e3c2d Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 14:03:10 +0200 Subject: [PATCH 06/22] feat: add MaxInt64DataPageStatsIterator --- .../physical_plan/parquet/statistics.rs | 34 +++++++++++++++++++ .../core/tests/parquet/arrow_statistics.rs | 16 +++++++-- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ecc9a63f2c95..2836782d5f6b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -571,6 +571,7 @@ macro_rules! make_data_page_stats_iterator { } make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64); +make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64); macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { @@ -636,6 +637,18 @@ where get_data_page_statistics!(Min, data_type, iterator) } +/// Extracts the max statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +pub(crate) fn max_page_statistics<'a, I>( + data_type: Option<&DataType>, + iterator: I, +) -> Result +where + I: Iterator, +{ + get_data_page_statistics!(Max, data_type, iterator) +} + /// Extracts Parquet statistics as Arrow arrays /// /// This is used to convert Parquet statistics to Arrow arrays, with proper type @@ -871,6 +884,27 @@ impl<'a> StatisticsConverter<'a> { min_page_statistics(Some(data_type), iter) } + /// TODO: docstring + pub fn data_page_maxes( + &self, + column_index: &ParquetColumnIndex, + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let data_type = self.arrow_field.data_type(); + + let Some(parquet_index) = self.parquet_index else { + return Ok(self.make_null_array(data_type, row_group_indices)); + }; + + let iter = row_group_indices + .into_iter() + .map(|rg_index| &column_index[*rg_index][parquet_index]); + max_page_statistics(Some(data_type), iter) + } + /// Returns a null array of data_type with one element per row group fn make_null_array(&self, data_type: &DataType, metadatas: I) -> ArrayRef where diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 12d72f613377..ac5e3af55e49 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -196,7 +196,7 @@ impl<'a> Test<'a> { ) .unwrap(); - let colum_index = reader + let column_index = reader .metadata() .column_index() .expect("File should have column indices"); @@ -216,20 +216,30 @@ impl<'a> Test<'a> { ); let min = converter - .data_page_mins(colum_index, &row_group_indices) + .data_page_mins(column_index, &row_group_indices) .unwrap(); assert_eq!( &min, &expected_min, - "{column_name}: Mismatch with expected minimums" + "{column_name}: Mismatch with expected data page minimums" ); let max = converter.row_group_maxes(row_groups).unwrap(); + assert_eq!( &max, &expected_max, "{column_name}: Mismatch with expected maximum" ); + let max = converter + .data_page_maxes(column_index, &row_group_indices) + .unwrap(); + + assert_eq!( + &max, &expected_max, + "{column_name}: Mismatch with expected data page maximum" + ); + let null_counts = converter.row_group_null_counts(row_groups).unwrap(); let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; assert_eq!( From 6cedd47193603061e753b70c8ede2e80df8d174e Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 14:25:31 +0200 Subject: [PATCH 07/22] feat: add test_data_page_stats param --- .../core/tests/parquet/arrow_statistics.rs | 134 ++++++++++++++---- 1 file changed, 104 insertions(+), 30 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index ac5e3af55e49..26948d8c9e88 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -176,6 +176,8 @@ struct Test<'a> { expected_row_counts: UInt64Array, /// Which column to extract statistics from column_name: &'static str, + /// Whether to test data page statistics or not + test_data_page_statistics: bool, } impl<'a> Test<'a> { @@ -187,6 +189,7 @@ impl<'a> Test<'a> { expected_null_counts, expected_row_counts, column_name, + test_data_page_statistics, } = self; let converter = StatisticsConverter::try_new( @@ -196,50 +199,20 @@ impl<'a> Test<'a> { ) .unwrap(); - let column_index = reader - .metadata() - .column_index() - .expect("File should have column indices"); - let row_groups = reader.metadata().row_groups(); - let row_group_indices = row_groups - .iter() - .enumerate() - .map(|(i, _)| i) - .collect::>(); let min = converter.row_group_mins(row_groups).unwrap(); - assert_eq!( &min, &expected_min, "{column_name}: Mismatch with expected minimums" ); - let min = converter - .data_page_mins(column_index, &row_group_indices) - .unwrap(); - - assert_eq!( - &min, &expected_min, - "{column_name}: Mismatch with expected data page minimums" - ); - let max = converter.row_group_maxes(row_groups).unwrap(); - assert_eq!( &max, &expected_max, "{column_name}: Mismatch with expected maximum" ); - let max = converter - .data_page_maxes(column_index, &row_group_indices) - .unwrap(); - - assert_eq!( - &max, &expected_max, - "{column_name}: Mismatch with expected data page maximum" - ); - let null_counts = converter.row_group_null_counts(row_groups).unwrap(); let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; assert_eq!( @@ -257,6 +230,35 @@ impl<'a> Test<'a> { "{column_name}: Mismatch with expected row counts. \ Actual: {row_counts:?}. Expected: {expected_row_counts:?}" ); + + if test_data_page_statistics { + let column_index = reader + .metadata() + .column_index() + .expect("File should have column indices"); + + let row_group_indices = row_groups + .iter() + .enumerate() + .map(|(i, _)| i) + .collect::>(); + + let min = converter + .data_page_mins(column_index, &row_group_indices) + .unwrap(); + assert_eq!( + &min, &expected_min, + "{column_name}: Mismatch with expected data page minimums" + ); + + let max = converter + .data_page_maxes(column_index, &row_group_indices) + .unwrap(); + assert_eq!( + &max, &expected_max, + "{column_name}: Mismatch with expected data page maximum" + ); + } } /// Run the test and expect a column not found error @@ -268,6 +270,7 @@ impl<'a> Test<'a> { expected_null_counts: _, expected_row_counts: _, column_name, + .. } = self; let converter = StatisticsConverter::try_new( @@ -301,6 +304,7 @@ async fn test_one_row_group_without_null() { // 3 rows expected_row_counts: UInt64Array::from(vec![3]), column_name: "i64", + test_data_page_statistics: false, } .run() } @@ -321,6 +325,7 @@ async fn test_one_row_group_with_null_and_negative() { // 8 rows expected_row_counts: UInt64Array::from(vec![8]), column_name: "i64", + test_data_page_statistics: false, } .run() } @@ -341,6 +346,7 @@ async fn test_two_row_group_with_null() { // row counts are [10, 5] expected_row_counts: UInt64Array::from(vec![10, 5]), column_name: "i64", + test_data_page_statistics: false, } .run() } @@ -361,6 +367,7 @@ async fn test_two_row_groups_with_all_nulls_in_one() { // row counts are [5, 3] expected_row_counts: UInt64Array::from(vec![5, 3]), column_name: "i64", + test_data_page_statistics: false, } .run() } @@ -392,6 +399,7 @@ async fn test_int_64() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "i64", + test_data_page_statistics: true, } .run(); } @@ -417,6 +425,7 @@ async fn test_int_32() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "i32", + test_data_page_statistics: false, } .run(); } @@ -457,6 +466,7 @@ async fn test_int_16() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "i16", + test_data_page_statistics: false, } .run(); } @@ -485,6 +495,7 @@ async fn test_int_8() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "i8", + test_data_page_statistics: false, } .run(); } @@ -534,6 +545,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "nanos", + test_data_page_statistics: false, } .run(); @@ -562,6 +574,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "nanos_timezoned", + test_data_page_statistics: false, } .run(); @@ -583,6 +596,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "micros", + test_data_page_statistics: false, } .run(); @@ -611,6 +625,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "micros_timezoned", + test_data_page_statistics: false, } .run(); @@ -632,6 +647,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "millis", + test_data_page_statistics: false, } .run(); @@ -660,6 +676,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "millis_timezoned", + test_data_page_statistics: false, } .run(); @@ -681,6 +698,7 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "seconds", + test_data_page_statistics: false, } .run(); @@ -709,6 +727,7 @@ async fn test_timestamp() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "seconds_timezoned", + test_data_page_statistics: false, } .run(); } @@ -754,6 +773,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "nanos", + test_data_page_statistics: false, } .run(); @@ -780,6 +800,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "nanos_timezoned", + test_data_page_statistics: false, } .run(); @@ -799,6 +820,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "micros", + test_data_page_statistics: false, } .run(); @@ -825,6 +847,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "micros_timezoned", + test_data_page_statistics: false, } .run(); @@ -844,6 +867,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "millis", + test_data_page_statistics: false, } .run(); @@ -870,6 +894,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "millis_timezoned", + test_data_page_statistics: false, } .run(); @@ -889,6 +914,7 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "seconds", + test_data_page_statistics: false, } .run(); @@ -915,6 +941,7 @@ async fn test_timestamp_diff_rg_sizes() { // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), column_name: "seconds_timezoned", + test_data_page_statistics: false, } .run(); } @@ -952,6 +979,7 @@ async fn test_dates_32_diff_rg_sizes() { // row counts are [13, 7] expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "date32", + test_data_page_statistics: false, } .run(); } @@ -974,6 +1002,7 @@ async fn test_time32_second_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), column_name: "second", + test_data_page_statistics: false, } .run(); } @@ -1000,6 +1029,7 @@ async fn test_time32_millisecond_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), column_name: "millisecond", + test_data_page_statistics: false, } .run(); } @@ -1032,6 +1062,7 @@ async fn test_time64_microsecond_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), column_name: "microsecond", + test_data_page_statistics: false, } .run(); } @@ -1064,6 +1095,7 @@ async fn test_time64_nanosecond_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), column_name: "nanosecond", + test_data_page_statistics: false, } .run(); } @@ -1090,6 +1122,7 @@ async fn test_dates_64_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "date64", + test_data_page_statistics: false, } .run(); } @@ -1132,6 +1165,7 @@ async fn test_interval_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "year_month", + test_data_page_statistics: false, } .run(); @@ -1150,6 +1184,7 @@ async fn test_interval_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "day_time", + test_data_page_statistics: false, } .run(); @@ -1168,6 +1203,7 @@ async fn test_interval_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "month_day_nano", + test_data_page_statistics: false, } .run(); } @@ -1195,6 +1231,7 @@ async fn test_uint() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), column_name: "u8", + test_data_page_statistics: false, } .run(); @@ -1205,6 +1242,7 @@ async fn test_uint() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), column_name: "u16", + test_data_page_statistics: false, } .run(); @@ -1215,6 +1253,7 @@ async fn test_uint() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), column_name: "u32", + test_data_page_statistics: false, } .run(); @@ -1225,6 +1264,7 @@ async fn test_uint() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), column_name: "u64", + test_data_page_statistics: false, } .run(); } @@ -1247,6 +1287,7 @@ async fn test_int32_range() { expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![4]), column_name: "i", + test_data_page_statistics: false, } .run(); } @@ -1269,6 +1310,7 @@ async fn test_uint32_range() { expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![4]), column_name: "u", + test_data_page_statistics: false, } .run(); } @@ -1290,6 +1332,7 @@ async fn test_numeric_limits_unsigned() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "u8", + test_data_page_statistics: false, } .run(); @@ -1300,6 +1343,7 @@ async fn test_numeric_limits_unsigned() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "u16", + test_data_page_statistics: false, } .run(); @@ -1310,6 +1354,7 @@ async fn test_numeric_limits_unsigned() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "u32", + test_data_page_statistics: false, } .run(); @@ -1320,6 +1365,7 @@ async fn test_numeric_limits_unsigned() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "u64", + test_data_page_statistics: false, } .run(); } @@ -1341,6 +1387,7 @@ async fn test_numeric_limits_signed() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "i8", + test_data_page_statistics: false, } .run(); @@ -1351,6 +1398,7 @@ async fn test_numeric_limits_signed() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "i16", + test_data_page_statistics: false, } .run(); @@ -1361,6 +1409,7 @@ async fn test_numeric_limits_signed() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "i32", + test_data_page_statistics: false, } .run(); @@ -1371,6 +1420,7 @@ async fn test_numeric_limits_signed() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "i64", + test_data_page_statistics: false, } .run(); } @@ -1392,6 +1442,7 @@ async fn test_numeric_limits_float() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "f32", + test_data_page_statistics: false, } .run(); @@ -1402,6 +1453,7 @@ async fn test_numeric_limits_float() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "f64", + test_data_page_statistics: false, } .run(); @@ -1412,6 +1464,7 @@ async fn test_numeric_limits_float() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "f32_nan", + test_data_page_statistics: false, } .run(); @@ -1422,6 +1475,7 @@ async fn test_numeric_limits_float() { expected_null_counts: UInt64Array::from(vec![0, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "f64_nan", + test_data_page_statistics: false, } .run(); } @@ -1444,6 +1498,7 @@ async fn test_float64() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "f", + test_data_page_statistics: false, } .run(); } @@ -1476,6 +1531,7 @@ async fn test_float16() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "f", + test_data_page_statistics: false, } .run(); } @@ -1506,6 +1562,7 @@ async fn test_decimal() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "decimal_col", + test_data_page_statistics: false, } .run(); } @@ -1543,6 +1600,7 @@ async fn test_decimal_256() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "decimal256_col", + test_data_page_statistics: false, } .run(); } @@ -1562,6 +1620,7 @@ async fn test_dictionary() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "string_dict_i8", + test_data_page_statistics: false, } .run(); @@ -1572,6 +1631,7 @@ async fn test_dictionary() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "string_dict_i32", + test_data_page_statistics: false, } .run(); @@ -1582,6 +1642,7 @@ async fn test_dictionary() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 2]), column_name: "int_dict_i8", + test_data_page_statistics: false, } .run(); } @@ -1619,6 +1680,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "name", + test_data_page_statistics: false, } .run(); @@ -1638,6 +1700,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service_string", + test_data_page_statistics: false, } .run(); @@ -1656,6 +1719,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service_binary", + test_data_page_statistics: false, } .run(); @@ -1676,6 +1740,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service_fixedsize", + test_data_page_statistics: false, } .run(); @@ -1696,6 +1761,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service_large_binary", + test_data_page_statistics: false, } .run(); } @@ -1728,6 +1794,7 @@ async fn test_period_in_column_names() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "name", + test_data_page_statistics: false, } .run(); @@ -1741,6 +1808,7 @@ async fn test_period_in_column_names() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "service.name", + test_data_page_statistics: false, } .run(); } @@ -1764,6 +1832,7 @@ async fn test_boolean() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 5]), column_name: "bool", + test_data_page_statistics: false, } .run(); } @@ -1790,6 +1859,7 @@ async fn test_struct() { expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![3]), column_name: "struct", + test_data_page_statistics: false, } .run(); } @@ -1812,6 +1882,7 @@ async fn test_utf8() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 5]), column_name: "utf8", + test_data_page_statistics: false, } .run(); @@ -1823,6 +1894,7 @@ async fn test_utf8() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: UInt64Array::from(vec![5, 5]), column_name: "large_utf8", + test_data_page_statistics: false, } .run(); } @@ -1842,6 +1914,7 @@ async fn test_missing_statistics() { expected_null_counts: UInt64Array::from(vec![None]), expected_row_counts: UInt64Array::from(vec![3]), // stil has row count statistics column_name: "i64", + test_data_page_statistics: false, } .run(); } @@ -1863,6 +1936,7 @@ async fn test_column_not_found() { expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), column_name: "not_a_column", + test_data_page_statistics: false, } .run_col_not_found(); } From 3e2ffef638abc2bf4df4cf5549ace589deeb5069 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 14:36:24 +0200 Subject: [PATCH 08/22] chore: add testcase int64_with_nulls --- .../core/tests/parquet/arrow_statistics.rs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 26948d8c9e88..e927a3a0f450 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -404,6 +404,34 @@ async fn test_int_64() { .run(); } +#[tokio::test] +async fn test_int_64_with_nulls() { + // This creates a parquet files of 4 columns + // named "i8", "i16", "i32", "i64" + // creates 3 row groups of 5 rows each + let reader = TestReader { + scenario: Scenario::WithNullValues, + row_per_group: 5, + } + .build() + .await; + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int64Array::from(vec![None, Some(1), None])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int64Array::from(vec![None, Some(5), None])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![5, 0, 5]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "i64", + test_data_page_statistics: false, + } + .run(); +} + #[tokio::test] async fn test_int_32() { // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" From 596913ac74a5cd45126f40f6db5bd8b81fb00ec6 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 14:57:36 +0200 Subject: [PATCH 09/22] feat: add data page null_counts --- .../physical_plan/parquet/statistics.rs | 40 +++++++++++++++++++ .../core/tests/parquet/arrow_statistics.rs | 12 +++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 2836782d5f6b..3d4ea79f4bb0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -649,6 +649,25 @@ where get_data_page_statistics!(Max, data_type, iterator) } +/// Extracts the null count statistics from an iterator +/// of parquet page [`Index`]'es to an [`ArrayRef`] +pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result +where + I: Iterator, +{ + let iter = iterator.flat_map(|index| match index { + Index::NONE => vec![None], + Index::INT64(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.and_then(|x| Some(x as u64))) + .collect::>(), + _ => unimplemented!(), + }); + + Ok(Arc::new(UInt64Array::from_iter(iter))) +} + /// Extracts Parquet statistics as Arrow arrays /// /// This is used to convert Parquet statistics to Arrow arrays, with proper type @@ -905,6 +924,27 @@ impl<'a> StatisticsConverter<'a> { max_page_statistics(Some(data_type), iter) } + /// TODO: docstring + pub fn data_page_null_counts( + &self, + column_index: &ParquetColumnIndex, + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let data_type = self.arrow_field.data_type(); + + let Some(parquet_index) = self.parquet_index else { + return Ok(self.make_null_array(data_type, row_group_indices)); + }; + + let iter = row_group_indices + .into_iter() + .map(|rg_index| &column_index[*rg_index][parquet_index]); + null_counts_page_statistics(iter) + } + /// Returns a null array of data_type with one element per row group fn make_null_array(&self, data_type: &DataType, metadatas: I) -> ArrayRef where diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index e927a3a0f450..261b83261be8 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -258,6 +258,16 @@ impl<'a> Test<'a> { &max, &expected_max, "{column_name}: Mismatch with expected data page maximum" ); + + let null_counts = converter + .data_page_null_counts(column_index, &row_group_indices) + .unwrap(); + + assert_eq!( + &null_counts, &expected_null_counts, + "{column_name}: Mismatch with expected data page null counts. \ + Actual: {null_counts:?}. Expected: {expected_null_counts:?}" + ); } } @@ -427,7 +437,7 @@ async fn test_int_64_with_nulls() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5]), column_name: "i64", - test_data_page_statistics: false, + test_data_page_statistics: true, } .run(); } From 23f1430eff1d4975c2e8157307e311e03cea3a78 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 15:15:12 +0200 Subject: [PATCH 10/22] fix: clippy --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 3d4ea79f4bb0..2b934525226f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -660,7 +660,7 @@ where Index::INT64(native_index) => native_index .indexes .iter() - .map(|x| x.null_count.and_then(|x| Some(x as u64))) + .map(|x| x.null_count.map(|x| x as u64)) .collect::>(), _ => unimplemented!(), }); From 8ccfe89c101cc1033291604ef4a6ac41db0ddba1 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 15:21:19 +0200 Subject: [PATCH 11/22] chore: rename column_page_index --- .../datasource/physical_plan/parquet/statistics.rs | 12 ++++++------ datafusion/core/tests/parquet/arrow_statistics.rs | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 2b934525226f..d622eb2d6be2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -879,7 +879,7 @@ impl<'a> StatisticsConverter<'a> { /// TODO: Docstring pub fn data_page_mins( &self, - column_index: &ParquetColumnIndex, + column_page_index: &ParquetColumnIndex, row_group_indices: I, ) -> Result where @@ -899,14 +899,14 @@ impl<'a> StatisticsConverter<'a> { // statistics (e.g. min, max, or null_count). let iter = row_group_indices .into_iter() - .map(|rg_index| &column_index[*rg_index][parquet_index]); + .map(|rg_index| &column_page_index[*rg_index][parquet_index]); min_page_statistics(Some(data_type), iter) } /// TODO: docstring pub fn data_page_maxes( &self, - column_index: &ParquetColumnIndex, + column_page_index: &ParquetColumnIndex, row_group_indices: I, ) -> Result where @@ -920,14 +920,14 @@ impl<'a> StatisticsConverter<'a> { let iter = row_group_indices .into_iter() - .map(|rg_index| &column_index[*rg_index][parquet_index]); + .map(|rg_index| &column_page_index[*rg_index][parquet_index]); max_page_statistics(Some(data_type), iter) } /// TODO: docstring pub fn data_page_null_counts( &self, - column_index: &ParquetColumnIndex, + column_page_index: &ParquetColumnIndex, row_group_indices: I, ) -> Result where @@ -941,7 +941,7 @@ impl<'a> StatisticsConverter<'a> { let iter = row_group_indices .into_iter() - .map(|rg_index| &column_index[*rg_index][parquet_index]); + .map(|rg_index| &column_page_index[*rg_index][parquet_index]); null_counts_page_statistics(iter) } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 261b83261be8..024bc0268dfb 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -232,7 +232,7 @@ impl<'a> Test<'a> { ); if test_data_page_statistics { - let column_index = reader + let column_page_index = reader .metadata() .column_index() .expect("File should have column indices"); @@ -244,7 +244,7 @@ impl<'a> Test<'a> { .collect::>(); let min = converter - .data_page_mins(column_index, &row_group_indices) + .data_page_mins(column_page_index, &row_group_indices) .unwrap(); assert_eq!( &min, &expected_min, @@ -252,7 +252,7 @@ impl<'a> Test<'a> { ); let max = converter - .data_page_maxes(column_index, &row_group_indices) + .data_page_maxes(column_page_index, &row_group_indices) .unwrap(); assert_eq!( &max, &expected_max, @@ -260,7 +260,7 @@ impl<'a> Test<'a> { ); let null_counts = converter - .data_page_null_counts(column_index, &row_group_indices) + .data_page_null_counts(column_page_index, &row_group_indices) .unwrap(); assert_eq!( From fff96c4acfdfbd968fed24e73646f2cf22312cfd Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Mon, 10 Jun 2024 15:59:57 +0200 Subject: [PATCH 12/22] feat: add data page row counts --- .../physical_plan/parquet/statistics.rs | 45 ++++++++++++++++++- .../core/tests/parquet/arrow_statistics.rs | 16 ++++++- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index d622eb2d6be2..9091b498eb2a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -33,7 +33,7 @@ use arrow_array::{ use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; use half::f16; -use parquet::file::metadata::{ParquetColumnIndex, RowGroupMetaData}; +use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; use parquet::file::page_index::index::Index; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; @@ -945,6 +945,49 @@ impl<'a> StatisticsConverter<'a> { null_counts_page_statistics(iter) } + /// TODO: docstring + pub fn data_page_row_counts( + &self, + column_offset_index: &ParquetOffsetIndex, + row_group_metadatas: &[RowGroupMetaData], + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let data_type = self.arrow_field.data_type(); + + let Some(parquet_index) = self.parquet_index else { + return Ok(self.make_null_array(data_type, row_group_indices)); + }; + + // `offset_index[row_group_number][column_number][page_number]` holds + // the [`PageLocation`] corresponding to page `page_number` of column + // `column_number`of row group `row_group_number`. + let mut row_count_total = Vec::new(); + for rg_idx in row_group_indices { + let page_locations = &column_offset_index[*rg_idx][parquet_index]; + + let row_count_per_page = page_locations.windows(2).map(|loc| { + Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64) + }); + + let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows(); + + // append the last page row count + let row_count_per_page = row_count_per_page + .chain(std::iter::once(Some( + *num_rows_in_row_group as u64 + - page_locations.last().unwrap().first_row_index as u64, + ))) + .collect::>(); + + row_count_total.extend(row_count_per_page); + } + + Ok(Arc::new(UInt64Array::from_iter(row_count_total))) + } + /// Returns a null array of data_type with one element per row group fn make_null_array(&self, data_type: &DataType, metadatas: I) -> ArrayRef where diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 024bc0268dfb..fc1f359033bc 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -237,6 +237,11 @@ impl<'a> Test<'a> { .column_index() .expect("File should have column indices"); + let column_offset_index = reader + .metadata() + .offset_index() + .expect("File should have column indices"); + let row_group_indices = row_groups .iter() .enumerate() @@ -262,12 +267,21 @@ impl<'a> Test<'a> { let null_counts = converter .data_page_null_counts(column_page_index, &row_group_indices) .unwrap(); - assert_eq!( &null_counts, &expected_null_counts, "{column_name}: Mismatch with expected data page null counts. \ Actual: {null_counts:?}. Expected: {expected_null_counts:?}" ); + + let row_counts = converter + .data_page_row_counts(column_offset_index, row_groups, &row_group_indices) + .unwrap(); + let expected_row_counts = Arc::new(expected_row_counts) as ArrayRef; + assert_eq!( + &row_counts, &expected_row_counts, + "{column_name}: Mismatch with expected row counts. \ + Actual: {row_counts:?}. Expected: {expected_row_counts:?}" + ); } } From 5ade0be55d5370776e074fa31b0ace66ae65f921 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 11 Jun 2024 08:44:54 +0200 Subject: [PATCH 13/22] feat: add num_data_pages to iterator --- .../physical_plan/parquet/statistics.rs | 47 ++++++++++++------- .../core/tests/parquet/arrow_statistics.rs | 12 ++++- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 9091b498eb2a..f8d1e9c8cf80 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -522,14 +522,14 @@ macro_rules! make_data_page_stats_iterator { ($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => { struct $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { iter: I, } impl<'a, I> $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { fn new(iter: I) -> Self { Self { iter } @@ -538,14 +538,14 @@ macro_rules! make_data_page_stats_iterator { impl<'a, I> Iterator for $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { type Item = Vec>; fn next(&mut self) -> Option { let next = self.iter.next(); match next { - Some(index) => match index { + Some((len, index)) => match index { $index_type(native_index) => Some( native_index .indexes @@ -553,11 +553,13 @@ macro_rules! make_data_page_stats_iterator { .map(|x| x.$func) .collect::>(), ), - // No matching `Index` found. - // Thus no statistics that can be extracted. - // We return vec![None] to effectively - // create an arrow null-array. - _ => Some(vec![None]), + // No matching `Index` found; + // thus no statistics that can be extracted. + // We return vec![None; len] to effectively + // create an arrow null-array with the length + // corresponding to the number of entries in + // `ParquetOffsetIndex` per row group per column. + _ => Some(vec![None; len]), }, _ => None, } @@ -632,7 +634,7 @@ pub(crate) fn min_page_statistics<'a, I>( iterator: I, ) -> Result where - I: Iterator, + I: Iterator, { get_data_page_statistics!(Min, data_type, iterator) } @@ -644,7 +646,7 @@ pub(crate) fn max_page_statistics<'a, I>( iterator: I, ) -> Result where - I: Iterator, + I: Iterator, { get_data_page_statistics!(Max, data_type, iterator) } @@ -880,6 +882,7 @@ impl<'a> StatisticsConverter<'a> { pub fn data_page_mins( &self, column_page_index: &ParquetColumnIndex, + column_offset_index: &ParquetOffsetIndex, row_group_indices: I, ) -> Result where @@ -897,9 +900,13 @@ impl<'a> StatisticsConverter<'a> { // An `Index` contains a `NativeIndex` with a `Vec`, // where each `PageIndex` contains information about the page's // statistics (e.g. min, max, or null_count). - let iter = row_group_indices - .into_iter() - .map(|rg_index| &column_page_index[*rg_index][parquet_index]); + let iter = row_group_indices.into_iter().map(|rg_index| { + let column_page_index_per_row_group_per_column = + &column_page_index[*rg_index][parquet_index]; + let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + + (*num_data_pages, column_page_index_per_row_group_per_column) + }); min_page_statistics(Some(data_type), iter) } @@ -907,6 +914,7 @@ impl<'a> StatisticsConverter<'a> { pub fn data_page_maxes( &self, column_page_index: &ParquetColumnIndex, + column_offset_index: &ParquetOffsetIndex, row_group_indices: I, ) -> Result where @@ -918,9 +926,14 @@ impl<'a> StatisticsConverter<'a> { return Ok(self.make_null_array(data_type, row_group_indices)); }; - let iter = row_group_indices - .into_iter() - .map(|rg_index| &column_page_index[*rg_index][parquet_index]); + let iter = row_group_indices.into_iter().map(|rg_index| { + let column_page_index_per_row_group_per_column = + &column_page_index[*rg_index][parquet_index]; + let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + + (*num_data_pages, column_page_index_per_row_group_per_column) + }); + max_page_statistics(Some(data_type), iter) } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index fc1f359033bc..b325664b7991 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -249,7 +249,11 @@ impl<'a> Test<'a> { .collect::>(); let min = converter - .data_page_mins(column_page_index, &row_group_indices) + .data_page_mins( + column_page_index, + column_offset_index, + &row_group_indices, + ) .unwrap(); assert_eq!( &min, &expected_min, @@ -257,7 +261,11 @@ impl<'a> Test<'a> { ); let max = converter - .data_page_maxes(column_page_index, &row_group_indices) + .data_page_maxes( + column_page_index, + column_offset_index, + &row_group_indices, + ) .unwrap(); assert_eq!( &max, &expected_max, From a302f4e68e10f89236c09ec0b82656b49d38dd2a Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 11 Jun 2024 10:08:40 +0200 Subject: [PATCH 14/22] chore: update docs --- .../physical_plan/parquet/statistics.rs | 86 ++++++++++++++++--- .../core/tests/parquet/arrow_statistics.rs | 4 +- 2 files changed, 78 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index f8d1e9c8cf80..ed66865f76ee 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -657,6 +657,7 @@ pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result, { + // TODO: use len from col_offset_indexes let iter = iterator.flat_map(|index| match index { Index::NONE => vec![None], Index::INT64(native_index) => native_index @@ -878,7 +879,57 @@ impl<'a> StatisticsConverter<'a> { Ok(Arc::new(UInt64Array::from_iter(null_counts))) } - /// TODO: Docstring + /// Extract the minimum values from Data Page statistics. + /// + /// In Parquet files, in addition to the Column Chunk level statistics + /// (stored for each column for each row group) there are also + /// optional statistics stored for each data page, as part of + /// the [`ParquetColumnIndex`]. + /// + /// Since a single Column Chunk is stored as one or more pages, + /// page level statistics can prune at a finer granularity. + /// + /// However since they are stored in a separate metadata + /// structure ([`Index`]) there is different code to extract them as + /// compared to arrow statistics. + /// + /// # Parameters: + /// + /// * `column_page_index`: The parquet column page indices, read from + /// [`ParquetMetadata::column_index()`] + /// + /// * `column_offset_index`: The parquet column offset indices, read from + /// [`ParquetMetadata::offset_index()`] + /// + /// * `row_group_indices`: The indices of the row groups, that are used to + /// extract the column page index and offset index on a per row group + /// per column basis. + /// + /// # Return Value + /// + /// The returned array contains 1 value for each `NativeIndex` + /// in the underlying `Index`es, in the same order as they appear + /// in `metadatas`. + /// + /// For example, if there are two `Index`es in `metadatas`: + /// 1. the first having `3` `PageIndex` entries + /// 2. the second having `2` `PageIndex` entries + /// + /// The returned array would have 5 rows. + /// + /// Each value is either: + /// * the minimum value for the page + /// * a null value, if the statistics can not be extracted + /// + /// Note that a null value does NOT mean the min value was actually + /// `null` it means it the requested statistic is unknown + /// + /// # Errors + /// + /// Reasons for not being able to extract the statistics include: + /// * the column is not present in the parquet file + /// * statistics for the pages are not present in the row group + /// * the stored statistic value can not be converted to the requested type pub fn data_page_mins( &self, column_page_index: &ParquetColumnIndex, @@ -894,12 +945,6 @@ impl<'a> StatisticsConverter<'a> { return Ok(self.make_null_array(data_type, row_group_indices)); }; - // TODO: Move into docstring - // Creates an iterator over each row group index - // that yields a `page_index::index::Index`. - // An `Index` contains a `NativeIndex` with a `Vec`, - // where each `PageIndex` contains information about the page's - // statistics (e.g. min, max, or null_count). let iter = row_group_indices.into_iter().map(|rg_index| { let column_page_index_per_row_group_per_column = &column_page_index[*rg_index][parquet_index]; @@ -907,10 +952,13 @@ impl<'a> StatisticsConverter<'a> { (*num_data_pages, column_page_index_per_row_group_per_column) }); + min_page_statistics(Some(data_type), iter) } - /// TODO: docstring + /// Extract the maximum values from Data Page statistics. + /// + /// See docs on [`Self::data_page_mins`] for details. pub fn data_page_maxes( &self, column_page_index: &ParquetColumnIndex, @@ -937,7 +985,9 @@ impl<'a> StatisticsConverter<'a> { max_page_statistics(Some(data_type), iter) } - /// TODO: docstring + /// Extract the null counts from Data Page statistics. + /// + /// See docs on [`Self::data_page_mins`] for details. pub fn data_page_null_counts( &self, column_page_index: &ParquetColumnIndex, @@ -958,7 +1008,23 @@ impl<'a> StatisticsConverter<'a> { null_counts_page_statistics(iter) } - /// TODO: docstring + /// Returns an [`ArrayRef`] with row counts for each row group. + /// + /// This function iterates over the given row group indexes and computes + /// the row count for each page in the specified column. + /// + /// # Parameters: + /// + /// * `column_offset_index`: The parquet column offset indices, read from + /// [`ParquetMetadata::offset_index()`] + /// + /// * `row_group_metadatas`: The metadata slice of the row groups, read + /// from [`ParquetMetadata::row_groups()`] + /// + /// * `row_group_indices`: The indices of the row groups, that are used to + /// extract the column offset index on a per row group per column basis. + /// + /// See docs on [`Self::data_page_mins`] for details. pub fn data_page_row_counts( &self, column_offset_index: &ParquetOffsetIndex, diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index b325664b7991..24cd563fb9aa 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -235,12 +235,12 @@ impl<'a> Test<'a> { let column_page_index = reader .metadata() .column_index() - .expect("File should have column indices"); + .expect("File should have column page indices"); let column_offset_index = reader .metadata() .offset_index() - .expect("File should have column indices"); + .expect("File should have column offset indices"); let row_group_indices = row_groups .iter() From 96c99ce1a3ee06b8aba26fe0a41ad85ca55faaca Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 11 Jun 2024 10:12:01 +0200 Subject: [PATCH 15/22] fix: use colum_offset len in data_page_null_counts --- .../physical_plan/parquet/statistics.rs | 18 +++++++++++------- .../core/tests/parquet/arrow_statistics.rs | 6 +++++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ed66865f76ee..a98c82e2b538 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -655,11 +655,10 @@ where /// of parquet page [`Index`]'es to an [`ArrayRef`] pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result where - I: Iterator, + I: Iterator, { - // TODO: use len from col_offset_indexes - let iter = iterator.flat_map(|index| match index { - Index::NONE => vec![None], + let iter = iterator.flat_map(|(len, index)| match index { + Index::NONE => vec![None; len], Index::INT64(native_index) => native_index .indexes .iter() @@ -991,6 +990,7 @@ impl<'a> StatisticsConverter<'a> { pub fn data_page_null_counts( &self, column_page_index: &ParquetColumnIndex, + column_offset_index: &ParquetOffsetIndex, row_group_indices: I, ) -> Result where @@ -1002,9 +1002,13 @@ impl<'a> StatisticsConverter<'a> { return Ok(self.make_null_array(data_type, row_group_indices)); }; - let iter = row_group_indices - .into_iter() - .map(|rg_index| &column_page_index[*rg_index][parquet_index]); + let iter = row_group_indices.into_iter().map(|rg_index| { + let column_page_index_per_row_group_per_column = + &column_page_index[*rg_index][parquet_index]; + let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + + (*num_data_pages, column_page_index_per_row_group_per_column) + }); null_counts_page_statistics(iter) } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 24cd563fb9aa..db0237f2ec9b 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -273,7 +273,11 @@ impl<'a> Test<'a> { ); let null_counts = converter - .data_page_null_counts(column_page_index, &row_group_indices) + .data_page_null_counts( + column_page_index, + column_offset_index, + &row_group_indices, + ) .unwrap(); assert_eq!( &null_counts, &expected_null_counts, From d731f447b3a3a3d726482247f6ca3da7ff29b5ec Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Tue, 11 Jun 2024 13:44:16 +0200 Subject: [PATCH 16/22] fix: docs --- .../src/datasource/physical_plan/parquet/statistics.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index a98c82e2b538..d0dd015c0fb3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -895,10 +895,10 @@ impl<'a> StatisticsConverter<'a> { /// # Parameters: /// /// * `column_page_index`: The parquet column page indices, read from - /// [`ParquetMetadata::column_index()`] + /// `ParquetMetaData` column_index /// /// * `column_offset_index`: The parquet column offset indices, read from - /// [`ParquetMetadata::offset_index()`] + /// `ParquetMetaData` offset_index /// /// * `row_group_indices`: The indices of the row groups, that are used to /// extract the column page index and offset index on a per row group @@ -1020,10 +1020,10 @@ impl<'a> StatisticsConverter<'a> { /// # Parameters: /// /// * `column_offset_index`: The parquet column offset indices, read from - /// [`ParquetMetadata::offset_index()`] + /// `ParquetMetaData` offset_index /// /// * `row_group_metadatas`: The metadata slice of the row groups, read - /// from [`ParquetMetadata::row_groups()`] + /// from `ParquetMetaData` row_groups /// /// * `row_group_indices`: The indices of the row groups, that are used to /// extract the column offset index on a per row group per column basis. From 6f7e856e798635bcc65d07fb293e9dc9e369a673 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 18:30:10 -0400 Subject: [PATCH 17/22] tweak comments --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 7a2011c47e8f..a2e0d8fa66be 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -658,6 +658,8 @@ where /// Extracts the null count statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] +/// +/// The returned Array is an [`UInt64Array`] pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result where I: Iterator, @@ -991,6 +993,8 @@ impl<'a> StatisticsConverter<'a> { /// Extract the null counts from Data Page statistics. /// + /// The returned Array is an [`UInt64Array`] + /// /// See docs on [`Self::data_page_mins`] for details. pub fn data_page_null_counts( &self, From 8d1b99c93e666d6dfc3d6a0036f36e58d31a6dc1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 18:43:04 -0400 Subject: [PATCH 18/22] update test helper --- .../core/tests/parquet/arrow_statistics.rs | 186 +++++++++--------- 1 file changed, 98 insertions(+), 88 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 2820ffda3972..ae276da0af31 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -47,96 +47,76 @@ use parquet::file::properties::{EnabledStatistics, WriterProperties}; use super::make_test_file_rg; -// TEST HELPERS - -/// Return a record batch with i64 with Null values -fn make_int64_batches_with_null( +#[derive(Debug, Clone)] +struct Int64Case { + /// Number of nulls in the column null_values: usize, + /// Non null values in the range `[no_null_values_start, + /// no_null_values_end]`, one value for each row no_null_values_start: i64, no_null_values_end: i64, -) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("i64", DataType::Int64, true)])); - - let v64: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); - - RecordBatch::try_new( - schema, - vec![make_array( - Int64Array::from_iter( - v64.into_iter() - .map(Some) - .chain(std::iter::repeat(None).take(null_values)), - ) - .to_data(), - )], - ) - .unwrap() -} - -// Create a parquet file with one column for data type i64 -// Data of the file include -// . Number of null rows is the given num_null -// . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row -// . The file is divided into row groups of size row_per_group -pub fn parquet_file_one_column( - num_null: usize, - no_null_values_start: i64, - no_null_values_end: i64, + /// Number of rows per row group row_per_group: usize, -) -> ParquetRecordBatchReaderBuilder { - parquet_file_one_column_stats( - num_null, - no_null_values_start, - no_null_values_end, - row_per_group, - EnabledStatistics::Chunk, - ) + /// if specified, overrides default statistics settings + enable_stats: Option, } -// Create a parquet file with one column for data type i64 -// Data of the file include -// . Number of null rows is the given num_null -// . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row -// . The file is divided into row groups of size row_per_group -// . Statistics are enabled/disabled based on the given enable_stats -pub fn parquet_file_one_column_stats( - num_null: usize, - no_null_values_start: i64, - no_null_values_end: i64, - row_per_group: usize, - enable_stats: EnabledStatistics, -) -> ParquetRecordBatchReaderBuilder { - let mut output_file = tempfile::Builder::new() - .prefix("parquert_statistics_test") - .suffix(".parquet") - .tempfile() - .expect("tempfile creation"); +impl Int64Case { + /// Return a record batch with i64 with Null values + fn make_int64_batches_with_null(&self) -> RecordBatch { + let schema = + Arc::new(Schema::new(vec![Field::new("i64", DataType::Int64, true)])); + + let v64: Vec = + (self.no_null_values_start as _..self.no_null_values_end as _).collect(); + + RecordBatch::try_new( + schema, + vec![make_array( + Int64Array::from_iter( + v64.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(self.null_values)), + ) + .to_data(), + )], + ) + .unwrap() + } + + // Create a parquet file with the specified settings + pub fn parquet_file(&self) -> ParquetRecordBatchReaderBuilder { + let mut output_file = tempfile::Builder::new() + .prefix("parquert_statistics_test") + .suffix(".parquet") + .tempfile() + .expect("tempfile creation"); - let props = WriterProperties::builder() - .set_max_row_group_size(row_per_group) - .set_statistics_enabled(enable_stats) - .build(); + let mut builder = + WriterProperties::builder().set_max_row_group_size(self.row_per_group); + if let Some(enable_stats) = self.enable_stats { + builder = builder.set_statistics_enabled(enable_stats); + } + let props = builder.build(); - let batches = vec![make_int64_batches_with_null( - num_null, - no_null_values_start, - no_null_values_end, - )]; + let batches = vec![self.make_int64_batches_with_null()]; - let schema = batches[0].schema(); + let schema = batches[0].schema(); - let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); + let mut writer = + ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); - for batch in batches { - writer.write(&batch).expect("writing batch"); - } + for batch in batches { + writer.write(&batch).expect("writing batch"); + } - // close file - let _file_meta = writer.close().unwrap(); + // close file + let _file_meta = writer.close().unwrap(); - // open the file & get the reader - let file = output_file.reopen().unwrap(); - ArrowReaderBuilder::try_new(file).unwrap() + // open the file & get the reader + let file = output_file.reopen().unwrap(); + ArrowReaderBuilder::try_new(file).unwrap() + } } /// Defines what data to create in a parquet file @@ -326,8 +306,15 @@ impl<'a> Test<'a> { #[tokio::test] async fn test_one_row_group_without_null() { - let row_per_group = 20; - let reader = parquet_file_one_column(0, 4, 7, row_per_group); + let reader = Int64Case { + null_values: 0, + no_null_values_start: 4, + no_null_values_end: 7, + row_per_group: 20, + enable_stats: None, + } + .parquet_file(); + Test { reader: &reader, // min is 4 @@ -346,8 +333,14 @@ async fn test_one_row_group_without_null() { #[tokio::test] async fn test_one_row_group_with_null_and_negative() { - let row_per_group = 20; - let reader = parquet_file_one_column(2, -1, 5, row_per_group); + let reader = Int64Case { + null_values: 2, + no_null_values_start: -1, + no_null_values_end: 5, + row_per_group: 20, + enable_stats: None, + } + .parquet_file(); Test { reader: &reader, @@ -367,8 +360,14 @@ async fn test_one_row_group_with_null_and_negative() { #[tokio::test] async fn test_two_row_group_with_null() { - let row_per_group = 10; - let reader = parquet_file_one_column(2, 4, 17, row_per_group); + let reader = Int64Case { + null_values: 2, + no_null_values_start: 4, + no_null_values_end: 17, + row_per_group: 10, + enable_stats: None, + } + .parquet_file(); Test { reader: &reader, @@ -388,8 +387,14 @@ async fn test_two_row_group_with_null() { #[tokio::test] async fn test_two_row_groups_with_all_nulls_in_one() { - let row_per_group = 5; - let reader = parquet_file_one_column(4, -2, 2, row_per_group); + let reader = Int64Case { + null_values: 4, + no_null_values_start: -2, + no_null_values_end: 2, + row_per_group: 5, + enable_stats: None, + } + .parquet_file(); Test { reader: &reader, @@ -1885,9 +1890,14 @@ async fn test_utf8() { #[tokio::test] async fn test_missing_statistics() { - let row_per_group = 5; - let reader = - parquet_file_one_column_stats(0, 4, 7, row_per_group, EnabledStatistics::None); + let reader = Int64Case { + null_values: 0, + no_null_values_start: 4, + no_null_values_end: 7, + row_per_group: 5, + enable_stats: Some(EnabledStatistics::None), + } + .parquet_file(); Test { reader: &reader, From 830f662086409e9a4ce2116ab05a2bd2900c90df Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 18:50:52 -0400 Subject: [PATCH 19/22] Add explicit multi-data page tests to statistics test --- .../core/tests/parquet/arrow_statistics.rs | 65 +++++++++++++++---- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index ae276da0af31..a35a3d7844dd 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -18,6 +18,7 @@ //! This file contains an end to end test of extracting statitics from parquet files. //! It writes data into a parquet file, reads statistics and verifies they are correct +use std::default::Default; use std::fs::File; use std::sync::Arc; @@ -47,7 +48,7 @@ use parquet::file::properties::{EnabledStatistics, WriterProperties}; use super::make_test_file_rg; -#[derive(Debug, Clone)] +#[derive(Debug, Default, Clone)] struct Int64Case { /// Number of nulls in the column null_values: usize, @@ -59,6 +60,8 @@ struct Int64Case { row_per_group: usize, /// if specified, overrides default statistics settings enable_stats: Option, + /// If specified, the number of values in each page + data_page_row_count_limit: Option, } impl Int64Case { @@ -85,7 +88,7 @@ impl Int64Case { } // Create a parquet file with the specified settings - pub fn parquet_file(&self) -> ParquetRecordBatchReaderBuilder { + pub fn build(&self) -> ParquetRecordBatchReaderBuilder { let mut output_file = tempfile::Builder::new() .prefix("parquert_statistics_test") .suffix(".parquet") @@ -97,6 +100,9 @@ impl Int64Case { if let Some(enable_stats) = self.enable_stats { builder = builder.set_statistics_enabled(enable_stats); } + if let Some(data_page_row_count_limit) = self.data_page_row_count_limit { + builder = builder.set_data_page_row_count_limit(data_page_row_count_limit); + } let props = builder.build(); let batches = vec![self.make_int64_batches_with_null()]; @@ -115,7 +121,8 @@ impl Int64Case { // open the file & get the reader let file = output_file.reopen().unwrap(); - ArrowReaderBuilder::try_new(file).unwrap() + let options = ArrowReaderOptions::new().with_page_index(true); + ArrowReaderBuilder::try_new_with_options(file, options).unwrap() } } @@ -311,9 +318,9 @@ async fn test_one_row_group_without_null() { no_null_values_start: 4, no_null_values_end: 7, row_per_group: 20, - enable_stats: None, + ..Default::default() } - .parquet_file(); + .build(); Test { reader: &reader, @@ -338,9 +345,9 @@ async fn test_one_row_group_with_null_and_negative() { no_null_values_start: -1, no_null_values_end: 5, row_per_group: 20, - enable_stats: None, + ..Default::default() } - .parquet_file(); + .build(); Test { reader: &reader, @@ -365,9 +372,9 @@ async fn test_two_row_group_with_null() { no_null_values_start: 4, no_null_values_end: 17, row_per_group: 10, - enable_stats: None, + ..Default::default() } - .parquet_file(); + .build(); Test { reader: &reader, @@ -392,9 +399,9 @@ async fn test_two_row_groups_with_all_nulls_in_one() { no_null_values_start: -2, no_null_values_end: 2, row_per_group: 5, - enable_stats: None, + ..Default::default() } - .parquet_file(); + .build(); Test { reader: &reader, @@ -412,6 +419,38 @@ async fn test_two_row_groups_with_all_nulls_in_one() { .run() } + +#[tokio::test] +async fn test_multiple_data_pages_nulls_and_negatives() { + let reader = Int64Case { + null_values: 2, + no_null_values_start: -1, + no_null_values_end: 5, + row_per_group: 20, + // limit page row count to 4 + data_page_row_count_limit: Some(4), + enable_stats: Some(EnabledStatistics::Page), + + } + .build(); + + Test { + reader: &reader, + // min is -1 + expected_min: Arc::new(Int64Array::from(vec![-1])), + // max is 4 + expected_max: Arc::new(Int64Array::from(vec![4])), + // 2 nulls + expected_null_counts: UInt64Array::from(vec![2]), + // 8 rows + expected_row_counts: UInt64Array::from(vec![8]), + column_name: "i64", + test_data_page_statistics: true, + } + .run() +} + + /////////////// MORE GENERAL TESTS ////////////////////// // . Many columns in a file // . Differnet data types @@ -1896,8 +1935,10 @@ async fn test_missing_statistics() { no_null_values_end: 7, row_per_group: 5, enable_stats: Some(EnabledStatistics::None), + ..Default::default() + } - .parquet_file(); + .build(); Test { reader: &reader, From d25dd9a85b5a09860358f8f484985a3dfb91a9af Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 19:04:26 -0400 Subject: [PATCH 20/22] Add explicit data page test --- .../core/tests/parquet/arrow_statistics.rs | 108 ++++++++++-------- 1 file changed, 60 insertions(+), 48 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index a35a3d7844dd..c5688be1264a 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -66,6 +66,8 @@ struct Int64Case { impl Int64Case { /// Return a record batch with i64 with Null values + /// The first no_null_values_end - no_null_values_start values + /// are non-null with the specified range, the rest are null fn make_int64_batches_with_null(&self) -> RecordBatch { let schema = Arc::new(Schema::new(vec![Field::new("i64", DataType::Int64, true)])); @@ -112,8 +114,18 @@ impl Int64Case { let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); - for batch in batches { - writer.write(&batch).expect("writing batch"); + // if we have a datapage limit send the batches in one at a time to give + // the writer a chance to be split into multiple pages + if self.data_page_row_count_limit.is_some() { + for batch in batches { + for i in 0..batch.num_rows() { + writer.write(&batch.slice(i, 1)).expect("writing batch"); + } + } + } else { + for batch in batches { + writer.write(&batch).expect("writing batch"); + } } // close file @@ -187,36 +199,6 @@ impl<'a> Test<'a> { let row_groups = reader.metadata().row_groups(); - let min = converter.row_group_mins(row_groups).unwrap(); - assert_eq!( - &min, &expected_min, - "{column_name}: Mismatch with expected minimums" - ); - - let max = converter.row_group_maxes(row_groups).unwrap(); - assert_eq!( - &max, &expected_max, - "{column_name}: Mismatch with expected maximum" - ); - - let null_counts = converter.row_group_null_counts(row_groups).unwrap(); - let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; - assert_eq!( - &null_counts, &expected_null_counts, - "{column_name}: Mismatch with expected null counts. \ - Actual: {null_counts:?}. Expected: {expected_null_counts:?}" - ); - - let row_counts = StatisticsConverter::row_group_row_counts( - reader.metadata().row_groups().iter(), - ) - .unwrap(); - assert_eq!( - row_counts, expected_row_counts, - "{column_name}: Mismatch with expected row counts. \ - Actual: {row_counts:?}. Expected: {expected_row_counts:?}" - ); - if test_data_page_statistics { let column_page_index = reader .metadata() @@ -265,6 +247,8 @@ impl<'a> Test<'a> { &row_group_indices, ) .unwrap(); + + let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; assert_eq!( &null_counts, &expected_null_counts, "{column_name}: Mismatch with expected data page null counts. \ @@ -280,6 +264,36 @@ impl<'a> Test<'a> { "{column_name}: Mismatch with expected row counts. \ Actual: {row_counts:?}. Expected: {expected_row_counts:?}" ); + } else { + let min = converter.row_group_mins(row_groups).unwrap(); + assert_eq!( + &min, &expected_min, + "{column_name}: Mismatch with expected minimums" + ); + + let max = converter.row_group_maxes(row_groups).unwrap(); + assert_eq!( + &max, &expected_max, + "{column_name}: Mismatch with expected maximum" + ); + + let null_counts = converter.row_group_null_counts(row_groups).unwrap(); + let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; + assert_eq!( + &null_counts, &expected_null_counts, + "{column_name}: Mismatch with expected null counts. \ + Actual: {null_counts:?}. Expected: {expected_null_counts:?}" + ); + + let row_counts = StatisticsConverter::row_group_row_counts( + reader.metadata().row_groups().iter(), + ) + .unwrap(); + assert_eq!( + row_counts, expected_row_counts, + "{column_name}: Mismatch with expected row counts. \ + Actual: {row_counts:?}. Expected: {expected_row_counts:?}" + ); } } @@ -419,38 +433,37 @@ async fn test_two_row_groups_with_all_nulls_in_one() { .run() } - #[tokio::test] async fn test_multiple_data_pages_nulls_and_negatives() { let reader = Int64Case { - null_values: 2, + null_values: 3, no_null_values_start: -1, - no_null_values_end: 5, + no_null_values_end: 10, row_per_group: 20, // limit page row count to 4 data_page_row_count_limit: Some(4), enable_stats: Some(EnabledStatistics::Page), - } - .build(); + .build(); + // Data layout looks like this: + // + // page 0: [-1, 0, 1, 2] + // page 1: [3, 4, 5, 6] + // page 2: [7, 8, 9, null] + // page 3: [null, null] Test { reader: &reader, - // min is -1 - expected_min: Arc::new(Int64Array::from(vec![-1])), - // max is 4 - expected_max: Arc::new(Int64Array::from(vec![4])), - // 2 nulls - expected_null_counts: UInt64Array::from(vec![2]), - // 8 rows - expected_row_counts: UInt64Array::from(vec![8]), + expected_min: Arc::new(Int64Array::from(vec![Some(-1), Some(3), Some(7), None])), + expected_max: Arc::new(Int64Array::from(vec![Some(2), Some(6), Some(9), None])), + expected_null_counts: UInt64Array::from(vec![0, 0, 1, 2]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 2]), column_name: "i64", test_data_page_statistics: true, } - .run() + .run() } - /////////////// MORE GENERAL TESTS ////////////////////// // . Many columns in a file // . Differnet data types @@ -1936,7 +1949,6 @@ async fn test_missing_statistics() { row_per_group: 5, enable_stats: Some(EnabledStatistics::None), ..Default::default() - } .build(); From a38140776f13daacc3b7418e9a0edcec3a316a43 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 19:08:29 -0400 Subject: [PATCH 21/22] remove duplicate test --- .../core/tests/parquet/arrow_statistics.rs | 33 ++----------------- 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index c5688be1264a..a87449c4e201 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -174,7 +174,8 @@ struct Test<'a> { expected_row_counts: UInt64Array, /// Which column to extract statistics from column_name: &'static str, - /// Whether to test data page statistics or not + /// If true, extracts and compares data page statistics rather than row + /// group statistics test_data_page_statistics: bool, } @@ -491,35 +492,7 @@ async fn test_int_64() { // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), column_name: "i64", - test_data_page_statistics: true, - } - .run(); -} - -#[tokio::test] -async fn test_int_64_with_nulls() { - // This creates a parquet files of 4 columns - // named "i8", "i16", "i32", "i64" - // creates 3 row groups of 5 rows each - let reader = TestReader { - scenario: Scenario::WithNullValues, - row_per_group: 5, - } - .build() - .await; - - Test { - reader: &reader, - // mins are [-5, -4, 0, 5] - expected_min: Arc::new(Int64Array::from(vec![None, Some(1), None])), - // maxes are [-1, 0, 4, 9] - expected_max: Arc::new(Int64Array::from(vec![None, Some(5), None])), - // nulls are [0, 0, 0, 0] - expected_null_counts: UInt64Array::from(vec![5, 0, 5]), - // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), - column_name: "i64", - test_data_page_statistics: true, + test_data_page_statistics: false, } .run(); } From a5b6b9b78f936ade3dfeca4d78b7d440273e55ec Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 19:09:59 -0400 Subject: [PATCH 22/22] update coverage --- .../core/tests/parquet/arrow_statistics.rs | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index a87449c4e201..3c812800e2b7 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -481,20 +481,23 @@ async fn test_int_64() { .build() .await; - Test { - reader: &reader, - // mins are [-5, -4, 0, 5] - expected_min: Arc::new(Int64Array::from(vec![-5, -4, 0, 5])), - // maxes are [-1, 0, 4, 9] - expected_max: Arc::new(Int64Array::from(vec![-1, 0, 4, 9])), - // nulls are [0, 0, 0, 0] - expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), - // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), - column_name: "i64", - test_data_page_statistics: false, + // since each row has only one data page, the statistics are the same + for test_data_page_statistics in [true, false] { + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int64Array::from(vec![-5, -4, 0, 5])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int64Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "i64", + test_data_page_statistics, + } + .run(); } - .run(); } #[tokio::test]