diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs index 6f5f56745e90..c9a6cf3b762c 100644 --- a/parquet/benches/metadata.rs +++ b/parquet/benches/metadata.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use parquet::basic::{Encoding, PageType, Type as PhysicalType}; use parquet::file::metadata::{ - ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, - ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData, + ColumnChunkMetaData, FileMetaData, LevelHistogram, PageEncodingStats, ParquetMetaData, + ParquetMetaDataOptions, ParquetMetaDataReader, ParquetMetaDataWriter, ParquetStatisticsPolicy, + RowGroupMetaData, }; use parquet::file::statistics::Statistics; use parquet::file::writer::TrackedWrite; @@ -40,7 +41,7 @@ use parquet::file::serialized_reader::ReadOptionsBuilder; const NUM_COLUMNS: usize = 10_000; const NUM_ROW_GROUPS: usize = 10; -fn encoded_meta() -> Vec { +fn encoded_meta(is_nullable: bool, has_lists: bool) -> Vec { let mut rng = seedable_rng(); let mut column_desc_ptrs: Vec = Vec::with_capacity(NUM_COLUMNS); @@ -66,6 +67,23 @@ fn encoded_meta() -> Vec { let stats = Statistics::float(Some(rng.random()), Some(rng.random()), None, Some(0), false); + let (var_size, rep_hist, def_hist) = match (is_nullable, has_lists) { + (true, true) => { + let rep_hist = LevelHistogram::from(vec![1500i64; 2]); + let def_hist = LevelHistogram::from(vec![1000i64; 3]); + ( + Some(rng.random_range(0..1000000000)), + Some(rep_hist), + Some(def_hist), + ) + } + (true, false) => { + let def_hist = LevelHistogram::from(vec![1500i64; 2]); + (Some(rng.random_range(0..1000000000)), None, Some(def_hist)) + } + (_, _) => (None, None, None), + }; + let row_groups = (0..NUM_ROW_GROUPS) .map(|i| { let columns = (0..NUM_COLUMNS) @@ -94,6 +112,9 @@ fn encoded_meta() -> Vec { .set_offset_index_length(Some(rng.random_range(1..100000))) .set_column_index_offset(Some(rng.random_range(0..2000000000))) .set_column_index_length(Some(rng.random_range(1..100000))) + .set_unencoded_byte_array_data_bytes(var_size) + .set_repetition_level_histogram(rep_hist.clone()) + .set_definition_level_histogram(def_hist.clone()) .build() .unwrap() }) @@ -147,61 +168,137 @@ fn criterion_benchmark(c: &mut Criterion) { let data = Bytes::from(data); c.bench_function("open(default)", |b| { - b.iter(|| SerializedFileReader::new(data.clone()).unwrap()) + b.iter(|| { + let options = ReadOptionsBuilder::new() + .with_encoding_stats_as_mask(false) + .build(); + SerializedFileReader::new_with_options(data.clone(), options).unwrap() + }) }); c.bench_function("open(page index)", |b| { b.iter(|| { - let options = ReadOptionsBuilder::new().with_page_index().build(); + let options = ReadOptionsBuilder::new() + .with_page_index() + .with_encoding_stats_as_mask(false) + .build(); SerializedFileReader::new_with_options(data.clone(), options).unwrap() }) }); let meta_data = get_footer_bytes(data.clone()); + let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false); c.bench_function("decode parquet metadata", |b| { b.iter(|| { - ParquetMetaDataReader::decode_metadata(&meta_data).unwrap(); + ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options)) + .unwrap(); }) }); - let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false); - c.bench_function("decode metadata (full stats)", |b| { + let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap(); + let options = ParquetMetaDataOptions::new() + .with_schema(schema) + .with_encoding_stats_as_mask(false); + c.bench_function("decode metadata with schema", |b| { b.iter(|| { ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options)) .unwrap(); }) }); - let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap(); - let options = ParquetMetaDataOptions::new().with_schema(schema); - c.bench_function("decode metadata with schema", |b| { + let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true); + c.bench_function("decode metadata with stats mask", |b| { b.iter(|| { ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options)) .unwrap(); }) }); - let buf: Bytes = black_box(encoded_meta()).into(); - c.bench_function("decode parquet metadata (wide)", |b| { + let options = + ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll); + c.bench_function("decode metadata with skip PES", |b| { b.iter(|| { - ParquetMetaDataReader::decode_metadata(&buf).unwrap(); + ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options)) + .unwrap(); }) }); + let options = ParquetMetaDataOptions::new() + .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll) + .with_encoding_stats_as_mask(false); + c.bench_function("decode metadata with skip column stats", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options)) + .unwrap(); + }) + }); + + let buf: Bytes = black_box(encoded_meta(false, false)).into(); let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false); - c.bench_function("decode metadata (wide) (full stats)", |b| { + c.bench_function("decode parquet metadata (wide)", |b| { b.iter(|| { ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap(); }) }); let schema = ParquetMetaDataReader::decode_schema(&buf).unwrap(); - let options = ParquetMetaDataOptions::new().with_schema(schema); + let options = ParquetMetaDataOptions::new() + .with_schema(schema) + .with_encoding_stats_as_mask(false); c.bench_function("decode metadata (wide) with schema", |b| { b.iter(|| { ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap(); }) }); + + let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true); + c.bench_function("decode metadata (wide) with stats mask", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap(); + }) + }); + + let options = + ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll); + c.bench_function("decode metadata (wide) with skip PES", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap(); + }) + }); + + let options = ParquetMetaDataOptions::new() + .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll) + .with_encoding_stats_as_mask(false); + c.bench_function("decode metadata (wide) with skip column stats", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap(); + }) + }); + + let buf: Bytes = black_box(encoded_meta(true, true)).into(); + c.bench_function("decode parquet metadata w/ size stats (wide)", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata(&buf).unwrap(); + }) + }); + + let options = + ParquetMetaDataOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll); + c.bench_function("decode metadata (wide) with skip size stats", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap(); + }) + }); + + let options = ParquetMetaDataOptions::new() + .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll) + .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll) + .with_size_stats_policy(ParquetStatisticsPolicy::SkipAll); + c.bench_function("decode metadata (wide) with skip all stats", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap(); + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a626076ebdd7..d8dac0dac54b 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -581,6 +581,24 @@ impl ArrowReaderOptions { self } + /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`. + /// + /// [`statistics`]: + /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912 + pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self { + self.metadata_options.set_column_stats_policy(policy); + self + } + + /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`. + /// + /// [`size_statistics`]: + /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936 + pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self { + self.metadata_options.set_size_stats_policy(policy); + self + } + /// Provide the file decryption properties to use when reading encrypted parquet files. /// /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided. @@ -1524,14 +1542,15 @@ pub(crate) mod tests { } #[test] - fn test_page_encoding_stats_skipped() { + fn test_stats_stats_skipped() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/alltypes_tiny_pages.parquet"); let file = File::open(path).unwrap(); // test skipping all - let arrow_options = - ArrowReaderOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll); + let arrow_options = ArrowReaderOptions::new() + .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll) + .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll); let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( file.try_clone().unwrap(), arrow_options, @@ -1542,12 +1561,14 @@ pub(crate) mod tests { for column in row_group_metadata.columns() { assert!(column.page_encoding_stats().is_none()); assert!(column.page_encoding_stats_mask().is_none()); + assert!(column.statistics().is_none()); } // test skipping all but one column and converting to mask let arrow_options = ArrowReaderOptions::new() .with_encoding_stats_as_mask(true) - .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0])); + .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0])) + .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0])); let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( file.try_clone().unwrap(), arrow_options, @@ -1558,6 +1579,47 @@ pub(crate) mod tests { for (idx, column) in row_group_metadata.columns().iter().enumerate() { assert!(column.page_encoding_stats().is_none()); assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0); + assert_eq!(column.statistics().is_some(), idx == 0); + } + } + + #[test] + fn test_size_stats_stats_skipped() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/repeated_primitive_no_list.parquet"); + let file = File::open(path).unwrap(); + + // test skipping all + let arrow_options = + ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll); + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + arrow_options, + ) + .unwrap(); + + let row_group_metadata = builder.metadata.row_group(0); + for column in row_group_metadata.columns() { + assert!(column.repetition_level_histogram().is_none()); + assert!(column.definition_level_histogram().is_none()); + assert!(column.unencoded_byte_array_data_bytes().is_none()); + } + + // test skipping all but one column and converting to mask + let arrow_options = ArrowReaderOptions::new() + .with_encoding_stats_as_mask(true) + .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1])); + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + arrow_options, + ) + .unwrap(); + + let row_group_metadata = builder.metadata.row_group(0); + for (idx, column) in row_group_metadata.columns().iter().enumerate() { + assert_eq!(column.repetition_level_histogram().is_some(), idx == 1); + assert_eq!(column.definition_level_histogram().is_some(), idx == 1); + assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1); } } diff --git a/parquet/src/file/metadata/options.rs b/parquet/src/file/metadata/options.rs index 0bd0dfd9e30a..e91f5bdd1028 100644 --- a/parquet/src/file/metadata/options.rs +++ b/parquet/src/file/metadata/options.rs @@ -92,6 +92,8 @@ pub struct ParquetMetaDataOptions { schema_descr: Option, encoding_stats_as_mask: bool, encoding_stats_policy: ParquetStatisticsPolicy, + column_stats_policy: ParquetStatisticsPolicy, + size_stats_policy: ParquetStatisticsPolicy, } impl Default for ParquetMetaDataOptions { @@ -100,6 +102,8 @@ impl Default for ParquetMetaDataOptions { schema_descr: None, encoding_stats_as_mask: true, encoding_stats_policy: ParquetStatisticsPolicy::KeepAll, + column_stats_policy: ParquetStatisticsPolicy::KeepAll, + size_stats_policy: ParquetStatisticsPolicy::KeepAll, } } } @@ -190,6 +194,56 @@ impl ParquetMetaDataOptions { self.set_encoding_stats_policy(policy); self } + + /// Returns whether to skip decoding the [`statistics`] in the Parquet `ColumnMetaData` + /// for the column indexed by `col_index`. + /// + /// [`statistics`]: + /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912 + pub fn skip_column_stats(&self, col_index: usize) -> bool { + self.column_stats_policy.is_skip(col_index) + } + + /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`. + /// + /// The default policy is to decode all `statistics`. + /// + /// [`statistics`]: + /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912 + pub fn set_column_stats_policy(&mut self, policy: ParquetStatisticsPolicy) { + self.column_stats_policy = policy; + } + + /// Call [`Self::set_column_stats_policy`] and return `Self` for chaining. + pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self { + self.set_column_stats_policy(policy); + self + } + + /// Returns whether to skip decoding the [`size_statistics`] in the Parquet `ColumnMetaData` + /// for the column indexed by `col_index`. + /// + /// [`size_statistics`]: + /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936 + pub fn skip_size_stats(&self, col_index: usize) -> bool { + self.size_stats_policy.is_skip(col_index) + } + + /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`. + /// + /// The default policy is to decode all `size_statistics`. + /// + /// [`size_statistics`]: + /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936 + pub fn set_size_stats_policy(&mut self, policy: ParquetStatisticsPolicy) { + self.size_stats_policy = policy; + } + + /// Call [`Self::set_size_stats_policy`] and return `Self` for chaining. + pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self { + self.set_size_stats_policy(policy); + self + } } #[cfg(test)] diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 154fde77edb9..b7e8aab7cc48 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -411,10 +411,14 @@ fn read_column_metadata<'a>( let mut skip_pes = false; let mut pes_mask = true; + let mut skip_col_stats = false; + let mut skip_size_stats = false; if let Some(opts) = options { skip_pes = opts.skip_encoding_stats(col_index); pes_mask = opts.encoding_stats_as_mask(); + skip_col_stats = opts.skip_column_stats(col_index); + skip_size_stats = opts.skip_size_stats(col_index); } // struct ColumnMetaData { @@ -483,7 +487,7 @@ fn read_column_metadata<'a>( 11 => { column.dictionary_page_offset = Some(i64::read_thrift(&mut *prot)?); } - 12 => { + 12 if !skip_col_stats => { column.statistics = convert_stats(column_descr, Some(Statistics::read_thrift(&mut *prot)?))?; } @@ -503,7 +507,7 @@ fn read_column_metadata<'a>( 15 => { column.bloom_filter_length = Some(i32::read_thrift(&mut *prot)?); } - 16 => { + 16 if !skip_size_stats => { let val = SizeStatistics::read_thrift(&mut *prot)?; column.unencoded_byte_array_data_bytes = val.unencoded_byte_array_data_bytes; column.repetition_level_histogram = diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 68b44f3cbbde..b3b6383f78bb 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -182,6 +182,24 @@ impl ReadOptionsBuilder { self } + /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`. + /// + /// [`statistics`]: + /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912 + pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self { + self.metadata_options.set_column_stats_policy(policy); + self + } + + /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`. + /// + /// [`size_statistics`]: + /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936 + pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self { + self.metadata_options.set_size_stats_policy(policy); + self + } + /// Seal the builder and return the read options pub fn build(self) -> ReadOptions { let props = self @@ -1912,6 +1930,7 @@ mod tests { // test skipping all let options = ReadOptionsBuilder::new() .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll) + .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll) .build(); let file_reader = Arc::new( SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(), @@ -1921,12 +1940,14 @@ mod tests { for column in row_group_metadata.columns() { assert!(column.page_encoding_stats().is_none()); assert!(column.page_encoding_stats_mask().is_none()); + assert!(column.statistics().is_none()); } // test skipping all but one column let options = ReadOptionsBuilder::new() .with_encoding_stats_as_mask(true) .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0])) + .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0])) .build(); let file_reader = Arc::new( SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(), @@ -1936,6 +1957,43 @@ mod tests { for (idx, column) in row_group_metadata.columns().iter().enumerate() { assert!(column.page_encoding_stats().is_none()); assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0); + assert_eq!(column.statistics().is_some(), idx == 0); + } + } + + #[test] + fn test_file_reader_size_stats_skipped() { + let file = get_test_file("repeated_primitive_no_list.parquet"); + + // test skipping all + let options = ReadOptionsBuilder::new() + .with_size_stats_policy(ParquetStatisticsPolicy::SkipAll) + .build(); + let file_reader = Arc::new( + SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(), + ); + + let row_group_metadata = file_reader.metadata.row_group(0); + for column in row_group_metadata.columns() { + assert!(column.repetition_level_histogram().is_none()); + assert!(column.definition_level_histogram().is_none()); + assert!(column.unencoded_byte_array_data_bytes().is_none()); + } + + // test skipping all but one column + let options = ReadOptionsBuilder::new() + .with_encoding_stats_as_mask(true) + .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1])) + .build(); + let file_reader = Arc::new( + SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(), + ); + + let row_group_metadata = file_reader.metadata.row_group(0); + for (idx, column) in row_group_metadata.columns().iter().enumerate() { + assert_eq!(column.repetition_level_histogram().is_some(), idx == 1); + assert_eq!(column.definition_level_histogram().is_some(), idx == 1); + assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1); } }