Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 11 additions & 28 deletions parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use parquet::basic::{Encoding, PageType, Type as PhysicalType};
use parquet::file::metadata::{
ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions,
ParquetMetaDataReader, ParquetMetaDataWriter, ParquetStatisticsPolicy, RowGroupMetaData,
ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData,
};
use parquet::file::statistics::Statistics;
use parquet::file::writer::TrackedWrite;
Expand Down Expand Up @@ -164,26 +164,17 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap();
let options = ParquetMetaDataOptions::new().with_schema(schema);
c.bench_function("decode metadata with schema", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
.unwrap();
})
});

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
c.bench_function("decode metadata with stats mask", |b| {
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
c.bench_function("decode metadata (full stats)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
.unwrap();
})
});

let options =
ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
c.bench_function("decode metadata with skip PES", |b| {
let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap();
let options = ParquetMetaDataOptions::new().with_schema(schema);
c.bench_function("decode metadata with schema", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
.unwrap();
Expand All @@ -197,24 +188,16 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let schema = ParquetMetaDataReader::decode_schema(&buf).unwrap();
let options = ParquetMetaDataOptions::new().with_schema(schema);
c.bench_function("decode metadata (wide) with schema", |b| {
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
c.bench_function("decode metadata (wide) (full stats)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
c.bench_function("decode metadata (wide) with stats mask", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});

let options =
ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
c.bench_function("decode metadata (wide) with skip PES", |b| {
let schema = ParquetMetaDataReader::decode_schema(&buf).unwrap();
let options = ParquetMetaDataOptions::new().with_schema(schema);
c.bench_function("decode metadata (wide) with schema", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
Expand Down
5 changes: 4 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4433,7 +4433,10 @@ mod tests {
.unwrap();

// check that the read metadata is also correct
let options = ReadOptionsBuilder::new().with_page_index().build();
let options = ReadOptionsBuilder::new()
.with_page_index()
.with_encoding_stats_as_mask(false)
.build();
let reader = SerializedFileReader::new_with_options(file, options).unwrap();

let rowgroup = reader.get_row_group(0).expect("row group missing");
Expand Down
16 changes: 15 additions & 1 deletion parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,9 @@ pub fn parquet_column<'a>(
#[cfg(test)]
mod test {
use crate::arrow::ArrowWriter;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use crate::file::metadata::{
ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, ParquetMetaDataWriter,
};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
Expand All @@ -511,13 +513,17 @@ mod test {
let parquet_bytes = create_parquet_file();

// read the metadata from the file WITHOUT the page index structures
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let original_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.parse_and_finish(&parquet_bytes)
.unwrap();

// this should error because the page indexes are not present, but have offsets specified
let metadata_bytes = metadata_to_bytes(&original_metadata);
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let err = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.with_page_indexes(true) // there are no page indexes in the metadata
.parse_and_finish(&metadata_bytes)
.err()
Expand All @@ -533,7 +539,9 @@ mod test {
let parquet_bytes = create_parquet_file();

// read the metadata from the file
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let original_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.parse_and_finish(&parquet_bytes)
.unwrap();

Expand All @@ -545,7 +553,9 @@ mod test {
"metadata is subset of parquet"
);

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.parse_and_finish(&metadata_bytes)
.unwrap();

Expand All @@ -559,14 +569,18 @@ mod test {

// read the metadata from the file including the page index structures
// (which are stored elsewhere in the footer)
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let original_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_metadata_options(Some(options))
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();
Expand Down
77 changes: 75 additions & 2 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,10 @@ impl ColumnChunkMetaData {

/// Returns the page encoding statistics, or `None` if no page encoding statistics
/// are available (or they were converted to a mask).
///
/// Note: By default, this crate converts page encoding statistics to a mask for performance
/// reasons. To get the full statistics, you must set [`ParquetMetaDataOptions::with_encoding_stats_as_mask`]
/// to `false`.
pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
match self.encoding_stats.as_ref() {
Some(ParquetPageEncodingStats::Full(stats)) => Some(stats),
Expand All @@ -1072,6 +1076,8 @@ impl ColumnChunkMetaData {
/// Returns the page encoding statistics reduced to a bitmask, or `None` if statistics are
/// not available (or they were left in their original form).
///
/// Note: This is the default behavior for this crate.
///
/// The [`PageEncodingStats`] struct was added to the Parquet specification specifically to
/// enable fast determination of whether all pages in a column chunk are dictionary encoded
/// (see <https://github.com/apache/parquet-format/pull/16>).
Expand Down Expand Up @@ -1667,7 +1673,9 @@ impl OffsetIndexBuilder {
mod tests {
use super::*;
use crate::basic::{PageType, SortOrder};
use crate::file::metadata::thrift::tests::{read_column_chunk, read_row_group};
use crate::file::metadata::thrift::tests::{
read_column_chunk, read_column_chunk_with_options, read_row_group,
};

#[test]
fn test_row_group_metadata_thrift_conversion() {
Expand Down Expand Up @@ -1822,7 +1830,72 @@ mod tests {
let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
col_metadata.write_thrift(&mut writer).unwrap();
let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap();
let col_chunk_res = read_column_chunk(&mut buf, column_descr.clone()).unwrap();

let expected_metadata = ColumnChunkMetaData::builder(column_descr)
.set_encodings_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_file_path("file_path".to_owned())
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_dictionary_page_offset(Some(5000))
.set_page_encoding_stats_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_bloom_filter_offset(Some(6000))
.set_bloom_filter_length(Some(25))
.set_offset_index_offset(Some(7000))
.set_offset_index_length(Some(25))
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
.set_unencoded_byte_array_data_bytes(Some(2000))
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
.set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
.build()
.unwrap();

assert_eq!(col_chunk_res, expected_metadata);
}

#[test]
fn test_column_chunk_metadata_thrift_conversion_full_stats() {
let column_descr = get_test_schema_descr().column(0);
let stats = vec![
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::PLAIN,
count: 3,
},
PageEncodingStats {
page_type: PageType::DATA_PAGE,
encoding: Encoding::RLE,
count: 5,
},
];
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.set_encodings_mask(EncodingMask::new_from_encodings(
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
.set_page_encoding_stats(stats)
.build()
.unwrap();

let mut buf = Vec::new();
let mut writer = ThriftCompactOutputProtocol::new(&mut buf);
col_metadata.write_thrift(&mut writer).unwrap();

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
let col_chunk_res =
read_column_chunk_with_options(&mut buf, column_descr, Some(&options)).unwrap();

assert_eq!(col_chunk_res, col_metadata);
}
Expand Down
20 changes: 18 additions & 2 deletions parquet/src/file/metadata/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,23 @@ impl ParquetStatisticsPolicy {
/// [`ParquetMetaData`]: crate::file::metadata::ParquetMetaData
/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
#[derive(Default, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct ParquetMetaDataOptions {
schema_descr: Option<SchemaDescPtr>,
encoding_stats_as_mask: bool,
encoding_stats_policy: ParquetStatisticsPolicy,
}

impl Default for ParquetMetaDataOptions {
fn default() -> Self {
Self {
schema_descr: None,
encoding_stats_as_mask: true,
encoding_stats_policy: ParquetStatisticsPolicy::KeepAll,
}
}
}

impl ParquetMetaDataOptions {
/// Return a new default [`ParquetMetaDataOptions`].
pub fn new() -> Self {
Expand All @@ -118,7 +128,7 @@ impl ParquetMetaDataOptions {
}

/// Returns whether to present the [`encoding_stats`] field of the Parquet `ColumnMetaData`
/// as a bitmask (defaults to `false`).
/// as a bitmask (defaults to `true`).
///
/// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
/// might be desirable.
Expand Down Expand Up @@ -193,6 +203,12 @@ mod tests {
};
use std::{io::Read, sync::Arc};

#[test]
fn test_options_default() {
let options = ParquetMetaDataOptions::default();
assert!(options.encoding_stats_as_mask());
}

#[test]
fn test_provide_schema() {
let mut buf: Vec<u8> = Vec::new();
Expand Down
14 changes: 11 additions & 3 deletions parquet/src/file/metadata/thrift/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ fn read_column_metadata<'a>(
let mut seen_mask = 0u16;

let mut skip_pes = false;
let mut pes_mask = false;
let mut pes_mask = true;

if let Some(opts) = options {
skip_pes = opts.skip_encoding_stats(col_index);
Expand Down Expand Up @@ -1704,7 +1704,7 @@ write_thrift_field!(RustBoundingBox, FieldType::Struct);
pub(crate) mod tests {
use crate::errors::Result;
use crate::file::metadata::thrift::{BoundingBox, SchemaElement, write_schema};
use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaDataOptions, RowGroupMetaData};
use crate::parquet_thrift::tests::test_roundtrip;
use crate::parquet_thrift::{
ElementType, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, read_thrift_vec,
Expand All @@ -1726,9 +1726,17 @@ pub(crate) mod tests {
pub(crate) fn read_column_chunk(
buf: &mut [u8],
column_descr: Arc<ColumnDescriptor>,
) -> Result<ColumnChunkMetaData> {
read_column_chunk_with_options(buf, column_descr, None)
}

pub(crate) fn read_column_chunk_with_options(
buf: &mut [u8],
column_descr: Arc<ColumnDescriptor>,
options: Option<&ParquetMetaDataOptions>,
) -> Result<ColumnChunkMetaData> {
let mut reader = ThriftSliceInputProtocol::new(buf);
crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, None)
crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, 0, options)
}

pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
Expand Down
5 changes: 4 additions & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1855,7 +1855,10 @@ mod tests {
fn test_file_reader_optional_metadata() {
// file with optional metadata: bloom filters, encoding stats, column index and offset index.
let file = get_test_file("data_index_bloom_encoding_stats.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let options = ReadOptionsBuilder::new()
.with_encoding_stats_as_mask(false)
.build();
let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());

let row_group_metadata = file_reader.metadata.row_group(0);
let col0_metadata = row_group_metadata.column(0);
Expand Down
Loading