Skip to content
Merged
54 changes: 53 additions & 1 deletion parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,35 @@ fn get_footer_bytes(data: Bytes) -> Bytes {
data.slice(meta_start..meta_end)
}

#[cfg(feature = "arrow")]
fn rewrite_file(bytes: Bytes) -> (Bytes, FileMetaData) {
use arrow::array::RecordBatchReader;
use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter};
use parquet::file::properties::{EnabledStatistics, WriterProperties};

let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(bytes)
.expect("parquet open")
.build()
.expect("parquet open");
let writer_properties = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build();
let mut output = Vec::new();
let mut parquet_writer = ArrowWriter::try_new(
&mut output,
parquet_reader.schema(),
Some(writer_properties),
)
.expect("create arrow writer");

for maybe_batch in parquet_reader {
let batch = maybe_batch.expect("reading batch");
parquet_writer.write(&batch).expect("writing data");
}
let file_meta = parquet_writer.close().expect("finalizing file");
(output.into(), file_meta)
}

fn criterion_benchmark(c: &mut Criterion) {
// Read file into memory to isolate filesystem performance
let file = "../parquet-testing/data/alltypes_tiny_pages.parquet";
Expand All @@ -168,7 +197,7 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let meta_data = get_footer_bytes(data);
let meta_data = get_footer_bytes(data.clone());
c.bench_function("decode file metadata", |b| {
b.iter(|| {
parquet::thrift::bench_file_metadata(&meta_data);
Expand All @@ -181,6 +210,29 @@ fn criterion_benchmark(c: &mut Criterion) {
parquet::thrift::bench_file_metadata(&buf);
})
});

// rewrite file with page statistics. then read page headers.
#[cfg(feature = "arrow")]
let (file_bytes, metadata) = rewrite_file(data.clone());
#[cfg(feature = "arrow")]
c.bench_function("page headers", |b| {
b.iter(|| {
metadata.row_groups.iter().for_each(|rg| {
rg.columns.iter().for_each(|col| {
if let Some(col_meta) = &col.meta_data {
if let Some(dict_offset) = col_meta.dictionary_page_offset {
parquet::thrift::bench_page_header(
&file_bytes.slice(dict_offset as usize..),
);
}
parquet::thrift::bench_page_header(
&file_bytes.slice(col_meta.data_page_offset as usize..),
);
}
});
});
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::bloom_filter::{
chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
};
Expand All @@ -39,7 +40,6 @@ use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::schema::types::SchemaDescriptor;

pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
Expand Down Expand Up @@ -737,17 +737,17 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
BloomFilterAlgorithm::BLOCK => {
// this match exists to future proof the singleton algorithm enum
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED(_) => {
BloomFilterCompression::UNCOMPRESSED => {
// this match exists to future proof the singleton compression enum
}
}
match header.hash {
BloomFilterHash::XXHASH(_) => {
BloomFilterHash::XXHASH => {
// this match exists to future proof the singleton hash enum
}
}
Expand Down
5 changes: 3 additions & 2 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;

use crate::file::page_index::offset_index::PageLocation;

/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
/// scanning a parquet file
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -162,7 +164,7 @@ impl RowSelection {
/// Note: this method does not make any effort to combine consecutive ranges, nor coalesce
/// ranges that are close together. This is instead delegated to the IO subsystem to optimise,
/// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec<Range<u64>> {
pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec<Range<u64>> {
let mut ranges: Vec<Range<u64>> = vec![];
let mut row_offset = 0;

Expand Down Expand Up @@ -640,7 +642,6 @@ fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelec
#[cfg(test)]
mod tests {
use super::*;
use crate::format::PageLocation;
use rand::{rng, Rng};

#[test]
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::arrow::arrow_reader::{
};
use crate::arrow::ProjectionMask;

use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
use crate::bloom_filter::{
chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
};
Expand All @@ -53,7 +54,6 @@ use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -446,17 +446,17 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
BloomFilterAlgorithm::BLOCK => {
// this match exists to future proof the singleton algorithm enum
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED(_) => {
BloomFilterCompression::UNCOMPRESSED => {
// this match exists to future proof the singleton compression enum
}
}
match header.hash {
BloomFilterHash::XXHASH(_) => {
BloomFilterHash::XXHASH => {
// this match exists to future proof the singleton hash enum
}
}
Expand Down
13 changes: 7 additions & 6 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ mod store;
pub use store::*;

use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
arrow::{arrow_writer::ArrowWriterOptions, ArrowWriter},
errors::{ParquetError, Result},
file::{metadata::RowGroupMetaData, properties::WriterProperties},
format::{FileMetaData, KeyValue},
file::{
metadata::{KeyValue, RowGroupMetaData},
properties::WriterProperties,
},
};
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
Expand Down Expand Up @@ -245,7 +246,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
pub async fn finish(&mut self) -> Result<FileMetaData> {
pub async fn finish(&mut self) -> Result<crate::format::FileMetaData> {
let metadata = self.sync_writer.finish()?;

// Force to flush the remaining data.
Expand All @@ -258,7 +259,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
/// Close and finalize the writer.
///
/// All the data in the inner buffer will be force flushed.
pub async fn close(mut self) -> Result<FileMetaData> {
pub async fn close(mut self) -> Result<crate::format::FileMetaData> {
self.finish().await
}

Expand Down
12 changes: 6 additions & 6 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,9 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
unit: match time_unit {
TimeUnit::Second => unreachable!(),
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
},
}))
.with_repetition(repetition)
Expand Down Expand Up @@ -571,7 +571,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
unit: match unit {
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
u => unreachable!("Invalid unit for Time32: {:?}", u),
},
}))
Expand All @@ -582,8 +582,8 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
unit: match unit {
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
u => unreachable!("Invalid unit for Time64: {:?}", u),
},
}))
Expand Down
14 changes: 7 additions & 7 deletions parquet/src/arrow/schema/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataTy
(Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
(Some(LogicalType::Date), _) => Ok(DataType::Date32),
(Some(LogicalType::Time { unit, .. }), _) => match unit {
ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
ParquetTimeUnit::MILLIS => Ok(DataType::Time32(TimeUnit::Millisecond)),
_ => Err(arrow_err!(
"Cannot create INT32 physical type from {:?}",
unit
Expand Down Expand Up @@ -225,11 +225,11 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataTy
false => Ok(DataType::UInt64),
},
(Some(LogicalType::Time { unit, .. }), _) => match unit {
ParquetTimeUnit::MILLIS(_) => {
ParquetTimeUnit::MILLIS => {
Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
}
ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
ParquetTimeUnit::MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)),
ParquetTimeUnit::NANOS => Ok(DataType::Time64(TimeUnit::Nanosecond)),
},
(
Some(LogicalType::Timestamp {
Expand All @@ -239,9 +239,9 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataTy
_,
) => Ok(DataType::Timestamp(
match unit {
ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
ParquetTimeUnit::MILLIS => TimeUnit::Millisecond,
ParquetTimeUnit::MICROS => TimeUnit::Microsecond,
ParquetTimeUnit::NANOS => TimeUnit::Nanosecond,
},
if is_adjusted_to_u_t_c {
Some("UTC".into())
Expand Down
Loading
Loading