diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index eba1f561203c..1613656ab9ae 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -25,7 +25,7 @@ use crate::basic::Type as PhysicalType; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; -use crate::file::page_index::index::{Index, PageIndex}; +use crate::file::page_index::column_index::{ColumnIndexIterators, ColumnIndexMetaData}; use crate::file::statistics::Statistics as ParquetStatistics; use crate::schema::types::SchemaDescriptor; use arrow_array::builder::{ @@ -597,17 +597,17 @@ macro_rules! get_statistics { } macro_rules! make_data_page_stats_iterator { - ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => { + ($iterator_type: ident, $func: ident, $stat_value_type: ty) => { struct $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { iter: I, } impl<'a, I> $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { fn new(iter: I) -> Self { Self { iter } @@ -616,7 +616,7 @@ macro_rules! make_data_page_stats_iterator { impl<'a, I> Iterator for $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { type Item = Vec>; @@ -624,16 +624,14 @@ macro_rules! make_data_page_stats_iterator { let next = self.iter.next(); match next { Some((len, index)) => match index { - $index_type(native_index) => { - Some(native_index.indexes.iter().map($func).collect::>()) - } // No matching `Index` found; // thus no statistics that can be extracted. // We return vec![None; len] to effectively // create an arrow null-array with the length // corresponding to the number of entries in // `ParquetOffsetIndex` per row group per column. - _ => Some(vec![None; len]), + ColumnIndexMetaData::NONE => Some(vec![None; len]), + _ => Some(<$stat_value_type>::$func(&index).collect::>()), }, _ => None, } @@ -646,101 +644,45 @@ macro_rules! make_data_page_stats_iterator { }; } -make_data_page_stats_iterator!( - MinBooleanDataPageStatsIterator, - |x: &PageIndex| { x.min }, - Index::BOOLEAN, - bool -); -make_data_page_stats_iterator!( - MaxBooleanDataPageStatsIterator, - |x: &PageIndex| { x.max }, - Index::BOOLEAN, - bool -); -make_data_page_stats_iterator!( - MinInt32DataPageStatsIterator, - |x: &PageIndex| { x.min }, - Index::INT32, - i32 -); -make_data_page_stats_iterator!( - MaxInt32DataPageStatsIterator, - |x: &PageIndex| { x.max }, - Index::INT32, - i32 -); -make_data_page_stats_iterator!( - MinInt64DataPageStatsIterator, - |x: &PageIndex| { x.min }, - Index::INT64, - i64 -); -make_data_page_stats_iterator!( - MaxInt64DataPageStatsIterator, - |x: &PageIndex| { x.max }, - Index::INT64, - i64 -); +make_data_page_stats_iterator!(MinBooleanDataPageStatsIterator, min_values_iter, bool); +make_data_page_stats_iterator!(MaxBooleanDataPageStatsIterator, max_values_iter, bool); +make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min_values_iter, i32); +make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max_values_iter, i32); +make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min_values_iter, i64); +make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max_values_iter, i64); make_data_page_stats_iterator!( MinFloat16DataPageStatsIterator, - |x: &PageIndex| { x.min.clone() }, - Index::FIXED_LEN_BYTE_ARRAY, + min_values_iter, FixedLenByteArray ); make_data_page_stats_iterator!( MaxFloat16DataPageStatsIterator, - |x: &PageIndex| { x.max.clone() }, - Index::FIXED_LEN_BYTE_ARRAY, + max_values_iter, FixedLenByteArray ); -make_data_page_stats_iterator!( - MinFloat32DataPageStatsIterator, - |x: &PageIndex| { x.min }, - Index::FLOAT, - f32 -); -make_data_page_stats_iterator!( - MaxFloat32DataPageStatsIterator, - |x: &PageIndex| { x.max }, - Index::FLOAT, - f32 -); -make_data_page_stats_iterator!( - MinFloat64DataPageStatsIterator, - |x: &PageIndex| { x.min }, - Index::DOUBLE, - f64 -); -make_data_page_stats_iterator!( - MaxFloat64DataPageStatsIterator, - |x: &PageIndex| { x.max }, - Index::DOUBLE, - f64 -); +make_data_page_stats_iterator!(MinFloat32DataPageStatsIterator, min_values_iter, f32); +make_data_page_stats_iterator!(MaxFloat32DataPageStatsIterator, max_values_iter, f32); +make_data_page_stats_iterator!(MinFloat64DataPageStatsIterator, min_values_iter, f64); +make_data_page_stats_iterator!(MaxFloat64DataPageStatsIterator, max_values_iter, f64); make_data_page_stats_iterator!( MinByteArrayDataPageStatsIterator, - |x: &PageIndex| { x.min.clone() }, - Index::BYTE_ARRAY, + min_values_iter, ByteArray ); make_data_page_stats_iterator!( MaxByteArrayDataPageStatsIterator, - |x: &PageIndex| { x.max.clone() }, - Index::BYTE_ARRAY, + max_values_iter, ByteArray ); make_data_page_stats_iterator!( MaxFixedLenByteArrayDataPageStatsIterator, - |x: &PageIndex| { x.max.clone() }, - Index::FIXED_LEN_BYTE_ARRAY, + max_values_iter, FixedLenByteArray ); make_data_page_stats_iterator!( MinFixedLenByteArrayDataPageStatsIterator, - |x: &PageIndex| { x.min.clone() }, - Index::FIXED_LEN_BYTE_ARRAY, + min_values_iter, FixedLenByteArray ); @@ -748,14 +690,14 @@ macro_rules! get_decimal_page_stats_iterator { ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => { struct $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { iter: I, } impl<'a, I> $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { fn new(iter: I) -> Self { Self { iter } @@ -764,44 +706,37 @@ macro_rules! get_decimal_page_stats_iterator { impl<'a, I> Iterator for $iterator_type<'a, I> where - I: Iterator, + I: Iterator, { type Item = Vec>; + // Some(native_index.$func().map(|v| v.map($conv)).collect::>()) fn next(&mut self) -> Option { let next = self.iter.next(); match next { Some((len, index)) => match index { - Index::INT32(native_index) => Some( + ColumnIndexMetaData::INT32(native_index) => Some( native_index - .indexes - .iter() - .map(|x| x.$func.and_then(|x| Some($stat_value_type::from(x)))) + .$func() + .map(|x| x.map(|x| $stat_value_type::from(*x))) .collect::>(), ), - Index::INT64(native_index) => Some( + ColumnIndexMetaData::INT64(native_index) => Some( native_index - .indexes - .iter() - .map(|x| x.$func.and_then(|x| $stat_value_type::try_from(x).ok())) + .$func() + .map(|x| x.map(|x| $stat_value_type::try_from(*x).unwrap())) .collect::>(), ), - Index::BYTE_ARRAY(native_index) => Some( + ColumnIndexMetaData::BYTE_ARRAY(native_index) => Some( native_index - .indexes - .iter() - .map(|x| { - x.clone().$func.and_then(|x| Some($convert_func(x.data()))) - }) + .$func() + .map(|x| x.map(|x| $convert_func(x))) .collect::>(), ), - Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some( + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(native_index) => Some( native_index - .indexes - .iter() - .map(|x| { - x.clone().$func.and_then(|x| Some($convert_func(x.data()))) - }) + .$func() + .map(|x| x.map(|x| $convert_func(x))) .collect::>(), ), _ => Some(vec![None; len]), @@ -819,56 +754,56 @@ macro_rules! get_decimal_page_stats_iterator { get_decimal_page_stats_iterator!( MinDecimal32DataPageStatsIterator, - min, + min_values_iter, i32, from_bytes_to_i32 ); get_decimal_page_stats_iterator!( MaxDecimal32DataPageStatsIterator, - max, + max_values_iter, i32, from_bytes_to_i32 ); get_decimal_page_stats_iterator!( MinDecimal64DataPageStatsIterator, - min, + min_values_iter, i64, from_bytes_to_i64 ); get_decimal_page_stats_iterator!( MaxDecimal64DataPageStatsIterator, - max, + max_values_iter, i64, from_bytes_to_i64 ); get_decimal_page_stats_iterator!( MinDecimal128DataPageStatsIterator, - min, + min_values_iter, i128, from_bytes_to_i128 ); get_decimal_page_stats_iterator!( MaxDecimal128DataPageStatsIterator, - max, + max_values_iter, i128, from_bytes_to_i128 ); get_decimal_page_stats_iterator!( MinDecimal256DataPageStatsIterator, - min, + min_values_iter, i256, from_bytes_to_i256 ); get_decimal_page_stats_iterator!( MaxDecimal256DataPageStatsIterator, - max, + max_values_iter, i256, from_bytes_to_i256 ); @@ -1174,77 +1109,44 @@ fn max_statistics<'a, I: Iterator>>( } /// Extracts the min statistics from an iterator -/// of parquet page [`Index`]'es to an [`ArrayRef`] +/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`] pub(crate) fn min_page_statistics<'a, I>( data_type: &DataType, iterator: I, physical_type: Option, ) -> Result where - I: Iterator, + I: Iterator, { get_data_page_statistics!(Min, data_type, iterator, physical_type) } /// Extracts the max statistics from an iterator -/// of parquet page [`Index`]'es to an [`ArrayRef`] +/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`] pub(crate) fn max_page_statistics<'a, I>( data_type: &DataType, iterator: I, physical_type: Option, ) -> Result where - I: Iterator, + I: Iterator, { get_data_page_statistics!(Max, data_type, iterator, physical_type) } /// Extracts the null count statistics from an iterator -/// of parquet page [`Index`]'es to an [`ArrayRef`] +/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`] /// /// The returned Array is an [`UInt64Array`] pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result where - I: Iterator, + I: Iterator, { let iter = iterator.flat_map(|(len, index)| match index { - Index::NONE => vec![None; len], - Index::BOOLEAN(native_index) => native_index - .indexes - .iter() - .map(|x| x.null_count.map(|x| x as u64)) - .collect::>(), - Index::INT32(native_index) => native_index - .indexes - .iter() - .map(|x| x.null_count.map(|x| x as u64)) - .collect::>(), - Index::INT64(native_index) => native_index - .indexes - .iter() - .map(|x| x.null_count.map(|x| x as u64)) - .collect::>(), - Index::FLOAT(native_index) => native_index - .indexes - .iter() - .map(|x| x.null_count.map(|x| x as u64)) - .collect::>(), - Index::DOUBLE(native_index) => native_index - .indexes - .iter() - .map(|x| x.null_count.map(|x| x as u64)) - .collect::>(), - Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index - .indexes - .iter() - .map(|x| x.null_count.map(|x| x as u64)) - .collect::>(), - Index::BYTE_ARRAY(native_index) => native_index - .indexes - .iter() - .map(|x| x.null_count.map(|x| x as u64)) - .collect::>(), - _ => unimplemented!(), + ColumnIndexMetaData::NONE => vec![None; len], + column_index => column_index.null_counts().map_or(vec![None; len], |v| { + v.iter().map(|i| Some(*i as u64)).collect::>() + }), }); Ok(UInt64Array::from_iter(iter)) @@ -1573,7 +1475,7 @@ impl<'a> StatisticsConverter<'a> { /// page level statistics can prune at a finer granularity. /// /// However since they are stored in a separate metadata - /// structure ([`Index`]) there is different code to extract them as + /// structure ([`ColumnIndexMetaData`]) there is different code to extract them as /// compared to arrow statistics. /// /// # Parameters: diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index c6b0b426f9dd..bd9f30c36103 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1488,6 +1488,7 @@ mod tests { use crate::arrow::ARROW_SCHEMA_META_KEY; use crate::column::page::{Page, PageReader}; use crate::file::page_encoding_stats::PageEncodingStats; + use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::reader::SerializedPageReader; use crate::format::PageHeader; use crate::schema::types::ColumnPath; @@ -1507,7 +1508,6 @@ mod tests { use crate::basic::Encoding; use crate::data_type::AsBytes; use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader}; - use crate::file::page_index::index::Index; use crate::file::properties::{ BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, }; @@ -4002,9 +4002,12 @@ mod tests { assert_eq!(column_index[0].len(), 2); // 2 columns let a_idx = &column_index[0][0]; - assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}"); + assert!( + matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)), + "{a_idx:?}" + ); let b_idx = &column_index[0][1]; - assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); + assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}"); } #[test] @@ -4070,9 +4073,9 @@ mod tests { assert_eq!(column_index[0].len(), 2); // 2 columns let a_idx = &column_index[0][0]; - assert!(matches!(a_idx, Index::NONE), "{a_idx:?}"); + assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}"); let b_idx = &column_index[0][1]; - assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); + assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}"); } #[test] diff --git a/parquet/src/bin/parquet-index.rs b/parquet/src/bin/parquet-index.rs index e91f5e5a9f17..397a75c76ae4 100644 --- a/parquet/src/bin/parquet-index.rs +++ b/parquet/src/bin/parquet-index.rs @@ -35,8 +35,11 @@ //! [page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md use clap::Parser; +use parquet::data_type::ByteArray; use parquet::errors::{ParquetError, Result}; -use parquet::file::page_index::index::{Index, PageIndex}; +use parquet::file::page_index::column_index::{ + ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex, +}; use parquet::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::file::serialized_reader::ReadOptionsBuilder; @@ -96,16 +99,20 @@ impl Args { let row_counts = compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows()); match &column_indices[column_idx] { - Index::NONE => println!("NO INDEX"), - Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?, - Index::INT32(v) => print_index(&v.indexes, offset_index, &row_counts)?, - Index::INT64(v) => print_index(&v.indexes, offset_index, &row_counts)?, - Index::INT96(v) => print_index(&v.indexes, offset_index, &row_counts)?, - Index::FLOAT(v) => print_index(&v.indexes, offset_index, &row_counts)?, - Index::DOUBLE(v) => print_index(&v.indexes, offset_index, &row_counts)?, - Index::BYTE_ARRAY(v) => print_index(&v.indexes, offset_index, &row_counts)?, - Index::FIXED_LEN_BYTE_ARRAY(v) => { - print_index(&v.indexes, offset_index, &row_counts)? + ColumnIndexMetaData::NONE => println!("NO INDEX"), + ColumnIndexMetaData::BOOLEAN(v) => { + print_index::(v, offset_index, &row_counts)? + } + ColumnIndexMetaData::INT32(v) => print_index(v, offset_index, &row_counts)?, + ColumnIndexMetaData::INT64(v) => print_index(v, offset_index, &row_counts)?, + ColumnIndexMetaData::INT96(v) => print_index(v, offset_index, &row_counts)?, + ColumnIndexMetaData::FLOAT(v) => print_index(v, offset_index, &row_counts)?, + ColumnIndexMetaData::DOUBLE(v) => print_index(v, offset_index, &row_counts)?, + ColumnIndexMetaData::BYTE_ARRAY(v) => { + print_bytes_index(v, offset_index, &row_counts)? + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => { + print_bytes_index(v, offset_index, &row_counts)? } } } @@ -131,20 +138,21 @@ fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec { /// Prints index information for a single column chunk fn print_index( - column_index: &[PageIndex], + column_index: &PrimitiveColumnIndex, offset_index: &OffsetIndexMetaData, row_counts: &[i64], ) -> Result<()> { - if column_index.len() != offset_index.page_locations.len() { + if column_index.num_pages() as usize != offset_index.page_locations.len() { return Err(ParquetError::General(format!( "Index length mismatch, got {} and {}", - column_index.len(), + column_index.num_pages(), offset_index.page_locations.len() ))); } - for (idx, ((c, o), row_count)) in column_index - .iter() + for (idx, (((min, max), o), row_count)) in column_index + .min_values_iter() + .zip(column_index.max_values_iter()) .zip(offset_index.page_locations()) .zip(row_counts) .enumerate() @@ -153,12 +161,12 @@ fn print_index( "Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}", idx, o.offset, o.compressed_page_size, row_count ); - match &c.min { + match min { Some(m) => print!(", min {m:>10}"), None => print!(", min {:>10}", "NONE"), } - match &c.max { + match max { Some(m) => print!(", max {m:>10}"), None => print!(", max {:>10}", "NONE"), } @@ -168,6 +176,51 @@ fn print_index( Ok(()) } +fn print_bytes_index( + column_index: &ByteArrayColumnIndex, + offset_index: &OffsetIndexMetaData, + row_counts: &[i64], +) -> Result<()> { + if column_index.num_pages() as usize != offset_index.page_locations.len() { + return Err(ParquetError::General(format!( + "Index length mismatch, got {} and {}", + column_index.num_pages(), + offset_index.page_locations.len() + ))); + } + + for (idx, (((min, max), o), row_count)) in column_index + .min_values_iter() + .zip(column_index.max_values_iter()) + .zip(offset_index.page_locations()) + .zip(row_counts) + .enumerate() + { + print!( + "Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}", + idx, o.offset, o.compressed_page_size, row_count + ); + match min { + Some(m) => match String::from_utf8(m.to_vec()) { + Ok(s) => print!(", min {s:>10}"), + Err(_) => print!(", min {:>10}", ByteArray::from(m)), + }, + None => print!(", min {:>10}", "NONE"), + } + + match max { + Some(m) => match String::from_utf8(m.to_vec()) { + Ok(s) => print!(", max {s:>10}"), + Err(_) => print!(", min {:>10}", ByteArray::from(m)), + }, + None => print!(", max {:>10}", "NONE"), + } + println!() + } + + Ok(()) +} + fn main() -> Result<()> { Args::parse().run() } diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs index 0b8d3b336fc0..69eee3c2999d 100644 --- a/parquet/src/file/metadata/memory.rs +++ b/parquet/src/file/metadata/memory.rs @@ -24,6 +24,9 @@ use crate::file::metadata::{ ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData, SortingColumn, }; use crate::file::page_encoding_stats::PageEncodingStats; +use crate::file::page_index::column_index::{ + ByteArrayColumnIndex, ColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex, +}; use crate::file::page_index::index::{Index, NativeIndex, PageIndex}; use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use crate::file::statistics::{Statistics, ValueStatistics}; @@ -154,6 +157,48 @@ impl HeapSize for OffsetIndexMetaData { } } +impl HeapSize for ColumnIndexMetaData { + fn heap_size(&self) -> usize { + match self { + Self::NONE => 0, + Self::BOOLEAN(native_index) => native_index.heap_size(), + Self::INT32(native_index) => native_index.heap_size(), + Self::INT64(native_index) => native_index.heap_size(), + Self::INT96(native_index) => native_index.heap_size(), + Self::FLOAT(native_index) => native_index.heap_size(), + Self::DOUBLE(native_index) => native_index.heap_size(), + Self::BYTE_ARRAY(native_index) => native_index.heap_size(), + Self::FIXED_LEN_BYTE_ARRAY(native_index) => native_index.heap_size(), + } + } +} + +impl HeapSize for ColumnIndex { + fn heap_size(&self) -> usize { + self.null_pages.heap_size() + + self.boundary_order.heap_size() + + self.null_counts.heap_size() + + self.definition_level_histograms.heap_size() + + self.repetition_level_histograms.heap_size() + } +} + +impl HeapSize for PrimitiveColumnIndex { + fn heap_size(&self) -> usize { + self.column_index.heap_size() + self.min_values.heap_size() + self.max_values.heap_size() + } +} + +impl HeapSize for ByteArrayColumnIndex { + fn heap_size(&self) -> usize { + self.column_index.heap_size() + + self.min_bytes.heap_size() + + self.min_offsets.heap_size() + + self.max_bytes.heap_size() + + self.max_offsets.heap_size() + } +} + impl HeapSize for Index { fn heap_size(&self) -> usize { match self { @@ -193,6 +238,11 @@ impl HeapSize for bool { 0 // no heap allocations } } +impl HeapSize for u8 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} impl HeapSize for i32 { fn heap_size(&self) -> usize { 0 // no heap allocations diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index f2fe9de77e72..69cdf8f10714 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -106,7 +106,7 @@ use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData}; pub(crate) use crate::file::metadata::memory::HeapSize; use crate::file::{ page_encoding_stats::{self, PageEncodingStats}, - page_index::offset_index::PageLocation, + page_index::{column_index::ColumnIndexMetaData, offset_index::PageLocation}, }; use crate::file::{ page_index::index::PageIndex, @@ -156,7 +156,7 @@ pub(crate) use writer::ThriftMetadataWriter; /// /// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md /// [`ColumnIndex`]: crate::format::ColumnIndex -pub type ParquetColumnIndex = Vec>; +pub type ParquetColumnIndex = Vec>; /// [`OffsetIndexMetaData`] for each data page of each row group of each column /// @@ -1948,7 +1948,7 @@ impl OffsetIndexBuilder { mod tests { use super::*; use crate::basic::{PageType, SortOrder}; - use crate::file::page_index::index::NativeIndex; + use crate::file::page_index::column_index::{ColumnIndex, PrimitiveColumnIndex}; #[test] fn test_row_group_metadata_thrift_conversion() { @@ -2223,7 +2223,17 @@ mod tests { let mut column_index = ColumnIndexBuilder::new(Type::BOOLEAN); column_index.append(false, vec![1u8], vec![2u8, 3u8], 4); let column_index = column_index.build_to_thrift(); - let native_index = NativeIndex::::try_new(column_index).unwrap(); + let native_index = PrimitiveColumnIndex:: { + column_index: ColumnIndex { + null_pages: column_index.null_pages, + boundary_order: column_index.boundary_order.try_into().unwrap(), + null_counts: column_index.null_counts, + repetition_level_histograms: column_index.repetition_level_histograms, + definition_level_histograms: column_index.definition_level_histograms, + }, + min_values: vec![], + max_values: vec![], + }; // Now, add in OffsetIndex let mut offset_index = OffsetIndexBuilder::new(); @@ -2237,16 +2247,16 @@ mod tests { let parquet_meta = ParquetMetaDataBuilder::new(file_metadata) .set_row_groups(row_group_meta) - .set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]])) + .set_column_index(Some(vec![vec![ColumnIndexMetaData::BOOLEAN(native_index)]])) .set_offset_index(Some(vec![vec![ OffsetIndexMetaData::try_new(offset_index).unwrap() ]])) .build(); #[cfg(not(feature = "encryption"))] - let bigger_expected_size = 2784; + let bigger_expected_size = 2704; #[cfg(feature = "encryption")] - let bigger_expected_size = 3120; + let bigger_expected_size = 3040; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index a403f4eee8f0..57cc7c57ac66 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -34,7 +34,7 @@ use bytes::Bytes; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData}; -use crate::file::page_index::index::Index; +use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::reader::ChunkReader; use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; @@ -566,7 +566,7 @@ impl ParquetMetaDataReader { col_idx, ) } - None => Ok(Index::NONE), + None => Ok(ColumnIndexMetaData::NONE), }) .collect::>>() }) @@ -584,7 +584,7 @@ impl ParquetMetaDataReader { column: &ColumnChunkMetaData, row_group_index: usize, col_index: usize, - ) -> Result { + ) -> Result { match &column.column_crypto_metadata { Some(crypto_metadata) => { let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { @@ -612,7 +612,7 @@ impl ParquetMetaDataReader { column: &ColumnChunkMetaData, _row_group_index: usize, _col_index: usize, - ) -> Result { + ) -> Result { decode_column_index(bytes, column.column_type()) } diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index acae20ec3cef..404bcf5dba8a 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -24,9 +24,7 @@ use crate::encryption::{ }; #[cfg(feature = "encryption")] use crate::errors::ParquetError; -use crate::errors::Result; use crate::file::metadata::{KeyValue, ParquetMetaData}; -use crate::file::page_index::index::Index; use crate::file::writer::{get_file_magic, TrackedWrite}; use crate::format::EncryptionAlgorithm; #[cfg(feature = "encryption")] @@ -34,6 +32,7 @@ use crate::format::{AesGcmV1, ColumnCryptoMetaData}; use crate::schema::types; use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr}; use crate::thrift::TSerializable; +use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData}; use std::io::Write; use std::sync::Arc; use thrift::protocol::TCompactOutputProtocol; @@ -391,17 +390,31 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { column_indexes .iter() .map(|column_index| match column_index { - Index::NONE => None, - Index::BOOLEAN(column_index) => Some(column_index.to_thrift()), - Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()), - Index::DOUBLE(column_index) => Some(column_index.to_thrift()), - Index::FIXED_LEN_BYTE_ARRAY(column_index) => { + ColumnIndexMetaData::NONE => None, + ColumnIndexMetaData::BOOLEAN(column_index) => { + Some(column_index.to_thrift()) + } + ColumnIndexMetaData::BYTE_ARRAY(column_index) => { + Some(column_index.to_thrift()) + } + ColumnIndexMetaData::DOUBLE(column_index) => { + Some(column_index.to_thrift()) + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => { + Some(column_index.to_thrift()) + } + ColumnIndexMetaData::FLOAT(column_index) => { + Some(column_index.to_thrift()) + } + ColumnIndexMetaData::INT32(column_index) => { + Some(column_index.to_thrift()) + } + ColumnIndexMetaData::INT64(column_index) => { + Some(column_index.to_thrift()) + } + ColumnIndexMetaData::INT96(column_index) => { Some(column_index.to_thrift()) } - Index::FLOAT(column_index) => Some(column_index.to_thrift()), - Index::INT32(column_index) => Some(column_index.to_thrift()), - Index::INT64(column_index) => Some(column_index.to_thrift()), - Index::INT96(column_index) => Some(column_index.to_thrift()), }) .collect() }) diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs new file mode 100644 index 000000000000..2d43c93b2e4b --- /dev/null +++ b/parquet/src/file/page_index/column_index.rs @@ -0,0 +1,569 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ColumnIndexMetaData`] structures holding decoded [`ColumnIndex`] information +//! +//! [`ColumnIndex`]: crate::format::ColumnIndex +//! + +use crate::{ + data_type::{ByteArray, FixedLenByteArray}, + errors::Result, +}; +use std::ops::Deref; + +use crate::{ + basic::BoundaryOrder, + data_type::{private::ParquetValueType, Int96}, + file::page_index::index_reader::ThriftColumnIndex, +}; + +/// Common bits of the column index +#[derive(Debug, Clone, PartialEq)] +pub struct ColumnIndex { + pub(crate) null_pages: Vec, + pub(crate) boundary_order: BoundaryOrder, + pub(crate) null_counts: Option>, + pub(crate) repetition_level_histograms: Option>, + pub(crate) definition_level_histograms: Option>, +} + +impl ColumnIndex { + /// Returns the number of pages + pub fn num_pages(&self) -> u64 { + self.null_pages.len() as u64 + } + + /// Returns the number of null values in the page indexed by `idx` + /// + /// Returns `None` if no null counts have been set in the index + pub fn null_count(&self, idx: usize) -> Option { + self.null_counts.as_ref().map(|nc| nc[idx]) + } + + /// Returns the repetition level histogram for the page indexed by `idx` + pub fn repetition_level_histogram(&self, idx: usize) -> Option<&[i64]> { + if let Some(rep_hists) = self.repetition_level_histograms.as_ref() { + let num_lvls = rep_hists.len() / self.num_pages() as usize; + let start = num_lvls * idx; + Some(&rep_hists[start..start + num_lvls]) + } else { + None + } + } + + /// Returns the definition level histogram for the page indexed by `idx` + pub fn definition_level_histogram(&self, idx: usize) -> Option<&[i64]> { + if let Some(def_hists) = self.definition_level_histograms.as_ref() { + let num_lvls = def_hists.len() / self.num_pages() as usize; + let start = num_lvls * idx; + Some(&def_hists[start..start + num_lvls]) + } else { + None + } + } + + /// Returns whether the page indexed by `idx` consists of all null values + pub fn is_null_page(&self, idx: usize) -> bool { + self.null_pages[idx] + } +} + +/// Column index for primitive types +#[derive(Debug, Clone, PartialEq)] +pub struct PrimitiveColumnIndex { + pub(crate) column_index: ColumnIndex, + pub(crate) min_values: Vec, + pub(crate) max_values: Vec, +} + +impl PrimitiveColumnIndex { + pub(super) fn try_new(index: ThriftColumnIndex) -> Result { + let len = index.null_pages.len(); + + let mut min_values = Vec::with_capacity(len); + let mut max_values = Vec::with_capacity(len); + + for (i, is_null) in index.null_pages.iter().enumerate().take(len) { + if !is_null { + let min = index.min_values[i]; + min_values.push(T::try_from_le_slice(min)?); + + let max = index.max_values[i]; + max_values.push(T::try_from_le_slice(max)?); + } else { + // need placeholders + min_values.push(Default::default()); + max_values.push(Default::default()); + } + } + + Ok(Self { + column_index: ColumnIndex { + null_pages: index.null_pages, + boundary_order: index.boundary_order, + null_counts: index.null_counts, + repetition_level_histograms: index.repetition_level_histograms, + definition_level_histograms: index.definition_level_histograms, + }, + min_values, + max_values, + }) + } + + pub(crate) fn to_thrift(&self) -> crate::format::ColumnIndex { + let min_values = self + .min_values + .iter() + .map(|x| x.as_bytes().to_vec()) + .collect::>(); + + let max_values = self + .max_values + .iter() + .map(|x| x.as_bytes().to_vec()) + .collect::>(); + + let null_counts = self.null_counts.clone(); + let repetition_level_histograms = self.repetition_level_histograms.clone(); + let definition_level_histograms = self.definition_level_histograms.clone(); + let null_pages = self.null_pages.clone(); + + crate::format::ColumnIndex::new( + null_pages, + min_values, + max_values, + self.boundary_order.into(), + null_counts, + repetition_level_histograms, + definition_level_histograms, + ) + } +} + +impl PrimitiveColumnIndex { + /// Returns an array containing the min values for each page. + /// + /// Values in the returned slice are only valid if [`ColumnIndex::is_null_page()`] + /// is `false` for the same index. + pub fn min_values(&self) -> &[T] { + &self.min_values + } + + /// Returns an array containing the max values for each page. + /// + /// Values in the returned slice are only valid if [`ColumnIndex::is_null_page()`] + /// is `false` for the same index. + pub fn max_values(&self) -> &[T] { + &self.max_values + } + + /// Returns an iterator over the min values. + /// + /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. + pub fn min_values_iter(&self) -> impl Iterator> { + self.min_values.iter().enumerate().map(|(i, min)| { + if self.is_null_page(i) { + None + } else { + Some(min) + } + }) + } + + /// Returns an iterator over the max values. + /// + /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. + pub fn max_values_iter(&self) -> impl Iterator> { + self.max_values.iter().enumerate().map(|(i, min)| { + if self.is_null_page(i) { + None + } else { + Some(min) + } + }) + } + + /// Returns the min value for the page indexed by `idx` + /// + /// It is `None` when all values are null + pub fn min_value(&self, idx: usize) -> Option<&T> { + if self.null_pages[idx] { + None + } else { + Some(&self.min_values[idx]) + } + } + + /// Returns the max value for the page indexed by `idx` + /// + /// It is `None` when all values are null + pub fn max_value(&self, idx: usize) -> Option<&T> { + if self.null_pages[idx] { + None + } else { + Some(&self.max_values[idx]) + } + } +} + +impl Deref for PrimitiveColumnIndex { + type Target = ColumnIndex; + + fn deref(&self) -> &Self::Target { + &self.column_index + } +} + +/// Column index for byte arrays (fixed length and variable) +#[derive(Debug, Clone, PartialEq)] +pub struct ByteArrayColumnIndex { + pub(crate) column_index: ColumnIndex, + // raw bytes for min and max values + pub(crate) min_bytes: Vec, + pub(crate) min_offsets: Vec, + pub(crate) max_bytes: Vec, + pub(crate) max_offsets: Vec, +} + +impl ByteArrayColumnIndex { + pub(super) fn try_new(index: ThriftColumnIndex) -> Result { + let len = index.null_pages.len(); + + let min_len = index.min_values.iter().map(|&v| v.len()).sum(); + let max_len = index.max_values.iter().map(|&v| v.len()).sum(); + let mut min_bytes = vec![0u8; min_len]; + let mut max_bytes = vec![0u8; max_len]; + + let mut min_offsets = vec![0usize; len + 1]; + let mut max_offsets = vec![0usize; len + 1]; + + let mut min_pos = 0; + let mut max_pos = 0; + + for (i, is_null) in index.null_pages.iter().enumerate().take(len) { + if !is_null { + let min = index.min_values[i]; + let dst = &mut min_bytes[min_pos..min_pos + min.len()]; + dst.copy_from_slice(min); + min_offsets[i] = min_pos; + min_pos += min.len(); + + let max = index.max_values[i]; + let dst = &mut max_bytes[max_pos..max_pos + max.len()]; + dst.copy_from_slice(max); + max_offsets[i] = max_pos; + max_pos += max.len(); + } else { + min_offsets[i] = min_pos; + max_offsets[i] = max_pos; + } + } + + min_offsets[len] = min_pos; + max_offsets[len] = max_pos; + + Ok(Self { + column_index: ColumnIndex { + null_pages: index.null_pages, + boundary_order: index.boundary_order, + null_counts: index.null_counts, + repetition_level_histograms: index.repetition_level_histograms, + definition_level_histograms: index.definition_level_histograms, + }, + + min_bytes, + min_offsets, + max_bytes, + max_offsets, + }) + } + + /// Returns the min value for the page indexed by `idx` + /// + /// It is `None` when all values are null + pub fn min_value(&self, idx: usize) -> Option<&[u8]> { + if self.null_pages[idx] { + None + } else { + let start = self.min_offsets[idx]; + let end = self.min_offsets[idx + 1]; + Some(&self.min_bytes[start..end]) + } + } + + /// Returns the max value for the page indexed by `idx` + /// + /// It is `None` when all values are null + pub fn max_value(&self, idx: usize) -> Option<&[u8]> { + if self.null_pages[idx] { + None + } else { + let start = self.max_offsets[idx]; + let end = self.max_offsets[idx + 1]; + Some(&self.max_bytes[start..end]) + } + } + + /// Returns an iterator over the min values. + /// + /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. + pub fn min_values_iter(&self) -> impl Iterator> { + (0..self.num_pages() as usize).map(|i| { + if self.is_null_page(i) { + None + } else { + self.min_value(i) + } + }) + } + + /// Returns an iterator over the max values. + /// + /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. + pub fn max_values_iter(&self) -> impl Iterator> { + (0..self.num_pages() as usize).map(|i| { + if self.is_null_page(i) { + None + } else { + self.max_value(i) + } + }) + } + + pub(crate) fn to_thrift(&self) -> crate::format::ColumnIndex { + let mut min_values = Vec::with_capacity(self.num_pages() as usize); + for i in 0..self.num_pages() as usize { + min_values.push(self.min_value(i).unwrap_or(&[]).to_owned()); + } + + let mut max_values = Vec::with_capacity(self.num_pages() as usize); + for i in 0..self.num_pages() as usize { + max_values.push(self.max_value(i).unwrap_or(&[]).to_owned()); + } + + let null_counts = self.null_counts.clone(); + let repetition_level_histograms = self.repetition_level_histograms.clone(); + let definition_level_histograms = self.definition_level_histograms.clone(); + let null_pages = self.null_pages.clone(); + + crate::format::ColumnIndex::new( + null_pages, + min_values, + max_values, + self.boundary_order.into(), + null_counts, + repetition_level_histograms, + definition_level_histograms, + ) + } +} + +impl Deref for ByteArrayColumnIndex { + type Target = ColumnIndex; + + fn deref(&self) -> &Self::Target { + &self.column_index + } +} + +// Macro to generate getter functions for ColumnIndexMetaData. +macro_rules! colidx_enum_func { + ($self:ident, $func:ident, $arg:ident) => {{ + match *$self { + Self::BOOLEAN(ref typed) => typed.$func($arg), + Self::INT32(ref typed) => typed.$func($arg), + Self::INT64(ref typed) => typed.$func($arg), + Self::INT96(ref typed) => typed.$func($arg), + Self::FLOAT(ref typed) => typed.$func($arg), + Self::DOUBLE(ref typed) => typed.$func($arg), + Self::BYTE_ARRAY(ref typed) => typed.$func($arg), + Self::FIXED_LEN_BYTE_ARRAY(ref typed) => typed.$func($arg), + _ => panic!(concat!( + "Cannot call ", + stringify!($func), + " on ColumnIndexMetaData::NONE" + )), + } + }}; + ($self:ident, $func:ident) => {{ + match *$self { + Self::BOOLEAN(ref typed) => typed.$func(), + Self::INT32(ref typed) => typed.$func(), + Self::INT64(ref typed) => typed.$func(), + Self::INT96(ref typed) => typed.$func(), + Self::FLOAT(ref typed) => typed.$func(), + Self::DOUBLE(ref typed) => typed.$func(), + Self::BYTE_ARRAY(ref typed) => typed.$func(), + Self::FIXED_LEN_BYTE_ARRAY(ref typed) => typed.$func(), + _ => panic!(concat!( + "Cannot call ", + stringify!($func), + " on ColumnIndexMetaData::NONE" + )), + } + }}; +} + +/// index +#[derive(Debug, Clone, PartialEq)] +#[allow(non_camel_case_types)] +pub enum ColumnIndexMetaData { + /// Sometimes reading page index from parquet file + /// will only return pageLocations without min_max index, + /// `NONE` represents this lack of index information + NONE, + /// Boolean type index + BOOLEAN(PrimitiveColumnIndex), + /// 32-bit integer type index + INT32(PrimitiveColumnIndex), + /// 64-bit integer type index + INT64(PrimitiveColumnIndex), + /// 96-bit integer type (timestamp) index + INT96(PrimitiveColumnIndex), + /// 32-bit floating point type index + FLOAT(PrimitiveColumnIndex), + /// 64-bit floating point type index + DOUBLE(PrimitiveColumnIndex), + /// Byte array type index + BYTE_ARRAY(ByteArrayColumnIndex), + /// Fixed length byte array type index + FIXED_LEN_BYTE_ARRAY(ByteArrayColumnIndex), +} + +impl ColumnIndexMetaData { + /// Return min/max elements inside ColumnIndex are ordered or not. + pub fn is_sorted(&self) -> bool { + // 0:UNORDERED, 1:ASCENDING ,2:DESCENDING, + if let Some(order) = self.get_boundary_order() { + order != BoundaryOrder::UNORDERED + } else { + false + } + } + + /// Get boundary_order of this page index. + pub fn get_boundary_order(&self) -> Option { + match self { + Self::NONE => None, + Self::BOOLEAN(index) => Some(index.boundary_order), + Self::INT32(index) => Some(index.boundary_order), + Self::INT64(index) => Some(index.boundary_order), + Self::INT96(index) => Some(index.boundary_order), + Self::FLOAT(index) => Some(index.boundary_order), + Self::DOUBLE(index) => Some(index.boundary_order), + Self::BYTE_ARRAY(index) => Some(index.boundary_order), + Self::FIXED_LEN_BYTE_ARRAY(index) => Some(index.boundary_order), + } + } + + /// Returns array of null counts, one per page. + /// + /// Returns `None` if now null counts have been set in the index + pub fn null_counts(&self) -> Option<&Vec> { + match self { + Self::NONE => None, + Self::BOOLEAN(index) => index.null_counts.as_ref(), + Self::INT32(index) => index.null_counts.as_ref(), + Self::INT64(index) => index.null_counts.as_ref(), + Self::INT96(index) => index.null_counts.as_ref(), + Self::FLOAT(index) => index.null_counts.as_ref(), + Self::DOUBLE(index) => index.null_counts.as_ref(), + Self::BYTE_ARRAY(index) => index.null_counts.as_ref(), + Self::FIXED_LEN_BYTE_ARRAY(index) => index.null_counts.as_ref(), + } + } + + /// Returns the number of pages + pub fn num_pages(&self) -> u64 { + colidx_enum_func!(self, num_pages) + } + + /// Returns the number of null values in the page indexed by `idx` + /// + /// Returns `None` if no null counts have been set in the index + pub fn null_count(&self, idx: usize) -> Option { + colidx_enum_func!(self, null_count, idx) + } + + /// Returns the repetition level histogram for the page indexed by `idx` + pub fn repetition_level_histogram(&self, idx: usize) -> Option<&[i64]> { + colidx_enum_func!(self, repetition_level_histogram, idx) + } + + /// Returns the definition level histogram for the page indexed by `idx` + pub fn definition_level_histogram(&self, idx: usize) -> Option<&[i64]> { + colidx_enum_func!(self, definition_level_histogram, idx) + } + + /// Returns whether the page indexed by `idx` consists of all null values + pub fn is_null_page(&self, idx: usize) -> bool { + colidx_enum_func!(self, is_null_page, idx) + } +} + +/// Provides iterators over min and max values of a [`ColumnIndexMetaData`] +pub trait ColumnIndexIterators { + /// Can be one of `bool`, `i32`, `i64`, `Int96`, `f32`, `f64`, [`ByteArray`], + /// or [`FixedLenByteArray`] + type Item; + + /// Return iterator over the min values for the index + fn min_values_iter(colidx: &ColumnIndexMetaData) -> impl Iterator>; + + /// Return iterator over the max values for the index + fn max_values_iter(colidx: &ColumnIndexMetaData) -> impl Iterator>; +} + +macro_rules! column_index_iters { + ($item: ident, $variant: ident, $conv:expr) => { + impl ColumnIndexIterators for $item { + type Item = $item; + + fn min_values_iter( + colidx: &ColumnIndexMetaData, + ) -> impl Iterator> { + if let ColumnIndexMetaData::$variant(index) = colidx { + index.min_values_iter().map($conv) + } else { + panic!(concat!("Wrong type for ", stringify!($item), " iterator")) + } + } + + fn max_values_iter( + colidx: &ColumnIndexMetaData, + ) -> impl Iterator> { + if let ColumnIndexMetaData::$variant(index) = colidx { + index.max_values_iter().map($conv) + } else { + panic!(concat!("Wrong type for ", stringify!($item), " iterator")) + } + } + } + }; +} + +column_index_iters!(bool, BOOLEAN, |v| v.copied()); +column_index_iters!(i32, INT32, |v| v.copied()); +column_index_iters!(i64, INT64, |v| v.copied()); +column_index_iters!(Int96, INT96, |v| v.copied()); +column_index_iters!(f32, FLOAT, |v| v.copied()); +column_index_iters!(f64, DOUBLE, |v| v.copied()); +column_index_iters!(ByteArray, BYTE_ARRAY, |v| v + .map(|v| ByteArray::from(v.to_owned()))); +column_index_iters!(FixedLenByteArray, FIXED_LEN_BYTE_ARRAY, |v| v + .map(|v| FixedLenByteArray::from(v.to_owned()))); diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index ed586bcd33d0..861dc0c3b04e 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -24,7 +24,7 @@ use crate::data_type::private::ParquetValueType; use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96}; use crate::errors::ParquetError; use crate::file::metadata::LevelHistogram; -use crate::file::page_index::index_reader::ColumnIndex; +use crate::file::page_index::index_reader::ThriftColumnIndex; use std::fmt::Debug; /// Typed statistics for one data page @@ -310,7 +310,8 @@ impl NativeIndex { } /// Creates a new [`NativeIndex`] - pub(crate) fn try_new_local(index: ColumnIndex) -> Result { + #[allow(dead_code)] + pub(super) fn try_new_local(index: ThriftColumnIndex) -> Result { let len = index.min_values.len(); // turn Option> into Vec> diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index fbe6d3984596..f35241689e1c 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -15,13 +15,15 @@ // specific language governing permissions and limitations // under the License. -//! Support for reading [`Index`] and [`OffsetIndexMetaData`] from parquet metadata. +//! Support for reading [`ColumnIndexMetaData`] and [`OffsetIndexMetaData`] from parquet metadata. use crate::basic::{BoundaryOrder, Type}; use crate::data_type::Int96; use crate::errors::{ParquetError, Result}; use crate::file::metadata::ColumnChunkMetaData; -use crate::file::page_index::index::{Index, NativeIndex}; +use crate::file::page_index::column_index::{ + ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex, +}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::ChunkReader; use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol}; @@ -38,7 +40,7 @@ pub(crate) fn acc_range(a: Option>, b: Option>) -> Option< } } -/// Reads per-column [`Index`] for all columns of a row group by +/// Reads per-column [`ColumnIndexMetaData`] for all columns of a row group by /// decoding [`ColumnIndex`] . /// /// Returns a vector of `index[column_number]`. @@ -56,7 +58,7 @@ pub(crate) fn acc_range(a: Option>, b: Option>) -> Option< pub fn read_columns_indexes( reader: &R, chunks: &[ColumnChunkMetaData], -) -> Result>, ParquetError> { +) -> Result>, ParquetError> { let fetch = chunks .iter() .fold(None, |range, c| acc_range(range, c.column_index_range())); @@ -77,7 +79,7 @@ pub fn read_columns_indexes( ..usize::try_from(r.end - fetch.start)?], c.column_type(), ), - None => Ok(Index::NONE), + None => Ok(ColumnIndexMetaData::NONE), }) .collect(), ) @@ -134,8 +136,9 @@ pub(crate) fn decode_offset_index(data: &[u8]) -> Result { +pub(super) struct ThriftColumnIndex<'a> { 1: required list null_pages 2: required list<'a> min_values 3: required list<'a> max_values @@ -146,20 +149,25 @@ pub(crate) struct ColumnIndex<'a> { } ); -pub(crate) fn decode_column_index(data: &[u8], column_type: Type) -> Result { +pub(crate) fn decode_column_index( + data: &[u8], + column_type: Type, +) -> Result { let mut prot = ThriftCompactInputProtocol::new(data); - let index = ColumnIndex::try_from(&mut prot)?; + let index = ThriftColumnIndex::try_from(&mut prot)?; let index = match column_type { - Type::BOOLEAN => Index::BOOLEAN(NativeIndex::::try_new_local(index)?), - Type::INT32 => Index::INT32(NativeIndex::::try_new_local(index)?), - Type::INT64 => Index::INT64(NativeIndex::::try_new_local(index)?), - Type::INT96 => Index::INT96(NativeIndex::::try_new_local(index)?), - Type::FLOAT => Index::FLOAT(NativeIndex::::try_new_local(index)?), - Type::DOUBLE => Index::DOUBLE(NativeIndex::::try_new_local(index)?), - Type::BYTE_ARRAY => Index::BYTE_ARRAY(NativeIndex::try_new_local(index)?), + Type::BOOLEAN => { + ColumnIndexMetaData::BOOLEAN(PrimitiveColumnIndex::::try_new(index)?) + } + Type::INT32 => ColumnIndexMetaData::INT32(PrimitiveColumnIndex::::try_new(index)?), + Type::INT64 => ColumnIndexMetaData::INT64(PrimitiveColumnIndex::::try_new(index)?), + Type::INT96 => ColumnIndexMetaData::INT96(PrimitiveColumnIndex::::try_new(index)?), + Type::FLOAT => ColumnIndexMetaData::FLOAT(PrimitiveColumnIndex::::try_new(index)?), + Type::DOUBLE => ColumnIndexMetaData::DOUBLE(PrimitiveColumnIndex::::try_new(index)?), + Type::BYTE_ARRAY => ColumnIndexMetaData::BYTE_ARRAY(ByteArrayColumnIndex::try_new(index)?), Type::FIXED_LEN_BYTE_ARRAY => { - Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new_local(index)?) + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(ByteArrayColumnIndex::try_new(index)?) } }; diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index a8077896db34..ff70e2eca5dd 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -19,6 +19,7 @@ //! //! [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +pub mod column_index; pub mod index; pub mod index_reader; pub mod offset_index; diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index bead048ee20f..5308825b0976 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -1102,13 +1102,15 @@ mod tests { use bytes::Buf; + use crate::file::page_index::column_index::{ + ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex, + }; use crate::file::properties::{EnabledStatistics, WriterProperties}; use crate::basic::{self, BoundaryOrder, ColumnOrder, SortOrder}; use crate::column::reader::ColumnReader; use crate::data_type::private::ParquetValueType; use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type}; - use crate::file::page_index::index::{Index, NativeIndex}; #[allow(deprecated)] use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes}; use crate::file::writer::SerializedFileWriter; @@ -1912,21 +1914,19 @@ mod tests { // only one row group assert_eq!(column_index.len(), 1); - let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] { + let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] { index } else { unreachable!() }; assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING); - let index_in_pages = &index.indexes; //only one page group - assert_eq!(index_in_pages.len(), 1); + assert_eq!(index.num_pages(), 1); - let page0 = &index_in_pages[0]; - let min = page0.min.as_ref().unwrap(); - let max = page0.max.as_ref().unwrap(); + let min = index.min_value(0).unwrap(); + let max = index.max_value(0).unwrap(); assert_eq!(b"Hello", min.as_bytes()); assert_eq!(b"today", max.as_bytes()); @@ -1991,7 +1991,7 @@ mod tests { let boundary_order = &column_index[0][0].get_boundary_order(); assert!(boundary_order.is_some()); matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED); - if let Index::INT32(index) = &column_index[0][0] { + if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] { check_native_page_index( index, 325, @@ -2004,15 +2004,15 @@ mod tests { }; //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0] assert!(&column_index[0][1].is_sorted()); - if let Index::BOOLEAN(index) = &column_index[0][1] { - assert_eq!(index.indexes.len(), 82); + if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] { + assert_eq!(index.num_pages(), 82); assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82); } else { unreachable!() }; //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] assert!(&column_index[0][2].is_sorted()); - if let Index::INT32(index) = &column_index[0][2] { + if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] { check_native_page_index( index, 325, @@ -2025,7 +2025,7 @@ mod tests { }; //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] assert!(&column_index[0][3].is_sorted()); - if let Index::INT32(index) = &column_index[0][3] { + if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] { check_native_page_index( index, 325, @@ -2038,7 +2038,7 @@ mod tests { }; //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] assert!(&column_index[0][4].is_sorted()); - if let Index::INT32(index) = &column_index[0][4] { + if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] { check_native_page_index( index, 325, @@ -2051,7 +2051,7 @@ mod tests { }; //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0] assert!(!&column_index[0][5].is_sorted()); - if let Index::INT64(index) = &column_index[0][5] { + if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] { check_native_page_index( index, 528, @@ -2064,7 +2064,7 @@ mod tests { }; //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0] assert!(&column_index[0][6].is_sorted()); - if let Index::FLOAT(index) = &column_index[0][6] { + if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] { check_native_page_index( index, 325, @@ -2077,7 +2077,7 @@ mod tests { }; //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0] assert!(!&column_index[0][7].is_sorted()); - if let Index::DOUBLE(index) = &column_index[0][7] { + if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] { check_native_page_index( index, 528, @@ -2090,8 +2090,8 @@ mod tests { }; //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0] assert!(!&column_index[0][8].is_sorted()); - if let Index::BYTE_ARRAY(index) = &column_index[0][8] { - check_native_page_index( + if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] { + check_byte_array_page_index( index, 974, get_row_group_min_max_bytes(row_group_metadata, 8), @@ -2103,8 +2103,8 @@ mod tests { }; //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0] assert!(&column_index[0][9].is_sorted()); - if let Index::BYTE_ARRAY(index) = &column_index[0][9] { - check_native_page_index( + if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] { + check_byte_array_page_index( index, 352, get_row_group_min_max_bytes(row_group_metadata, 9), @@ -2117,14 +2117,14 @@ mod tests { //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined] //Notice: min_max values for each page for this col not exits. assert!(!&column_index[0][10].is_sorted()); - if let Index::NONE = &column_index[0][10] { + if let ColumnIndexMetaData::NONE = &column_index[0][10] { assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974); } else { unreachable!() }; //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0] assert!(&column_index[0][11].is_sorted()); - if let Index::INT32(index) = &column_index[0][11] { + if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] { check_native_page_index( index, 325, @@ -2137,7 +2137,7 @@ mod tests { }; //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0] assert!(!&column_index[0][12].is_sorted()); - if let Index::INT32(index) = &column_index[0][12] { + if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] { check_native_page_index( index, 325, @@ -2151,17 +2151,31 @@ mod tests { } fn check_native_page_index( - row_group_index: &NativeIndex, + row_group_index: &PrimitiveColumnIndex, page_size: usize, min_max: (&[u8], &[u8]), boundary_order: BoundaryOrder, ) { - assert_eq!(row_group_index.indexes.len(), page_size); + assert_eq!(row_group_index.num_pages() as usize, page_size); assert_eq!(row_group_index.boundary_order, boundary_order); - row_group_index.indexes.iter().all(|x| { - x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap() - && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap() - }); + assert!(row_group_index.min_values().iter().all(|x| { + x >= &T::try_from_le_slice(min_max.0).unwrap() + && x <= &T::try_from_le_slice(min_max.1).unwrap() + })); + } + + fn check_byte_array_page_index( + row_group_index: &ByteArrayColumnIndex, + page_size: usize, + min_max: (&[u8], &[u8]), + boundary_order: BoundaryOrder, + ) { + assert_eq!(row_group_index.num_pages() as usize, page_size); + assert_eq!(row_group_index.boundary_order, boundary_order); + for i in 0..row_group_index.num_pages() as usize { + let x = row_group_index.min_value(i).unwrap(); + assert!(x >= min_max.0 && x <= min_max.1); + } } fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) { @@ -2402,12 +2416,11 @@ mod tests { assert_eq!(c.len(), 1); match &c[0] { - Index::FIXED_LEN_BYTE_ARRAY(v) => { - assert_eq!(v.indexes.len(), 1); - let page_idx = &v.indexes[0]; - assert_eq!(page_idx.null_count.unwrap(), 1); - assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]); - assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]); + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => { + assert_eq!(v.num_pages(), 1); + assert_eq!(v.null_count(0).unwrap(), 1); + assert_eq!(v.min_value(0).unwrap(), &[0; 11]); + assert_eq!(v.max_value(0).unwrap(), &[5; 11]); } _ => unreachable!(), } @@ -2538,11 +2551,11 @@ mod tests { // test that we got the index matching the row group match pg_idx { - Index::INT32(int_idx) => { + ColumnIndexMetaData::INT32(int_idx) => { let min = col_stats.min_bytes_opt().unwrap().get_i32_le(); let max = col_stats.max_bytes_opt().unwrap().get_i32_le(); - assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref()); - assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref()); + assert_eq!(int_idx.min_value(0), Some(min).as_ref()); + assert_eq!(int_idx.max_value(0), Some(max).as_ref()); } _ => panic!("wrong stats type"), } @@ -2583,11 +2596,11 @@ mod tests { // test that we got the index matching the row group match pg_idx { - Index::INT32(int_idx) => { + ColumnIndexMetaData::INT32(int_idx) => { let min = col_stats.min_bytes_opt().unwrap().get_i32_le(); let max = col_stats.max_bytes_opt().unwrap().get_i32_le(); - assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref()); - assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref()); + assert_eq!(int_idx.min_value(0), Some(min).as_ref()); + assert_eq!(int_idx.max_value(0), Some(max).as_ref()); } _ => panic!("wrong stats type"), } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 7db517ced5b2..65b96246ea03 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1062,7 +1062,7 @@ mod tests { use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; use crate::data_type::{BoolType, ByteArrayType, Int32Type}; - use crate::file::page_index::index::Index; + use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::properties::EnabledStatistics; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ @@ -2083,9 +2083,9 @@ mod tests { assert_eq!(column_index[0].len(), 2); // 2 column let a_idx = &column_index[0][0]; - assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}"); + assert!(matches!(a_idx, ColumnIndexMetaData::INT32(_)), "{a_idx:?}"); let b_idx = &column_index[0][1]; - assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); + assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}"); } #[test] @@ -2169,16 +2169,16 @@ mod tests { let column_index = reader.metadata().column_index().unwrap(); assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 1); - let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] { - assert_eq!(index.indexes.len(), 1); - &index.indexes[0] + let col_idx = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] { + assert_eq!(index.num_pages(), 1); + index } else { unreachable!() }; - assert!(col_idx.repetition_level_histogram().is_none()); - assert!(col_idx.definition_level_histogram().is_some()); - check_def_hist(col_idx.definition_level_histogram().unwrap().values()); + assert!(col_idx.repetition_level_histogram(0).is_none()); + assert!(col_idx.definition_level_histogram(0).is_some()); + check_def_hist(col_idx.definition_level_histogram(0).unwrap()); assert!(reader.metadata().offset_index().is_some()); let offset_index = reader.metadata().offset_index().unwrap(); @@ -2324,15 +2324,15 @@ mod tests { let column_index = reader.metadata().column_index().unwrap(); assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 1); - let col_idx = if let Index::INT32(index) = &column_index[0][0] { - assert_eq!(index.indexes.len(), 1); - &index.indexes[0] + let col_idx = if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] { + assert_eq!(index.num_pages(), 1); + index } else { unreachable!() }; - check_def_hist(col_idx.definition_level_histogram().unwrap().values()); - check_rep_hist(col_idx.repetition_level_histogram().unwrap().values()); + check_def_hist(col_idx.definition_level_histogram(0).unwrap()); + check_rep_hist(col_idx.repetition_level_histogram(0).unwrap()); assert!(reader.metadata().offset_index().is_some()); let offset_index = reader.metadata().offset_index().unwrap(); diff --git a/parquet/tests/arrow_reader/io/mod.rs b/parquet/tests/arrow_reader/io/mod.rs index b31f295755b0..9cafcd714e89 100644 --- a/parquet/tests/arrow_reader/io/mod.rs +++ b/parquet/tests/arrow_reader/io/mod.rs @@ -49,7 +49,6 @@ use parquet::data_type::AsBytes; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetOffsetIndex}; use parquet::file::properties::WriterProperties; use parquet::file::FOOTER_SIZE; -use parquet::format::PageLocation; use parquet::schema::types::SchemaDescriptor; use std::collections::BTreeMap; use std::fmt::Display; @@ -257,7 +256,7 @@ struct TestColumnChunk { dictionary_page_location: Option, /// The location of the data pages in the file - page_locations: Vec, + page_locations: Vec, } /// Information about the pages in a single row group @@ -287,8 +286,11 @@ impl TestRowGroups { .enumerate() .map(|(col_idx, col_meta)| { let column_name = col_meta.column_descr().name().to_string(); - let page_locations = - offset_index[rg_index][col_idx].page_locations().to_vec(); + let page_locations = offset_index[rg_index][col_idx] + .page_locations() + .iter() + .map(parquet::format::PageLocation::from) + .collect(); let dictionary_page_location = col_meta.dictionary_page_offset(); // We can find the byte range of the entire column chunk diff --git a/parquet/tests/encryption/encryption_util.rs b/parquet/tests/encryption/encryption_util.rs index bf7fd08109f6..6817491b3024 100644 --- a/parquet/tests/encryption/encryption_util.rs +++ b/parquet/tests/encryption/encryption_util.rs @@ -191,11 +191,11 @@ pub(crate) fn verify_column_indexes(metadata: &ParquetMetaData) { let column_index = &column_index[0][float_col_idx]; match column_index { - parquet::file::page_index::index::Index::FLOAT(float_index) => { - assert_eq!(float_index.indexes.len(), 1); - assert_eq!(float_index.indexes[0].min, Some(0.0f32)); - assert!(float_index.indexes[0] - .max + parquet::file::page_index::column_index::ColumnIndexMetaData::FLOAT(float_index) => { + assert_eq!(float_index.num_pages(), 1); + assert_eq!(float_index.min_value(0), Some(&0.0f32)); + assert!(float_index + .max_value(0) .is_some_and(|max| (max - 53.9).abs() < 1e-6)); } _ => {