diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs index 949e0d98ea39..bb2eca0a75c1 100644 --- a/parquet/benches/metadata.rs +++ b/parquet/benches/metadata.rs @@ -151,6 +151,35 @@ fn get_footer_bytes(data: Bytes) -> Bytes { data.slice(meta_start..meta_end) } +#[cfg(feature = "arrow")] +fn rewrite_file(bytes: Bytes) -> (Bytes, FileMetaData) { + use arrow::array::RecordBatchReader; + use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter}; + use parquet::file::properties::{EnabledStatistics, WriterProperties}; + + let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(bytes) + .expect("parquet open") + .build() + .expect("parquet open"); + let writer_properties = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(); + let mut output = Vec::new(); + let mut parquet_writer = ArrowWriter::try_new( + &mut output, + parquet_reader.schema(), + Some(writer_properties), + ) + .expect("create arrow writer"); + + for maybe_batch in parquet_reader { + let batch = maybe_batch.expect("reading batch"); + parquet_writer.write(&batch).expect("writing data"); + } + let file_meta = parquet_writer.close().expect("finalizing file"); + (output.into(), file_meta) +} + fn criterion_benchmark(c: &mut Criterion) { // Read file into memory to isolate filesystem performance let file = "../parquet-testing/data/alltypes_tiny_pages.parquet"; @@ -168,7 +197,7 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - let meta_data = get_footer_bytes(data); + let meta_data = get_footer_bytes(data.clone()); c.bench_function("decode file metadata", |b| { b.iter(|| { parquet::thrift::bench_file_metadata(&meta_data); @@ -181,6 +210,29 @@ fn criterion_benchmark(c: &mut Criterion) { parquet::thrift::bench_file_metadata(&buf); }) }); + + // rewrite file with page statistics. then read page headers. + #[cfg(feature = "arrow")] + let (file_bytes, metadata) = rewrite_file(data.clone()); + #[cfg(feature = "arrow")] + c.bench_function("page headers", |b| { + b.iter(|| { + metadata.row_groups.iter().for_each(|rg| { + rg.columns.iter().for_each(|col| { + if let Some(col_meta) = &col.meta_data { + if let Some(dict_offset) = col_meta.dictionary_page_offset { + parquet::thrift::bench_page_header( + &file_bytes.slice(dict_offset as usize..), + ); + } + parquet::thrift::bench_page_header( + &file_bytes.slice(col_meta.data_page_offset as usize..), + ); + } + }); + }); + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d4a3e11e2c46..a934a93ef22e 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -30,6 +30,7 @@ pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; +use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; use crate::bloom_filter::{ chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE, }; @@ -39,7 +40,6 @@ use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; -use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; use crate::schema::types::SchemaDescriptor; pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder}; @@ -737,17 +737,17 @@ impl ParquetRecordBatchReaderBuilder { chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?; match header.algorithm { - BloomFilterAlgorithm::BLOCK(_) => { + BloomFilterAlgorithm::BLOCK => { // this match exists to future proof the singleton algorithm enum } } match header.compression { - BloomFilterCompression::UNCOMPRESSED(_) => { + BloomFilterCompression::UNCOMPRESSED => { // this match exists to future proof the singleton compression enum } } match header.hash { - BloomFilterHash::XXHASH(_) => { + BloomFilterHash::XXHASH => { // this match exists to future proof the singleton hash enum } } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index c53d47be2e56..d515adc13e07 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -21,6 +21,8 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; +use crate::file::page_index::offset_index::PageLocation; + /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when /// scanning a parquet file #[derive(Debug, Clone, Copy, Eq, PartialEq)] @@ -162,7 +164,7 @@ impl RowSelection { /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) - pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { + pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec> { let mut ranges: Vec> = vec![]; let mut row_offset = 0; @@ -640,7 +642,6 @@ fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelec #[cfg(test)] mod tests { use super::*; - use crate::format::PageLocation; use rand::{rng, Rng}; #[test] diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 611d6999e07e..d738d85fb6d2 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -45,6 +45,7 @@ use crate::arrow::arrow_reader::{ }; use crate::arrow::ProjectionMask; +use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; use crate::bloom_filter::{ chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE, }; @@ -53,7 +54,6 @@ use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; -use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; mod metadata; pub use metadata::*; @@ -446,17 +446,17 @@ impl ParquetRecordBatchStreamBuilder { chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?; match header.algorithm { - BloomFilterAlgorithm::BLOCK(_) => { + BloomFilterAlgorithm::BLOCK => { // this match exists to future proof the singleton algorithm enum } } match header.compression { - BloomFilterCompression::UNCOMPRESSED(_) => { + BloomFilterCompression::UNCOMPRESSED => { // this match exists to future proof the singleton compression enum } } match header.hash { - BloomFilterHash::XXHASH(_) => { + BloomFilterHash::XXHASH => { // this match exists to future proof the singleton hash enum } } diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index faec427907a7..4e06223a5e13 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -61,11 +61,12 @@ mod store; pub use store::*; use crate::{ - arrow::arrow_writer::ArrowWriterOptions, - arrow::ArrowWriter, + arrow::{arrow_writer::ArrowWriterOptions, ArrowWriter}, errors::{ParquetError, Result}, - file::{metadata::RowGroupMetaData, properties::WriterProperties}, - format::{FileMetaData, KeyValue}, + file::{ + metadata::{KeyValue, RowGroupMetaData}, + properties::WriterProperties, + }, }; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; @@ -245,7 +246,7 @@ impl AsyncArrowWriter { /// Unlike [`Self::close`] this does not consume self /// /// Attempting to write after calling finish will result in an error - pub async fn finish(&mut self) -> Result { + pub async fn finish(&mut self) -> Result { let metadata = self.sync_writer.finish()?; // Force to flush the remaining data. @@ -258,7 +259,7 @@ impl AsyncArrowWriter { /// Close and finalize the writer. /// /// All the data in the inner buffer will be force flushed. - pub async fn close(mut self) -> Result { + pub async fn close(mut self) -> Result { self.finish().await } diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 5b079b66276a..e5d7f8410b3c 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -532,9 +532,9 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()), unit: match time_unit { TimeUnit::Second => unreachable!(), - TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), - TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), - TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS, + TimeUnit::Microsecond => ParquetTimeUnit::MICROS, + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS, }, })) .with_repetition(repetition) @@ -571,7 +571,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_logical_type(Some(LogicalType::Time { is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"), unit: match unit { - TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS, u => unreachable!("Invalid unit for Time32: {:?}", u), }, })) @@ -582,8 +582,8 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_logical_type(Some(LogicalType::Time { is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"), unit: match unit { - TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), - TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), + TimeUnit::Microsecond => ParquetTimeUnit::MICROS, + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS, u => unreachable!("Invalid unit for Time64: {:?}", u), }, })) diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index 1b3ab7d45c51..0992a05cd2a4 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -186,7 +186,7 @@ fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result decimal_128_type(scale, precision), (Some(LogicalType::Date), _) => Ok(DataType::Date32), (Some(LogicalType::Time { unit, .. }), _) => match unit { - ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)), + ParquetTimeUnit::MILLIS => Ok(DataType::Time32(TimeUnit::Millisecond)), _ => Err(arrow_err!( "Cannot create INT32 physical type from {:?}", unit @@ -225,11 +225,11 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::UInt64), }, (Some(LogicalType::Time { unit, .. }), _) => match unit { - ParquetTimeUnit::MILLIS(_) => { + ParquetTimeUnit::MILLIS => { Err(arrow_err!("Cannot create INT64 from MILLIS time unit",)) } - ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)), - ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)), + ParquetTimeUnit::MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)), + ParquetTimeUnit::NANOS => Ok(DataType::Time64(TimeUnit::Nanosecond)), }, ( Some(LogicalType::Timestamp { @@ -239,9 +239,9 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::Timestamp( match unit { - ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond, - ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond, - ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond, + ParquetTimeUnit::MILLIS => TimeUnit::Millisecond, + ParquetTimeUnit::MICROS => TimeUnit::Microsecond, + ParquetTimeUnit::NANOS => TimeUnit::Nanosecond, }, if is_adjusted_to_u_t_c { Some("UTC".into()) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index c1e301136d0e..de53c57ce53a 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -15,28 +15,23 @@ // specific language governing permissions and limitations // under the License. -//! Contains Rust mappings for Thrift definition. -//! Refer to [`parquet.thrift`](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift) file to see raw definitions. +//! Contains Rust mappings for Thrift definition. This module contains only mappings for thrift +//! enums and unions. Thrift structs are handled elsewhere. +//! Refer to [`parquet.thrift`](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift) +//! file to see raw definitions. use std::str::FromStr; use std::{fmt, str}; pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel}; -use crate::format as parquet; use crate::errors::{ParquetError, Result}; -// Re-export crate::format types used in this module -pub use crate::format::{ - BsonType, DateType, DecimalType, EnumType, IntType, JsonType, ListType, MapType, NullType, - StringType, TimeType, TimeUnit, TimestampType, UUIDType, -}; - // ---------------------------------------------------------------------- // Types from the Thrift definition // ---------------------------------------------------------------------- -// Mirrors `parquet::Type` +// Mirrors thrift enum `crate::format::Type` /// Types supported by Parquet. /// @@ -66,7 +61,7 @@ pub enum Type { } // ---------------------------------------------------------------------- -// Mirrors `parquet::ConvertedType` +// Mirrors thrift enum `crate::format::ConvertedType` /// Common types (converted types) used by frameworks when using Parquet. /// @@ -171,7 +166,21 @@ pub enum ConvertedType { } // ---------------------------------------------------------------------- -// Mirrors `parquet::LogicalType` +// Mirrors thrift union `crate::format::TimeUnit` + +/// Time unit for `Time` and `Timestamp` logical types. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum TimeUnit { + /// Milliseconds. + MILLIS, + /// Microseconds. + MICROS, + /// Nanoseconds. + NANOS, +} + +// ---------------------------------------------------------------------- +// Mirrors thrift union `crate::format::LogicalType` /// Logical types used by version 2.4.0+ of the Parquet format. /// @@ -237,7 +246,7 @@ pub enum LogicalType { } // ---------------------------------------------------------------------- -// Mirrors `parquet::FieldRepetitionType` +// Mirrors thrift enum `crate::format::FieldRepetitionType` /// Representation of field types in schema. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -252,7 +261,7 @@ pub enum Repetition { } // ---------------------------------------------------------------------- -// Mirrors `parquet::Encoding` +// Mirrors thrift enum `crate::format::Encoding` /// Encodings supported by Parquet. /// @@ -368,7 +377,7 @@ impl FromStr for Encoding { } // ---------------------------------------------------------------------- -// Mirrors `parquet::CompressionCodec` +// Mirrors thrift enum `crate::format::CompressionCodec` /// Supported block compression algorithms. /// @@ -497,7 +506,7 @@ impl FromStr for Compression { } // ---------------------------------------------------------------------- -/// Mirrors [parquet::PageType] +/// Mirrors thrift enum `crate::format::PageType` /// /// Available data pages for Parquet file format. /// Note that some of the page types may not be supported. @@ -515,7 +524,54 @@ pub enum PageType { } // ---------------------------------------------------------------------- -// Mirrors `parquet::ColumnOrder` +// Mirrors thrift enum `crate::format::BoundaryOrder` + +/// Enum to annotate whether lists of min/max elements inside ColumnIndex +/// are ordered and if so, in which direction. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum BoundaryOrder { + /// Min/max stats are unordered. + UNORDERED, + /// Min/max stats are ordered in an ascending fashion. + ASCENDING, + /// Min/max stats are ordered in an descending fashion. + DESCENDING, +} + +// ---------------------------------------------------------------------- +// Mirrors thrift union `crate::format::BloomFilterAlgorithm` + +/// The algorithm used in Bloom filter. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum BloomFilterAlgorithm { + /// Block-based Bloom filter. + BLOCK, +} + +// ---------------------------------------------------------------------- +// Mirrors thrift union `crate::format::BloomFilterHash` + +/// The hash function used in Bloom filter. This function takes the hash of a column value +/// using plain encoding. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum BloomFilterHash { + /// xxHash is an extremely fast non-cryptographic hash algorithm. It uses 64 bits version + /// of xxHash. + XXHASH, +} + +// ---------------------------------------------------------------------- +// Mirrors thrift union `crate::format::BloomFilterCompression` + +/// The compression used in the Bloom filter. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum BloomFilterCompression { + /// No compression is used. + UNCOMPRESSED, +} + +// ---------------------------------------------------------------------- +// Mirrors thrift union `crate::format::ColumnOrder` /// Sort order for page and column statistics. /// @@ -660,6 +716,9 @@ impl ColumnOrder { } } +// ---------------------------------------------------------------------- +// Display handlers + impl fmt::Display for Type { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{self:?}") @@ -709,73 +768,73 @@ impl fmt::Display for ColumnOrder { } // ---------------------------------------------------------------------- -// parquet::Type <=> Type conversion +// crate::format::Type <=> Type conversion -impl TryFrom for Type { +impl TryFrom for Type { type Error = ParquetError; - fn try_from(value: parquet::Type) -> Result { + fn try_from(value: crate::format::Type) -> Result { Ok(match value { - parquet::Type::BOOLEAN => Type::BOOLEAN, - parquet::Type::INT32 => Type::INT32, - parquet::Type::INT64 => Type::INT64, - parquet::Type::INT96 => Type::INT96, - parquet::Type::FLOAT => Type::FLOAT, - parquet::Type::DOUBLE => Type::DOUBLE, - parquet::Type::BYTE_ARRAY => Type::BYTE_ARRAY, - parquet::Type::FIXED_LEN_BYTE_ARRAY => Type::FIXED_LEN_BYTE_ARRAY, + crate::format::Type::BOOLEAN => Type::BOOLEAN, + crate::format::Type::INT32 => Type::INT32, + crate::format::Type::INT64 => Type::INT64, + crate::format::Type::INT96 => Type::INT96, + crate::format::Type::FLOAT => Type::FLOAT, + crate::format::Type::DOUBLE => Type::DOUBLE, + crate::format::Type::BYTE_ARRAY => Type::BYTE_ARRAY, + crate::format::Type::FIXED_LEN_BYTE_ARRAY => Type::FIXED_LEN_BYTE_ARRAY, _ => return Err(general_err!("unexpected parquet type: {}", value.0)), }) } } -impl From for parquet::Type { +impl From for crate::format::Type { fn from(value: Type) -> Self { match value { - Type::BOOLEAN => parquet::Type::BOOLEAN, - Type::INT32 => parquet::Type::INT32, - Type::INT64 => parquet::Type::INT64, - Type::INT96 => parquet::Type::INT96, - Type::FLOAT => parquet::Type::FLOAT, - Type::DOUBLE => parquet::Type::DOUBLE, - Type::BYTE_ARRAY => parquet::Type::BYTE_ARRAY, - Type::FIXED_LEN_BYTE_ARRAY => parquet::Type::FIXED_LEN_BYTE_ARRAY, + Type::BOOLEAN => crate::format::Type::BOOLEAN, + Type::INT32 => crate::format::Type::INT32, + Type::INT64 => crate::format::Type::INT64, + Type::INT96 => crate::format::Type::INT96, + Type::FLOAT => crate::format::Type::FLOAT, + Type::DOUBLE => crate::format::Type::DOUBLE, + Type::BYTE_ARRAY => crate::format::Type::BYTE_ARRAY, + Type::FIXED_LEN_BYTE_ARRAY => crate::format::Type::FIXED_LEN_BYTE_ARRAY, } } } // ---------------------------------------------------------------------- -// parquet::ConvertedType <=> ConvertedType conversion +// crate::format::ConvertedType <=> ConvertedType conversion -impl TryFrom> for ConvertedType { +impl TryFrom> for ConvertedType { type Error = ParquetError; - fn try_from(option: Option) -> Result { + fn try_from(option: Option) -> Result { Ok(match option { None => ConvertedType::NONE, Some(value) => match value { - parquet::ConvertedType::UTF8 => ConvertedType::UTF8, - parquet::ConvertedType::MAP => ConvertedType::MAP, - parquet::ConvertedType::MAP_KEY_VALUE => ConvertedType::MAP_KEY_VALUE, - parquet::ConvertedType::LIST => ConvertedType::LIST, - parquet::ConvertedType::ENUM => ConvertedType::ENUM, - parquet::ConvertedType::DECIMAL => ConvertedType::DECIMAL, - parquet::ConvertedType::DATE => ConvertedType::DATE, - parquet::ConvertedType::TIME_MILLIS => ConvertedType::TIME_MILLIS, - parquet::ConvertedType::TIME_MICROS => ConvertedType::TIME_MICROS, - parquet::ConvertedType::TIMESTAMP_MILLIS => ConvertedType::TIMESTAMP_MILLIS, - parquet::ConvertedType::TIMESTAMP_MICROS => ConvertedType::TIMESTAMP_MICROS, - parquet::ConvertedType::UINT_8 => ConvertedType::UINT_8, - parquet::ConvertedType::UINT_16 => ConvertedType::UINT_16, - parquet::ConvertedType::UINT_32 => ConvertedType::UINT_32, - parquet::ConvertedType::UINT_64 => ConvertedType::UINT_64, - parquet::ConvertedType::INT_8 => ConvertedType::INT_8, - parquet::ConvertedType::INT_16 => ConvertedType::INT_16, - parquet::ConvertedType::INT_32 => ConvertedType::INT_32, - parquet::ConvertedType::INT_64 => ConvertedType::INT_64, - parquet::ConvertedType::JSON => ConvertedType::JSON, - parquet::ConvertedType::BSON => ConvertedType::BSON, - parquet::ConvertedType::INTERVAL => ConvertedType::INTERVAL, + crate::format::ConvertedType::UTF8 => ConvertedType::UTF8, + crate::format::ConvertedType::MAP => ConvertedType::MAP, + crate::format::ConvertedType::MAP_KEY_VALUE => ConvertedType::MAP_KEY_VALUE, + crate::format::ConvertedType::LIST => ConvertedType::LIST, + crate::format::ConvertedType::ENUM => ConvertedType::ENUM, + crate::format::ConvertedType::DECIMAL => ConvertedType::DECIMAL, + crate::format::ConvertedType::DATE => ConvertedType::DATE, + crate::format::ConvertedType::TIME_MILLIS => ConvertedType::TIME_MILLIS, + crate::format::ConvertedType::TIME_MICROS => ConvertedType::TIME_MICROS, + crate::format::ConvertedType::TIMESTAMP_MILLIS => ConvertedType::TIMESTAMP_MILLIS, + crate::format::ConvertedType::TIMESTAMP_MICROS => ConvertedType::TIMESTAMP_MICROS, + crate::format::ConvertedType::UINT_8 => ConvertedType::UINT_8, + crate::format::ConvertedType::UINT_16 => ConvertedType::UINT_16, + crate::format::ConvertedType::UINT_32 => ConvertedType::UINT_32, + crate::format::ConvertedType::UINT_64 => ConvertedType::UINT_64, + crate::format::ConvertedType::INT_8 => ConvertedType::INT_8, + crate::format::ConvertedType::INT_16 => ConvertedType::INT_16, + crate::format::ConvertedType::INT_32 => ConvertedType::INT_32, + crate::format::ConvertedType::INT_64 => ConvertedType::INT_64, + crate::format::ConvertedType::JSON => ConvertedType::JSON, + crate::format::ConvertedType::BSON => ConvertedType::BSON, + crate::format::ConvertedType::INTERVAL => ConvertedType::INTERVAL, _ => { return Err(general_err!( "unexpected parquet converted type: {}", @@ -787,115 +846,201 @@ impl TryFrom> for ConvertedType { } } -impl From for Option { +impl From for Option { fn from(value: ConvertedType) -> Self { match value { ConvertedType::NONE => None, - ConvertedType::UTF8 => Some(parquet::ConvertedType::UTF8), - ConvertedType::MAP => Some(parquet::ConvertedType::MAP), - ConvertedType::MAP_KEY_VALUE => Some(parquet::ConvertedType::MAP_KEY_VALUE), - ConvertedType::LIST => Some(parquet::ConvertedType::LIST), - ConvertedType::ENUM => Some(parquet::ConvertedType::ENUM), - ConvertedType::DECIMAL => Some(parquet::ConvertedType::DECIMAL), - ConvertedType::DATE => Some(parquet::ConvertedType::DATE), - ConvertedType::TIME_MILLIS => Some(parquet::ConvertedType::TIME_MILLIS), - ConvertedType::TIME_MICROS => Some(parquet::ConvertedType::TIME_MICROS), - ConvertedType::TIMESTAMP_MILLIS => Some(parquet::ConvertedType::TIMESTAMP_MILLIS), - ConvertedType::TIMESTAMP_MICROS => Some(parquet::ConvertedType::TIMESTAMP_MICROS), - ConvertedType::UINT_8 => Some(parquet::ConvertedType::UINT_8), - ConvertedType::UINT_16 => Some(parquet::ConvertedType::UINT_16), - ConvertedType::UINT_32 => Some(parquet::ConvertedType::UINT_32), - ConvertedType::UINT_64 => Some(parquet::ConvertedType::UINT_64), - ConvertedType::INT_8 => Some(parquet::ConvertedType::INT_8), - ConvertedType::INT_16 => Some(parquet::ConvertedType::INT_16), - ConvertedType::INT_32 => Some(parquet::ConvertedType::INT_32), - ConvertedType::INT_64 => Some(parquet::ConvertedType::INT_64), - ConvertedType::JSON => Some(parquet::ConvertedType::JSON), - ConvertedType::BSON => Some(parquet::ConvertedType::BSON), - ConvertedType::INTERVAL => Some(parquet::ConvertedType::INTERVAL), + ConvertedType::UTF8 => Some(crate::format::ConvertedType::UTF8), + ConvertedType::MAP => Some(crate::format::ConvertedType::MAP), + ConvertedType::MAP_KEY_VALUE => Some(crate::format::ConvertedType::MAP_KEY_VALUE), + ConvertedType::LIST => Some(crate::format::ConvertedType::LIST), + ConvertedType::ENUM => Some(crate::format::ConvertedType::ENUM), + ConvertedType::DECIMAL => Some(crate::format::ConvertedType::DECIMAL), + ConvertedType::DATE => Some(crate::format::ConvertedType::DATE), + ConvertedType::TIME_MILLIS => Some(crate::format::ConvertedType::TIME_MILLIS), + ConvertedType::TIME_MICROS => Some(crate::format::ConvertedType::TIME_MICROS), + ConvertedType::TIMESTAMP_MILLIS => Some(crate::format::ConvertedType::TIMESTAMP_MILLIS), + ConvertedType::TIMESTAMP_MICROS => Some(crate::format::ConvertedType::TIMESTAMP_MICROS), + ConvertedType::UINT_8 => Some(crate::format::ConvertedType::UINT_8), + ConvertedType::UINT_16 => Some(crate::format::ConvertedType::UINT_16), + ConvertedType::UINT_32 => Some(crate::format::ConvertedType::UINT_32), + ConvertedType::UINT_64 => Some(crate::format::ConvertedType::UINT_64), + ConvertedType::INT_8 => Some(crate::format::ConvertedType::INT_8), + ConvertedType::INT_16 => Some(crate::format::ConvertedType::INT_16), + ConvertedType::INT_32 => Some(crate::format::ConvertedType::INT_32), + ConvertedType::INT_64 => Some(crate::format::ConvertedType::INT_64), + ConvertedType::JSON => Some(crate::format::ConvertedType::JSON), + ConvertedType::BSON => Some(crate::format::ConvertedType::BSON), + ConvertedType::INTERVAL => Some(crate::format::ConvertedType::INTERVAL), + } + } +} + +// ---------------------------------------------------------------------- +// crate::format::BloomFilterHash <=> BloomFilterHash conversion + +impl From for BloomFilterHash { + fn from(value: crate::format::BloomFilterHash) -> Self { + match value { + crate::format::BloomFilterHash::XXHASH(_) => BloomFilterHash::XXHASH, + } + } +} + +impl From for crate::format::BloomFilterHash { + fn from(value: BloomFilterHash) -> Self { + match value { + BloomFilterHash::XXHASH => crate::format::BloomFilterHash::XXHASH(Default::default()), + } + } +} + +// ---------------------------------------------------------------------- +// crate::format::BloomFilterAlgorithm <=> BloomFilterAlgorithm conversion + +impl From for BloomFilterAlgorithm { + fn from(value: crate::format::BloomFilterAlgorithm) -> Self { + match value { + crate::format::BloomFilterAlgorithm::BLOCK(_) => BloomFilterAlgorithm::BLOCK, + } + } +} + +impl From for crate::format::BloomFilterAlgorithm { + fn from(value: BloomFilterAlgorithm) -> Self { + match value { + BloomFilterAlgorithm::BLOCK => { + crate::format::BloomFilterAlgorithm::BLOCK(Default::default()) + } + } + } +} + +// ---------------------------------------------------------------------- +// crate::format::BloomFilterCompression <=> BloomFilterCompression conversion + +impl From for BloomFilterCompression { + fn from(value: crate::format::BloomFilterCompression) -> Self { + match value { + crate::format::BloomFilterCompression::UNCOMPRESSED(_) => { + BloomFilterCompression::UNCOMPRESSED + } + } + } +} + +impl From for crate::format::BloomFilterCompression { + fn from(value: BloomFilterCompression) -> Self { + match value { + BloomFilterCompression::UNCOMPRESSED => { + crate::format::BloomFilterCompression::UNCOMPRESSED(Default::default()) + } } } } // ---------------------------------------------------------------------- -// parquet::LogicalType <=> LogicalType conversion +// crate::format::TimeUnit <=> TimeUnit conversion -impl From for LogicalType { - fn from(value: parquet::LogicalType) -> Self { +impl From for TimeUnit { + fn from(value: crate::format::TimeUnit) -> Self { match value { - parquet::LogicalType::STRING(_) => LogicalType::String, - parquet::LogicalType::MAP(_) => LogicalType::Map, - parquet::LogicalType::LIST(_) => LogicalType::List, - parquet::LogicalType::ENUM(_) => LogicalType::Enum, - parquet::LogicalType::DECIMAL(t) => LogicalType::Decimal { + crate::format::TimeUnit::MILLIS(_) => TimeUnit::MILLIS, + crate::format::TimeUnit::MICROS(_) => TimeUnit::MICROS, + crate::format::TimeUnit::NANOS(_) => TimeUnit::NANOS, + } + } +} + +impl From for crate::format::TimeUnit { + fn from(value: TimeUnit) -> Self { + match value { + TimeUnit::MILLIS => crate::format::TimeUnit::MILLIS(crate::format::MilliSeconds {}), + TimeUnit::MICROS => crate::format::TimeUnit::MICROS(crate::format::MicroSeconds {}), + TimeUnit::NANOS => crate::format::TimeUnit::NANOS(crate::format::NanoSeconds {}), + } + } +} + +// ---------------------------------------------------------------------- +// crate::format::LogicalType <=> LogicalType conversion + +impl From for LogicalType { + fn from(value: crate::format::LogicalType) -> Self { + match value { + crate::format::LogicalType::STRING(_) => LogicalType::String, + crate::format::LogicalType::MAP(_) => LogicalType::Map, + crate::format::LogicalType::LIST(_) => LogicalType::List, + crate::format::LogicalType::ENUM(_) => LogicalType::Enum, + crate::format::LogicalType::DECIMAL(t) => LogicalType::Decimal { scale: t.scale, precision: t.precision, }, - parquet::LogicalType::DATE(_) => LogicalType::Date, - parquet::LogicalType::TIME(t) => LogicalType::Time { + crate::format::LogicalType::DATE(_) => LogicalType::Date, + crate::format::LogicalType::TIME(t) => LogicalType::Time { is_adjusted_to_u_t_c: t.is_adjusted_to_u_t_c, - unit: t.unit, + unit: t.unit.into(), }, - parquet::LogicalType::TIMESTAMP(t) => LogicalType::Timestamp { + crate::format::LogicalType::TIMESTAMP(t) => LogicalType::Timestamp { is_adjusted_to_u_t_c: t.is_adjusted_to_u_t_c, - unit: t.unit, + unit: t.unit.into(), }, - parquet::LogicalType::INTEGER(t) => LogicalType::Integer { + crate::format::LogicalType::INTEGER(t) => LogicalType::Integer { bit_width: t.bit_width, is_signed: t.is_signed, }, - parquet::LogicalType::UNKNOWN(_) => LogicalType::Unknown, - parquet::LogicalType::JSON(_) => LogicalType::Json, - parquet::LogicalType::BSON(_) => LogicalType::Bson, - parquet::LogicalType::UUID(_) => LogicalType::Uuid, - parquet::LogicalType::FLOAT16(_) => LogicalType::Float16, - parquet::LogicalType::VARIANT(_) => LogicalType::Variant, - parquet::LogicalType::GEOMETRY(_) => LogicalType::Geometry, - parquet::LogicalType::GEOGRAPHY(_) => LogicalType::Geography, + crate::format::LogicalType::UNKNOWN(_) => LogicalType::Unknown, + crate::format::LogicalType::JSON(_) => LogicalType::Json, + crate::format::LogicalType::BSON(_) => LogicalType::Bson, + crate::format::LogicalType::UUID(_) => LogicalType::Uuid, + crate::format::LogicalType::FLOAT16(_) => LogicalType::Float16, + crate::format::LogicalType::VARIANT(_) => LogicalType::Variant, + crate::format::LogicalType::GEOMETRY(_) => LogicalType::Geometry, + crate::format::LogicalType::GEOGRAPHY(_) => LogicalType::Geography, } } } -impl From for parquet::LogicalType { +impl From for crate::format::LogicalType { fn from(value: LogicalType) -> Self { match value { - LogicalType::String => parquet::LogicalType::STRING(Default::default()), - LogicalType::Map => parquet::LogicalType::MAP(Default::default()), - LogicalType::List => parquet::LogicalType::LIST(Default::default()), - LogicalType::Enum => parquet::LogicalType::ENUM(Default::default()), + LogicalType::String => crate::format::LogicalType::STRING(Default::default()), + LogicalType::Map => crate::format::LogicalType::MAP(Default::default()), + LogicalType::List => crate::format::LogicalType::LIST(Default::default()), + LogicalType::Enum => crate::format::LogicalType::ENUM(Default::default()), LogicalType::Decimal { scale, precision } => { - parquet::LogicalType::DECIMAL(DecimalType { scale, precision }) + crate::format::LogicalType::DECIMAL(crate::format::DecimalType { scale, precision }) } - LogicalType::Date => parquet::LogicalType::DATE(Default::default()), + LogicalType::Date => crate::format::LogicalType::DATE(Default::default()), LogicalType::Time { is_adjusted_to_u_t_c, unit, - } => parquet::LogicalType::TIME(TimeType { + } => crate::format::LogicalType::TIME(crate::format::TimeType { is_adjusted_to_u_t_c, - unit, + unit: unit.into(), }), LogicalType::Timestamp { is_adjusted_to_u_t_c, unit, - } => parquet::LogicalType::TIMESTAMP(TimestampType { + } => crate::format::LogicalType::TIMESTAMP(crate::format::TimestampType { is_adjusted_to_u_t_c, - unit, + unit: unit.into(), }), LogicalType::Integer { bit_width, is_signed, - } => parquet::LogicalType::INTEGER(IntType { + } => crate::format::LogicalType::INTEGER(crate::format::IntType { bit_width, is_signed, }), - LogicalType::Unknown => parquet::LogicalType::UNKNOWN(Default::default()), - LogicalType::Json => parquet::LogicalType::JSON(Default::default()), - LogicalType::Bson => parquet::LogicalType::BSON(Default::default()), - LogicalType::Uuid => parquet::LogicalType::UUID(Default::default()), - LogicalType::Float16 => parquet::LogicalType::FLOAT16(Default::default()), - LogicalType::Variant => parquet::LogicalType::VARIANT(Default::default()), - LogicalType::Geometry => parquet::LogicalType::GEOMETRY(Default::default()), - LogicalType::Geography => parquet::LogicalType::GEOGRAPHY(Default::default()), + LogicalType::Unknown => crate::format::LogicalType::UNKNOWN(Default::default()), + LogicalType::Json => crate::format::LogicalType::JSON(Default::default()), + LogicalType::Bson => crate::format::LogicalType::BSON(Default::default()), + LogicalType::Uuid => crate::format::LogicalType::UUID(Default::default()), + LogicalType::Float16 => crate::format::LogicalType::FLOAT16(Default::default()), + LogicalType::Variant => crate::format::LogicalType::VARIANT(Default::default()), + LogicalType::Geometry => crate::format::LogicalType::GEOMETRY(Default::default()), + LogicalType::Geography => crate::format::LogicalType::GEOGRAPHY(Default::default()), } } } @@ -920,14 +1065,14 @@ impl From> for ConvertedType { LogicalType::Decimal { .. } => ConvertedType::DECIMAL, LogicalType::Date => ConvertedType::DATE, LogicalType::Time { unit, .. } => match unit { - TimeUnit::MILLIS(_) => ConvertedType::TIME_MILLIS, - TimeUnit::MICROS(_) => ConvertedType::TIME_MICROS, - TimeUnit::NANOS(_) => ConvertedType::NONE, + TimeUnit::MILLIS => ConvertedType::TIME_MILLIS, + TimeUnit::MICROS => ConvertedType::TIME_MICROS, + TimeUnit::NANOS => ConvertedType::NONE, }, LogicalType::Timestamp { unit, .. } => match unit { - TimeUnit::MILLIS(_) => ConvertedType::TIMESTAMP_MILLIS, - TimeUnit::MICROS(_) => ConvertedType::TIMESTAMP_MICROS, - TimeUnit::NANOS(_) => ConvertedType::NONE, + TimeUnit::MILLIS => ConvertedType::TIMESTAMP_MILLIS, + TimeUnit::MICROS => ConvertedType::TIMESTAMP_MICROS, + TimeUnit::NANOS => ConvertedType::NONE, }, LogicalType::Integer { bit_width, @@ -958,16 +1103,16 @@ impl From> for ConvertedType { } // ---------------------------------------------------------------------- -// parquet::FieldRepetitionType <=> Repetition conversion +// crate::format::FieldRepetitionType <=> Repetition conversion -impl TryFrom for Repetition { +impl TryFrom for Repetition { type Error = ParquetError; - fn try_from(value: parquet::FieldRepetitionType) -> Result { + fn try_from(value: crate::format::FieldRepetitionType) -> Result { Ok(match value { - parquet::FieldRepetitionType::REQUIRED => Repetition::REQUIRED, - parquet::FieldRepetitionType::OPTIONAL => Repetition::OPTIONAL, - parquet::FieldRepetitionType::REPEATED => Repetition::REPEATED, + crate::format::FieldRepetitionType::REQUIRED => Repetition::REQUIRED, + crate::format::FieldRepetitionType::OPTIONAL => Repetition::OPTIONAL, + crate::format::FieldRepetitionType::REPEATED => Repetition::REPEATED, _ => { return Err(general_err!( "unexpected parquet repetition type: {}", @@ -978,72 +1123,72 @@ impl TryFrom for Repetition { } } -impl From for parquet::FieldRepetitionType { +impl From for crate::format::FieldRepetitionType { fn from(value: Repetition) -> Self { match value { - Repetition::REQUIRED => parquet::FieldRepetitionType::REQUIRED, - Repetition::OPTIONAL => parquet::FieldRepetitionType::OPTIONAL, - Repetition::REPEATED => parquet::FieldRepetitionType::REPEATED, + Repetition::REQUIRED => crate::format::FieldRepetitionType::REQUIRED, + Repetition::OPTIONAL => crate::format::FieldRepetitionType::OPTIONAL, + Repetition::REPEATED => crate::format::FieldRepetitionType::REPEATED, } } } // ---------------------------------------------------------------------- -// parquet::Encoding <=> Encoding conversion +// crate::format::Encoding <=> Encoding conversion -impl TryFrom for Encoding { +impl TryFrom for Encoding { type Error = ParquetError; - fn try_from(value: parquet::Encoding) -> Result { + fn try_from(value: crate::format::Encoding) -> Result { Ok(match value { - parquet::Encoding::PLAIN => Encoding::PLAIN, - parquet::Encoding::PLAIN_DICTIONARY => Encoding::PLAIN_DICTIONARY, - parquet::Encoding::RLE => Encoding::RLE, + crate::format::Encoding::PLAIN => Encoding::PLAIN, + crate::format::Encoding::PLAIN_DICTIONARY => Encoding::PLAIN_DICTIONARY, + crate::format::Encoding::RLE => Encoding::RLE, #[allow(deprecated)] - parquet::Encoding::BIT_PACKED => Encoding::BIT_PACKED, - parquet::Encoding::DELTA_BINARY_PACKED => Encoding::DELTA_BINARY_PACKED, - parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY => Encoding::DELTA_LENGTH_BYTE_ARRAY, - parquet::Encoding::DELTA_BYTE_ARRAY => Encoding::DELTA_BYTE_ARRAY, - parquet::Encoding::RLE_DICTIONARY => Encoding::RLE_DICTIONARY, - parquet::Encoding::BYTE_STREAM_SPLIT => Encoding::BYTE_STREAM_SPLIT, + crate::format::Encoding::BIT_PACKED => Encoding::BIT_PACKED, + crate::format::Encoding::DELTA_BINARY_PACKED => Encoding::DELTA_BINARY_PACKED, + crate::format::Encoding::DELTA_LENGTH_BYTE_ARRAY => Encoding::DELTA_LENGTH_BYTE_ARRAY, + crate::format::Encoding::DELTA_BYTE_ARRAY => Encoding::DELTA_BYTE_ARRAY, + crate::format::Encoding::RLE_DICTIONARY => Encoding::RLE_DICTIONARY, + crate::format::Encoding::BYTE_STREAM_SPLIT => Encoding::BYTE_STREAM_SPLIT, _ => return Err(general_err!("unexpected parquet encoding: {}", value.0)), }) } } -impl From for parquet::Encoding { +impl From for crate::format::Encoding { fn from(value: Encoding) -> Self { match value { - Encoding::PLAIN => parquet::Encoding::PLAIN, - Encoding::PLAIN_DICTIONARY => parquet::Encoding::PLAIN_DICTIONARY, - Encoding::RLE => parquet::Encoding::RLE, + Encoding::PLAIN => crate::format::Encoding::PLAIN, + Encoding::PLAIN_DICTIONARY => crate::format::Encoding::PLAIN_DICTIONARY, + Encoding::RLE => crate::format::Encoding::RLE, #[allow(deprecated)] - Encoding::BIT_PACKED => parquet::Encoding::BIT_PACKED, - Encoding::DELTA_BINARY_PACKED => parquet::Encoding::DELTA_BINARY_PACKED, - Encoding::DELTA_LENGTH_BYTE_ARRAY => parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY, - Encoding::DELTA_BYTE_ARRAY => parquet::Encoding::DELTA_BYTE_ARRAY, - Encoding::RLE_DICTIONARY => parquet::Encoding::RLE_DICTIONARY, - Encoding::BYTE_STREAM_SPLIT => parquet::Encoding::BYTE_STREAM_SPLIT, + Encoding::BIT_PACKED => crate::format::Encoding::BIT_PACKED, + Encoding::DELTA_BINARY_PACKED => crate::format::Encoding::DELTA_BINARY_PACKED, + Encoding::DELTA_LENGTH_BYTE_ARRAY => crate::format::Encoding::DELTA_LENGTH_BYTE_ARRAY, + Encoding::DELTA_BYTE_ARRAY => crate::format::Encoding::DELTA_BYTE_ARRAY, + Encoding::RLE_DICTIONARY => crate::format::Encoding::RLE_DICTIONARY, + Encoding::BYTE_STREAM_SPLIT => crate::format::Encoding::BYTE_STREAM_SPLIT, } } } // ---------------------------------------------------------------------- -// parquet::CompressionCodec <=> Compression conversion +// crate::format::CompressionCodec <=> Compression conversion -impl TryFrom for Compression { +impl TryFrom for Compression { type Error = ParquetError; - fn try_from(value: parquet::CompressionCodec) -> Result { + fn try_from(value: crate::format::CompressionCodec) -> Result { Ok(match value { - parquet::CompressionCodec::UNCOMPRESSED => Compression::UNCOMPRESSED, - parquet::CompressionCodec::SNAPPY => Compression::SNAPPY, - parquet::CompressionCodec::GZIP => Compression::GZIP(Default::default()), - parquet::CompressionCodec::LZO => Compression::LZO, - parquet::CompressionCodec::BROTLI => Compression::BROTLI(Default::default()), - parquet::CompressionCodec::LZ4 => Compression::LZ4, - parquet::CompressionCodec::ZSTD => Compression::ZSTD(Default::default()), - parquet::CompressionCodec::LZ4_RAW => Compression::LZ4_RAW, + crate::format::CompressionCodec::UNCOMPRESSED => Compression::UNCOMPRESSED, + crate::format::CompressionCodec::SNAPPY => Compression::SNAPPY, + crate::format::CompressionCodec::GZIP => Compression::GZIP(Default::default()), + crate::format::CompressionCodec::LZO => Compression::LZO, + crate::format::CompressionCodec::BROTLI => Compression::BROTLI(Default::default()), + crate::format::CompressionCodec::LZ4 => Compression::LZ4, + crate::format::CompressionCodec::ZSTD => Compression::ZSTD(Default::default()), + crate::format::CompressionCodec::LZ4_RAW => Compression::LZ4_RAW, _ => { return Err(general_err!( "unexpected parquet compression codec: {}", @@ -1054,45 +1199,76 @@ impl TryFrom for Compression { } } -impl From for parquet::CompressionCodec { +impl From for crate::format::CompressionCodec { fn from(value: Compression) -> Self { match value { - Compression::UNCOMPRESSED => parquet::CompressionCodec::UNCOMPRESSED, - Compression::SNAPPY => parquet::CompressionCodec::SNAPPY, - Compression::GZIP(_) => parquet::CompressionCodec::GZIP, - Compression::LZO => parquet::CompressionCodec::LZO, - Compression::BROTLI(_) => parquet::CompressionCodec::BROTLI, - Compression::LZ4 => parquet::CompressionCodec::LZ4, - Compression::ZSTD(_) => parquet::CompressionCodec::ZSTD, - Compression::LZ4_RAW => parquet::CompressionCodec::LZ4_RAW, + Compression::UNCOMPRESSED => crate::format::CompressionCodec::UNCOMPRESSED, + Compression::SNAPPY => crate::format::CompressionCodec::SNAPPY, + Compression::GZIP(_) => crate::format::CompressionCodec::GZIP, + Compression::LZO => crate::format::CompressionCodec::LZO, + Compression::BROTLI(_) => crate::format::CompressionCodec::BROTLI, + Compression::LZ4 => crate::format::CompressionCodec::LZ4, + Compression::ZSTD(_) => crate::format::CompressionCodec::ZSTD, + Compression::LZ4_RAW => crate::format::CompressionCodec::LZ4_RAW, } } } // ---------------------------------------------------------------------- -// parquet::PageType <=> PageType conversion +// crate::format::PageType <=> PageType conversion -impl TryFrom for PageType { +impl TryFrom for PageType { type Error = ParquetError; - fn try_from(value: parquet::PageType) -> Result { + fn try_from(value: crate::format::PageType) -> Result { Ok(match value { - parquet::PageType::DATA_PAGE => PageType::DATA_PAGE, - parquet::PageType::INDEX_PAGE => PageType::INDEX_PAGE, - parquet::PageType::DICTIONARY_PAGE => PageType::DICTIONARY_PAGE, - parquet::PageType::DATA_PAGE_V2 => PageType::DATA_PAGE_V2, + crate::format::PageType::DATA_PAGE => PageType::DATA_PAGE, + crate::format::PageType::INDEX_PAGE => PageType::INDEX_PAGE, + crate::format::PageType::DICTIONARY_PAGE => PageType::DICTIONARY_PAGE, + crate::format::PageType::DATA_PAGE_V2 => PageType::DATA_PAGE_V2, _ => return Err(general_err!("unexpected parquet page type: {}", value.0)), }) } } -impl From for parquet::PageType { +impl From for crate::format::PageType { fn from(value: PageType) -> Self { match value { - PageType::DATA_PAGE => parquet::PageType::DATA_PAGE, - PageType::INDEX_PAGE => parquet::PageType::INDEX_PAGE, - PageType::DICTIONARY_PAGE => parquet::PageType::DICTIONARY_PAGE, - PageType::DATA_PAGE_V2 => parquet::PageType::DATA_PAGE_V2, + PageType::DATA_PAGE => crate::format::PageType::DATA_PAGE, + PageType::INDEX_PAGE => crate::format::PageType::INDEX_PAGE, + PageType::DICTIONARY_PAGE => crate::format::PageType::DICTIONARY_PAGE, + PageType::DATA_PAGE_V2 => crate::format::PageType::DATA_PAGE_V2, + } + } +} + +// ---------------------------------------------------------------------- +// crate::format::PageType <=> PageType conversion + +impl TryFrom for BoundaryOrder { + type Error = ParquetError; + + fn try_from(value: crate::format::BoundaryOrder) -> Result { + Ok(match value { + crate::format::BoundaryOrder::UNORDERED => BoundaryOrder::UNORDERED, + crate::format::BoundaryOrder::ASCENDING => BoundaryOrder::ASCENDING, + crate::format::BoundaryOrder::DESCENDING => BoundaryOrder::DESCENDING, + _ => { + return Err(general_err!( + "unexpected parquet boundary order type: {}", + value.0 + )) + } + }) + } +} + +impl From for crate::format::BoundaryOrder { + fn from(value: BoundaryOrder) -> Self { + match value { + BoundaryOrder::UNORDERED => crate::format::BoundaryOrder::UNORDERED, + BoundaryOrder::ASCENDING => crate::format::BoundaryOrder::ASCENDING, + BoundaryOrder::DESCENDING => crate::format::BoundaryOrder::DESCENDING, } } } @@ -1184,11 +1360,11 @@ impl str::FromStr for LogicalType { "DATE" => Ok(LogicalType::Date), "TIME" => Ok(LogicalType::Time { is_adjusted_to_u_t_c: false, - unit: TimeUnit::MILLIS(parquet::MilliSeconds {}), + unit: TimeUnit::MILLIS, }), "TIMESTAMP" => Ok(LogicalType::Timestamp { is_adjusted_to_u_t_c: false, - unit: TimeUnit::MILLIS(parquet::MilliSeconds {}), + unit: TimeUnit::MILLIS, }), "STRING" => Ok(LogicalType::String), "JSON" => Ok(LogicalType::Json), @@ -1227,35 +1403,50 @@ mod tests { #[test] fn test_from_type() { assert_eq!( - Type::try_from(parquet::Type::BOOLEAN).unwrap(), + Type::try_from(crate::format::Type::BOOLEAN).unwrap(), Type::BOOLEAN ); - assert_eq!(Type::try_from(parquet::Type::INT32).unwrap(), Type::INT32); - assert_eq!(Type::try_from(parquet::Type::INT64).unwrap(), Type::INT64); - assert_eq!(Type::try_from(parquet::Type::INT96).unwrap(), Type::INT96); - assert_eq!(Type::try_from(parquet::Type::FLOAT).unwrap(), Type::FLOAT); - assert_eq!(Type::try_from(parquet::Type::DOUBLE).unwrap(), Type::DOUBLE); assert_eq!( - Type::try_from(parquet::Type::BYTE_ARRAY).unwrap(), + Type::try_from(crate::format::Type::INT32).unwrap(), + Type::INT32 + ); + assert_eq!( + Type::try_from(crate::format::Type::INT64).unwrap(), + Type::INT64 + ); + assert_eq!( + Type::try_from(crate::format::Type::INT96).unwrap(), + Type::INT96 + ); + assert_eq!( + Type::try_from(crate::format::Type::FLOAT).unwrap(), + Type::FLOAT + ); + assert_eq!( + Type::try_from(crate::format::Type::DOUBLE).unwrap(), + Type::DOUBLE + ); + assert_eq!( + Type::try_from(crate::format::Type::BYTE_ARRAY).unwrap(), Type::BYTE_ARRAY ); assert_eq!( - Type::try_from(parquet::Type::FIXED_LEN_BYTE_ARRAY).unwrap(), + Type::try_from(crate::format::Type::FIXED_LEN_BYTE_ARRAY).unwrap(), Type::FIXED_LEN_BYTE_ARRAY ); } #[test] fn test_into_type() { - assert_eq!(parquet::Type::BOOLEAN, Type::BOOLEAN.into()); - assert_eq!(parquet::Type::INT32, Type::INT32.into()); - assert_eq!(parquet::Type::INT64, Type::INT64.into()); - assert_eq!(parquet::Type::INT96, Type::INT96.into()); - assert_eq!(parquet::Type::FLOAT, Type::FLOAT.into()); - assert_eq!(parquet::Type::DOUBLE, Type::DOUBLE.into()); - assert_eq!(parquet::Type::BYTE_ARRAY, Type::BYTE_ARRAY.into()); - assert_eq!( - parquet::Type::FIXED_LEN_BYTE_ARRAY, + assert_eq!(crate::format::Type::BOOLEAN, Type::BOOLEAN.into()); + assert_eq!(crate::format::Type::INT32, Type::INT32.into()); + assert_eq!(crate::format::Type::INT64, Type::INT64.into()); + assert_eq!(crate::format::Type::INT96, Type::INT96.into()); + assert_eq!(crate::format::Type::FLOAT, Type::FLOAT.into()); + assert_eq!(crate::format::Type::DOUBLE, Type::DOUBLE.into()); + assert_eq!(crate::format::Type::BYTE_ARRAY, Type::BYTE_ARRAY.into()); + assert_eq!( + crate::format::Type::FIXED_LEN_BYTE_ARRAY, Type::FIXED_LEN_BYTE_ARRAY.into() ); } @@ -1337,196 +1528,199 @@ mod tests { #[test] fn test_from_converted_type() { - let parquet_conv_none: Option = None; + let parquet_conv_none: Option = None; assert_eq!( ConvertedType::try_from(parquet_conv_none).unwrap(), ConvertedType::NONE ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::UTF8)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::UTF8)).unwrap(), ConvertedType::UTF8 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::MAP)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::MAP)).unwrap(), ConvertedType::MAP ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::MAP_KEY_VALUE)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::MAP_KEY_VALUE)).unwrap(), ConvertedType::MAP_KEY_VALUE ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::LIST)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::LIST)).unwrap(), ConvertedType::LIST ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::ENUM)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::ENUM)).unwrap(), ConvertedType::ENUM ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::DECIMAL)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::DECIMAL)).unwrap(), ConvertedType::DECIMAL ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::DATE)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::DATE)).unwrap(), ConvertedType::DATE ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::TIME_MILLIS)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::TIME_MILLIS)).unwrap(), ConvertedType::TIME_MILLIS ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::TIME_MICROS)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::TIME_MICROS)).unwrap(), ConvertedType::TIME_MICROS ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::TIMESTAMP_MILLIS)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::TIMESTAMP_MILLIS)).unwrap(), ConvertedType::TIMESTAMP_MILLIS ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::TIMESTAMP_MICROS)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::TIMESTAMP_MICROS)).unwrap(), ConvertedType::TIMESTAMP_MICROS ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::UINT_8)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::UINT_8)).unwrap(), ConvertedType::UINT_8 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::UINT_16)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::UINT_16)).unwrap(), ConvertedType::UINT_16 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::UINT_32)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::UINT_32)).unwrap(), ConvertedType::UINT_32 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::UINT_64)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::UINT_64)).unwrap(), ConvertedType::UINT_64 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::INT_8)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::INT_8)).unwrap(), ConvertedType::INT_8 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::INT_16)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::INT_16)).unwrap(), ConvertedType::INT_16 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::INT_32)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::INT_32)).unwrap(), ConvertedType::INT_32 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::INT_64)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::INT_64)).unwrap(), ConvertedType::INT_64 ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::JSON)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::JSON)).unwrap(), ConvertedType::JSON ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::BSON)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::BSON)).unwrap(), ConvertedType::BSON ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::INTERVAL)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::INTERVAL)).unwrap(), ConvertedType::INTERVAL ); assert_eq!( - ConvertedType::try_from(Some(parquet::ConvertedType::DECIMAL)).unwrap(), + ConvertedType::try_from(Some(crate::format::ConvertedType::DECIMAL)).unwrap(), ConvertedType::DECIMAL ) } #[test] fn test_into_converted_type() { - let converted_type: Option = None; + let converted_type: Option = None; assert_eq!(converted_type, ConvertedType::NONE.into()); assert_eq!( - Some(parquet::ConvertedType::UTF8), + Some(crate::format::ConvertedType::UTF8), ConvertedType::UTF8.into() ); - assert_eq!(Some(parquet::ConvertedType::MAP), ConvertedType::MAP.into()); assert_eq!( - Some(parquet::ConvertedType::MAP_KEY_VALUE), + Some(crate::format::ConvertedType::MAP), + ConvertedType::MAP.into() + ); + assert_eq!( + Some(crate::format::ConvertedType::MAP_KEY_VALUE), ConvertedType::MAP_KEY_VALUE.into() ); assert_eq!( - Some(parquet::ConvertedType::LIST), + Some(crate::format::ConvertedType::LIST), ConvertedType::LIST.into() ); assert_eq!( - Some(parquet::ConvertedType::ENUM), + Some(crate::format::ConvertedType::ENUM), ConvertedType::ENUM.into() ); assert_eq!( - Some(parquet::ConvertedType::DECIMAL), + Some(crate::format::ConvertedType::DECIMAL), ConvertedType::DECIMAL.into() ); assert_eq!( - Some(parquet::ConvertedType::DATE), + Some(crate::format::ConvertedType::DATE), ConvertedType::DATE.into() ); assert_eq!( - Some(parquet::ConvertedType::TIME_MILLIS), + Some(crate::format::ConvertedType::TIME_MILLIS), ConvertedType::TIME_MILLIS.into() ); assert_eq!( - Some(parquet::ConvertedType::TIME_MICROS), + Some(crate::format::ConvertedType::TIME_MICROS), ConvertedType::TIME_MICROS.into() ); assert_eq!( - Some(parquet::ConvertedType::TIMESTAMP_MILLIS), + Some(crate::format::ConvertedType::TIMESTAMP_MILLIS), ConvertedType::TIMESTAMP_MILLIS.into() ); assert_eq!( - Some(parquet::ConvertedType::TIMESTAMP_MICROS), + Some(crate::format::ConvertedType::TIMESTAMP_MICROS), ConvertedType::TIMESTAMP_MICROS.into() ); assert_eq!( - Some(parquet::ConvertedType::UINT_8), + Some(crate::format::ConvertedType::UINT_8), ConvertedType::UINT_8.into() ); assert_eq!( - Some(parquet::ConvertedType::UINT_16), + Some(crate::format::ConvertedType::UINT_16), ConvertedType::UINT_16.into() ); assert_eq!( - Some(parquet::ConvertedType::UINT_32), + Some(crate::format::ConvertedType::UINT_32), ConvertedType::UINT_32.into() ); assert_eq!( - Some(parquet::ConvertedType::UINT_64), + Some(crate::format::ConvertedType::UINT_64), ConvertedType::UINT_64.into() ); assert_eq!( - Some(parquet::ConvertedType::INT_8), + Some(crate::format::ConvertedType::INT_8), ConvertedType::INT_8.into() ); assert_eq!( - Some(parquet::ConvertedType::INT_16), + Some(crate::format::ConvertedType::INT_16), ConvertedType::INT_16.into() ); assert_eq!( - Some(parquet::ConvertedType::INT_32), + Some(crate::format::ConvertedType::INT_32), ConvertedType::INT_32.into() ); assert_eq!( - Some(parquet::ConvertedType::INT_64), + Some(crate::format::ConvertedType::INT_64), ConvertedType::INT_64.into() ); assert_eq!( - Some(parquet::ConvertedType::JSON), + Some(crate::format::ConvertedType::JSON), ConvertedType::JSON.into() ); assert_eq!( - Some(parquet::ConvertedType::BSON), + Some(crate::format::ConvertedType::BSON), ConvertedType::BSON.into() ); assert_eq!( - Some(parquet::ConvertedType::INTERVAL), + Some(crate::format::ConvertedType::INTERVAL), ConvertedType::INTERVAL.into() ); assert_eq!( - Some(parquet::ConvertedType::DECIMAL), + Some(crate::format::ConvertedType::DECIMAL), ConvertedType::DECIMAL.into() ) } @@ -1732,42 +1926,42 @@ mod tests { ); assert_eq!( ConvertedType::from(Some(LogicalType::Time { - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, is_adjusted_to_u_t_c: true, })), ConvertedType::TIME_MILLIS ); assert_eq!( ConvertedType::from(Some(LogicalType::Time { - unit: TimeUnit::MICROS(Default::default()), + unit: TimeUnit::MICROS, is_adjusted_to_u_t_c: true, })), ConvertedType::TIME_MICROS ); assert_eq!( ConvertedType::from(Some(LogicalType::Time { - unit: TimeUnit::NANOS(Default::default()), + unit: TimeUnit::NANOS, is_adjusted_to_u_t_c: false, })), ConvertedType::NONE ); assert_eq!( ConvertedType::from(Some(LogicalType::Timestamp { - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, is_adjusted_to_u_t_c: true, })), ConvertedType::TIMESTAMP_MILLIS ); assert_eq!( ConvertedType::from(Some(LogicalType::Timestamp { - unit: TimeUnit::MICROS(Default::default()), + unit: TimeUnit::MICROS, is_adjusted_to_u_t_c: false, })), ConvertedType::TIMESTAMP_MICROS ); assert_eq!( ConvertedType::from(Some(LogicalType::Timestamp { - unit: TimeUnit::NANOS(Default::default()), + unit: TimeUnit::NANOS, is_adjusted_to_u_t_c: false, })), ConvertedType::NONE @@ -1864,15 +2058,15 @@ mod tests { #[test] fn test_from_repetition() { assert_eq!( - Repetition::try_from(parquet::FieldRepetitionType::REQUIRED).unwrap(), + Repetition::try_from(crate::format::FieldRepetitionType::REQUIRED).unwrap(), Repetition::REQUIRED ); assert_eq!( - Repetition::try_from(parquet::FieldRepetitionType::OPTIONAL).unwrap(), + Repetition::try_from(crate::format::FieldRepetitionType::OPTIONAL).unwrap(), Repetition::OPTIONAL ); assert_eq!( - Repetition::try_from(parquet::FieldRepetitionType::REPEATED).unwrap(), + Repetition::try_from(crate::format::FieldRepetitionType::REPEATED).unwrap(), Repetition::REPEATED ); } @@ -1880,15 +2074,15 @@ mod tests { #[test] fn test_into_repetition() { assert_eq!( - parquet::FieldRepetitionType::REQUIRED, + crate::format::FieldRepetitionType::REQUIRED, Repetition::REQUIRED.into() ); assert_eq!( - parquet::FieldRepetitionType::OPTIONAL, + crate::format::FieldRepetitionType::OPTIONAL, Repetition::OPTIONAL.into() ); assert_eq!( - parquet::FieldRepetitionType::REPEATED, + crate::format::FieldRepetitionType::REPEATED, Repetition::REPEATED.into() ); } @@ -1939,54 +2133,57 @@ mod tests { #[test] fn test_from_encoding() { assert_eq!( - Encoding::try_from(parquet::Encoding::PLAIN).unwrap(), + Encoding::try_from(crate::format::Encoding::PLAIN).unwrap(), Encoding::PLAIN ); assert_eq!( - Encoding::try_from(parquet::Encoding::PLAIN_DICTIONARY).unwrap(), + Encoding::try_from(crate::format::Encoding::PLAIN_DICTIONARY).unwrap(), Encoding::PLAIN_DICTIONARY ); assert_eq!( - Encoding::try_from(parquet::Encoding::RLE).unwrap(), + Encoding::try_from(crate::format::Encoding::RLE).unwrap(), Encoding::RLE ); assert_eq!( - Encoding::try_from(parquet::Encoding::BIT_PACKED).unwrap(), + Encoding::try_from(crate::format::Encoding::BIT_PACKED).unwrap(), Encoding::BIT_PACKED ); assert_eq!( - Encoding::try_from(parquet::Encoding::DELTA_BINARY_PACKED).unwrap(), + Encoding::try_from(crate::format::Encoding::DELTA_BINARY_PACKED).unwrap(), Encoding::DELTA_BINARY_PACKED ); assert_eq!( - Encoding::try_from(parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY).unwrap(), + Encoding::try_from(crate::format::Encoding::DELTA_LENGTH_BYTE_ARRAY).unwrap(), Encoding::DELTA_LENGTH_BYTE_ARRAY ); assert_eq!( - Encoding::try_from(parquet::Encoding::DELTA_BYTE_ARRAY).unwrap(), + Encoding::try_from(crate::format::Encoding::DELTA_BYTE_ARRAY).unwrap(), Encoding::DELTA_BYTE_ARRAY ); } #[test] fn test_into_encoding() { - assert_eq!(parquet::Encoding::PLAIN, Encoding::PLAIN.into()); + assert_eq!(crate::format::Encoding::PLAIN, Encoding::PLAIN.into()); assert_eq!( - parquet::Encoding::PLAIN_DICTIONARY, + crate::format::Encoding::PLAIN_DICTIONARY, Encoding::PLAIN_DICTIONARY.into() ); - assert_eq!(parquet::Encoding::RLE, Encoding::RLE.into()); - assert_eq!(parquet::Encoding::BIT_PACKED, Encoding::BIT_PACKED.into()); + assert_eq!(crate::format::Encoding::RLE, Encoding::RLE.into()); + assert_eq!( + crate::format::Encoding::BIT_PACKED, + Encoding::BIT_PACKED.into() + ); assert_eq!( - parquet::Encoding::DELTA_BINARY_PACKED, + crate::format::Encoding::DELTA_BINARY_PACKED, Encoding::DELTA_BINARY_PACKED.into() ); assert_eq!( - parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY, + crate::format::Encoding::DELTA_LENGTH_BYTE_ARRAY, Encoding::DELTA_LENGTH_BYTE_ARRAY.into() ); assert_eq!( - parquet::Encoding::DELTA_BYTE_ARRAY, + crate::format::Encoding::DELTA_BYTE_ARRAY, Encoding::DELTA_BYTE_ARRAY.into() ); } @@ -2023,31 +2220,31 @@ mod tests { #[test] fn test_from_compression() { assert_eq!( - Compression::try_from(parquet::CompressionCodec::UNCOMPRESSED).unwrap(), + Compression::try_from(crate::format::CompressionCodec::UNCOMPRESSED).unwrap(), Compression::UNCOMPRESSED ); assert_eq!( - Compression::try_from(parquet::CompressionCodec::SNAPPY).unwrap(), + Compression::try_from(crate::format::CompressionCodec::SNAPPY).unwrap(), Compression::SNAPPY ); assert_eq!( - Compression::try_from(parquet::CompressionCodec::GZIP).unwrap(), + Compression::try_from(crate::format::CompressionCodec::GZIP).unwrap(), Compression::GZIP(Default::default()) ); assert_eq!( - Compression::try_from(parquet::CompressionCodec::LZO).unwrap(), + Compression::try_from(crate::format::CompressionCodec::LZO).unwrap(), Compression::LZO ); assert_eq!( - Compression::try_from(parquet::CompressionCodec::BROTLI).unwrap(), + Compression::try_from(crate::format::CompressionCodec::BROTLI).unwrap(), Compression::BROTLI(Default::default()) ); assert_eq!( - Compression::try_from(parquet::CompressionCodec::LZ4).unwrap(), + Compression::try_from(crate::format::CompressionCodec::LZ4).unwrap(), Compression::LZ4 ); assert_eq!( - Compression::try_from(parquet::CompressionCodec::ZSTD).unwrap(), + Compression::try_from(crate::format::CompressionCodec::ZSTD).unwrap(), Compression::ZSTD(Default::default()) ); } @@ -2055,25 +2252,31 @@ mod tests { #[test] fn test_into_compression() { assert_eq!( - parquet::CompressionCodec::UNCOMPRESSED, + crate::format::CompressionCodec::UNCOMPRESSED, Compression::UNCOMPRESSED.into() ); assert_eq!( - parquet::CompressionCodec::SNAPPY, + crate::format::CompressionCodec::SNAPPY, Compression::SNAPPY.into() ); assert_eq!( - parquet::CompressionCodec::GZIP, + crate::format::CompressionCodec::GZIP, Compression::GZIP(Default::default()).into() ); - assert_eq!(parquet::CompressionCodec::LZO, Compression::LZO.into()); assert_eq!( - parquet::CompressionCodec::BROTLI, + crate::format::CompressionCodec::LZO, + Compression::LZO.into() + ); + assert_eq!( + crate::format::CompressionCodec::BROTLI, Compression::BROTLI(Default::default()).into() ); - assert_eq!(parquet::CompressionCodec::LZ4, Compression::LZ4.into()); assert_eq!( - parquet::CompressionCodec::ZSTD, + crate::format::CompressionCodec::LZ4, + Compression::LZ4.into() + ); + assert_eq!( + crate::format::CompressionCodec::ZSTD, Compression::ZSTD(Default::default()).into() ); } @@ -2089,33 +2292,39 @@ mod tests { #[test] fn test_from_page_type() { assert_eq!( - PageType::try_from(parquet::PageType::DATA_PAGE).unwrap(), + PageType::try_from(crate::format::PageType::DATA_PAGE).unwrap(), PageType::DATA_PAGE ); assert_eq!( - PageType::try_from(parquet::PageType::INDEX_PAGE).unwrap(), + PageType::try_from(crate::format::PageType::INDEX_PAGE).unwrap(), PageType::INDEX_PAGE ); assert_eq!( - PageType::try_from(parquet::PageType::DICTIONARY_PAGE).unwrap(), + PageType::try_from(crate::format::PageType::DICTIONARY_PAGE).unwrap(), PageType::DICTIONARY_PAGE ); assert_eq!( - PageType::try_from(parquet::PageType::DATA_PAGE_V2).unwrap(), + PageType::try_from(crate::format::PageType::DATA_PAGE_V2).unwrap(), PageType::DATA_PAGE_V2 ); } #[test] fn test_into_page_type() { - assert_eq!(parquet::PageType::DATA_PAGE, PageType::DATA_PAGE.into()); - assert_eq!(parquet::PageType::INDEX_PAGE, PageType::INDEX_PAGE.into()); assert_eq!( - parquet::PageType::DICTIONARY_PAGE, + crate::format::PageType::DATA_PAGE, + PageType::DATA_PAGE.into() + ); + assert_eq!( + crate::format::PageType::INDEX_PAGE, + PageType::INDEX_PAGE.into() + ); + assert_eq!( + crate::format::PageType::DICTIONARY_PAGE, PageType::DICTIONARY_PAGE.into() ); assert_eq!( - parquet::PageType::DATA_PAGE_V2, + crate::format::PageType::DATA_PAGE_V2, PageType::DATA_PAGE_V2.into() ); } @@ -2208,27 +2417,27 @@ mod tests { LogicalType::Date, LogicalType::Time { is_adjusted_to_u_t_c: false, - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, }, LogicalType::Time { is_adjusted_to_u_t_c: false, - unit: TimeUnit::MICROS(Default::default()), + unit: TimeUnit::MICROS, }, LogicalType::Time { is_adjusted_to_u_t_c: true, - unit: TimeUnit::NANOS(Default::default()), + unit: TimeUnit::NANOS, }, LogicalType::Timestamp { is_adjusted_to_u_t_c: false, - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, }, LogicalType::Timestamp { is_adjusted_to_u_t_c: false, - unit: TimeUnit::MICROS(Default::default()), + unit: TimeUnit::MICROS, }, LogicalType::Timestamp { is_adjusted_to_u_t_c: true, - unit: TimeUnit::NANOS(Default::default()), + unit: TimeUnit::NANOS, }, LogicalType::Float16, ]; diff --git a/parquet/src/bin/parquet-index.rs b/parquet/src/bin/parquet-index.rs index 1a9b74dd78fb..e91f5e5a9f17 100644 --- a/parquet/src/bin/parquet-index.rs +++ b/parquet/src/bin/parquet-index.rs @@ -37,10 +37,9 @@ use clap::Parser; use parquet::errors::{ParquetError, Result}; use parquet::file::page_index::index::{Index, PageIndex}; -use parquet::file::page_index::offset_index::OffsetIndexMetaData; +use parquet::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::file::serialized_reader::ReadOptionsBuilder; -use parquet::format::PageLocation; use std::fs::File; #[derive(Debug, Parser)] diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 384a4a10486e..f7dc098bd0bc 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -72,14 +72,11 @@ //! [sbbf-paper]: https://arxiv.org/pdf/2101.01719 //! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf +use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; use crate::data_type::AsBytes; use crate::errors::ParquetError; use crate::file::metadata::ColumnChunkMetaData; use crate::file::reader::ChunkReader; -use crate::format::{ - BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, - SplitBlockAlgorithm, Uncompressed, XxHash, -}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use bytes::Bytes; use std::io::Write; @@ -98,6 +95,43 @@ const SALT: [u32; 8] = [ 0x5c6bfb31_u32, ]; +/// Bloom filter header is stored at beginning of Bloom filter data of each column +/// and followed by its bitset. +/// +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct BloomFilterHeader { + /// The size of bitset in bytes * + pub num_bytes: i32, + /// The algorithm for setting bits. * + pub algorithm: BloomFilterAlgorithm, + /// The hash function used for Bloom filter. * + pub hash: BloomFilterHash, + /// The compression used in the Bloom filter * + pub compression: BloomFilterCompression, +} + +impl From for BloomFilterHeader { + fn from(value: crate::format::BloomFilterHeader) -> Self { + Self { + num_bytes: value.num_bytes, + algorithm: value.algorithm.into(), + hash: value.hash.into(), + compression: value.compression.into(), + } + } +} + +impl From for crate::format::BloomFilterHeader { + fn from(value: BloomFilterHeader) -> Self { + Self { + num_bytes: value.num_bytes, + algorithm: value.algorithm.into(), + hash: value.hash.into(), + compression: value.compression.into(), + } + } +} + /// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits. /// Each word is thought of as an array of bits; each bit is either "set" or "not set". #[derive(Debug, Copy, Clone)] @@ -195,9 +229,9 @@ pub(crate) fn read_bloom_filter_header_and_length( ) -> Result<(BloomFilterHeader, u64), ParquetError> { let total_length = buffer.len(); let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref()); - let header = BloomFilterHeader::read_from_in_protocol(&mut prot) + let header = crate::format::BloomFilterHeader::read_from_in_protocol(&mut prot) .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?; - Ok((header, (total_length - prot.as_slice().len()) as u64)) + Ok((header.into(), (total_length - prot.as_slice().len()) as u64)) } pub(crate) const BITSET_MIN_LENGTH: usize = 32; @@ -262,7 +296,7 @@ impl Sbbf { /// must remember to flush the writer. pub(crate) fn write(&self, mut writer: W) -> Result<(), ParquetError> { let mut protocol = TCompactOutputProtocol::new(&mut writer); - let header = self.header(); + let header: crate::format::BloomFilterHeader = self.header().into(); header.write_to_out_protocol(&mut protocol).map_err(|e| { ParquetError::General(format!("Could not write bloom filter header: {e}")) })?; @@ -305,9 +339,9 @@ impl Sbbf { BloomFilterHeader { // 8 i32 per block, 4 bytes per i32 num_bytes: self.0.len() as i32 * 4 * 8, - algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}), - hash: BloomFilterHash::XXHASH(XxHash {}), - compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}), + algorithm: BloomFilterAlgorithm::BLOCK, + hash: BloomFilterHash::XXHASH, + compression: BloomFilterCompression::UNCOMPRESSED, } } @@ -333,17 +367,17 @@ impl Sbbf { chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?; match header.algorithm { - BloomFilterAlgorithm::BLOCK(_) => { + BloomFilterAlgorithm::BLOCK => { // this match exists to future proof the singleton algorithm enum } } match header.compression { - BloomFilterCompression::UNCOMPRESSED(_) => { + BloomFilterCompression::UNCOMPRESSED => { // this match exists to future proof the singleton compression enum } } match header.hash { - BloomFilterHash::XXHASH(_) => { + BloomFilterHash::XXHASH => { // this match exists to future proof the singleton hash enum } } @@ -471,15 +505,9 @@ mod tests { read_length, ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap(); assert_eq!(read_length, 15); - assert_eq!( - algorithm, - BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}) - ); - assert_eq!( - compression, - BloomFilterCompression::UNCOMPRESSED(Uncompressed {}) - ); - assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {})); + assert_eq!(algorithm, BloomFilterAlgorithm::BLOCK); + assert_eq!(compression, BloomFilterCompression::UNCOMPRESSED); + assert_eq!(hash, BloomFilterHash::XXHASH); assert_eq!(num_bytes, 32_i32); assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE); } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 9374e226b87f..1e6f4f6f0706 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -21,11 +21,14 @@ use bytes::Bytes; use half::f16; use crate::bloom_filter::Sbbf; -use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex}; +use crate::file::page_index::index::Index; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use std::collections::{BTreeSet, VecDeque}; use std::str; -use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type}; +use crate::basic::{ + BoundaryOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, Type, +}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues}; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; @@ -185,9 +188,9 @@ pub struct ColumnCloseResult { /// Optional bloom filter for this column pub bloom_filter: Option, /// Optional column index, for filtering - pub column_index: Option, + pub column_index: Option, /// Optional offset index, identifying page locations - pub offset_index: Option, + pub offset_index: Option, } // Metrics per page @@ -384,7 +387,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } // Disable column_index_builder if not collecting page statistics. - let mut column_index_builder = ColumnIndexBuilder::new(); + let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type()); if statistics_enabled != EnabledStatistics::Page { column_index_builder.to_invalid() } @@ -615,12 +618,12 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }; self.column_index_builder.set_boundary_order(boundary_order); - let column_index = self - .column_index_builder - .valid() - .then(|| self.column_index_builder.build_to_thrift()); + let column_index = match self.column_index_builder.valid() { + true => Some(self.column_index_builder.build()?), + false => None, + }; - let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift()); + let offset_index = self.offset_index_builder.map(|b| b.build()); Ok(ColumnCloseResult { bytes_written: self.column_metrics.total_bytes_written, @@ -2939,19 +2942,29 @@ mod tests { let r = writer.close().unwrap(); assert!(r.column_index.is_some()); let col_idx = r.column_index.unwrap(); + let col_idx = match col_idx { + Index::INT32(col_idx) => col_idx, + _ => panic!("wrong stats type"), + }; // null_pages should be true for page 0 - assert!(col_idx.null_pages[0]); + assert!(col_idx.indexes[0].is_null_page()); // min and max should be empty byte arrays - assert_eq!(col_idx.min_values[0].len(), 0); - assert_eq!(col_idx.max_values[0].len(), 0); + assert!(col_idx.indexes[0].min().is_none()); + assert!(col_idx.indexes[0].max().is_none()); // null_counts should be defined and be 4 for page 0 - assert!(col_idx.null_counts.is_some()); - assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4); + assert!(col_idx.indexes[0].null_count().is_some()); + assert_eq!(col_idx.indexes[0].null_count().unwrap(), 4); // there is no repetition so rep histogram should be absent - assert!(col_idx.repetition_level_histograms.is_none()); + assert!(col_idx.indexes[0].repetition_level_histogram().is_none()); // definition_level_histogram should be present and should be 0:4, 1:0 - assert!(col_idx.definition_level_histograms.is_some()); - assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]); + assert!(col_idx.indexes[0].definition_level_histogram().is_some()); + assert_eq!( + col_idx.indexes[0] + .definition_level_histogram() + .unwrap() + .values(), + &[4, 0] + ); } #[test] @@ -2974,12 +2987,16 @@ mod tests { assert_eq!(8, r.rows_written); // column index - assert_eq!(2, column_index.null_pages.len()); + let column_index = match column_index { + Index::INT32(column_index) => column_index, + _ => panic!("wrong stats type"), + }; + assert_eq!(2, column_index.indexes.len()); assert_eq!(2, offset_index.page_locations.len()); assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order); for idx in 0..2 { - assert!(!column_index.null_pages[idx]); - assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]); + assert!(!column_index.indexes[idx].is_null_page()); + assert_eq!(0, *column_index.indexes[idx].null_count.as_ref().unwrap()); } if let Some(stats) = r.metadata.statistics() { @@ -2989,14 +3006,8 @@ mod tests { // first page is [1,2,3,4] // second page is [-5,2,4,8] // note that we don't increment here, as this is a non BinaryArray type. - assert_eq!( - stats.min_bytes_opt(), - Some(column_index.min_values[1].as_slice()) - ); - assert_eq!( - stats.max_bytes_opt(), - column_index.max_values.get(1).map(Vec::as_slice) - ); + assert_eq!(stats.min_opt(), column_index.indexes[1].min()); + assert_eq!(stats.max_opt(), column_index.indexes[1].max()); } else { panic!("expecting Statistics::Int32"); } @@ -3036,37 +3047,36 @@ mod tests { let column_index = r.column_index.unwrap(); let offset_index = r.offset_index.unwrap(); + let column_index = match column_index { + Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index, + _ => panic!("wrong stats type"), + }; + assert_eq!(3, r.rows_written); // column index - assert_eq!(1, column_index.null_pages.len()); + assert_eq!(1, column_index.indexes.len()); assert_eq!(1, offset_index.page_locations.len()); assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order); - assert!(!column_index.null_pages[0]); - assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]); + assert!(!column_index.indexes[0].is_null_page()); + assert_eq!(Some(0), column_index.indexes[0].null_count()); if let Some(stats) = r.metadata.statistics() { assert_eq!(stats.null_count_opt(), Some(0)); assert_eq!(stats.distinct_count_opt(), None); if let Statistics::FixedLenByteArray(stats) = stats { - let column_index_min_value = &column_index.min_values[0]; - let column_index_max_value = &column_index.max_values[0]; + let column_index_min_value = column_index.indexes[0].min_bytes().unwrap(); + let column_index_max_value = column_index.indexes[0].max_bytes().unwrap(); // Column index stats are truncated, while the column chunk's aren't. - assert_ne!( - stats.min_bytes_opt(), - Some(column_index_min_value.as_slice()) - ); - assert_ne!( - stats.max_bytes_opt(), - Some(column_index_max_value.as_slice()) - ); + assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value); + assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value); assert_eq!( column_index_min_value.len(), DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap() ); - assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]); + assert_eq!(column_index_min_value, &[97_u8; 64]); assert_eq!( column_index_max_value.len(), DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap() @@ -3108,27 +3118,32 @@ mod tests { let column_index = r.column_index.unwrap(); let offset_index = r.offset_index.unwrap(); + let column_index = match column_index { + Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index, + _ => panic!("wrong stats type"), + }; + assert_eq!(1, r.rows_written); // column index - assert_eq!(1, column_index.null_pages.len()); + assert_eq!(1, column_index.indexes.len()); assert_eq!(1, offset_index.page_locations.len()); assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order); - assert!(!column_index.null_pages[0]); - assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]); + assert!(!column_index.indexes[0].is_null_page()); + assert_eq!(Some(0), column_index.indexes[0].null_count()); if let Some(stats) = r.metadata.statistics() { assert_eq!(stats.null_count_opt(), Some(0)); assert_eq!(stats.distinct_count_opt(), None); if let Statistics::FixedLenByteArray(_stats) = stats { - let column_index_min_value = &column_index.min_values[0]; - let column_index_max_value = &column_index.max_values[0]; + let column_index_min_value = column_index.indexes[0].min_bytes().unwrap(); + let column_index_max_value = column_index.indexes[0].max_bytes().unwrap(); assert_eq!(column_index_min_value.len(), 1); assert_eq!(column_index_max_value.len(), 1); - assert_eq!("B".as_bytes(), column_index_min_value.as_slice()); - assert_eq!("C".as_bytes(), column_index_max_value.as_slice()); + assert_eq!("B".as_bytes(), column_index_min_value); + assert_eq!("C".as_bytes(), column_index_max_value); assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap()); assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap()); @@ -3158,8 +3173,12 @@ mod tests { // stats should still be written // ensure bytes weren't truncated for column index let column_index = r.column_index.unwrap(); - let column_index_min_bytes = column_index.min_values[0].as_slice(); - let column_index_max_bytes = column_index.max_values[0].as_slice(); + let column_index = match column_index { + Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index, + _ => panic!("wrong stats type"), + }; + let column_index_min_bytes = column_index.indexes[0].min_bytes().unwrap(); + let column_index_max_bytes = column_index.indexes[0].min_bytes().unwrap(); assert_eq!(expected_value, column_index_min_bytes); assert_eq!(expected_value, column_index_max_bytes); @@ -3197,8 +3216,12 @@ mod tests { // stats should still be written // ensure bytes weren't truncated for column index let column_index = r.column_index.unwrap(); - let column_index_min_bytes = column_index.min_values[0].as_slice(); - let column_index_max_bytes = column_index.max_values[0].as_slice(); + let column_index = match column_index { + Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index, + _ => panic!("wrong stats type"), + }; + let column_index_min_bytes = column_index.indexes[0].min_bytes().unwrap(); + let column_index_max_bytes = column_index.indexes[0].min_bytes().unwrap(); assert_eq!(expected_value, column_index_min_bytes); assert_eq!(expected_value, column_index_max_bytes); @@ -3678,8 +3701,11 @@ mod tests { &[Some(-5), Some(11)], ], )?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); // min max both descending let column_close_result = write_multiple_pages::( @@ -3691,34 +3717,49 @@ mod tests { &[Some(-5), Some(0)], ], )?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::DESCENDING); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING)); // min max both equal let column_close_result = write_multiple_pages::( &descr, &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]], )?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); // only nulls let column_close_result = write_multiple_pages::(&descr, &[&[None], &[None], &[None]])?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); // one page let column_close_result = write_multiple_pages::(&descr, &[&[Some(-10), Some(10)]])?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); // one non-null page let column_close_result = write_multiple_pages::(&descr, &[&[Some(-10), Some(10)], &[None]])?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); // min max both unordered let column_close_result = write_multiple_pages::( @@ -3730,8 +3771,11 @@ mod tests { &[Some(-5), Some(0)], ], )?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::UNORDERED); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED)); // min max both ordered in different orders let column_close_result = write_multiple_pages::( @@ -3743,8 +3787,11 @@ mod tests { &[Some(3), Some(7)], ], )?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::UNORDERED); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED)); Ok(()) } @@ -3781,14 +3828,20 @@ mod tests { // f16 descending let column_close_result = write_multiple_pages::(&f16_descr, values)?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::DESCENDING); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING)); // same bytes, but fba unordered let column_close_result = write_multiple_pages::(&fba_descr, values)?; - let boundary_order = column_close_result.column_index.unwrap().boundary_order; - assert_eq!(boundary_order, BoundaryOrder::UNORDERED); + let boundary_order = column_close_result + .column_index + .unwrap() + .get_boundary_order(); + assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED)); Ok(()) } diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs index ad452267901a..0b8d3b336fc0 100644 --- a/parquet/src/file/metadata/memory.rs +++ b/parquet/src/file/metadata/memory.rs @@ -18,14 +18,15 @@ //! Memory calculations for [`ParquetMetadata::memory_size`] //! //! [`ParquetMetadata::memory_size`]: crate::file::metadata::ParquetMetaData::memory_size -use crate::basic::{ColumnOrder, Compression, Encoding, PageType}; +use crate::basic::{BoundaryOrder, ColumnOrder, Compression, Encoding, PageType}; use crate::data_type::private::ParquetValueType; -use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData}; +use crate::file::metadata::{ + ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData, SortingColumn, +}; use crate::file::page_encoding_stats::PageEncodingStats; use crate::file::page_index::index::{Index, NativeIndex, PageIndex}; -use crate::file::page_index::offset_index::OffsetIndexMetaData; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use crate::file::statistics::{Statistics, ValueStatistics}; -use crate::format::{BoundaryOrder, PageLocation, SortingColumn}; use std::sync::Arc; /// Trait for calculating the size of various containers diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 04129c6aa482..193b70d9dd4a 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -101,25 +101,32 @@ use crate::encryption::{ decrypt::FileDecryptor, modules::{create_module_aad, ModuleType}, }; -use crate::errors::{ParquetError, Result}; #[cfg(feature = "encryption")] use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData}; pub(crate) use crate::file::metadata::memory::HeapSize; -use crate::file::page_encoding_stats::{self, PageEncodingStats}; -use crate::file::page_index::index::Index; -use crate::file::page_index::offset_index::OffsetIndexMetaData; -use crate::file::statistics::{self, Statistics}; -use crate::format::ColumnCryptoMetaData as TColumnCryptoMetaData; -use crate::format::{ - BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, - SizeStatistics, SortingColumn, +use crate::file::page_index::index::{Index, NativeIndex}; +use crate::file::{ + page_encoding_stats::{self, PageEncodingStats}, + page_index::offset_index::PageLocation, +}; +use crate::file::{ + page_index::index::PageIndex, + statistics::{self, Statistics}, }; +use crate::format::ColumnCryptoMetaData as TColumnCryptoMetaData; use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, }; #[cfg(feature = "encryption")] use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; +use crate::{ + basic::BoundaryOrder, + errors::{ParquetError, Result}, +}; +use crate::{ + data_type::private::ParquetValueType, file::page_index::offset_index::OffsetIndexMetaData, +}; pub use reader::{FooterTail, ParquetMetaDataReader}; use std::ops::Range; use std::sync::Arc; @@ -141,6 +148,7 @@ pub(crate) use writer::ThriftMetadataWriter; /// column in the third row group of the parquet file. /// /// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [`ColumnIndex`]: crate::format::ColumnIndex pub type ParquetColumnIndex = Vec>; /// [`OffsetIndexMetaData`] for each data page of each row group of each column @@ -153,6 +161,7 @@ pub type ParquetColumnIndex = Vec>; /// `column_number`of row group `row_group_number`. /// /// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [`OffsetIndex`]: crate::format::OffsetIndex pub type ParquetOffsetIndex = Vec>; /// Parsed metadata for a single Parquet file @@ -415,7 +424,26 @@ impl From for ParquetMetaDataBuilder { } /// A key-value pair for [`FileMetaData`]. -pub type KeyValue = crate::format::KeyValue; +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct KeyValue { + /// The key. + pub key: String, + /// An optional value. + pub value: Option, +} + +impl KeyValue { + /// Create a new key value pair + pub fn new(key: String, value: F2) -> KeyValue + where + F2: Into>, + { + KeyValue { + key, + value: value.into(), + } + } +} /// Reference counted pointer for [`FileMetaData`]. pub type FileMetaDataPtr = Arc; @@ -518,6 +546,38 @@ impl FileMetaData { } } +/// Sort order within a RowGroup of a leaf column +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SortingColumn { + /// The ordinal position of the column (in this row group) * + pub column_idx: i32, + /// If true, indicates this column is sorted in descending order. * + pub descending: bool, + /// If true, nulls will come before non-null values, otherwise, + /// nulls go at the end. + pub nulls_first: bool, +} + +impl From<&crate::format::SortingColumn> for SortingColumn { + fn from(value: &crate::format::SortingColumn) -> Self { + Self { + column_idx: value.column_idx, + descending: value.descending, + nulls_first: value.nulls_first, + } + } +} + +impl From<&SortingColumn> for crate::format::SortingColumn { + fn from(value: &SortingColumn) -> Self { + Self { + column_idx: value.column_idx, + descending: value.descending, + nulls_first: value.nulls_first, + } + } +} + /// Reference counted pointer for [`RowGroupMetaData`]. pub type RowGroupMetaDataPtr = Arc; @@ -613,7 +673,7 @@ impl RowGroupMetaData { #[cfg(feature = "encryption")] fn from_encrypted_thrift( schema_descr: SchemaDescPtr, - mut rg: RowGroup, + mut rg: crate::format::RowGroup, decryptor: Option<&FileDecryptor>, ) -> Result { if schema_descr.num_columns() != rg.columns.len() { @@ -673,12 +733,18 @@ impl RowGroupMetaData { })?; let mut prot = TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice()); - c.meta_data = Some(ColumnMetaData::read_from_in_protocol(&mut prot)?); + c.meta_data = Some(crate::format::ColumnMetaData::read_from_in_protocol( + &mut prot, + )?); } columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?); } - let sorting_columns = rg.sorting_columns; + let sorting_columns = rg.sorting_columns.map(|scs| { + scs.iter() + .map(|sc| sc.into()) + .collect::>() + }); Ok(RowGroupMetaData { columns, num_rows, @@ -691,7 +757,10 @@ impl RowGroupMetaData { } /// Method to convert from Thrift. - pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup) -> Result { + pub fn from_thrift( + schema_descr: SchemaDescPtr, + mut rg: crate::format::RowGroup, + ) -> Result { if schema_descr.num_columns() != rg.columns.len() { return Err(general_err!( "Column count mismatch. Schema has {} columns while Row Group has {}", @@ -707,7 +776,11 @@ impl RowGroupMetaData { columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?); } - let sorting_columns = rg.sorting_columns; + let sorting_columns = rg.sorting_columns.map(|scs| { + scs.iter() + .map(|sc| sc.into()) + .collect::>() + }); Ok(RowGroupMetaData { columns, num_rows, @@ -720,12 +793,17 @@ impl RowGroupMetaData { } /// Method to convert to Thrift. - pub fn to_thrift(&self) -> RowGroup { - RowGroup { + pub fn to_thrift(&self) -> crate::format::RowGroup { + let sorting_columns = self.sorting_columns().map(|scs| { + scs.iter() + .map(|sc| sc.into()) + .collect::>() + }); + crate::format::RowGroup { columns: self.columns().iter().map(|v| v.to_thrift()).collect(), total_byte_size: self.total_byte_size, num_rows: self.num_rows, - sorting_columns: self.sorting_columns().cloned(), + sorting_columns, file_offset: self.file_offset(), total_compressed_size: Some(self.compressed_size()), ordinal: self.ordinal, @@ -1143,11 +1221,14 @@ impl ColumnChunkMetaData { } /// Method to convert from Thrift. - pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result { + pub fn from_thrift( + column_descr: ColumnDescPtr, + cc: crate::format::ColumnChunk, + ) -> Result { if cc.meta_data.is_none() { return Err(general_err!("Expected to have column metadata")); } - let mut col_metadata: ColumnMetaData = cc.meta_data.unwrap(); + let mut col_metadata: crate::format::ColumnMetaData = cc.meta_data.unwrap(); let column_type = Type::try_from(col_metadata.type_)?; let encodings = col_metadata .encodings @@ -1233,10 +1314,10 @@ impl ColumnChunkMetaData { } /// Method to convert to Thrift. - pub fn to_thrift(&self) -> ColumnChunk { + pub fn to_thrift(&self) -> crate::format::ColumnChunk { let column_metadata = self.to_column_metadata_thrift(); - ColumnChunk { + crate::format::ColumnChunk { file_path: self.file_path().map(|s| s.to_owned()), file_offset: self.file_offset, meta_data: Some(column_metadata), @@ -1250,7 +1331,7 @@ impl ColumnChunkMetaData { } /// Method to convert to Thrift `ColumnMetaData` - pub fn to_column_metadata_thrift(&self) -> ColumnMetaData { + pub fn to_column_metadata_thrift(&self) -> crate::format::ColumnMetaData { let size_statistics = if self.unencoded_byte_array_data_bytes.is_some() || self.repetition_level_histogram.is_some() || self.definition_level_histogram.is_some() @@ -1265,7 +1346,7 @@ impl ColumnChunkMetaData { .as_ref() .map(|hist| hist.clone().into_inner()); - Some(SizeStatistics { + Some(crate::format::SizeStatistics { unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes, repetition_level_histogram, definition_level_histogram, @@ -1274,7 +1355,7 @@ impl ColumnChunkMetaData { None }; - ColumnMetaData { + crate::format::ColumnMetaData { type_: self.column_type().into(), encodings: self.encodings().iter().map(|&v| v.into()).collect(), path_in_schema: self.column_path().as_ref().to_vec(), @@ -1517,7 +1598,9 @@ impl ColumnChunkMetaDataBuilder { /// Builder for Parquet [`ColumnIndex`], part of the Parquet [PageIndex] /// /// [PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [`ColumnIndex`]: crate::format::ColumnIndex pub struct ColumnIndexBuilder { + column_type: Type, null_pages: Vec, min_values: Vec>, max_values: Vec>, @@ -1537,16 +1620,11 @@ pub struct ColumnIndexBuilder { valid: bool, } -impl Default for ColumnIndexBuilder { - fn default() -> Self { - Self::new() - } -} - impl ColumnIndexBuilder { /// Creates a new column index builder. - pub fn new() -> Self { + pub fn new(column_type: Type) -> Self { ColumnIndexBuilder { + column_type, null_pages: Vec::new(), min_values: Vec::new(), max_values: Vec::new(), @@ -1574,6 +1652,8 @@ impl ColumnIndexBuilder { /// Append the given page-level histograms to the [`ColumnIndex`] histograms. /// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state. + /// + /// [`ColumnIndex`]: crate::format::ColumnIndex pub fn append_histograms( &mut self, repetition_level_histogram: &Option, @@ -1612,17 +1692,151 @@ impl ColumnIndexBuilder { /// Build and get the thrift metadata of column index /// /// Note: callers should check [`Self::valid`] before calling this method - pub fn build_to_thrift(self) -> ColumnIndex { - ColumnIndex::new( + pub fn build_to_thrift(self) -> crate::format::ColumnIndex { + crate::format::ColumnIndex::new( self.null_pages, self.min_values, self.max_values, - self.boundary_order, + self.boundary_order.into(), self.null_counts, self.repetition_level_histograms, self.definition_level_histograms, ) } + + /// Build and get the column index + /// + /// Note: callers should check [`Self::valid`] before calling this method + pub fn build(self) -> Result { + Ok(match self.column_type { + Type::BOOLEAN => { + let (indexes, boundary_order) = self.build_page_index()?; + Index::BOOLEAN(NativeIndex { + indexes, + boundary_order, + }) + } + Type::INT32 => { + let (indexes, boundary_order) = self.build_page_index()?; + Index::INT32(NativeIndex { + indexes, + boundary_order, + }) + } + Type::INT64 => { + let (indexes, boundary_order) = self.build_page_index()?; + Index::INT64(NativeIndex { + indexes, + boundary_order, + }) + } + Type::INT96 => { + let (indexes, boundary_order) = self.build_page_index()?; + Index::INT96(NativeIndex { + indexes, + boundary_order, + }) + } + Type::FLOAT => { + let (indexes, boundary_order) = self.build_page_index()?; + Index::FLOAT(NativeIndex { + indexes, + boundary_order, + }) + } + Type::DOUBLE => { + let (indexes, boundary_order) = self.build_page_index()?; + Index::DOUBLE(NativeIndex { + indexes, + boundary_order, + }) + } + Type::BYTE_ARRAY => { + let (indexes, boundary_order) = self.build_page_index()?; + Index::BYTE_ARRAY(NativeIndex { + indexes, + boundary_order, + }) + } + Type::FIXED_LEN_BYTE_ARRAY => { + let (indexes, boundary_order) = self.build_page_index()?; + Index::FIXED_LEN_BYTE_ARRAY(NativeIndex { + indexes, + boundary_order, + }) + } + }) + } + + fn build_page_index(self) -> Result<(Vec>, BoundaryOrder)> + where + T: ParquetValueType, + { + let len = self.min_values.len(); + + let null_counts = self + .null_counts + .iter() + .map(|x| Some(*x)) + .collect::>(); + + // histograms are a 1D array encoding a 2D num_pages X num_levels matrix. + let to_page_histograms = |opt_hist: Option>| { + if let Some(hist) = opt_hist { + // TODO: should we assert (hist.len() % len) == 0? + let num_levels = hist.len() / len; + let mut res = Vec::with_capacity(len); + for i in 0..len { + let page_idx = i * num_levels; + let page_hist = hist[page_idx..page_idx + num_levels].to_vec(); + res.push(Some(LevelHistogram::from(page_hist))); + } + res + } else { + vec![None; len] + } + }; + + let rep_hists: Vec> = + to_page_histograms(self.repetition_level_histograms); + let def_hists: Vec> = + to_page_histograms(self.definition_level_histograms); + + let indexes = self + .min_values + .iter() + .zip(self.max_values.iter()) + .zip(self.null_pages.into_iter()) + .zip(null_counts.into_iter()) + .zip(rep_hists.into_iter()) + .zip(def_hists.into_iter()) + .map( + |( + ((((min, max), is_null), null_count), repetition_level_histogram), + definition_level_histogram, + )| { + let (min, max) = if is_null { + (None, None) + } else { + ( + Some(T::try_from_le_slice(min)?), + Some(T::try_from_le_slice(max)?), + ) + }; + Ok(PageIndex { + min, + max, + null_count, + repetition_level_histogram, + definition_level_histogram, + }) + }, + ) + .collect::, ParquetError>>()?; + + let boundary_order = self.boundary_order; + Ok((indexes, boundary_order)) + } } impl From for ColumnChunkMetaDataBuilder { @@ -1686,15 +1900,36 @@ impl OffsetIndexBuilder { } /// Build and get the thrift metadata of offset index - pub fn build_to_thrift(self) -> OffsetIndex { + pub fn build_to_thrift(self) -> crate::format::OffsetIndex { let locations = self .offset_array .iter() .zip(self.compressed_page_size_array.iter()) .zip(self.first_row_index_array.iter()) - .map(|((offset, size), row_index)| PageLocation::new(*offset, *size, *row_index)) + .map(|((offset, size), row_index)| { + crate::format::PageLocation::new(*offset, *size, *row_index) + }) .collect::>(); - OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array) + crate::format::OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array) + } + + /// Build and get the thrift metadata of offset index + pub fn build(self) -> OffsetIndexMetaData { + let locations = self + .offset_array + .iter() + .zip(self.compressed_page_size_array.iter()) + .zip(self.first_row_index_array.iter()) + .map(|((offset, size), row_index)| PageLocation { + offset: *offset, + compressed_page_size: *size, + first_row_index: *row_index, + }) + .collect::>(); + OffsetIndexMetaData { + page_locations: locations, + unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes_array, + } } } @@ -1974,7 +2209,7 @@ mod tests { assert_eq!(parquet_meta.memory_size(), base_expected_size); - let mut column_index = ColumnIndexBuilder::new(); + let mut column_index = ColumnIndexBuilder::new(Type::BOOLEAN); column_index.append(false, vec![1u8], vec![2u8, 3u8], 4); let column_index = column_index.build_to_thrift(); let native_index = NativeIndex::::try_new(column_index).unwrap(); diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 356713837530..53ae01221976 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -17,12 +17,12 @@ use std::{io::Read, ops::Range, sync::Arc}; -use crate::basic::ColumnOrder; #[cfg(feature = "encryption")] use crate::encryption::{ decrypt::{FileDecryptionProperties, FileDecryptor}, modules::create_footer_aad, }; +use crate::{basic::ColumnOrder, file::metadata::KeyValue}; use bytes::Bytes; use crate::errors::{ParquetError, Result}; @@ -31,7 +31,6 @@ use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::reader::ChunkReader; use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; -use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; #[cfg(feature = "encryption")] use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}; use crate::schema::types; @@ -947,7 +946,7 @@ impl ParquetMetaDataReader { } } - let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) + let t_file_metadata = crate::format::FileMetaData::read_from_in_protocol(&mut prot) .map_err(|e| general_err!("Could not parse metadata: {}", e))?; let schema = types::from_thrift(&t_file_metadata.schema)?; let schema_descr = Arc::new(SchemaDescriptor::new(schema)); @@ -980,11 +979,17 @@ impl ParquetMetaDataReader { let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; + let key_value_metadata = t_file_metadata.key_value_metadata.map(|vkv| { + vkv.into_iter() + .map(|kv| KeyValue::new(kv.key, kv.value)) + .collect::>() + }); + let file_metadata = FileMetaData::new( t_file_metadata.version, t_file_metadata.num_rows, t_file_metadata.created_by, - t_file_metadata.key_value_metadata, + key_value_metadata, schema_descr, column_orders, ); @@ -1005,7 +1010,7 @@ impl ParquetMetaDataReader { pub fn decode_metadata(buf: &[u8]) -> Result { let mut prot = TCompactSliceInputProtocol::new(buf); - let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) + let t_file_metadata = crate::format::FileMetaData::read_from_in_protocol(&mut prot) .map_err(|e| general_err!("Could not parse metadata: {}", e))?; let schema = types::from_thrift(&t_file_metadata.schema)?; let schema_descr = Arc::new(SchemaDescriptor::new(schema)); @@ -1017,11 +1022,17 @@ impl ParquetMetaDataReader { let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; + let key_value_metadata = t_file_metadata.key_value_metadata.map(|vkv| { + vkv.into_iter() + .map(|kv| KeyValue::new(kv.key, kv.value)) + .collect::>() + }); + let file_metadata = FileMetaData::new( t_file_metadata.version, t_file_metadata.num_rows, t_file_metadata.created_by, - t_file_metadata.key_value_metadata, + key_value_metadata, schema_descr, column_orders, ); @@ -1032,7 +1043,7 @@ impl ParquetMetaDataReader { /// Parses column orders from Thrift definition. /// If no column orders are defined, returns `None`. fn parse_column_orders( - t_column_orders: Option>, + t_column_orders: Option>, schema_descr: &SchemaDescriptor, ) -> Result>> { match t_column_orders { @@ -1044,7 +1055,7 @@ impl ParquetMetaDataReader { let mut res = Vec::new(); for (i, column) in schema_descr.columns().iter().enumerate() { match orders[i] { - TColumnOrder::TYPEORDER(_) => { + crate::format::ColumnOrder::TYPEORDER(_) => { let sort_order = ColumnOrder::get_sort_order( column.logical_type(), column.converted_type(), @@ -1099,7 +1110,6 @@ mod tests { use crate::basic::SortOrder; use crate::basic::Type; use crate::file::reader::Length; - use crate::format::TypeDefinedOrder; use crate::schema::types::Type as SchemaType; use crate::util::test_common::file_util::get_test_file; @@ -1153,8 +1163,8 @@ mod tests { let schema_descr = SchemaDescriptor::new(Arc::new(schema)); let t_column_orders = Some(vec![ - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + crate::format::ColumnOrder::TYPEORDER(Default::default()), + crate::format::ColumnOrder::TYPEORDER(Default::default()), ]); assert_eq!( @@ -1177,7 +1187,9 @@ mod tests { let schema = SchemaType::group_type_builder("schema").build().unwrap(); let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); + let t_column_orders = Some(vec![crate::format::ColumnOrder::TYPEORDER( + Default::default(), + )]); let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr); assert!(res.is_err()); diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 5bb59b6b2faf..acae20ec3cef 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -31,7 +31,6 @@ use crate::file::writer::{get_file_magic, TrackedWrite}; use crate::format::EncryptionAlgorithm; #[cfg(feature = "encryption")] use crate::format::{AesGcmV1, ColumnCryptoMetaData}; -use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup}; use crate::schema::types; use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr}; use crate::thrift::TSerializable; @@ -46,9 +45,9 @@ pub(crate) struct ThriftMetadataWriter<'a, W: Write> { buf: &'a mut TrackedWrite, schema: &'a TypePtr, schema_descr: &'a SchemaDescPtr, - row_groups: Vec, - column_indexes: Option<&'a [Vec>]>, - offset_indexes: Option<&'a [Vec>]>, + row_groups: Vec, + column_indexes: Option<&'a [Vec>]>, + offset_indexes: Option<&'a [Vec>]>, key_value_metadata: Option>, created_by: Option, object_writer: MetadataObjectWriter, @@ -61,7 +60,10 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { /// Note: also updates the `ColumnChunk::offset_index_offset` and /// `ColumnChunk::offset_index_length` to reflect the position and length /// of the serialized offset indexes. - fn write_offset_indexes(&mut self, offset_indexes: &[Vec>]) -> Result<()> { + fn write_offset_indexes( + &mut self, + offset_indexes: &[Vec>], + ) -> Result<()> { // iter row group // iter each column // write offset index to the file @@ -91,7 +93,10 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { /// Note: also updates the `ColumnChunk::column_index_offset` and /// `ColumnChunk::column_index_length` to reflect the position and length /// of the serialized column indexes. - fn write_column_indexes(&mut self, column_indexes: &[Vec>]) -> Result<()> { + fn write_column_indexes( + &mut self, + column_indexes: &[Vec>], + ) -> Result<()> { // iter row group // iter each column // write column index to the file @@ -146,10 +151,15 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { let (encryption_algorithm, footer_signing_key_metadata) = self.object_writer.get_plaintext_footer_crypto_metadata(); - let mut file_metadata = FileMetaData { + let key_value_metadata = self.key_value_metadata.map(|vkv| { + vkv.into_iter() + .map(|kv| crate::format::KeyValue::new(kv.key, kv.value)) + .collect::>() + }); + let mut file_metadata = crate::format::FileMetaData { num_rows, row_groups, - key_value_metadata: self.key_value_metadata.clone(), + key_value_metadata, version: self.writer_version, schema: types::to_thrift(self.schema.as_ref())?, created_by: self.created_by.clone(), @@ -185,7 +195,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { buf: &'a mut TrackedWrite, schema: &'a TypePtr, schema_descr: &'a SchemaDescPtr, - row_groups: Vec, + row_groups: Vec, created_by: Option, writer_version: i32, ) -> Self { @@ -203,12 +213,18 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { } } - pub fn with_column_indexes(mut self, column_indexes: &'a [Vec>]) -> Self { + pub fn with_column_indexes( + mut self, + column_indexes: &'a [Vec>], + ) -> Self { self.column_indexes = Some(column_indexes); self } - pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec>]) -> Self { + pub fn with_offset_indexes( + mut self, + offset_indexes: &'a [Vec>], + ) -> Self { self.offset_indexes = Some(offset_indexes); self } @@ -257,6 +273,8 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { /// /// [`FileMetaData`]: crate::format::FileMetaData /// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData +/// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md /// /// ```text /// ┌──────────────────────┐ @@ -365,7 +383,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { Ok(()) } - fn convert_column_indexes(&self) -> Vec>> { + fn convert_column_indexes(&self) -> Vec>> { if let Some(row_group_column_indexes) = self.metadata.column_index() { (0..self.metadata.row_groups().len()) .map(|rg_idx| { @@ -398,7 +416,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { } } - fn convert_offset_index(&self) -> Vec>> { + fn convert_offset_index(&self) -> Vec>> { if let Some(row_group_offset_indexes) = self.metadata.offset_index() { (0..self.metadata.row_groups().len()) .map(|rg_idx| { @@ -439,15 +457,19 @@ impl MetadataObjectWriter { #[cfg(not(feature = "encryption"))] impl MetadataObjectWriter { /// Write [`FileMetaData`] in Thrift format - fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> { + fn write_file_metadata( + &self, + file_metadata: &crate::format::FileMetaData, + sink: impl Write, + ) -> Result<()> { Self::write_object(file_metadata, sink) } /// Write a column [`OffsetIndex`] in Thrift format fn write_offset_index( &self, - offset_index: &OffsetIndex, - _column_chunk: &ColumnChunk, + offset_index: &crate::format::OffsetIndex, + _column_chunk: &crate::format::ColumnChunk, _row_group_idx: usize, _column_idx: usize, sink: impl Write, @@ -458,8 +480,8 @@ impl MetadataObjectWriter { /// Write a column [`ColumnIndex`] in Thrift format fn write_column_index( &self, - column_index: &ColumnIndex, - _column_chunk: &ColumnChunk, + column_index: &crate::format::ColumnIndex, + _column_chunk: &crate::format::ColumnChunk, _row_group_idx: usize, _column_idx: usize, sink: impl Write, @@ -470,8 +492,11 @@ impl MetadataObjectWriter { /// No-op implementation of row-group metadata encryption fn apply_row_group_encryption( &self, - row_groups: Vec, - ) -> Result<(Vec, Option>)> { + row_groups: Vec, + ) -> Result<( + Vec, + Option>, + )> { Ok((row_groups, None)) } @@ -497,9 +522,11 @@ impl MetadataObjectWriter { } /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required + /// + /// [`FileMetaData`]: crate::format::FileMetaData fn write_file_metadata( &self, - file_metadata: &FileMetaData, + file_metadata: &crate::format::FileMetaData, mut sink: impl Write, ) -> Result<()> { match self.file_encryptor.as_ref() { @@ -524,10 +551,12 @@ impl MetadataObjectWriter { } /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required + /// + /// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md fn write_offset_index( &self, - offset_index: &OffsetIndex, - column_chunk: &ColumnChunk, + offset_index: &crate::format::OffsetIndex, + column_chunk: &crate::format::ColumnChunk, row_group_idx: usize, column_idx: usize, sink: impl Write, @@ -547,10 +576,12 @@ impl MetadataObjectWriter { } /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required + /// + /// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md fn write_column_index( &self, - column_index: &ColumnIndex, - column_chunk: &ColumnChunk, + column_index: &crate::format::ColumnIndex, + column_chunk: &crate::format::ColumnChunk, row_group_idx: usize, column_idx: usize, sink: impl Write, @@ -574,8 +605,11 @@ impl MetadataObjectWriter { /// and possibly unencrypted metadata to be returned to clients if data was encrypted. fn apply_row_group_encryption( &self, - row_groups: Vec, - ) -> Result<(Vec, Option>)> { + row_groups: Vec, + ) -> Result<( + Vec, + Option>, + )> { match &self.file_encryptor { Some(file_encryptor) => { let unencrypted_row_groups = row_groups.clone(); @@ -599,7 +633,7 @@ impl MetadataObjectWriter { object: &impl TSerializable, mut sink: impl Write, file_encryptor: &FileEncryptor, - column_metadata: &ColumnChunk, + column_metadata: &crate::format::ColumnChunk, module_type: ModuleType, row_group_index: usize, column_index: usize, @@ -682,14 +716,14 @@ impl MetadataObjectWriter { } fn encrypt_row_groups( - row_groups: Vec, + row_groups: Vec, file_encryptor: &Arc, - ) -> Result> { + ) -> Result> { row_groups .into_iter() .enumerate() .map(|(rg_idx, mut rg)| { - let cols: Result> = rg + let cols: Result> = rg .columns .into_iter() .enumerate() @@ -705,11 +739,11 @@ impl MetadataObjectWriter { /// Apply column encryption to column chunk metadata fn encrypt_column_chunk( - mut column_chunk: ColumnChunk, + mut column_chunk: crate::format::ColumnChunk, file_encryptor: &Arc, row_group_index: usize, column_index: usize, - ) -> Result { + ) -> Result { // Column crypto metadata should have already been set when the column was created. // Here we apply the encryption by encrypting the column metadata if required. match column_chunk.crypto_metadata.as_ref() { diff --git a/parquet/src/file/page_encoding_stats.rs b/parquet/src/file/page_encoding_stats.rs index edb6a8fa9d4c..67ca2a3e4c71 100644 --- a/parquet/src/file/page_encoding_stats.rs +++ b/parquet/src/file/page_encoding_stats.rs @@ -19,9 +19,6 @@ use crate::basic::{Encoding, PageType}; use crate::errors::Result; -use crate::format::{ - Encoding as TEncoding, PageEncodingStats as TPageEncodingStats, PageType as TPageType, -}; /// PageEncodingStats for a column chunk and data page. #[derive(Clone, Debug, PartialEq, Eq)] @@ -35,7 +32,9 @@ pub struct PageEncodingStats { } /// Converts Thrift definition into `PageEncodingStats`. -pub fn try_from_thrift(thrift_encoding_stats: &TPageEncodingStats) -> Result { +pub fn try_from_thrift( + thrift_encoding_stats: &crate::format::PageEncodingStats, +) -> Result { let page_type = PageType::try_from(thrift_encoding_stats.page_type)?; let encoding = Encoding::try_from(thrift_encoding_stats.encoding)?; let count = thrift_encoding_stats.count; @@ -48,12 +47,12 @@ pub fn try_from_thrift(thrift_encoding_stats: &TPageEncodingStats) -> Result TPageEncodingStats { - let page_type = TPageType::from(encoding_stats.page_type); - let encoding = TEncoding::from(encoding_stats.encoding); +pub fn to_thrift(encoding_stats: &PageEncodingStats) -> crate::format::PageEncodingStats { + let page_type = crate::format::PageType::from(encoding_stats.page_type); + let encoding = crate::format::Encoding::from(encoding_stats.encoding); let count = encoding_stats.count; - TPageEncodingStats { + crate::format::PageEncodingStats { page_type, encoding, count, diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index a66509e14c7a..2c9aa009080e 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -16,13 +16,14 @@ // under the License. //! [`Index`] structures holding decoded [`ColumnIndex`] information +//! +//! [`ColumnIndex`]: crate::format::ColumnIndex -use crate::basic::Type; +use crate::basic::{BoundaryOrder, Type}; use crate::data_type::private::ParquetValueType; use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96}; use crate::errors::ParquetError; use crate::file::metadata::LevelHistogram; -use crate::format::{BoundaryOrder, ColumnIndex}; use std::fmt::Debug; /// Typed statistics for one data page @@ -78,6 +79,11 @@ impl PageIndex { pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> { self.definition_level_histogram.as_ref() } + + /// Returns whether this is an all null page + pub fn is_null_page(&self) -> bool { + self.min.is_none() + } } impl PageIndex @@ -132,7 +138,7 @@ impl Index { pub fn is_sorted(&self) -> bool { // 0:UNORDERED, 1:ASCENDING ,2:DESCENDING, if let Some(order) = self.get_boundary_order() { - order.0 > (BoundaryOrder::UNORDERED.0) + order != BoundaryOrder::UNORDERED } else { false } @@ -170,6 +176,7 @@ impl Index { /// /// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md /// [`Statistics`]: crate::file::statistics::Statistics +/// [`ColumnIndex`]: crate::format::ColumnIndex #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct NativeIndex { /// The actual column indexes, one item per page @@ -186,7 +193,7 @@ impl NativeIndex { pub const PHYSICAL_TYPE: Type = T::PHYSICAL_TYPE; /// Creates a new [`NativeIndex`] - pub(crate) fn try_new(index: ColumnIndex) -> Result { + pub(crate) fn try_new(index: crate::format::ColumnIndex) -> Result { let len = index.min_values.len(); let null_counts = index @@ -248,13 +255,14 @@ impl NativeIndex { ) .collect::, ParquetError>>()?; + let boundary_order = index.boundary_order.try_into()?; Ok(Self { indexes, - boundary_order: index.boundary_order, + boundary_order, }) } - pub(crate) fn to_thrift(&self) -> ColumnIndex { + pub(crate) fn to_thrift(&self) -> crate::format::ColumnIndex { let min_values = self .indexes .iter() @@ -288,11 +296,11 @@ impl NativeIndex { .collect::>>() .map(|hists| hists.concat()); - ColumnIndex::new( + crate::format::ColumnIndex::new( self.indexes.iter().map(|x| x.min().is_none()).collect(), min_values, max_values, - self.boundary_order, + self.boundary_order.into(), null_counts, repetition_level_histograms, definition_level_histograms, @@ -350,7 +358,7 @@ mod tests { #[test] fn test_invalid_column_index() { - let column_index = ColumnIndex { + let column_index = crate::format::ColumnIndex { null_pages: vec![true, false], min_values: vec![ vec![], @@ -363,7 +371,7 @@ mod tests { null_counts: None, repetition_level_histograms: None, definition_level_histograms: None, - boundary_order: BoundaryOrder::UNORDERED, + boundary_order: crate::format::BoundaryOrder::UNORDERED, }; let err = NativeIndex::::try_new(column_index).unwrap_err(); diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index d0537711dc20..d4d405d68ff2 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Support for reading [`Index`] and [`OffsetIndex`] from parquet metadata. +//! Support for reading [`Index`] and [`OffsetIndexMetaData`] from parquet metadata. use crate::basic::Type; use crate::data_type::Int96; @@ -24,7 +24,6 @@ use crate::file::metadata::ColumnChunkMetaData; use crate::file::page_index::index::{Index, NativeIndex}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::ChunkReader; -use crate::format::{ColumnIndex, OffsetIndex}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use std::ops::Range; @@ -48,6 +47,7 @@ pub(crate) fn acc_range(a: Option>, b: Option>) -> Option< /// See [Page Index Documentation] for more details. /// /// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [`ColumnIndex`]: crate::format::ColumnIndex #[deprecated( since = "55.2.0", note = "Use ParquetMetaDataReader instead; will be removed in 58.0.0" @@ -93,6 +93,7 @@ pub fn read_columns_indexes( /// See [Page Index Documentation] for more details. /// /// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [`OffsetIndex`]: crate::format::OffsetIndex #[deprecated( since = "55.2.0", note = "Use ParquetMetaDataReader instead; will be removed in 58.0.0" @@ -129,14 +130,14 @@ pub fn read_offset_indexes( pub(crate) fn decode_offset_index(data: &[u8]) -> Result { let mut prot = TCompactSliceInputProtocol::new(data); - let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; + let offset = crate::format::OffsetIndex::read_from_in_protocol(&mut prot)?; OffsetIndexMetaData::try_new(offset) } pub(crate) fn decode_column_index(data: &[u8], column_type: Type) -> Result { let mut prot = TCompactSliceInputProtocol::new(data); - let index = ColumnIndex::read_from_in_protocol(&mut prot)?; + let index = crate::format::ColumnIndex::read_from_in_protocol(&mut prot)?; let index = match column_type { Type::BOOLEAN => Index::BOOLEAN(NativeIndex::::try_new(index)?), diff --git a/parquet/src/file/page_index/offset_index.rs b/parquet/src/file/page_index/offset_index.rs index d48d1b6c083d..5614b1750a0e 100644 --- a/parquet/src/file/page_index/offset_index.rs +++ b/parquet/src/file/page_index/offset_index.rs @@ -16,12 +16,49 @@ // under the License. //! [`OffsetIndexMetaData`] structure holding decoded [`OffsetIndex`] information +//! +//! [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md use crate::errors::ParquetError; -use crate::format::{OffsetIndex, PageLocation}; + +/// Page location information for [`OffsetIndexMetaData`] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct PageLocation { + /// Offset of the page in the file * + pub offset: i64, + /// Size of the page, including header. Sum of compressed_page_size and header + /// length + pub compressed_page_size: i32, + /// Index within the RowGroup of the first row of the page. When an + /// OffsetIndex is present, pages must begin on row boundaries + /// (repetition_level = 0). + pub first_row_index: i64, +} + +impl From<&crate::format::PageLocation> for PageLocation { + fn from(value: &crate::format::PageLocation) -> Self { + Self { + offset: value.offset, + compressed_page_size: value.compressed_page_size, + first_row_index: value.first_row_index, + } + } +} + +impl From<&PageLocation> for crate::format::PageLocation { + fn from(value: &PageLocation) -> Self { + Self { + offset: value.offset, + compressed_page_size: value.compressed_page_size, + first_row_index: value.first_row_index, + } + } +} /// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes for each page /// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY columns. +/// +/// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md #[derive(Debug, Clone, PartialEq)] pub struct OffsetIndexMetaData { /// Vector of [`PageLocation`] objects, one per page in the chunk. @@ -33,9 +70,12 @@ pub struct OffsetIndexMetaData { impl OffsetIndexMetaData { /// Creates a new [`OffsetIndexMetaData`] from an [`OffsetIndex`]. - pub(crate) fn try_new(index: OffsetIndex) -> Result { + /// + /// [`OffsetIndex`]: crate::format::OffsetIndex + pub(crate) fn try_new(index: crate::format::OffsetIndex) -> Result { + let page_locations = index.page_locations.iter().map(|loc| loc.into()).collect(); Ok(Self { - page_locations: index.page_locations, + page_locations, unencoded_byte_array_data_bytes: index.unencoded_byte_array_data_bytes, }) } @@ -53,9 +93,10 @@ impl OffsetIndexMetaData { // TODO: remove annotation after merge #[allow(dead_code)] - pub(crate) fn to_thrift(&self) -> OffsetIndex { - OffsetIndex::new( - self.page_locations.clone(), + pub(crate) fn to_thrift(&self) -> crate::format::OffsetIndex { + let page_locations = self.page_locations.iter().map(|loc| loc.into()).collect(); + crate::format::OffsetIndex::new( + page_locations, self.unencoded_byte_array_data_bytes.clone(), ) } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 26177b69a577..a4919435298c 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -20,8 +20,7 @@ use crate::basic::{Compression, Encoding}; use crate::compression::{CodecOptions, CodecOptionsBuilder}; #[cfg(feature = "encryption")] use crate::encryption::encrypt::FileEncryptionProperties; -use crate::file::metadata::KeyValue; -use crate::format::SortingColumn; +use crate::file::metadata::{KeyValue, SortingColumn}; use crate::schema::types::ColumnPath; use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 2edb38deb3e0..f12048a4ee02 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -25,14 +25,14 @@ use crate::compression::{create_codec, Codec}; #[cfg(feature = "encryption")] use crate::encryption::decrypt::{read_and_decrypt, CryptoContext}; use crate::errors::{ParquetError, Result}; -use crate::file::page_index::offset_index::OffsetIndexMetaData; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use crate::file::{ metadata::*, properties::{ReaderProperties, ReaderPropertiesPtr}, reader::*, statistics, }; -use crate::format::{PageHeader, PageLocation, PageType}; +use crate::format::{PageHeader, PageType}; use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; @@ -1102,9 +1102,8 @@ mod tests { use bytes::Buf; use crate::file::properties::{EnabledStatistics, WriterProperties}; - use crate::format::BoundaryOrder; - use crate::basic::{self, ColumnOrder, SortOrder}; + use crate::basic::{self, BoundaryOrder, ColumnOrder, SortOrder}; use crate::column::reader::ColumnReader; use crate::data_type::private::ParquetValueType; use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type}; diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 02729a5016bb..d4501830ac40 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -41,8 +41,6 @@ use std::fmt; -use crate::format::Statistics as TStatistics; - use crate::basic::Type; use crate::data_type::private::ParquetValueType; use crate::data_type::*; @@ -122,7 +120,7 @@ macro_rules! statistics_enum_func { /// Converts Thrift definition into `Statistics`. pub fn from_thrift( physical_type: Type, - thrift_stats: Option, + thrift_stats: Option, ) -> Result> { Ok(match thrift_stats { Some(stats) => { @@ -269,7 +267,7 @@ pub fn from_thrift( } /// Convert Statistics into Thrift definition. -pub fn to_thrift(stats: Option<&Statistics>) -> Option { +pub fn to_thrift(stats: Option<&Statistics>) -> Option { let stats = stats?; // record null count if it can fit in i64 @@ -282,7 +280,7 @@ pub fn to_thrift(stats: Option<&Statistics>) -> Option { .distinct_count_opt() .and_then(|value| i64::try_from(value).ok()); - let mut thrift_stats = TStatistics { + let mut thrift_stats = crate::format::Statistics { max: None, min: None, null_count, @@ -702,7 +700,7 @@ mod tests { #[test] #[should_panic(expected = "General(\"Statistics null count is negative -10\")")] fn test_statistics_negative_null_count() { - let thrift_stats = TStatistics { + let thrift_stats = crate::format::Statistics { max: None, min: None, null_count: Some(-10), @@ -1017,7 +1015,7 @@ mod tests { #[test] fn test_count_decoding_null_invalid() { - let tstatistics = TStatistics { + let tstatistics = crate::format::Statistics { null_count: Some(-42), ..Default::default() }; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 31a3344db66c..b985c31ec8b8 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -19,8 +19,8 @@ //! using row group writers and column writers respectively. use crate::bloom_filter::Sbbf; -use crate::format as parquet; -use crate::format::{ColumnIndex, OffsetIndex}; +use crate::file::page_index::index::Index; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::thrift::TSerializable; use std::fmt::Debug; use std::io::{BufWriter, IoSlice, Read}; @@ -128,8 +128,8 @@ pub type OnCloseRowGroup<'a, W> = Box< &'a mut TrackedWrite, RowGroupMetaData, Vec>, - Vec>, - Vec>, + Vec>, + Vec>, ) -> Result<()> + 'a + Send, @@ -154,8 +154,8 @@ pub struct SerializedFileWriter { props: WriterPropertiesPtr, row_groups: Vec, bloom_filters: Vec>>, - column_indexes: Vec>>, - offset_indexes: Vec>>, + column_indexes: Vec>>, + offset_indexes: Vec>>, row_group_index: usize, // kv_metadatas will be appended to `props` when `write_metadata` kv_metadatas: Vec, @@ -290,7 +290,7 @@ impl SerializedFileWriter { /// Unlike [`Self::close`] this does not consume self /// /// Attempting to write after calling finish will result in an error - pub fn finish(&mut self) -> Result { + pub fn finish(&mut self) -> Result { self.assert_previous_writer_closed()?; let metadata = self.write_metadata()?; self.buf.flush()?; @@ -298,7 +298,7 @@ impl SerializedFileWriter { } /// Closes and finalises file writer, returning the file metadata. - pub fn close(mut self) -> Result { + pub fn close(mut self) -> Result { self.finish() } @@ -319,7 +319,7 @@ impl SerializedFileWriter { } /// Assembles and writes metadata at the end of the file. - fn write_metadata(&mut self) -> Result { + fn write_metadata(&mut self) -> Result { self.finished = true; // write out any remaining bloom filters after all row groups @@ -339,6 +339,9 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); + let column_indexes = self.convert_column_indexes(); + let offset_indexes = self.convert_offset_index(); + let mut encoder = ThriftMetadataWriter::new( &mut self.buf, &self.schema, @@ -356,11 +359,46 @@ impl SerializedFileWriter { if let Some(key_value_metadata) = key_value_metadata { encoder = encoder.with_key_value_metadata(key_value_metadata) } - encoder = encoder.with_column_indexes(&self.column_indexes); - encoder = encoder.with_offset_indexes(&self.offset_indexes); + + encoder = encoder.with_column_indexes(&column_indexes); + encoder = encoder.with_offset_indexes(&offset_indexes); encoder.finish() } + fn convert_column_indexes(&self) -> Vec>> { + self.column_indexes + .iter() + .map(|cis| { + cis.iter() + .map(|ci| { + ci.as_ref().map(|column_index| match column_index { + Index::NONE => panic!("trying to serialize missing column index"), + Index::BOOLEAN(column_index) => column_index.to_thrift(), + Index::BYTE_ARRAY(column_index) => column_index.to_thrift(), + Index::DOUBLE(column_index) => column_index.to_thrift(), + Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index.to_thrift(), + Index::FLOAT(column_index) => column_index.to_thrift(), + Index::INT32(column_index) => column_index.to_thrift(), + Index::INT64(column_index) => column_index.to_thrift(), + Index::INT96(column_index) => column_index.to_thrift(), + }) + }) + .collect() + }) + .collect() + } + + fn convert_offset_index(&self) -> Vec>> { + self.offset_indexes + .iter() + .map(|ois| { + ois.iter() + .map(|oi| oi.as_ref().map(|offset_index| offset_index.to_thrift())) + .collect() + }) + .collect() + } + #[inline] fn assert_previous_writer_closed(&self) -> Result<()> { if self.finished { @@ -499,8 +537,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { row_group_metadata: Option, column_chunks: Vec, bloom_filters: Vec>, - column_indexes: Vec>, - offset_indexes: Vec>, + column_indexes: Vec>, + offset_indexes: Vec>, row_group_index: i16, file_offset: i64, on_close: Option>, @@ -901,7 +939,7 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> { /// Serializes page header into Thrift. /// Returns number of bytes that have been written into the sink. #[inline] - fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result { + fn serialize_page_header(&mut self, header: crate::format::PageHeader) -> Result { let start_pos = self.sink.bytes_written(); match self.page_encryptor_and_sink_mut() { Some((page_encryptor, sink)) => { @@ -1032,7 +1070,6 @@ mod tests { reader::{FileReader, SerializedFileReader, SerializedPageReader}, statistics::{from_thrift, to_thrift, Statistics}, }; - use crate::format::SortingColumn; use crate::record::{Row, RowAccessor}; use crate::schema::parser::parse_message_type; use crate::schema::types; diff --git a/parquet/src/schema/parser.rs b/parquet/src/schema/parser.rs index 0a67250476c7..700be8a15fd6 100644 --- a/parquet/src/schema/parser.rs +++ b/parquet/src/schema/parser.rs @@ -178,9 +178,9 @@ fn parse_timeunit( value .ok_or_else(|| general_err!(not_found_msg)) .and_then(|v| match v.to_uppercase().as_str() { - "MILLIS" => Ok(TimeUnit::MILLIS(Default::default())), - "MICROS" => Ok(TimeUnit::MICROS(Default::default())), - "NANOS" => Ok(TimeUnit::NANOS(Default::default())), + "MILLIS" => Ok(TimeUnit::MILLIS), + "MICROS" => Ok(TimeUnit::MICROS), + "NANOS" => Ok(TimeUnit::NANOS), _ => Err(general_err!(parse_fail_msg)), }) } @@ -1075,7 +1075,7 @@ mod tests { Arc::new( Type::primitive_type_builder("_6", PhysicalType::INT32) .with_logical_type(Some(LogicalType::Time { - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, is_adjusted_to_u_t_c: false, })) .build() @@ -1084,7 +1084,7 @@ mod tests { Arc::new( Type::primitive_type_builder("_7", PhysicalType::INT64) .with_logical_type(Some(LogicalType::Time { - unit: TimeUnit::MICROS(Default::default()), + unit: TimeUnit::MICROS, is_adjusted_to_u_t_c: true, })) .build() @@ -1093,7 +1093,7 @@ mod tests { Arc::new( Type::primitive_type_builder("_8", PhysicalType::INT64) .with_logical_type(Some(LogicalType::Timestamp { - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, is_adjusted_to_u_t_c: true, })) .build() @@ -1102,7 +1102,7 @@ mod tests { Arc::new( Type::primitive_type_builder("_9", PhysicalType::INT64) .with_logical_type(Some(LogicalType::Timestamp { - unit: TimeUnit::NANOS(Default::default()), + unit: TimeUnit::NANOS, is_adjusted_to_u_t_c: false, })) .build() diff --git a/parquet/src/schema/printer.rs b/parquet/src/schema/printer.rs index 5ef068da915b..4190f9717dcd 100644 --- a/parquet/src/schema/printer.rs +++ b/parquet/src/schema/printer.rs @@ -277,9 +277,9 @@ impl<'a> Printer<'a> { #[inline] fn print_timeunit(unit: &TimeUnit) -> &str { match unit { - TimeUnit::MILLIS(_) => "MILLIS", - TimeUnit::MICROS(_) => "MICROS", - TimeUnit::NANOS(_) => "NANOS", + TimeUnit::MILLIS => "MILLIS", + TimeUnit::MICROS => "MICROS", + TimeUnit::NANOS => "NANOS", } } @@ -645,7 +645,7 @@ mod tests { PhysicalType::INT64, Some(LogicalType::Timestamp { is_adjusted_to_u_t_c: true, - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, }), ConvertedType::NONE, Repetition::REQUIRED, @@ -671,7 +671,7 @@ mod tests { None, PhysicalType::INT32, Some(LogicalType::Time { - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, is_adjusted_to_u_t_c: false, }), ConvertedType::TIME_MILLIS, @@ -686,7 +686,7 @@ mod tests { Some(42), PhysicalType::INT32, Some(LogicalType::Time { - unit: TimeUnit::MILLIS(Default::default()), + unit: TimeUnit::MILLIS, is_adjusted_to_u_t_c: false, }), ConvertedType::TIME_MILLIS, diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 68492e19f437..0b36fbb63b34 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -20,7 +20,6 @@ use std::{collections::HashMap, fmt, sync::Arc}; use crate::file::metadata::HeapSize; -use crate::format::SchemaElement; use crate::basic::{ ColumnOrder, ConvertedType, LogicalType, Repetition, SortOrder, TimeUnit, Type as PhysicalType, @@ -375,13 +374,13 @@ impl<'a> PrimitiveTypeBuilder<'a> { (LogicalType::Date, PhysicalType::INT32) => {} ( LogicalType::Time { - unit: TimeUnit::MILLIS(_), + unit: TimeUnit::MILLIS, .. }, PhysicalType::INT32, ) => {} (LogicalType::Time { unit, .. }, PhysicalType::INT64) => { - if *unit == TimeUnit::MILLIS(Default::default()) { + if *unit == TimeUnit::MILLIS { return Err(general_err!( "Cannot use millisecond unit on INT64 type for field '{}'", self.name @@ -1160,7 +1159,7 @@ fn build_tree<'a>( } /// Method to convert from Thrift. -pub fn from_thrift(elements: &[SchemaElement]) -> Result { +pub fn from_thrift(elements: &[crate::format::SchemaElement]) -> Result { let mut index = 0; let mut schema_nodes = Vec::new(); while index < elements.len() { @@ -1198,7 +1197,10 @@ fn check_logical_type(logical_type: &Option) -> Result<()> { /// The first result is the starting index for the next Type after this one. If it is /// equal to `elements.len()`, then this Type is the last one. /// The second result is the result Type. -fn from_thrift_helper(elements: &[SchemaElement], index: usize) -> Result<(usize, TypePtr)> { +fn from_thrift_helper( + elements: &[crate::format::SchemaElement], + index: usize, +) -> Result<(usize, TypePtr)> { // Whether or not the current node is root (message type). // There is only one message type node in the schema tree. let is_root_node = index == 0; @@ -1313,18 +1315,18 @@ fn from_thrift_helper(elements: &[SchemaElement], index: usize) -> Result<(usize } /// Method to convert to Thrift. -pub fn to_thrift(schema: &Type) -> Result> { +pub fn to_thrift(schema: &Type) -> Result> { if !schema.is_group() { return Err(general_err!("Root schema must be Group type")); } - let mut elements: Vec = Vec::new(); + let mut elements: Vec = Vec::new(); to_thrift_helper(schema, &mut elements); Ok(elements) } /// Constructs list of `SchemaElement` from the schema using depth-first traversal. /// Here we assume that schema is always valid and starts with group type. -fn to_thrift_helper(schema: &Type, elements: &mut Vec) { +fn to_thrift_helper(schema: &Type, elements: &mut Vec) { match *schema { Type::PrimitiveType { ref basic_info, @@ -1333,7 +1335,7 @@ fn to_thrift_helper(schema: &Type, elements: &mut Vec) { scale, precision, } => { - let element = SchemaElement { + let element = crate::format::SchemaElement { type_: Some(physical_type.into()), type_length: if type_length >= 0 { Some(type_length) @@ -1370,7 +1372,7 @@ fn to_thrift_helper(schema: &Type, elements: &mut Vec) { None }; - let element = SchemaElement { + let element = crate::format::SchemaElement { type_: None, type_length: None, repetition_type: repetition, diff --git a/parquet/src/thrift.rs b/parquet/src/thrift.rs index fc391abe87d7..984ba43ec7ad 100644 --- a/parquet/src/thrift.rs +++ b/parquet/src/thrift.rs @@ -33,12 +33,18 @@ pub trait TSerializable: Sized { fn write_to_out_protocol(&self, o_prot: &mut T) -> thrift::Result<()>; } -/// Public function to aid benchmarking. +/// Public function to aid benchmarking. Reads Parquet `FileMetaData` encoded in `bytes`. pub fn bench_file_metadata(bytes: &bytes::Bytes) { let mut input = TCompactSliceInputProtocol::new(bytes); crate::format::FileMetaData::read_from_in_protocol(&mut input).unwrap(); } +/// Public function to aid benchmarking. Reads Parquet `PageHeader` encoded in `bytes`. +pub fn bench_page_header(bytes: &bytes::Bytes) { + let mut input = TCompactSliceInputProtocol::new(bytes); + crate::format::PageHeader::read_from_in_protocol(&mut input).unwrap(); +} + /// A more performant implementation of [`TCompactInputProtocol`] that reads a slice /// /// [`TCompactInputProtocol`]: thrift::protocol::TCompactInputProtocol @@ -323,7 +329,6 @@ fn eof_error() -> thrift::Error { #[cfg(test)] mod tests { - use crate::format::{BoundaryOrder, ColumnIndex}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; #[test] @@ -334,12 +339,12 @@ mod tests { let bytes = vec![0x19, 0x21, 2, 1, 0x19, 8, 0x19, 8, 0x15, 0, 0]; let mut protocol = TCompactSliceInputProtocol::new(bytes.as_slice()); - let index = ColumnIndex::read_from_in_protocol(&mut protocol).unwrap(); - let expected = ColumnIndex { + let index = crate::format::ColumnIndex::read_from_in_protocol(&mut protocol).unwrap(); + let expected = crate::format::ColumnIndex { null_pages: vec![false, true], min_values: vec![], max_values: vec![], - boundary_order: BoundaryOrder::UNORDERED, + boundary_order: crate::format::BoundaryOrder::UNORDERED, null_counts: None, repetition_level_histograms: None, definition_level_histograms: None, @@ -355,12 +360,12 @@ mod tests { let bytes = vec![0x19, 0x22, 0, 1, 0x19, 8, 0x19, 8, 0x15, 0, 0]; let mut protocol = TCompactSliceInputProtocol::new(bytes.as_slice()); - let index = ColumnIndex::read_from_in_protocol(&mut protocol).unwrap(); - let expected = ColumnIndex { + let index = crate::format::ColumnIndex::read_from_in_protocol(&mut protocol).unwrap(); + let expected = crate::format::ColumnIndex { null_pages: vec![false, true], min_values: vec![], max_values: vec![], - boundary_order: BoundaryOrder::UNORDERED, + boundary_order: crate::format::BoundaryOrder::UNORDERED, null_counts: None, repetition_level_histograms: None, definition_level_histograms: None,