-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[Parquet] Account for FileDecryptor in ParquetMetaData heap size calculation #8671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a6f033e
ec03919
f58565e
a53eb7b
0c8ab49
955ad16
4ce0a66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor, TAG_LEN} | |
| use crate::encryption::modules::{ModuleType, create_footer_aad, create_module_aad}; | ||
| use crate::errors::{ParquetError, Result}; | ||
| use crate::file::column_crypto_metadata::ColumnCryptoMetaData; | ||
| use crate::file::metadata::HeapSize; | ||
| use std::borrow::Cow; | ||
| use std::collections::HashMap; | ||
| use std::fmt::Formatter; | ||
|
|
@@ -271,6 +272,12 @@ struct ExplicitDecryptionKeys { | |
| column_keys: HashMap<String, Vec<u8>>, | ||
| } | ||
|
|
||
| impl HeapSize for ExplicitDecryptionKeys { | ||
| fn heap_size(&self) -> usize { | ||
| self.footer_key.heap_size() + self.column_keys.heap_size() | ||
| } | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
| enum DecryptionKeys { | ||
| Explicit(ExplicitDecryptionKeys), | ||
|
|
@@ -290,6 +297,19 @@ impl PartialEq for DecryptionKeys { | |
| } | ||
| } | ||
|
|
||
| impl HeapSize for DecryptionKeys { | ||
| fn heap_size(&self) -> usize { | ||
| match self { | ||
| Self::Explicit(keys) => keys.heap_size(), | ||
| Self::ViaRetriever(_) => { | ||
| // The retriever is a user-defined type we don't control, | ||
| // so we can't determine the heap size. | ||
|
Comment on lines
+305
to
+306
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed in #8472, we could potentially add a new trait method to allow a key retriever to provide a heap size later. |
||
| 0 | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// `FileDecryptionProperties` hold keys and AAD data required to decrypt a Parquet file. | ||
| /// | ||
| /// When reading Arrow data, the `FileDecryptionProperties` should be included in the | ||
|
|
@@ -334,6 +354,11 @@ pub struct FileDecryptionProperties { | |
| footer_signature_verification: bool, | ||
| } | ||
|
|
||
| impl HeapSize for FileDecryptionProperties { | ||
| fn heap_size(&self) -> usize { | ||
| self.keys.heap_size() + self.aad_prefix.heap_size() | ||
| } | ||
| } | ||
| impl FileDecryptionProperties { | ||
| /// Returns a new [`FileDecryptionProperties`] builder that will use the provided key to | ||
| /// decrypt footer metadata. | ||
|
|
@@ -547,6 +572,21 @@ impl PartialEq for FileDecryptor { | |
| } | ||
| } | ||
|
|
||
| /// Estimate the size in bytes required for the file decryptor. | ||
| /// This is important to track the memory usage of cached Parquet meta data, | ||
| /// and is used via [`crate::file::metadata::ParquetMetaData::memory_size`]. | ||
| /// Note that when a [`KeyRetriever`] is used, its heap size won't be included | ||
| /// and the result will be an underestimate. | ||
| /// If the [`FileDecryptionProperties`] are shared between multiple files then the | ||
| /// heap size may also be an overestimate. | ||
adamreeve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| impl HeapSize for FileDecryptor { | ||
| fn heap_size(&self) -> usize { | ||
| self.decryption_properties.heap_size() | ||
| + (Arc::clone(&self.footer_decryptor) as Arc<dyn HeapSize>).heap_size() | ||
| + self.file_aad.heap_size() | ||
| } | ||
| } | ||
|
|
||
| impl FileDecryptor { | ||
| pub(crate) fn new( | ||
| decryption_properties: &Arc<FileDecryptionProperties>, | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -28,6 +28,7 @@ use crate::file::page_index::column_index::{ | |||||
| }; | ||||||
| use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; | ||||||
| use crate::file::statistics::{Statistics, ValueStatistics}; | ||||||
| use std::collections::HashMap; | ||||||
| use std::sync::Arc; | ||||||
|
|
||||||
| /// Trait for calculating the size of various containers | ||||||
|
|
@@ -50,9 +51,60 @@ impl<T: HeapSize> HeapSize for Vec<T> { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl<K: HeapSize, V: HeapSize> HeapSize for HashMap<K, V> { | ||||||
adamreeve marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| fn heap_size(&self) -> usize { | ||||||
| let capacity = self.capacity(); | ||||||
| if capacity == 0 { | ||||||
| return 0; | ||||||
| } | ||||||
|
|
||||||
| // HashMap doesn't provide a way to get its heap size, so this is an approximation based on | ||||||
| // the behavior of hashbrown::HashMap as at version 0.16.0, and may become inaccurate | ||||||
| // if the implementation changes. | ||||||
| let key_val_size = std::mem::size_of::<(K, V)>(); | ||||||
| // Overhead for the control tags group, which may be smaller depending on architecture | ||||||
| let group_size = 16; | ||||||
| // 1 byte of metadata stored per bucket. | ||||||
| let metadata_size = 1; | ||||||
|
|
||||||
| // Compute the number of buckets for the capacity. Based on hashbrown's capacity_to_buckets | ||||||
| let buckets = if capacity < 15 { | ||||||
| let min_cap = match key_val_size { | ||||||
| 0..=1 => 14, | ||||||
| 2..=3 => 7, | ||||||
| _ => 3, | ||||||
| }; | ||||||
| let cap = min_cap.max(capacity); | ||||||
| if cap < 4 { | ||||||
| 4 | ||||||
| } else if cap < 8 { | ||||||
| 8 | ||||||
| } else { | ||||||
| 16 | ||||||
| } | ||||||
| } else { | ||||||
| (capacity.saturating_mul(8) / 7).next_power_of_two() | ||||||
| }; | ||||||
|
|
||||||
| group_size | ||||||
| + (buckets * (key_val_size + metadata_size)) | ||||||
| + self.keys().map(|k| k.heap_size()).sum::<usize>() | ||||||
| + self.values().map(|v| v.heap_size()).sum::<usize>() | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl<T: HeapSize> HeapSize for Arc<T> { | ||||||
| fn heap_size(&self) -> usize { | ||||||
| self.as_ref().heap_size() | ||||||
| // Arc stores weak and strong counts on the heap alongside an instance of T | ||||||
| 2 * std::mem::size_of::<usize>() + std::mem::size_of::<T>() + self.as_ref().heap_size() | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While reviewing the downstream changes to DataFusion, I actually think this is (now) double counting the memory usage (it includes both I filed I will prepare a fix
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this code is correct. See the documentation on the arrow-rs/parquet/src/file/metadata/memory.rs Lines 39 to 40 in ed9efe7
The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's correct. let v = Vec::<i32>::new();
println!("empty vec heap size {}", v.heap_size());
println!("size of vec {}", std::mem::size_of::<Vec<i32>>());
let av = Arc::new(v);
println!("arc<vec> heap size {}", av.heap_size());prints filling Assuming
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just noticed that the heap size for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thank you -- I double checked too and I agree the code is correct. Sorry for the false alarm: #8898 (comment) |
||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl HeapSize for Arc<dyn HeapSize> { | ||||||
| fn heap_size(&self) -> usize { | ||||||
| 2 * std::mem::size_of::<usize>() | ||||||
| + std::mem::size_of_val(self.as_ref()) | ||||||
| + self.as_ref().heap_size() | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -287,11 +287,17 @@ impl ParquetMetaData { | |
| /// | ||
| /// 4. Does not include any allocator overheads | ||
| pub fn memory_size(&self) -> usize { | ||
| #[cfg(feature = "encryption")] | ||
| let encryption_size = self.file_decryptor.heap_size(); | ||
| #[cfg(not(feature = "encryption"))] | ||
| let encryption_size = 0usize; | ||
|
|
||
| std::mem::size_of::<Self>() | ||
| + self.file_metadata.heap_size() | ||
| + self.row_groups.heap_size() | ||
| + self.column_index.heap_size() | ||
| + self.offset_index.heap_size() | ||
| + encryption_size | ||
| } | ||
|
|
||
| /// Override the column index | ||
|
|
@@ -1875,10 +1881,9 @@ mod tests { | |
| .build(); | ||
|
|
||
| #[cfg(not(feature = "encryption"))] | ||
| let base_expected_size = 2248; | ||
| let base_expected_size = 2766; | ||
| #[cfg(feature = "encryption")] | ||
| // Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472 | ||
| let base_expected_size = 2416; | ||
| let base_expected_size = 2934; | ||
|
|
||
| assert_eq!(parquet_meta.memory_size(), base_expected_size); | ||
|
|
||
|
|
@@ -1907,16 +1912,90 @@ mod tests { | |
| .build(); | ||
|
|
||
| #[cfg(not(feature = "encryption"))] | ||
| let bigger_expected_size = 2674; | ||
| let bigger_expected_size = 3192; | ||
|
Comment on lines
-1910
to
+1915
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So sad to see this increase so much. Truth hurts 😢
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can start with being truthful and then move on to being slimmer |
||
| #[cfg(feature = "encryption")] | ||
| // Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472 | ||
| let bigger_expected_size = 2842; | ||
| let bigger_expected_size = 3360; | ||
|
|
||
| // more set fields means more memory usage | ||
| assert!(bigger_expected_size > base_expected_size); | ||
| assert_eq!(parquet_meta.memory_size(), bigger_expected_size); | ||
| } | ||
|
|
||
| #[test] | ||
| #[cfg(feature = "encryption")] | ||
| fn test_memory_size_with_decryptor() { | ||
| use crate::encryption::decrypt::FileDecryptionProperties; | ||
| use crate::file::metadata::thrift::encryption::AesGcmV1; | ||
|
|
||
| let schema_descr = get_test_schema_descr(); | ||
|
|
||
| let columns = schema_descr | ||
| .columns() | ||
| .iter() | ||
| .map(|column_descr| ColumnChunkMetaData::builder(column_descr.clone()).build()) | ||
| .collect::<Result<Vec<_>>>() | ||
| .unwrap(); | ||
| let row_group_meta = RowGroupMetaData::builder(schema_descr.clone()) | ||
| .set_num_rows(1000) | ||
| .set_column_metadata(columns) | ||
| .build() | ||
| .unwrap(); | ||
| let row_group_meta = vec![row_group_meta]; | ||
|
|
||
| let version = 2; | ||
| let num_rows = 1000; | ||
| let aad_file_unique = vec![1u8; 8]; | ||
| let aad_prefix = vec![2u8; 8]; | ||
| let encryption_algorithm = EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 { | ||
| aad_prefix: Some(aad_prefix.clone()), | ||
| aad_file_unique: Some(aad_file_unique.clone()), | ||
| supply_aad_prefix: Some(true), | ||
| }); | ||
| let footer_key_metadata = Some(vec![3u8; 8]); | ||
| let file_metadata = | ||
| FileMetaData::new(version, num_rows, None, None, schema_descr.clone(), None) | ||
| .with_encryption_algorithm(Some(encryption_algorithm)) | ||
| .with_footer_signing_key_metadata(footer_key_metadata.clone()); | ||
|
|
||
| let parquet_meta_data = ParquetMetaDataBuilder::new(file_metadata.clone()) | ||
| .set_row_groups(row_group_meta.clone()) | ||
| .build(); | ||
|
|
||
| let base_expected_size = 2058; | ||
| assert_eq!(parquet_meta_data.memory_size(), base_expected_size); | ||
|
|
||
| let footer_key = "0123456789012345".as_bytes(); | ||
| let column_key = "1234567890123450".as_bytes(); | ||
| let mut decryption_properties_builder = | ||
| FileDecryptionProperties::builder(footer_key.to_vec()) | ||
| .with_aad_prefix(aad_prefix.clone()); | ||
| for column in schema_descr.columns() { | ||
| decryption_properties_builder = decryption_properties_builder | ||
| .with_column_key(&column.path().string(), column_key.to_vec()); | ||
| } | ||
| let decryption_properties = decryption_properties_builder.build().unwrap(); | ||
| let decryptor = FileDecryptor::new( | ||
| &decryption_properties, | ||
| footer_key_metadata.as_deref(), | ||
| aad_file_unique, | ||
| aad_prefix, | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let parquet_meta_data = ParquetMetaDataBuilder::new(file_metadata.clone()) | ||
| .set_row_groups(row_group_meta.clone()) | ||
| .set_file_decryptor(Some(decryptor)) | ||
| .build(); | ||
|
|
||
| let expected_size_with_decryptor = 3072; | ||
| assert!(expected_size_with_decryptor > base_expected_size); | ||
|
|
||
| assert_eq!( | ||
| parquet_meta_data.memory_size(), | ||
| expected_size_with_decryptor | ||
| ); | ||
| } | ||
|
|
||
| /// Returns sample schema descriptor so we can create column metadata. | ||
| fn get_test_schema_descr() -> SchemaDescPtr { | ||
| let schema = SchemaType::group_type_builder("schema") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -845,7 +845,9 @@ pub struct ColumnDescriptor { | |
|
|
||
| impl HeapSize for ColumnDescriptor { | ||
| fn heap_size(&self) -> usize { | ||
| self.primitive_type.heap_size() + self.path.heap_size() | ||
| // Don't include the heap size of primitive_type, this is already | ||
| // accounted for via SchemaDescriptor::schema | ||
| self.path.heap_size() | ||
|
Comment on lines
+848
to
+850
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚀 |
||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.