diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 8b06fe676308..0c4372e38683 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -97,10 +97,7 @@ pub(crate) mod thrift_gen; mod writer; #[cfg(feature = "encryption")] -use crate::encryption::{ - decrypt::FileDecryptor, - modules::{create_module_aad, ModuleType}, -}; +use crate::encryption::decrypt::FileDecryptor; #[cfg(feature = "encryption")] use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData}; pub(crate) use crate::file::metadata::memory::HeapSize; @@ -117,8 +114,6 @@ use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, }; -#[cfg(feature = "encryption")] -use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use crate::{ basic::BoundaryOrder, errors::{ParquetError, Result}, @@ -684,93 +679,6 @@ impl RowGroupMetaData { self.file_offset } - /// Method to convert from encrypted Thrift. - #[cfg(feature = "encryption")] - fn from_encrypted_thrift( - schema_descr: SchemaDescPtr, - mut rg: crate::format::RowGroup, - decryptor: Option<&FileDecryptor>, - ) -> Result { - if schema_descr.num_columns() != rg.columns.len() { - return Err(general_err!( - "Column count mismatch. Schema has {} columns while Row Group has {}", - schema_descr.num_columns(), - rg.columns.len() - )); - } - let total_byte_size = rg.total_byte_size; - let num_rows = rg.num_rows; - let mut columns = vec![]; - - for (i, (mut c, d)) in rg - .columns - .drain(0..) - .zip(schema_descr.columns()) - .enumerate() - { - // Read encrypted metadata if it's present and we have a decryptor. - if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) { - let column_decryptor = match c.crypto_metadata.as_ref() { - None => { - return Err(general_err!( - "No crypto_metadata is set for column '{}', which has encrypted metadata", - d.path().string() - )); - } - Some(TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => { - let column_name = crypto_metadata.path_in_schema.join("."); - decryptor.get_column_metadata_decryptor( - column_name.as_str(), - crypto_metadata.key_metadata.as_deref(), - )? - } - Some(TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => { - decryptor.get_footer_decryptor()? - } - }; - - let column_aad = create_module_aad( - decryptor.file_aad(), - ModuleType::ColumnMetaData, - rg.ordinal.unwrap() as usize, - i, - None, - )?; - - let buf = c.encrypted_column_metadata.clone().unwrap(); - let decrypted_cc_buf = column_decryptor - .decrypt(buf.as_slice(), column_aad.as_ref()) - .map_err(|_| { - general_err!( - "Unable to decrypt column '{}', perhaps the column key is wrong?", - d.path().string() - ) - })?; - - let mut prot = TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice()); - c.meta_data = Some(crate::format::ColumnMetaData::read_from_in_protocol( - &mut prot, - )?); - } - columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?); - } - - let sorting_columns = rg.sorting_columns.map(|scs| { - scs.iter() - .map(|sc| sc.into()) - .collect::>() - }); - Ok(RowGroupMetaData { - columns, - num_rows, - sorting_columns, - total_byte_size, - schema_descr, - file_offset: rg.file_offset, - ordinal: rg.ordinal, - }) - } - /// Method to convert from Thrift. pub fn from_thrift( schema_descr: SchemaDescPtr, diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 57cc7c57ac66..7ab2db2f7ff3 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -15,32 +15,19 @@ // specific language governing permissions and limitations // under the License. -use std::{io::Read, ops::Range, sync::Arc}; +use std::{io::Read, ops::Range}; -use crate::{ - basic::ColumnOrder, - file::metadata::{FileMetaData, KeyValue}, - parquet_thrift::ThriftCompactInputProtocol, -}; #[cfg(feature = "encryption")] -use crate::{ - encryption::{ - decrypt::{CryptoContext, FileDecryptionProperties, FileDecryptor}, - modules::create_footer_aad, - }, - format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}, -}; +use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties}; +use crate::parquet_thrift::ThriftCompactInputProtocol; use bytes::Bytes; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData}; use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::reader::ChunkReader; use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; -use crate::schema::types; -use crate::schema::types::SchemaDescriptor; -use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; #[cfg(all(feature = "async", feature = "arrow"))] use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch}; @@ -960,101 +947,11 @@ impl ParquetMetaDataReader { encrypted_footer: bool, file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { - let mut prot = TCompactSliceInputProtocol::new(buf); - let mut file_decryptor = None; - let decrypted_fmd_buf; - - if encrypted_footer { - if let Some(file_decryption_properties) = file_decryption_properties { - let t_file_crypto_metadata: TFileCryptoMetaData = - TFileCryptoMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; - let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm { - EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix, - _ => Some(false), - } - .unwrap_or(false); - if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() { - return Err(general_err!( - "Parquet file was encrypted with an AAD prefix that is not stored in the file, \ - but no AAD prefix was provided in the file decryption properties" - )); - } - let decryptor = get_file_decryptor( - t_file_crypto_metadata.encryption_algorithm, - t_file_crypto_metadata.key_metadata.as_deref(), - file_decryption_properties, - )?; - let footer_decryptor = decryptor.get_footer_decryptor(); - let aad_footer = create_footer_aad(decryptor.file_aad())?; - - decrypted_fmd_buf = footer_decryptor? - .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref()) - .map_err(|_| { - general_err!( - "Provided footer key and AAD were unable to decrypt parquet footer" - ) - })?; - prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); - - file_decryptor = Some(decryptor); - } else { - return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided")); - } - } - - let t_file_metadata = crate::format::FileMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse metadata: {}", e))?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Arc::new(SchemaDescriptor::new(schema)); - - if let (Some(algo), Some(file_decryption_properties)) = ( - t_file_metadata.encryption_algorithm, + super::thrift_gen::parquet_metadata_with_encryption( file_decryption_properties, - ) { - // File has a plaintext footer but encryption algorithm is set - let file_decryptor_value = get_file_decryptor( - algo, - t_file_metadata.footer_signing_key_metadata.as_deref(), - file_decryption_properties, - )?; - if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer { - file_decryptor_value.verify_plaintext_footer_signature(buf)?; - } - file_decryptor = Some(file_decryptor_value); - } - - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - let r = RowGroupMetaData::from_encrypted_thrift( - schema_descr.clone(), - rg, - file_decryptor.as_ref(), - )?; - row_groups.push(r); - } - let column_orders = - Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; - - let key_value_metadata = t_file_metadata.key_value_metadata.map(|vkv| { - vkv.into_iter() - .map(|kv| KeyValue::new(kv.key, kv.value)) - .collect::>() - }); - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - key_value_metadata, - schema_descr, - column_orders, - ); - let mut metadata = ParquetMetaData::new(file_metadata, row_groups); - - metadata.with_file_decryptor(file_decryptor); - - Ok(metadata) + encrypted_footer, + buf, + ) } /// Decodes [`ParquetMetaData`] from the provided bytes. @@ -1065,116 +962,17 @@ impl ParquetMetaDataReader { /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata(buf: &[u8]) -> Result { - let mut prot = TCompactSliceInputProtocol::new(buf); - - let t_file_metadata = crate::format::FileMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse metadata: {}", e))?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Arc::new(SchemaDescriptor::new(schema)); - - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); - } - let column_orders = - Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; - - let key_value_metadata = t_file_metadata.key_value_metadata.map(|vkv| { - vkv.into_iter() - .map(|kv| KeyValue::new(kv.key, kv.value)) - .collect::>() - }); - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - key_value_metadata, - schema_descr, - column_orders, - ); - - Ok(ParquetMetaData::new(file_metadata, row_groups)) - } - - /// create meta data from thrift encoded bytes - pub fn decode_file_metadata(buf: &[u8]) -> Result { let mut prot = ThriftCompactInputProtocol::new(buf); ParquetMetaData::try_from(&mut prot) } - - /// Parses column orders from Thrift definition. - /// If no column orders are defined, returns `None`. - fn parse_column_orders( - t_column_orders: Option>, - schema_descr: &SchemaDescriptor, - ) -> Result>> { - match t_column_orders { - Some(orders) => { - // Should always be the case - if orders.len() != schema_descr.num_columns() { - return Err(general_err!("Column order length mismatch")); - }; - let mut res = Vec::new(); - for (i, column) in schema_descr.columns().iter().enumerate() { - match orders[i] { - crate::format::ColumnOrder::TYPEORDER(_) => { - let sort_order = ColumnOrder::get_sort_order( - column.logical_type(), - column.converted_type(), - column.physical_type(), - ); - res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); - } - } - } - Ok(Some(res)) - } - None => Ok(None), - } - } -} - -#[cfg(feature = "encryption")] -fn get_file_decryptor( - encryption_algorithm: EncryptionAlgorithm, - footer_key_metadata: Option<&[u8]>, - file_decryption_properties: &FileDecryptionProperties, -) -> Result { - match encryption_algorithm { - EncryptionAlgorithm::AESGCMV1(algo) => { - let aad_file_unique = algo - .aad_file_unique - .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?; - let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() { - aad_prefix.clone() - } else { - algo.aad_prefix.unwrap_or_default() - }; - - FileDecryptor::new( - file_decryption_properties, - footer_key_metadata, - aad_file_unique, - aad_prefix, - ) - } - EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!( - "The AES_GCM_CTR_V1 encryption algorithm is not yet supported" - )), - } } #[cfg(test)] mod tests { use super::*; use bytes::Bytes; - use zstd::zstd_safe::WriteBuf; - use crate::basic::SortOrder; - use crate::basic::Type; use crate::file::reader::Length; - use crate::schema::types::Type as SchemaType; use crate::util::test_common::file_util::get_test_file; #[test] @@ -1205,61 +1003,6 @@ mod tests { assert!(matches!(err, ParquetError::NeedMoreData(263))); } - #[test] - fn test_metadata_column_orders_parse() { - // Define simple schema, we do not need to provide logical types. - let fields = vec![ - Arc::new( - SchemaType::primitive_type_builder("col1", Type::INT32) - .build() - .unwrap(), - ), - Arc::new( - SchemaType::primitive_type_builder("col2", Type::FLOAT) - .build() - .unwrap(), - ), - ]; - let schema = SchemaType::group_type_builder("schema") - .with_fields(fields) - .build() - .unwrap(); - let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - - let t_column_orders = Some(vec![ - crate::format::ColumnOrder::TYPEORDER(Default::default()), - crate::format::ColumnOrder::TYPEORDER(Default::default()), - ]); - - assert_eq!( - ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(), - Some(vec![ - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) - ]) - ); - - // Test when no column orders are defined. - assert_eq!( - ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(), - None - ); - } - - #[test] - fn test_metadata_column_orders_len_mismatch() { - let schema = SchemaType::group_type_builder("schema").build().unwrap(); - let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - - let t_column_orders = Some(vec![crate::format::ColumnOrder::TYPEORDER( - Default::default(), - )]); - - let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr); - assert!(res.is_err()); - assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch")); - } - #[test] #[allow(deprecated)] fn test_try_parse() { @@ -1374,27 +1117,6 @@ mod tests { "EOF: Parquet file too small. Size is 1728 but need 1729" ); } - - #[test] - fn test_new_decoder() { - let file = get_test_file("alltypes_tiny_pages.parquet"); - let len = file.len(); - - // read entire file - let bytes = file.get_bytes(0, len as usize).unwrap(); - let mut footer = [0u8; FOOTER_SIZE]; - footer.copy_from_slice(bytes.slice(len as usize - FOOTER_SIZE..).as_slice()); - let tail = ParquetMetaDataReader::decode_footer_tail(&footer).unwrap(); - let meta_len = tail.metadata_length(); - let metadata_bytes = bytes.slice(len as usize - FOOTER_SIZE - meta_len..); - - // get ParquetMetaData - let m = ParquetMetaDataReader::decode_file_metadata(&metadata_bytes).unwrap(); - let m2 = ParquetMetaDataReader::decode_metadata(&metadata_bytes).unwrap(); - - // check that metadatas are equivalent - assert_eq!(m, m2); - } } #[cfg(all(feature = "async", feature = "arrow", test))] @@ -1412,6 +1134,7 @@ mod async_tests { use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; use tempfile::NamedTempFile; use crate::arrow::ArrowWriter; diff --git a/parquet/src/file/metadata/thrift_gen.rs b/parquet/src/file/metadata/thrift_gen.rs index f15a5a6b16d8..06229fb1812f 100644 --- a/parquet/src/file/metadata/thrift_gen.rs +++ b/parquet/src/file/metadata/thrift_gen.rs @@ -20,10 +20,10 @@ use std::io::Write; use std::sync::Arc; -#[cfg(feature = "encryption")] -use crate::file::column_crypto_metadata::ColumnCryptoMetaData; use crate::{ - basic::{ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, Type}, + basic::{ + ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, Repetition, Type, + }, data_type::{ByteArray, FixedLenByteArray, Int96}, errors::{ParquetError, Result}, file::{ @@ -39,9 +39,15 @@ use crate::{ WriteThrift, WriteThriftField, }, schema::types::{parquet_schema_from_array, ColumnDescriptor, SchemaDescriptor}, - thrift_struct, + thrift_struct, thrift_union, util::bit_util::FromBytes, }; +#[cfg(feature = "encryption")] +use crate::{ + encryption::decrypt::{FileDecryptionProperties, FileDecryptor}, + file::column_crypto_metadata::ColumnCryptoMetaData, + schema::types::SchemaDescPtr, +}; // this needs to be visible to the schema conversion code thrift_struct!( @@ -60,6 +66,153 @@ pub(crate) struct SchemaElement<'a> { } ); +thrift_struct!( +pub(crate) struct DataPageHeader { + /// Number of values, including NULLs, in this data page. + /// + /// If a OffsetIndex is present, a page must begin at a row + /// boundary (repetition_level = 0). Otherwise, pages may begin + /// within a row (repetition_level > 0). + 1: required i32 num_values + + /// Encoding used for this data page + 2: required Encoding encoding + + /// Encoding used for definition levels + 3: required Encoding definition_level_encoding; + + /// Encoding used for repetition levels + 4: required Encoding repetition_level_encoding; + + // Optional statistics for the data in this page + // page stats are pretty useless...lets ignore them + //5: optional Statistics statistics; +} +); + +thrift_struct!( + pub(crate) struct IndexPageHeader {} +); + +thrift_struct!( +pub(crate) struct DictionaryPageHeader { + /// Number of values in the dictionary + 1: required i32 num_values; + + /// Encoding using this dictionary page + 2: required Encoding encoding + + /// If true, the entries in the dictionary are sorted in ascending order + 3: optional bool is_sorted; +} +); + +thrift_struct!( +pub(crate) struct DataPageHeaderV2 { + /// Number of values, including NULLs, in this data page. + 1: required i32 num_values + /// Number of NULL values, in this data page. + /// Number of non-null = num_values - num_nulls which is also the number of values in the data section + 2: required i32 num_nulls + /// Number of rows in this data page. Every page must begin at a + /// row boundary (repetition_level = 0): rows must **not** be + /// split across page boundaries when using V2 data pages. + 3: required i32 num_rows + /// Encoding used for data in this page + 4: required Encoding encoding + + // repetition levels and definition levels are always using RLE (without size in it) + + /// Length of the definition levels + 5: required i32 definition_levels_byte_length; + /// Length of the repetition levels + 6: required i32 repetition_levels_byte_length; + + /// Whether the values are compressed. + /// Which means the section of the page between + /// definition_levels_byte_length + repetition_levels_byte_length + 1 and compressed_page_size (included) + /// is compressed with the compression_codec. + /// If missing it is considered compressed + 7: optional bool is_compressed = true; + + // Optional statistics for the data in this page + //8: optional Statistics statistics; +} +); + +thrift_struct!( +#[allow(dead_code)] +pub(crate) struct PageHeader { + /// the type of the page: indicates which of the *_header fields is set + 1: required PageType type_ + + /// Uncompressed page size in bytes (not including this header) + 2: required i32 uncompressed_page_size + + /// Compressed (and potentially encrypted) page size in bytes, not including this header + 3: required i32 compressed_page_size + + /// The 32-bit CRC checksum for the page, to be be calculated as follows: + 4: optional i32 crc + + // Headers for page specific data. One only will be set. + 5: optional DataPageHeader data_page_header; + 6: optional IndexPageHeader index_page_header; + 7: optional DictionaryPageHeader dictionary_page_header; + 8: optional DataPageHeaderV2 data_page_header_v2; +} +); + +thrift_struct!( +pub(crate) struct AesGcmV1<'a> { + /// AAD prefix + 1: optional binary<'a> aad_prefix + + /// Unique file identifier part of AAD suffix + 2: optional binary<'a> aad_file_unique + + /// In files encrypted with AAD prefix without storing it, + /// readers must supply the prefix + 3: optional bool supply_aad_prefix +} +); + +thrift_struct!( +pub(crate) struct AesGcmCtrV1<'a> { + /// AAD prefix + 1: optional binary<'a> aad_prefix + + /// Unique file identifier part of AAD suffix + 2: optional binary<'a> aad_file_unique + + /// In files encrypted with AAD prefix without storing it, + /// readers must supply the prefix + 3: optional bool supply_aad_prefix +} +); + +thrift_union!( +union EncryptionAlgorithm<'a> { + 1: (AesGcmV1<'a>) AES_GCM_V1 + 2: (AesGcmCtrV1<'a>) AES_GCM_CTR_V1 +} +); + +#[cfg(feature = "encryption")] +thrift_struct!( +/// Crypto metadata for files with encrypted footer +pub(crate) struct FileCryptoMetaData<'a> { + /// Encryption algorithm. This field is only used for files + /// with encrypted footer. Files with plaintext footer store algorithm id + /// inside footer (FileMetaData structure). + 1: required EncryptionAlgorithm<'a> encryption_algorithm + + /** Retrieval metadata of key used for encryption of footer, + * and (possibly) columns **/ + 2: optional binary<'a> key_metadata +} +); + // the following are only used internally so are private thrift_struct!( struct FileMetaData<'a> { @@ -71,8 +224,8 @@ struct FileMetaData<'a> { 5: optional list key_value_metadata 6: optional string created_by 7: optional list column_orders; - //8: optional EncryptionAlgorithm encryption_algorithm - //9: optional binary footer_signing_key_metadata + 8: optional EncryptionAlgorithm<'a> encryption_algorithm + 9: optional binary<'a> footer_signing_key_metadata } ); @@ -172,7 +325,7 @@ struct SizeStatistics { ); thrift_struct!( -struct Statistics<'a> { +pub(crate) struct Statistics<'a> { 1: optional binary<'a> max; 2: optional binary<'a> min; 3: optional i64 null_count; @@ -304,7 +457,7 @@ fn convert_column( Ok(result) } -fn convert_stats( +pub(crate) fn convert_stats( physical_type: Type, thrift_stats: Option, ) -> Result> { @@ -453,6 +606,250 @@ fn convert_stats( }) } +#[cfg(feature = "encryption")] +fn row_group_from_encrypted_thrift( + mut rg: RowGroup, + schema_descr: SchemaDescPtr, + decryptor: Option<&FileDecryptor>, +) -> Result { + if schema_descr.num_columns() != rg.columns.len() { + return Err(general_err!( + "Column count mismatch. Schema has {} columns while Row Group has {}", + schema_descr.num_columns(), + rg.columns.len() + )); + } + let total_byte_size = rg.total_byte_size; + let num_rows = rg.num_rows; + let mut columns = vec![]; + + for (i, (mut c, d)) in rg + .columns + .drain(0..) + .zip(schema_descr.columns()) + .enumerate() + { + // Read encrypted metadata if it's present and we have a decryptor. + if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) { + let column_decryptor = match c.crypto_metadata.as_ref() { + None => { + return Err(general_err!( + "No crypto_metadata is set for column '{}', which has encrypted metadata", + d.path().string() + )); + } + Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => { + let column_name = crypto_metadata.path_in_schema.join("."); + decryptor.get_column_metadata_decryptor( + column_name.as_str(), + crypto_metadata.key_metadata.as_deref(), + )? + } + Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => { + decryptor.get_footer_decryptor()? + } + }; + + let column_aad = crate::encryption::modules::create_module_aad( + decryptor.file_aad(), + crate::encryption::modules::ModuleType::ColumnMetaData, + rg.ordinal.unwrap() as usize, + i, + None, + )?; + + let buf = c.encrypted_column_metadata.unwrap(); + let decrypted_cc_buf = + column_decryptor + .decrypt(buf, column_aad.as_ref()) + .map_err(|_| { + general_err!( + "Unable to decrypt column '{}', perhaps the column key is wrong?", + d.path().string() + ) + })?; + + let mut prot = ThriftCompactInputProtocol::new(decrypted_cc_buf.as_slice()); + let col_meta = ColumnMetaData::try_from(&mut prot)?; + c.meta_data = Some(col_meta); + columns.push(convert_column(c, d.clone())?); + } else { + columns.push(convert_column(c, d.clone())?); + } + } + + let sorting_columns = rg.sorting_columns; + let file_offset = rg.file_offset; + let ordinal = rg.ordinal; + + Ok(RowGroupMetaData { + columns, + num_rows, + sorting_columns, + total_byte_size, + schema_descr, + file_offset, + ordinal, + }) +} + +#[cfg(feature = "encryption")] +pub(crate) fn parquet_metadata_with_encryption( + file_decryption_properties: Option<&FileDecryptionProperties>, + encrypted_footer: bool, + buf: &[u8], +) -> Result { + let mut prot = ThriftCompactInputProtocol::new(buf); + let mut file_decryptor = None; + let decrypted_fmd_buf; + + if encrypted_footer { + if let Some(file_decryption_properties) = file_decryption_properties { + let t_file_crypto_metadata: FileCryptoMetaData = + FileCryptoMetaData::try_from(&mut prot) + .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; + let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm { + EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix, + _ => Some(false), + } + .unwrap_or(false); + if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() { + return Err(general_err!( + "Parquet file was encrypted with an AAD prefix that is not stored in the file, \ + but no AAD prefix was provided in the file decryption properties" + )); + } + let decryptor = get_file_decryptor( + t_file_crypto_metadata.encryption_algorithm, + t_file_crypto_metadata.key_metadata, + file_decryption_properties, + )?; + let footer_decryptor = decryptor.get_footer_decryptor(); + let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?; + + decrypted_fmd_buf = footer_decryptor? + .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref()) + .map_err(|_| { + general_err!( + "Provided footer key and AAD were unable to decrypt parquet footer" + ) + })?; + prot = ThriftCompactInputProtocol::new(decrypted_fmd_buf.as_ref()); + + file_decryptor = Some(decryptor); + } else { + return Err(general_err!( + "Parquet file has an encrypted footer but decryption properties were not provided" + )); + } + } + + let file_meta = super::thrift_gen::FileMetaData::try_from(&mut prot) + .map_err(|e| general_err!("Could not parse metadata: {}", e))?; + + let version = file_meta.version; + let num_rows = file_meta.num_rows; + let created_by = file_meta.created_by.map(|c| c.to_owned()); + let key_value_metadata = file_meta.key_value_metadata; + + let val = parquet_schema_from_array(file_meta.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(val)); + + if let (Some(algo), Some(file_decryption_properties)) = + (file_meta.encryption_algorithm, file_decryption_properties) + { + // File has a plaintext footer but encryption algorithm is set + let file_decryptor_value = get_file_decryptor( + algo, + file_meta.footer_signing_key_metadata, + file_decryption_properties, + )?; + if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer { + file_decryptor_value.verify_plaintext_footer_signature(buf)?; + } + file_decryptor = Some(file_decryptor_value); + } + + // decrypt column chunk info + let mut row_groups = Vec::with_capacity(file_meta.row_groups.len()); + for rg in file_meta.row_groups { + let r = row_group_from_encrypted_thrift(rg, schema_descr.clone(), file_decryptor.as_ref())?; + row_groups.push(r); + } + + // need to map read column orders to actual values based on the schema + if file_meta + .column_orders + .as_ref() + .is_some_and(|cos| cos.len() != schema_descr.num_columns()) + { + return Err(general_err!("Column order length mismatch")); + } + + let column_orders = file_meta.column_orders.map(|cos| { + let mut res = Vec::with_capacity(cos.len()); + for (i, column) in schema_descr.columns().iter().enumerate() { + match cos[i] { + ColumnOrder::TYPE_DEFINED_ORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + _ => res.push(cos[i]), + } + } + res + }); + + let fmd = crate::file::metadata::FileMetaData::new( + version, + num_rows, + created_by, + key_value_metadata, + schema_descr, + column_orders, + ); + let mut metadata = ParquetMetaData::new(fmd, row_groups); + + metadata.with_file_decryptor(file_decryptor); + + Ok(metadata) +} + +#[cfg(feature = "encryption")] +pub(super) fn get_file_decryptor( + encryption_algorithm: EncryptionAlgorithm, + footer_key_metadata: Option<&[u8]>, + file_decryption_properties: &FileDecryptionProperties, +) -> Result { + match encryption_algorithm { + EncryptionAlgorithm::AES_GCM_V1(algo) => { + let aad_file_unique = algo + .aad_file_unique + .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?; + let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() { + aad_prefix.clone() + } else { + algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default() + }; + let aad_file_unique = aad_file_unique.to_vec(); + + FileDecryptor::new( + file_decryption_properties, + footer_key_metadata, + aad_file_unique, + aad_prefix, + ) + } + EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!( + "The AES_GCM_CTR_V1 encryption algorithm is not yet supported" + )), + } +} + /// Create ParquetMetaData from thrift input. Note that this only decodes the file metadata in /// the Parquet footer. Page indexes will need to be added later. impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ParquetMetaData { diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 5308825b0976..728598045315 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -1875,9 +1875,15 @@ mod tests { 80, 65, 82, 49, ]; let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data)); + #[cfg(feature = "encryption")] assert_eq!( ret.err().unwrap().to_string(), - "Parquet error: Could not parse metadata: bad data" + "Parquet error: Could not parse metadata: Parquet error: Received empty union from remote ColumnOrder" + ); + #[cfg(not(feature = "encryption"))] + assert_eq!( + ret.err().unwrap().to_string(), + "Parquet error: Received empty union from remote ColumnOrder" ); } diff --git a/parquet/src/parquet_macros.rs b/parquet/src/parquet_macros.rs index eb523a6982a0..939f3cb339ab 100644 --- a/parquet/src/parquet_macros.rs +++ b/parquet/src/parquet_macros.rs @@ -185,17 +185,17 @@ macro_rules! thrift_union_all_empty { #[macro_export] #[allow(clippy::crate_in_macro_def)] macro_rules! thrift_union { - ($(#[$($def_attrs:tt)*])* union $identifier:ident { $($(#[$($field_attrs:tt)*])* $field_id:literal : $( ( $field_type:ident $(< $element_type:ident >)? ) )? $field_name:ident $(;)?)* }) => { + ($(#[$($def_attrs:tt)*])* union $identifier:ident $(< $lt:lifetime >)? { $($(#[$($field_attrs:tt)*])* $field_id:literal : $( ( $field_type:ident $(< $element_type:ident >)? $(< $field_lt:lifetime >)?) )? $field_name:ident $(;)?)* }) => { $(#[cfg_attr(not(doctest), $($def_attrs)*)])* #[derive(Clone, Debug, Eq, PartialEq)] #[allow(non_camel_case_types)] #[allow(non_snake_case)] #[allow(missing_docs)] - pub enum $identifier { - $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name $( ( $crate::__thrift_union_type!{$field_type $($element_type)?} ) )?),* + pub enum $identifier $(<$lt>)? { + $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name $( ( $crate::__thrift_union_type!{$field_type $($field_lt)? $($element_type)?} ) )?),* } - impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier { + impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier $(<$lt>)? { type Error = ParquetError; fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { @@ -224,7 +224,7 @@ macro_rules! thrift_union { } } - impl WriteThrift for $identifier { + impl $(<$lt>)? WriteThrift for $identifier $(<$lt>)? { const ELEMENT_TYPE: ElementType = ElementType::Struct; fn write_thrift(&self, writer: &mut ThriftCompactOutputProtocol) -> Result<()> { @@ -236,7 +236,7 @@ macro_rules! thrift_union { } } - impl WriteThriftField for $identifier { + impl $(<$lt>)? WriteThriftField for $identifier $(<$lt>)? { fn write_thrift_field(&self, writer: &mut ThriftCompactOutputProtocol, field_id: i16, last_field_id: i16) -> Result { writer.write_field_begin(FieldType::Struct, field_id, last_field_id)?; self.write_thrift(writer)?; @@ -316,6 +316,7 @@ macro_rules! thrift_struct { #[allow(unused_assignments)] fn write_thrift(&self, writer: &mut ThriftCompactOutputProtocol) -> Result<()> { + #[allow(unused_mut, unused_variables)] let mut last_field_id = 0i16; $($crate::__thrift_write_required_or_optional_field!($required_or_optional $field_name, $field_id, $field_type, self, writer, last_field_id);)* writer.write_struct_end() @@ -470,6 +471,9 @@ macro_rules! __thrift_field_type { #[doc(hidden)] #[macro_export] macro_rules! __thrift_union_type { + (binary $lt:lifetime) => { &$lt [u8] }; + (string $lt:lifetime) => { &$lt str }; + ($field_type:ident $lt:lifetime) => { $field_type<$lt> }; ($field_type:ident) => { $field_type }; (list $field_type:ident) => { Vec<$field_type> }; } diff --git a/parquet/tests/arrow_reader/bad_data.rs b/parquet/tests/arrow_reader/bad_data.rs index 619bbb862fe1..ecf449a7ce61 100644 --- a/parquet/tests/arrow_reader/bad_data.rs +++ b/parquet/tests/arrow_reader/bad_data.rs @@ -80,10 +80,13 @@ fn test_invalid_files() { #[test] fn test_parquet_1481() { let err = read_file("PARQUET-1481.parquet").unwrap_err(); + #[cfg(feature = "encryption")] assert_eq!( err.to_string(), - "Parquet error: Unexpected parquet Type: -7" + "Parquet error: Could not parse metadata: Parquet error: Unexpected Type -7" ); + #[cfg(not(feature = "encryption"))] + assert_eq!(err.to_string(), "Parquet error: Unexpected Type -7"); } #[test]