diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index f90143104ce2..a6f740f0f2f4 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -40,7 +40,7 @@ //! metadata into parquet files. To work with metadata directly, //! the following APIs are available: //! -//! * [`ParquetMetaDataReader`] for reading from a reader for I/O +//! * [`ParquetMetaDataReader`] for reading metadata from an I/O source (sync and async) //! * [`ParquetMetaDataPushDecoder`] for decoding from bytes without I/O //! * [`ParquetMetaDataWriter`] for writing. //! @@ -91,6 +91,7 @@ //! * Same name, different struct //! ``` mod memory; +mod parser; mod push_decoder; pub(crate) mod reader; mod writer; @@ -195,10 +196,10 @@ impl ParquetMetaData { ParquetMetaData { file_metadata, row_groups, - #[cfg(feature = "encryption")] - file_decryptor: None, column_index: None, offset_index: None, + #[cfg(feature = "encryption")] + file_decryptor: None, } } diff --git a/parquet/src/file/metadata/parser.rs b/parquet/src/file/metadata/parser.rs new file mode 100644 index 000000000000..a68f14d4d7aa --- /dev/null +++ b/parquet/src/file/metadata/parser.rs @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Internal metadata parsing routines +//! +//! These functions parse thrift-encoded metadata from a byte slice +//! into the corresponding Rust structures + +use crate::basic::ColumnOrder; +use crate::errors::ParquetError; +use crate::file::metadata::{ + ColumnChunkMetaData, FileMetaData, PageIndexPolicy, ParquetMetaData, RowGroupMetaData, +}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::TCompactSliceInputProtocol; +use crate::thrift::TSerializable; +use bytes::Bytes; +use std::sync::Arc; + +#[cfg(feature = "encryption")] +use crate::encryption::{ + decrypt::{FileDecryptionProperties, FileDecryptor}, + modules::create_footer_aad, +}; +#[cfg(feature = "encryption")] +use crate::format::EncryptionAlgorithm; + +/// Decodes [`ParquetMetaData`] from the provided bytes. +/// +/// Typically this is used to decode the metadata from the end of a parquet +/// file. The format of `buf` is the Thrift compact binary protocol, as specified +/// by the [Parquet Spec]. +/// +/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata +pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result { + let mut prot = TCompactSliceInputProtocol::new(buf); + + let t_file_metadata: crate::format::FileMetaData = + crate::format::FileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse metadata: {}", e))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); + } + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_descr, + column_orders, + ); + + Ok(ParquetMetaData::new(file_metadata, row_groups)) +} + +/// Parses column orders from Thrift definition. +/// If no column orders are defined, returns `None`. +pub(crate) fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, +) -> crate::errors::Result>> { + match t_column_orders { + Some(orders) => { + // Should always be the case + if orders.len() != schema_descr.num_columns() { + return Err(general_err!("Column order length mismatch")); + }; + let mut res = Vec::new(); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + crate::format::ColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Ok(Some(res)) + } + None => Ok(None), + } +} + +/// Parses column index from the provided bytes and adds it to the metadata. +/// +/// Arguments +/// * `metadata` - The ParquetMetaData to which the parsed column index will be added. +/// * `column_index_policy` - The policy for handling column index parsing (e.g., +/// Required, Optional, Skip). +/// * `bytes` - The byte slice containing the column index data. +/// * `start_offset` - The offset where `bytes` begin in the file. +pub(crate) fn parse_column_index( + metadata: &mut ParquetMetaData, + column_index_policy: PageIndexPolicy, + bytes: &Bytes, + start_offset: u64, +) -> crate::errors::Result<()> { + if column_index_policy == PageIndexPolicy::Skip { + return Ok(()); + } + let index = metadata + .row_groups() + .iter() + .enumerate() + .map(|(rg_idx, x)| { + x.columns() + .iter() + .enumerate() + .map(|(col_idx, c)| match c.column_index_range() { + Some(r) => { + let r_start = usize::try_from(r.start - start_offset)?; + let r_end = usize::try_from(r.end - start_offset)?; + parse_single_column_index( + &bytes[r_start..r_end], + metadata, + c, + rg_idx, + col_idx, + ) + } + None => Ok(Index::NONE), + }) + .collect::>>() + }) + .collect::>>()?; + + metadata.set_column_index(Some(index)); + Ok(()) +} + +#[cfg(feature = "encryption")] +fn parse_single_column_index( + bytes: &[u8], + metadata: &ParquetMetaData, + column: &ColumnChunkMetaData, + row_group_index: usize, + col_index: usize, +) -> crate::errors::Result { + use crate::encryption::decrypt::CryptoContext; + match &column.column_crypto_metadata { + Some(crypto_metadata) => { + let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { + general_err!("Cannot decrypt column index, no file decryptor set") + })?; + let crypto_context = CryptoContext::for_column( + file_decryptor, + crypto_metadata, + row_group_index, + col_index, + )?; + let column_decryptor = crypto_context.metadata_decryptor(); + let aad = crypto_context.create_column_index_aad()?; + let plaintext = column_decryptor.decrypt(bytes, &aad)?; + decode_column_index(&plaintext, column.column_type()) + } + None => decode_column_index(bytes, column.column_type()), + } +} + +#[cfg(not(feature = "encryption"))] +fn parse_single_column_index( + bytes: &[u8], + _metadata: &ParquetMetaData, + column: &ColumnChunkMetaData, + _row_group_index: usize, + _col_index: usize, +) -> crate::errors::Result { + decode_column_index(bytes, column.column_type()) +} + +pub(crate) fn parse_offset_index( + metadata: &mut ParquetMetaData, + offset_index_policy: PageIndexPolicy, + bytes: &Bytes, + start_offset: u64, +) -> crate::errors::Result<()> { + if offset_index_policy == PageIndexPolicy::Skip { + return Ok(()); + } + let row_groups = metadata.row_groups(); + let mut all_indexes = Vec::with_capacity(row_groups.len()); + for (rg_idx, x) in row_groups.iter().enumerate() { + let mut row_group_indexes = Vec::with_capacity(x.columns().len()); + for (col_idx, c) in x.columns().iter().enumerate() { + let result = match c.offset_index_range() { + Some(r) => { + let r_start = usize::try_from(r.start - start_offset)?; + let r_end = usize::try_from(r.end - start_offset)?; + parse_single_offset_index(&bytes[r_start..r_end], metadata, c, rg_idx, col_idx) + } + None => Err(general_err!("missing offset index")), + }; + + match result { + Ok(index) => row_group_indexes.push(index), + Err(e) => { + if offset_index_policy == PageIndexPolicy::Required { + return Err(e); + } else { + // Invalidate and return + metadata.set_column_index(None); + metadata.set_offset_index(None); + return Ok(()); + } + } + } + } + all_indexes.push(row_group_indexes); + } + metadata.set_offset_index(Some(all_indexes)); + Ok(()) +} + +#[cfg(feature = "encryption")] +fn parse_single_offset_index( + bytes: &[u8], + metadata: &ParquetMetaData, + column: &ColumnChunkMetaData, + row_group_index: usize, + col_index: usize, +) -> crate::errors::Result { + use crate::encryption::decrypt::CryptoContext; + match &column.column_crypto_metadata { + Some(crypto_metadata) => { + let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { + general_err!("Cannot decrypt offset index, no file decryptor set") + })?; + let crypto_context = CryptoContext::for_column( + file_decryptor, + crypto_metadata, + row_group_index, + col_index, + )?; + let column_decryptor = crypto_context.metadata_decryptor(); + let aad = crypto_context.create_offset_index_aad()?; + let plaintext = column_decryptor.decrypt(bytes, &aad)?; + decode_offset_index(&plaintext) + } + None => decode_offset_index(bytes), + } +} + +#[cfg(not(feature = "encryption"))] +fn parse_single_offset_index( + bytes: &[u8], + _metadata: &ParquetMetaData, + _column: &ColumnChunkMetaData, + _row_group_index: usize, + _col_index: usize, +) -> crate::errors::Result { + decode_offset_index(bytes) +} + +/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted. +/// +/// Typically this is used to decode the metadata from the end of a parquet +/// file. The format of `buf` is the Thrift compact binary protocol, as specified +/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR +/// ciphers as specfied in the [Parquet Encryption Spec]. +/// +/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata +/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/ +#[cfg(feature = "encryption")] +pub(crate) fn decode_metadata_with_encryption( + buf: &[u8], + encrypted_footer: bool, + file_decryption_properties: Option<&FileDecryptionProperties>, +) -> crate::errors::Result { + let mut prot = TCompactSliceInputProtocol::new(buf); + let mut file_decryptor = None; + let decrypted_fmd_buf; + + if encrypted_footer { + if let Some(file_decryption_properties) = file_decryption_properties { + let t_file_crypto_metadata: crate::format::FileCryptoMetaData = + crate::format::FileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; + let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm { + EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix, + _ => Some(false), + } + .unwrap_or(false); + if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() { + return Err(general_err!( + "Parquet file was encrypted with an AAD prefix that is not stored in the file, \ + but no AAD prefix was provided in the file decryption properties" + )); + } + let decryptor = get_file_decryptor( + t_file_crypto_metadata.encryption_algorithm, + t_file_crypto_metadata.key_metadata.as_deref(), + file_decryption_properties, + )?; + let footer_decryptor = decryptor.get_footer_decryptor(); + let aad_footer = create_footer_aad(decryptor.file_aad())?; + + decrypted_fmd_buf = footer_decryptor? + .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref()) + .map_err(|_| { + general_err!( + "Provided footer key and AAD were unable to decrypt parquet footer" + ) + })?; + prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); + + file_decryptor = Some(decryptor); + } else { + return Err(general_err!( + "Parquet file has an encrypted footer but decryption properties were not provided" + )); + } + } + + use crate::format::FileMetaData as TFileMetaData; + let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse metadata: {}", e))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + + if let (Some(algo), Some(file_decryption_properties)) = ( + t_file_metadata.encryption_algorithm, + file_decryption_properties, + ) { + // File has a plaintext footer but encryption algorithm is set + let file_decryptor_value = get_file_decryptor( + algo, + t_file_metadata.footer_signing_key_metadata.as_deref(), + file_decryption_properties, + )?; + if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer { + file_decryptor_value.verify_plaintext_footer_signature(buf)?; + } + file_decryptor = Some(file_decryptor_value); + } + + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + let r = RowGroupMetaData::from_encrypted_thrift( + schema_descr.clone(), + rg, + file_decryptor.as_ref(), + )?; + row_groups.push(r); + } + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_descr, + column_orders, + ); + let mut metadata = ParquetMetaData::new(file_metadata, row_groups); + + metadata.with_file_decryptor(file_decryptor); + + Ok(metadata) +} + +#[cfg(feature = "encryption")] +fn get_file_decryptor( + encryption_algorithm: EncryptionAlgorithm, + footer_key_metadata: Option<&[u8]>, + file_decryption_properties: &FileDecryptionProperties, +) -> crate::errors::Result { + match encryption_algorithm { + EncryptionAlgorithm::AESGCMV1(algo) => { + let aad_file_unique = algo + .aad_file_unique + .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?; + let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() { + aad_prefix.clone() + } else { + algo.aad_prefix.unwrap_or_default() + }; + + FileDecryptor::new( + file_decryption_properties, + footer_key_metadata, + aad_file_unique, + aad_prefix, + ) + } + EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!( + "The AES_GCM_CTR_V1 encryption algorithm is not yet supported" + )), + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::basic::{SortOrder, Type}; + use crate::file::metadata::SchemaType; + use crate::format::ColumnOrder as TColumnOrder; + use crate::format::TypeDefinedOrder; + #[test] + fn test_metadata_column_orders_parse() { + // Define simple schema, we do not need to provide logical types. + let fields = vec![ + Arc::new( + SchemaType::primitive_type_builder("col1", Type::INT32) + .build() + .unwrap(), + ), + Arc::new( + SchemaType::primitive_type_builder("col2", Type::FLOAT) + .build() + .unwrap(), + ), + ]; + let schema = SchemaType::group_type_builder("schema") + .with_fields(fields) + .build() + .unwrap(); + let schema_descr = SchemaDescriptor::new(Arc::new(schema)); + + let t_column_orders = Some(vec![ + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + ]); + + assert_eq!( + parse_column_orders(t_column_orders, &schema_descr).unwrap(), + Some(vec![ + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) + ]) + ); + + // Test when no column orders are defined. + assert_eq!(parse_column_orders(None, &schema_descr).unwrap(), None); + } + + #[test] + fn test_metadata_column_orders_len_mismatch() { + let schema = SchemaType::group_type_builder("schema").build().unwrap(); + let schema_descr = SchemaDescriptor::new(Arc::new(schema)); + + let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); + + let res = parse_column_orders(t_column_orders, &schema_descr); + assert!(res.is_err()); + assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch")); + } +} diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 8d92d1e0aa8d..92113f336e95 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -15,46 +15,43 @@ // specific language governing permissions and limitations // under the License. -use std::{io::Read, ops::Range, sync::Arc}; +use std::{io::Read, ops::Range}; -use crate::basic::ColumnOrder; #[cfg(feature = "encryption")] -use crate::encryption::{ - decrypt::{FileDecryptionProperties, FileDecryptor}, - modules::create_footer_aad, -}; -use bytes::Bytes; - +use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData}; -use crate::file::page_index::index::Index; -use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::index_reader::acc_range; use crate::file::reader::ChunkReader; use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; -use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; -#[cfg(feature = "encryption")] -use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}; -use crate::schema::types; -use crate::schema::types::SchemaDescriptor; -use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; #[cfg(all(feature = "async", feature = "arrow"))] use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch}; #[cfg(feature = "encryption")] -use crate::encryption::decrypt::CryptoContext; -use crate::file::page_index::offset_index::OffsetIndexMetaData; +use crate::file::metadata::parser::decode_metadata_with_encryption; +use crate::file::metadata::parser::{decode_metadata, parse_column_index, parse_offset_index}; -/// Reads the [`ParquetMetaData`] from a byte stream. +/// Reads [`ParquetMetaData`] from a byte stream, with either synchronous or +/// asynchronous I/O. +/// +/// There are two flavors of APIs: +/// * Synchronous: [`Self::try_parse()`], [`Self::try_parse_sized()`], [`Self::parse_and_finish()`], etc. +/// * Asynchronous (requires `async` and `arrow` features): [`Self::try_load()`], etc +/// +/// See the [`ParquetMetaDataPushDecoder`] for an API that does not require I/O. /// -/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of -/// the Parquet metadata. +/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::push_decoder::ParquetMetaDataPushDecoder /// -/// Parquet metadata is not necessarily contiguous in the files: part is stored +/// # Format Notes +/// +/// Parquet metadata is not necessarily contiguous in a Parquet file: a portion is stored /// in the footer (the last bytes of the file), but other portions (such as the /// PageIndex) can be stored elsewhere. +/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for more details of +/// Parquet metadata. /// /// This reader handles reading the footer as well as the non contiguous parts -/// of the metadata such as the page indexes; excluding Bloom Filters. +/// of the metadata (`PageIndex` and `ColumnIndex`). It does not handle reading Bloom Filters. /// /// # Example /// ```no_run @@ -243,6 +240,8 @@ impl ParquetMetaDataReader { /// .with_page_indexes(true) /// .parse_and_finish(&file).unwrap(); /// ``` + /// + /// [`Bytes`]: bytes::Bytes pub fn parse_and_finish(mut self, reader: &R) -> Result { self.try_parse(reader)?; self.finish() @@ -253,6 +252,8 @@ impl ParquetMetaDataReader { /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete /// the request, and must include the Parquet footer. If page indexes are desired, the buffer /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + /// + /// [`Bytes`]: bytes::Bytes pub fn try_parse(&mut self, reader: &R) -> Result<()> { self.try_parse_sized(reader, reader.len()) } @@ -329,6 +330,8 @@ impl ParquetMetaDataReader { /// } /// let metadata = reader.finish().unwrap(); /// ``` + /// + /// [`Bytes`]: bytes::Bytes pub fn try_parse_sized(&mut self, reader: &R, file_size: u64) -> Result<()> { self.metadata = match self.parse_metadata(reader) { Ok(metadata) => Some(metadata), @@ -369,22 +372,24 @@ impl ParquetMetaDataReader { /// a [`Bytes`] struct containing the tail of the file). /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`]. + /// + /// [`Bytes`]: bytes::Bytes pub fn read_page_indexes_sized( &mut self, reader: &R, file_size: u64, ) -> Result<()> { - if self.metadata.is_none() { - return Err(general_err!( - "Tried to read page indexes without ParquetMetaData metadata" - )); - } - // Get bounds needed for page indexes (if any are present in the file). let Some(range) = self.range_for_page_index() else { return Ok(()); }; + let Some(metadata) = self.metadata.as_mut() else { + return Err(general_err!( + "Tried to read page indexes without ParquetMetaData metadata" + )); + }; + // Check to see if needed range is within `file_range`. Checking `range.end` seems // redundant, but it guards against `range_for_page_index()` returning garbage. let file_range = file_size.saturating_sub(reader.len())..file_size; @@ -417,8 +422,8 @@ impl ParquetMetaDataReader { let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?; let offset = range.start; - self.parse_column_index(&bytes, offset)?; - self.parse_offset_index(&bytes, offset)?; + parse_column_index(metadata, self.column_index, &bytes, offset)?; + parse_offset_index(metadata, self.offset_index, &bytes, offset)?; Ok(()) } @@ -507,17 +512,15 @@ impl ParquetMetaDataReader { async fn load_page_index_with_remainder( &mut self, mut fetch: F, - remainder: Option<(usize, Bytes)>, + remainder: Option<(usize, bytes::Bytes)>, ) -> Result<()> { - if self.metadata.is_none() { - return Err(general_err!("Footer metadata is not present")); - } - // Get bounds needed for page indexes (if any are present in the file). - let range = self.range_for_page_index(); - let range = match range { - Some(range) => range, - None => return Ok(()), + let Some(range) = self.range_for_page_index() else { + return Ok(()); + }; + + let Some(metadata) = self.metadata.as_mut() else { + return Err(general_err!("Footer metadata is not present")); }; let bytes = match &remainder { @@ -535,168 +538,12 @@ impl ParquetMetaDataReader { // Sanity check assert_eq!(bytes.len() as u64, range.end - range.start); - self.parse_column_index(&bytes, range.start)?; - self.parse_offset_index(&bytes, range.start)?; - - Ok(()) - } - - fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { - let metadata = self.metadata.as_mut().unwrap(); - if self.column_index != PageIndexPolicy::Skip { - let index = metadata - .row_groups() - .iter() - .enumerate() - .map(|(rg_idx, x)| { - x.columns() - .iter() - .enumerate() - .map(|(col_idx, c)| match c.column_index_range() { - Some(r) => { - let r_start = usize::try_from(r.start - start_offset)?; - let r_end = usize::try_from(r.end - start_offset)?; - Self::parse_single_column_index( - &bytes[r_start..r_end], - metadata, - c, - rg_idx, - col_idx, - ) - } - None => Ok(Index::NONE), - }) - .collect::>>() - }) - .collect::>>()?; - - metadata.set_column_index(Some(index)); - } - Ok(()) - } + parse_column_index(metadata, self.column_index, &bytes, range.start)?; + parse_offset_index(metadata, self.offset_index, &bytes, range.start)?; - #[cfg(feature = "encryption")] - fn parse_single_column_index( - bytes: &[u8], - metadata: &ParquetMetaData, - column: &ColumnChunkMetaData, - row_group_index: usize, - col_index: usize, - ) -> Result { - match &column.column_crypto_metadata { - Some(crypto_metadata) => { - let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { - general_err!("Cannot decrypt column index, no file decryptor set") - })?; - let crypto_context = CryptoContext::for_column( - file_decryptor, - crypto_metadata, - row_group_index, - col_index, - )?; - let column_decryptor = crypto_context.metadata_decryptor(); - let aad = crypto_context.create_column_index_aad()?; - let plaintext = column_decryptor.decrypt(bytes, &aad)?; - decode_column_index(&plaintext, column.column_type()) - } - None => decode_column_index(bytes, column.column_type()), - } - } - - #[cfg(not(feature = "encryption"))] - fn parse_single_column_index( - bytes: &[u8], - _metadata: &ParquetMetaData, - column: &ColumnChunkMetaData, - _row_group_index: usize, - _col_index: usize, - ) -> Result { - decode_column_index(bytes, column.column_type()) - } - - fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { - let metadata = self.metadata.as_mut().unwrap(); - if self.offset_index != PageIndexPolicy::Skip { - let row_groups = metadata.row_groups(); - let mut all_indexes = Vec::with_capacity(row_groups.len()); - for (rg_idx, x) in row_groups.iter().enumerate() { - let mut row_group_indexes = Vec::with_capacity(x.columns().len()); - for (col_idx, c) in x.columns().iter().enumerate() { - let result = match c.offset_index_range() { - Some(r) => { - let r_start = usize::try_from(r.start - start_offset)?; - let r_end = usize::try_from(r.end - start_offset)?; - Self::parse_single_offset_index( - &bytes[r_start..r_end], - metadata, - c, - rg_idx, - col_idx, - ) - } - None => Err(general_err!("missing offset index")), - }; - - match result { - Ok(index) => row_group_indexes.push(index), - Err(e) => { - if self.offset_index == PageIndexPolicy::Required { - return Err(e); - } else { - // Invalidate and return - metadata.set_column_index(None); - metadata.set_offset_index(None); - return Ok(()); - } - } - } - } - all_indexes.push(row_group_indexes); - } - metadata.set_offset_index(Some(all_indexes)); - } Ok(()) } - #[cfg(feature = "encryption")] - fn parse_single_offset_index( - bytes: &[u8], - metadata: &ParquetMetaData, - column: &ColumnChunkMetaData, - row_group_index: usize, - col_index: usize, - ) -> Result { - match &column.column_crypto_metadata { - Some(crypto_metadata) => { - let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { - general_err!("Cannot decrypt offset index, no file decryptor set") - })?; - let crypto_context = CryptoContext::for_column( - file_decryptor, - crypto_metadata, - row_group_index, - col_index, - )?; - let column_decryptor = crypto_context.metadata_decryptor(); - let aad = crypto_context.create_offset_index_aad()?; - let plaintext = column_decryptor.decrypt(bytes, &aad)?; - decode_offset_index(&plaintext) - } - None => decode_offset_index(bytes), - } - } - - #[cfg(not(feature = "encryption"))] - fn parse_single_offset_index( - bytes: &[u8], - _metadata: &ParquetMetaData, - _column: &ColumnChunkMetaData, - _row_group_index: usize, - _col_index: usize, - ) -> Result { - decode_offset_index(bytes) - } - fn range_for_page_index(&self) -> Option> { // sanity check self.metadata.as_ref()?; @@ -763,7 +610,7 @@ impl ParquetMetaDataReader { &self, fetch: &mut F, file_size: u64, - ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { + ) -> Result<(ParquetMetaData, Option<(usize, bytes::Bytes)>)> { let prefetch = self.get_prefetch_size() as u64; if file_size < FOOTER_SIZE as u64 { @@ -825,7 +672,7 @@ impl ParquetMetaDataReader { async fn load_metadata_via_suffix( &self, fetch: &mut F, - ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { + ) -> Result<(ParquetMetaData, Option<(usize, bytes::Bytes)>)> { let prefetch = self.get_prefetch_size(); let suffix = fetch.fetch_suffix(prefetch as _).await?; @@ -914,6 +761,8 @@ impl ParquetMetaDataReader { /// file. The format of `buf` is the Thrift compact binary protocol, as specified /// by the [Parquet Spec]. /// + /// It does **NOT** include the 8-byte footer. + /// /// This method handles using either `decode_metadata` or /// `decode_metadata_with_encryption` depending on whether the encryption /// feature is enabled. @@ -925,7 +774,7 @@ impl ParquetMetaDataReader { footer_tail: &FooterTail, ) -> Result { #[cfg(feature = "encryption")] - let result = Self::decode_metadata_with_encryption( + let result = decode_metadata_with_encryption( buf, footer_tail.is_encrypted_footer(), self.file_decryption_properties.as_ref(), @@ -943,112 +792,6 @@ impl ParquetMetaDataReader { result } - /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted. - /// - /// Typically this is used to decode the metadata from the end of a parquet - /// file. The format of `buf` is the Thrift compact binary protocol, as specified - /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR - /// ciphers as specfied in the [Parquet Encryption Spec]. - /// - /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata - /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/ - #[cfg(feature = "encryption")] - fn decode_metadata_with_encryption( - buf: &[u8], - encrypted_footer: bool, - file_decryption_properties: Option<&FileDecryptionProperties>, - ) -> Result { - let mut prot = TCompactSliceInputProtocol::new(buf); - let mut file_decryptor = None; - let decrypted_fmd_buf; - - if encrypted_footer { - if let Some(file_decryption_properties) = file_decryption_properties { - let t_file_crypto_metadata: TFileCryptoMetaData = - TFileCryptoMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; - let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm { - EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix, - _ => Some(false), - } - .unwrap_or(false); - if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() { - return Err(general_err!( - "Parquet file was encrypted with an AAD prefix that is not stored in the file, \ - but no AAD prefix was provided in the file decryption properties" - )); - } - let decryptor = get_file_decryptor( - t_file_crypto_metadata.encryption_algorithm, - t_file_crypto_metadata.key_metadata.as_deref(), - file_decryption_properties, - )?; - let footer_decryptor = decryptor.get_footer_decryptor(); - let aad_footer = create_footer_aad(decryptor.file_aad())?; - - decrypted_fmd_buf = footer_decryptor? - .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref()) - .map_err(|_| { - general_err!( - "Provided footer key and AAD were unable to decrypt parquet footer" - ) - })?; - prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); - - file_decryptor = Some(decryptor); - } else { - return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided")); - } - } - - let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse metadata: {}", e))?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Arc::new(SchemaDescriptor::new(schema)); - - if let (Some(algo), Some(file_decryption_properties)) = ( - t_file_metadata.encryption_algorithm, - file_decryption_properties, - ) { - // File has a plaintext footer but encryption algorithm is set - let file_decryptor_value = get_file_decryptor( - algo, - t_file_metadata.footer_signing_key_metadata.as_deref(), - file_decryption_properties, - )?; - if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer { - file_decryptor_value.verify_plaintext_footer_signature(buf)?; - } - file_decryptor = Some(file_decryptor_value); - } - - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - let r = RowGroupMetaData::from_encrypted_thrift( - schema_descr.clone(), - rg, - file_decryptor.as_ref(), - )?; - row_groups.push(r); - } - let column_orders = - Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - t_file_metadata.key_value_metadata, - schema_descr, - column_orders, - ); - let mut metadata = ParquetMetaData::new(file_metadata, row_groups); - - metadata.with_file_decryptor(file_decryptor); - - Ok(metadata) - } - /// Decodes [`ParquetMetaData`] from the provided bytes. /// /// Typically this is used to decode the metadata from the end of a parquet @@ -1057,105 +800,18 @@ impl ParquetMetaDataReader { /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata(buf: &[u8]) -> Result { - let mut prot = TCompactSliceInputProtocol::new(buf); - - let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse metadata: {}", e))?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Arc::new(SchemaDescriptor::new(schema)); - - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); - } - let column_orders = - Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - t_file_metadata.key_value_metadata, - schema_descr, - column_orders, - ); - - Ok(ParquetMetaData::new(file_metadata, row_groups)) - } - - /// Parses column orders from Thrift definition. - /// If no column orders are defined, returns `None`. - fn parse_column_orders( - t_column_orders: Option>, - schema_descr: &SchemaDescriptor, - ) -> Result>> { - match t_column_orders { - Some(orders) => { - // Should always be the case - if orders.len() != schema_descr.num_columns() { - return Err(general_err!("Column order length mismatch")); - }; - let mut res = Vec::new(); - for (i, column) in schema_descr.columns().iter().enumerate() { - match orders[i] { - TColumnOrder::TYPEORDER(_) => { - let sort_order = ColumnOrder::get_sort_order( - column.logical_type(), - column.converted_type(), - column.physical_type(), - ); - res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); - } - } - } - Ok(Some(res)) - } - None => Ok(None), - } - } -} - -#[cfg(feature = "encryption")] -fn get_file_decryptor( - encryption_algorithm: EncryptionAlgorithm, - footer_key_metadata: Option<&[u8]>, - file_decryption_properties: &FileDecryptionProperties, -) -> Result { - match encryption_algorithm { - EncryptionAlgorithm::AESGCMV1(algo) => { - let aad_file_unique = algo - .aad_file_unique - .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?; - let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() { - aad_prefix.clone() - } else { - algo.aad_prefix.unwrap_or_default() - }; - - FileDecryptor::new( - file_decryption_properties, - footer_key_metadata, - aad_file_unique, - aad_prefix, - ) - } - EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!( - "The AES_GCM_CTR_V1 encryption algorithm is not yet supported" - )), + // Note this API does not support encryption. + decode_metadata(buf) } } #[cfg(test)] mod tests { use super::*; - use bytes::Bytes; - - use crate::basic::SortOrder; - use crate::basic::Type; use crate::file::reader::Length; - use crate::format::TypeDefinedOrder; - use crate::schema::types::Type as SchemaType; use crate::util::test_common::file_util::get_test_file; + use bytes::Bytes; + use std::ops::Range; #[test] fn test_parse_metadata_size_smaller_than_footer() { @@ -1185,59 +841,6 @@ mod tests { assert!(matches!(err, ParquetError::NeedMoreData(263))); } - #[test] - fn test_metadata_column_orders_parse() { - // Define simple schema, we do not need to provide logical types. - let fields = vec![ - Arc::new( - SchemaType::primitive_type_builder("col1", Type::INT32) - .build() - .unwrap(), - ), - Arc::new( - SchemaType::primitive_type_builder("col2", Type::FLOAT) - .build() - .unwrap(), - ), - ]; - let schema = SchemaType::group_type_builder("schema") - .with_fields(fields) - .build() - .unwrap(); - let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - - let t_column_orders = Some(vec![ - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - ]); - - assert_eq!( - ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(), - Some(vec![ - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) - ]) - ); - - // Test when no column orders are defined. - assert_eq!( - ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(), - None - ); - } - - #[test] - fn test_metadata_column_orders_len_mismatch() { - let schema = SchemaType::group_type_builder("schema").build().unwrap(); - let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - - let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); - - let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr); - assert!(res.is_err()); - assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch")); - } - #[test] #[allow(deprecated)] fn test_try_parse() { @@ -1369,6 +972,7 @@ mod async_tests { use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; use tempfile::NamedTempFile; use crate::arrow::ArrowWriter;