From 188d7a6e8fa17e08bedf9e897a20524e13af78be Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Sun, 29 Jun 2025 01:35:31 +0200 Subject: [PATCH 01/17] Initial commit --- parquet/src/arrow/arrow_writer/mod.rs | 56 +++++--- parquet/src/encryption/encrypt.rs | 4 +- parquet/src/file/properties.rs | 2 +- parquet/src/file/writer.rs | 2 +- parquet/tests/encryption/encryption.rs | 132 +++++++++++++++++-- parquet/tests/encryption/encryption_async.rs | 26 ++-- 6 files changed, 178 insertions(+), 44 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 25046273d065..180b10c19a67 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -179,7 +179,7 @@ pub struct ArrowWriter { arrow_schema: SchemaRef, /// Creates new [`ArrowRowGroupWriter`] instances as required - row_group_writer_factory: ArrowRowGroupWriterFactory, + pub row_group_writer_factory: ArrowRowGroupWriterFactory, /// The length of arrays to write to each row group max_row_group_size: usize, @@ -793,8 +793,9 @@ impl ArrowColumnWriter { } /// Encodes [`RecordBatch`] to a parquet row group -struct ArrowRowGroupWriter { - writers: Vec, +pub struct ArrowRowGroupWriter { + /// [`ArrowColumnWriter`] for each column in a row group + pub writers: Vec, schema: SchemaRef, buffered_rows: usize, } @@ -827,44 +828,54 @@ impl ArrowRowGroupWriter { } } -struct ArrowRowGroupWriterFactory { +/// Factory for creating [`ArrowRowGroupWriter`] instances. +/// This is used by [`ArrowWriter`] to create row group writers, but can be used +/// directly for lower level API. +pub struct ArrowRowGroupWriterFactory { #[cfg(feature = "encryption")] file_encryptor: Option>, } impl ArrowRowGroupWriterFactory { + /// Creates a new [`ArrowRowGroupWriterFactory`] using provided [`SerializedFileWriter`]. #[cfg(feature = "encryption")] - fn new(file_writer: &SerializedFileWriter) -> Self { + pub fn new(file_writer: &SerializedFileWriter) -> Self { Self { file_encryptor: file_writer.file_encryptor(), } } #[cfg(not(feature = "encryption"))] - fn new(_file_writer: &SerializedFileWriter) -> Self { + pub fn new(_file_writer: &SerializedFileWriter) -> Self { Self {} } + /// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and writer properties. #[cfg(feature = "encryption")] - fn create_row_group_writer( + pub fn create_row_group_writer( &self, parquet: &SchemaDescriptor, props: &WriterPropertiesPtr, arrow: &SchemaRef, row_group_index: usize, ) -> Result { - let writers = get_column_writers_with_encryptor( - parquet, - props, - arrow, - self.file_encryptor.clone(), - row_group_index, - )?; + let mut writers = Vec::with_capacity(arrow.fields.len()); + let mut leaves = parquet.columns().iter(); + let column_factory = ArrowColumnWriterFactory::new() + .with_file_encryptor(row_group_index, self.file_encryptor.clone()); + for field in &arrow.fields { + column_factory.get_arrow_column_writer( + field.data_type(), + props, + &mut leaves, + &mut writers, + )?; + } Ok(ArrowRowGroupWriter::new(writers, arrow)) } #[cfg(not(feature = "encryption"))] - fn create_row_group_writer( + pub fn create_row_group_writer( &self, parquet: &SchemaDescriptor, props: &WriterPropertiesPtr, @@ -898,7 +909,7 @@ pub fn get_column_writers( /// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption #[cfg(feature = "encryption")] -fn get_column_writers_with_encryptor( +pub fn get_column_writers_with_encryptor( parquet: &SchemaDescriptor, props: &WriterPropertiesPtr, arrow: &SchemaRef, @@ -921,14 +932,21 @@ fn get_column_writers_with_encryptor( } /// Gets [`ArrowColumnWriter`] instances for different data types -struct ArrowColumnWriterFactory { +pub struct ArrowColumnWriterFactory { #[cfg(feature = "encryption")] row_group_index: usize, #[cfg(feature = "encryption")] file_encryptor: Option>, } +impl Default for ArrowColumnWriterFactory { + fn default() -> Self { + Self::new() + } +} + impl ArrowColumnWriterFactory { + /// Create a new [`ArrowColumnWriterFactory`] pub fn new() -> Self { Self { #[cfg(feature = "encryption")] @@ -939,7 +957,7 @@ impl ArrowColumnWriterFactory { } #[cfg(feature = "encryption")] - pub fn with_file_encryptor( + fn with_file_encryptor( mut self, row_group_index: usize, file_encryptor: Option>, @@ -977,7 +995,7 @@ impl ArrowColumnWriterFactory { } /// Gets the [`ArrowColumnWriter`] for the given `data_type` - fn get_arrow_column_writer( + pub fn get_arrow_column_writer( &self, data_type: &ArrowDataType, props: &WriterPropertiesPtr, diff --git a/parquet/src/encryption/encrypt.rs b/parquet/src/encryption/encrypt.rs index c8d3ffc0eef4..36ff6f3d6a1f 100644 --- a/parquet/src/encryption/encrypt.rs +++ b/parquet/src/encryption/encrypt.rs @@ -288,14 +288,14 @@ impl EncryptionPropertiesBuilder { #[derive(Debug)] /// The encryption configuration for a single Parquet file -pub(crate) struct FileEncryptor { +pub struct FileEncryptor { properties: FileEncryptionProperties, aad_file_unique: Vec, file_aad: Vec, } impl FileEncryptor { - pub(crate) fn new(properties: FileEncryptionProperties) -> Result { + pub fn new(properties: FileEncryptionProperties) -> Result { // Generate unique AAD for file let rng = SystemRandom::new(); let mut aad_file_unique = vec![0u8; 8]; diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 26177b69a577..3ab437f168e6 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -457,7 +457,7 @@ pub struct WriterPropertiesBuilder { impl WriterPropertiesBuilder { /// Returns default state of the builder. - fn with_defaults() -> Self { + pub fn with_defaults() -> Self { Self { data_page_size_limit: DEFAULT_PAGE_SIZE, data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 31a3344db66c..26342f94fe9c 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -433,7 +433,7 @@ impl SerializedFileWriter { /// Get the file encryptor used by this instance to encrypt data #[cfg(feature = "encryption")] - pub(crate) fn file_encryptor(&self) -> Option> { + pub fn file_encryptor(&self) -> Option> { self.file_encryptor.clone() } } diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 7079e91d1209..1fc96de18e62 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -28,13 +28,16 @@ use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, }; -use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_writer::{ + compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory, +}; +use parquet::arrow::{ArrowSchemaConverter, ArrowWriter}; use parquet::data_type::{ByteArray, ByteArrayType}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; use parquet::schema::parser::parse_message_type; use std::fs::File; @@ -1062,14 +1065,10 @@ fn test_decrypt_page_index( Ok(()) } -fn read_and_roundtrip_to_encrypted_file( +fn read_encrypted_file( path: &str, decryption_properties: FileDecryptionProperties, - encryption_properties: FileEncryptionProperties, -) { - let temp_file = tempfile::tempfile().unwrap(); - - // read example data +) -> Result<(Vec, ArrowReaderMetadata), ParquetError> { let file = File::open(path).unwrap(); let options = ArrowReaderOptions::default() .with_file_decryption_properties(decryption_properties.clone()); @@ -1080,7 +1079,18 @@ fn read_and_roundtrip_to_encrypted_file( let batches = batch_reader .collect::, _>>() .unwrap(); + Ok((batches, metadata)) +} + +fn read_and_roundtrip_to_encrypted_file( + path: &str, + decryption_properties: FileDecryptionProperties, + encryption_properties: FileEncryptionProperties, +) { + // read example data + let (batches, metadata) = read_encrypted_file(path, decryption_properties.clone()).unwrap(); + let temp_file = tempfile::tempfile().unwrap(); // write example data let props = WriterProperties::builder() .with_file_encryption_properties(encryption_properties) @@ -1101,3 +1111,109 @@ fn read_and_roundtrip_to_encrypted_file( // check re-written example data verify_encryption_test_file_read(temp_file, decryption_properties); } + +#[tokio::test] +async fn test_multi_threaded_encrypted_writing() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&path, decryption_properties.clone()).unwrap(); + let to_write: Vec<_> = record_batches + .iter() + .flat_map(|rb| rb.columns().to_vec()) + .collect(); + let schema = metadata.schema().clone(); + + let props = Arc::new( + WriterPropertiesBuilder::with_defaults() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(props.coerce_types()) + .convert(&schema) + .unwrap(); + let root_schema = parquet_schema.root_schema_ptr(); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let mut file_writer = + SerializedFileWriter::new(&temp_file, root_schema.clone(), props.clone()).unwrap(); + + let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer); + let arrow_row_group_writer = arrow_row_group_writer_factory + .create_row_group_writer(&parquet_schema, &props.clone(), &schema, 0) + .unwrap(); + + // Get column writers with encryptor from ArrowRowGroupWriter + let col_writers = arrow_row_group_writer.writers; + + let mut workers: Vec<_> = col_writers + .into_iter() + .map(|mut col_writer| { + let (send, recv) = std::sync::mpsc::channel::(); + let handle = std::thread::spawn(move || { + // receive Arrays to encode via the channel + for col in recv { + col_writer.write(&col)?; + } + // once the input is complete, close the writer + // to return the newly created ArrowColumnChunk + col_writer.close() + }); + (handle, send) + }) + .collect(); + + let mut worker_iter = workers.iter_mut(); + for (arr, field) in to_write.iter().zip(&schema.fields) { + for leaves in compute_leaves(field, arr).unwrap() { + worker_iter.next().unwrap().1.send(leaves).unwrap(); + } + } + + // Wait for the workers to complete encoding, and append + // the resulting column chunks to the row group (and the file) + let mut row_group_writer = file_writer.next_row_group().unwrap(); + + for (handle, send) in workers { + drop(send); // Drop send side to signal termination + // wait for the worker to send the completed chunk + let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap(); + chunk.append_to_row_group(&mut row_group_writer).unwrap(); + } + // Close the row group which writes to the underlying file + row_group_writer.close().unwrap(); + + // Close the file writer which writes the footer + let metadata = file_writer.close().unwrap(); + assert_eq!(metadata.num_rows, 50); + + // Check that the file was written correctly + let (read_record_batches, read_metadata) = read_encrypted_file( + temp_file.path().to_str().unwrap(), + decryption_properties.clone(), + ) + .unwrap(); + verify_encryption_test_data(read_record_batches, read_metadata.metadata()); + + // Check that file was encrypted + let result = ArrowReaderMetadata::load(&temp_file.into_file(), ArrowReaderOptions::default()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" + ); +} diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index e0fbbcdfafe3..4aca4bacb057 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -20,6 +20,7 @@ use crate::encryption_util::{ verify_column_indexes, verify_encryption_test_data, TestKeyRetriever, }; +use arrow_array::RecordBatch; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::arrow_writer::ArrowWriterOptions; @@ -436,14 +437,13 @@ async fn test_decrypt_page_index( Ok(()) } -async fn verify_encryption_test_file_read_async( +async fn read_encrypted_file_async( file: &mut tokio::fs::File, decryption_properties: FileDecryptionProperties, -) -> Result<(), ParquetError> { +) -> Result<(Vec, ArrowReaderMetadata), ParquetError> { let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); let arrow_metadata = ArrowReaderMetadata::load_async(file, options).await?; - let metadata = arrow_metadata.metadata(); let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( file.try_clone().await?, @@ -451,8 +451,15 @@ async fn verify_encryption_test_file_read_async( ) .build()?; let record_batches = record_reader.try_collect::>().await?; + Ok((record_batches, arrow_metadata.clone())) +} - verify_encryption_test_data(record_batches, metadata); +async fn verify_encryption_test_file_read_async( + file: &mut tokio::fs::File, + decryption_properties: FileDecryptionProperties, +) -> Result<(), ParquetError> { + let (record_batches, metadata) = read_encrypted_file_async(file, decryption_properties).await?; + verify_encryption_test_data(record_batches, metadata.metadata()); Ok(()) } @@ -464,15 +471,8 @@ async fn read_and_roundtrip_to_encrypted_file_async( let temp_file = tempfile::tempfile().unwrap(); let mut file = File::open(&path).await.unwrap(); - let options = - ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties.clone()); - let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?; - let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( - file.try_clone().await?, - arrow_metadata.clone(), - ) - .build()?; - let record_batches = record_reader.try_collect::>().await?; + let (record_batches, arrow_metadata) = + read_encrypted_file_async(&mut file, decryption_properties.clone()).await?; let props = WriterProperties::builder() .with_file_encryption_properties(encryption_properties) From 4e3ceb8e1a3fc1bb5bc4a2b98b056aa2719e5919 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jun 2025 10:47:01 +0200 Subject: [PATCH 02/17] Update parquet/tests/encryption/encryption.rs Co-authored-by: Adam Reeve --- parquet/tests/encryption/encryption.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 1fc96de18e62..80080b68e9f1 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -1155,7 +1155,7 @@ async fn test_multi_threaded_encrypted_writing() { let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer); let arrow_row_group_writer = arrow_row_group_writer_factory - .create_row_group_writer(&parquet_schema, &props.clone(), &schema, 0) + .create_row_group_writer(&parquet_schema, &props, &schema, 0) .unwrap(); // Get column writers with encryptor from ArrowRowGroupWriter From c9d2fcad443ff00b45e6799209ea7209df1ce8ce Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jun 2025 12:16:01 +0200 Subject: [PATCH 03/17] Move schema and writer properties into ArrowRowGroupWriterFactory --- parquet/src/arrow/arrow_writer/mod.rs | 94 ++++++++++++++------------ parquet/src/encryption/encrypt.rs | 4 +- parquet/src/file/writer.rs | 2 +- parquet/tests/encryption/encryption.rs | 9 ++- 4 files changed, 60 insertions(+), 49 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 180b10c19a67..e2bb926201e9 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -179,7 +179,7 @@ pub struct ArrowWriter { arrow_schema: SchemaRef, /// Creates new [`ArrowRowGroupWriter`] instances as required - pub row_group_writer_factory: ArrowRowGroupWriterFactory, + row_group_writer_factory: ArrowRowGroupWriterFactory, /// The length of arrays to write to each row group max_row_group_size: usize, @@ -237,9 +237,14 @@ impl ArrowWriter { let max_row_group_size = props.max_row_group_size(); let file_writer = - SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?; + SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props.clone()))?; - let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer); + let row_group_writer_factory = ArrowRowGroupWriterFactory::new( + &file_writer, + schema, + arrow_schema.clone(), + props.into(), + ); Ok(Self { writer: file_writer, @@ -310,12 +315,10 @@ impl ArrowWriter { let in_progress = match &mut self.in_progress { Some(in_progress) => in_progress, - x => x.insert(self.row_group_writer_factory.create_row_group_writer( - self.writer.schema_descr(), - self.writer.properties(), - &self.arrow_schema, - self.writer.flushed_row_groups().len(), - )?), + x => x.insert( + self.row_group_writer_factory + .create_row_group_writer(self.writer.flushed_row_groups().len())?, + ), }; // If would exceed max_row_group_size, split batch @@ -832,6 +835,9 @@ impl ArrowRowGroupWriter { /// This is used by [`ArrowWriter`] to create row group writers, but can be used /// directly for lower level API. pub struct ArrowRowGroupWriterFactory { + schema: SchemaDescriptor, + arrow_schema: SchemaRef, + props: WriterPropertiesPtr, #[cfg(feature = "encryption")] file_encryptor: Option>, } @@ -839,51 +845,51 @@ pub struct ArrowRowGroupWriterFactory { impl ArrowRowGroupWriterFactory { /// Creates a new [`ArrowRowGroupWriterFactory`] using provided [`SerializedFileWriter`]. #[cfg(feature = "encryption")] - pub fn new(file_writer: &SerializedFileWriter) -> Self { + pub fn new( + file_writer: &SerializedFileWriter, + schema: SchemaDescriptor, + arrow_schema: SchemaRef, + props: WriterPropertiesPtr, + ) -> Self { Self { + schema, + arrow_schema, + props, file_encryptor: file_writer.file_encryptor(), } } #[cfg(not(feature = "encryption"))] - pub fn new(_file_writer: &SerializedFileWriter) -> Self { - Self {} + pub fn new( + _file_writer: &SerializedFileWriter, + schema: SchemaDescriptor, + arrow_schema: SchemaRef, + props: WriterPropertiesPtr, + ) -> Self { + Self { + schema, + arrow_schema, + props, + } } /// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and writer properties. #[cfg(feature = "encryption")] - pub fn create_row_group_writer( - &self, - parquet: &SchemaDescriptor, - props: &WriterPropertiesPtr, - arrow: &SchemaRef, - row_group_index: usize, - ) -> Result { - let mut writers = Vec::with_capacity(arrow.fields.len()); - let mut leaves = parquet.columns().iter(); - let column_factory = ArrowColumnWriterFactory::new() - .with_file_encryptor(row_group_index, self.file_encryptor.clone()); - for field in &arrow.fields { - column_factory.get_arrow_column_writer( - field.data_type(), - props, - &mut leaves, - &mut writers, - )?; - } - Ok(ArrowRowGroupWriter::new(writers, arrow)) + pub fn create_row_group_writer(&self, row_group_index: usize) -> Result { + let writers = get_column_writers_with_encryptor( + &self.schema, + &self.props, + &self.arrow_schema, + self.file_encryptor.clone(), + row_group_index, + )?; + Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) } #[cfg(not(feature = "encryption"))] - pub fn create_row_group_writer( - &self, - parquet: &SchemaDescriptor, - props: &WriterPropertiesPtr, - arrow: &SchemaRef, - _row_group_index: usize, - ) -> Result { - let writers = get_column_writers(parquet, props, arrow)?; - Ok(ArrowRowGroupWriter::new(writers, arrow)) + pub fn create_row_group_writer(&self, _row_group_index: usize) -> Result { + let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?; + Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) } } @@ -909,7 +915,7 @@ pub fn get_column_writers( /// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption #[cfg(feature = "encryption")] -pub fn get_column_writers_with_encryptor( +fn get_column_writers_with_encryptor( parquet: &SchemaDescriptor, props: &WriterPropertiesPtr, arrow: &SchemaRef, @@ -932,7 +938,7 @@ pub fn get_column_writers_with_encryptor( } /// Gets [`ArrowColumnWriter`] instances for different data types -pub struct ArrowColumnWriterFactory { +struct ArrowColumnWriterFactory { #[cfg(feature = "encryption")] row_group_index: usize, #[cfg(feature = "encryption")] @@ -957,7 +963,7 @@ impl ArrowColumnWriterFactory { } #[cfg(feature = "encryption")] - fn with_file_encryptor( + pub fn with_file_encryptor( mut self, row_group_index: usize, file_encryptor: Option>, diff --git a/parquet/src/encryption/encrypt.rs b/parquet/src/encryption/encrypt.rs index 36ff6f3d6a1f..c8d3ffc0eef4 100644 --- a/parquet/src/encryption/encrypt.rs +++ b/parquet/src/encryption/encrypt.rs @@ -288,14 +288,14 @@ impl EncryptionPropertiesBuilder { #[derive(Debug)] /// The encryption configuration for a single Parquet file -pub struct FileEncryptor { +pub(crate) struct FileEncryptor { properties: FileEncryptionProperties, aad_file_unique: Vec, file_aad: Vec, } impl FileEncryptor { - pub fn new(properties: FileEncryptionProperties) -> Result { + pub(crate) fn new(properties: FileEncryptionProperties) -> Result { // Generate unique AAD for file let rng = SystemRandom::new(); let mut aad_file_unique = vec![0u8; 8]; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 26342f94fe9c..31a3344db66c 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -433,7 +433,7 @@ impl SerializedFileWriter { /// Get the file encryptor used by this instance to encrypt data #[cfg(feature = "encryption")] - pub fn file_encryptor(&self) -> Option> { + pub(crate) fn file_encryptor(&self) -> Option> { self.file_encryptor.clone() } } diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 80080b68e9f1..90cd0de86522 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -1153,9 +1153,14 @@ async fn test_multi_threaded_encrypted_writing() { let mut file_writer = SerializedFileWriter::new(&temp_file, root_schema.clone(), props.clone()).unwrap(); - let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer); + let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new( + &file_writer, + parquet_schema, + schema.clone(), + props.clone(), + ); let arrow_row_group_writer = arrow_row_group_writer_factory - .create_row_group_writer(&parquet_schema, &props, &schema, 0) + .create_row_group_writer(0) .unwrap(); // Get column writers with encryptor from ArrowRowGroupWriter From 5eb0a5f4bdcb302b8b6b636b74f879dc702457c0 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 30 Jun 2025 14:29:11 +0200 Subject: [PATCH 04/17] Switch to tokio for multi threaded writing test --- parquet/tests/encryption/encryption.rs | 61 +++++++++++++------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 90cd0de86522..387ef95d1447 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -28,9 +28,7 @@ use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, }; -use parquet::arrow::arrow_writer::{ - compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory, -}; +use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowRowGroupWriterFactory}; use parquet::arrow::{ArrowSchemaConverter, ArrowWriter}; use parquet::data_type::{ByteArray, ByteArrayType}; use parquet::encryption::decrypt::FileDecryptionProperties; @@ -1165,41 +1163,44 @@ async fn test_multi_threaded_encrypted_writing() { // Get column writers with encryptor from ArrowRowGroupWriter let col_writers = arrow_row_group_writer.writers; + let num_columns = col_writers.len(); + + // Create a channel for each column writer to send ArrowLeafColumn data to + let mut col_writer_tasks = Vec::with_capacity(num_columns); + let mut col_array_channels = Vec::with_capacity(num_columns); + for mut writer in col_writers.into_iter() { + let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(100); + col_array_channels.push(send_array); + let handle = tokio::spawn(async move { + while let Some(col) = receive_array.recv().await { + let _ = writer.write(&col); + } + writer.close().unwrap() + }); + col_writer_tasks.push(handle); + } - let mut workers: Vec<_> = col_writers - .into_iter() - .map(|mut col_writer| { - let (send, recv) = std::sync::mpsc::channel::(); - let handle = std::thread::spawn(move || { - // receive Arrays to encode via the channel - for col in recv { - col_writer.write(&col)?; - } - // once the input is complete, close the writer - // to return the newly created ArrowColumnChunk - col_writer.close() - }); - (handle, send) - }) - .collect(); - - let mut worker_iter = workers.iter_mut(); - for (arr, field) in to_write.iter().zip(&schema.fields) { - for leaves in compute_leaves(field, arr).unwrap() { - worker_iter.next().unwrap().1.send(leaves).unwrap(); + // Send the ArrowLeafColumn data to the respective column writer channels + for (channel_idx, (array, field)) in to_write.iter().zip(schema.fields()).enumerate() { + for c in compute_leaves(field, array).into_iter().flatten() { + let _ = col_array_channels[channel_idx].send(c).await; } } + drop(col_array_channels); + + // Wait for all column writers to finish writing + let mut finalized_rg = Vec::with_capacity(num_columns); + for task in col_writer_tasks.into_iter() { + finalized_rg.push(task.await.unwrap()); + } - // Wait for the workers to complete encoding, and append + // Wait for the workers to complete writing then append // the resulting column chunks to the row group (and the file) let mut row_group_writer = file_writer.next_row_group().unwrap(); - - for (handle, send) in workers { - drop(send); // Drop send side to signal termination - // wait for the worker to send the completed chunk - let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap(); + for chunk in finalized_rg { chunk.append_to_row_group(&mut row_group_writer).unwrap(); } + // Close the row group which writes to the underlying file row_group_writer.close().unwrap(); From dd4f457e6b991c60bfb7ff97bb93213eb33ce4ec Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 2 Jul 2025 23:54:49 +0200 Subject: [PATCH 05/17] Review feedback --- parquet/src/arrow/arrow_writer/mod.rs | 19 ++++++++++--------- parquet/tests/encryption/encryption.rs | 9 +++++---- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index e2bb926201e9..836dbd55bfd2 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -236,15 +236,12 @@ impl ArrowWriter { let max_row_group_size = props.max_row_group_size(); + let props_ptr = Arc::new(props); let file_writer = - SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props.clone()))?; + SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?; - let row_group_writer_factory = ArrowRowGroupWriterFactory::new( - &file_writer, - schema, - arrow_schema.clone(), - props.into(), - ); + let row_group_writer_factory = + ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr); Ok(Self { writer: file_writer, @@ -797,8 +794,7 @@ impl ArrowColumnWriter { /// Encodes [`RecordBatch`] to a parquet row group pub struct ArrowRowGroupWriter { - /// [`ArrowColumnWriter`] for each column in a row group - pub writers: Vec, + writers: Vec, schema: SchemaRef, buffered_rows: usize, } @@ -829,6 +825,11 @@ impl ArrowRowGroupWriter { .map(|writer| writer.close()) .collect() } + + /// Get [`ArrowColumnWriter`]s for all columns in a row group + pub fn into_column_writers(self) -> Vec { + self.writers + } } /// Factory for creating [`ArrowRowGroupWriter`] instances. diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 387ef95d1447..63c92c2f1549 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -1162,7 +1162,7 @@ async fn test_multi_threaded_encrypted_writing() { .unwrap(); // Get column writers with encryptor from ArrowRowGroupWriter - let col_writers = arrow_row_group_writer.writers; + let col_writers = arrow_row_group_writer.into_column_writers(); let num_columns = col_writers.len(); // Create a channel for each column writer to send ArrowLeafColumn data to @@ -1181,9 +1181,10 @@ async fn test_multi_threaded_encrypted_writing() { } // Send the ArrowLeafColumn data to the respective column writer channels - for (channel_idx, (array, field)) in to_write.iter().zip(schema.fields()).enumerate() { - for c in compute_leaves(field, array).into_iter().flatten() { - let _ = col_array_channels[channel_idx].send(c).await; + let mut worker_iter = col_array_channels.iter_mut(); + for (array, field) in to_write.iter().zip(schema.fields()) { + for leaves in compute_leaves(field, array).unwrap() { + worker_iter.next().unwrap().send(leaves).await.unwrap(); } } drop(col_array_channels); From 6b2dc4296a2be690f4f1cec726453fce6f37a2f8 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 10 Jul 2025 21:53:30 +0200 Subject: [PATCH 06/17] alternative approach --- parquet/src/arrow/arrow_writer/mod.rs | 15 ++++++- parquet/tests/encryption/encryption.rs | 55 ++++++++------------------ 2 files changed, 30 insertions(+), 40 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 836dbd55bfd2..e0d2d557e277 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -402,6 +402,17 @@ impl ArrowWriter { pub fn close(mut self) -> Result { self.finish() } + + pub fn get_column_writers(&mut self) -> Result<&ArrowRowGroupWriter> { + let in_progress = match &mut self.in_progress { + Some(in_progress) => in_progress, + x => x.insert( + self.row_group_writer_factory + .create_row_group_writer(self.writer.flushed_row_groups().len())?, + ), + }; + Ok(in_progress) + } } impl RecordBatchWriter for ArrowWriter { @@ -827,8 +838,8 @@ impl ArrowRowGroupWriter { } /// Get [`ArrowColumnWriter`]s for all columns in a row group - pub fn into_column_writers(self) -> Vec { - self.writers + pub fn get_column_writers(&self) -> &Vec { + &self.writers } } diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 63c92c2f1549..c58ad559da84 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -1135,34 +1135,23 @@ async fn test_multi_threaded_encrypted_writing() { .collect(); let schema = metadata.schema().clone(); - let props = Arc::new( - WriterPropertiesBuilder::with_defaults() - .with_file_encryption_properties(file_encryption_properties) - .build(), - ); - let parquet_schema = ArrowSchemaConverter::new() - .with_coerce_types(props.coerce_types()) - .convert(&schema) - .unwrap(); - let root_schema = parquet_schema.root_schema_ptr(); + let props = Some(WriterPropertiesBuilder::with_defaults() + .with_file_encryption_properties(file_encryption_properties) + .build()); // Create a temporary file to write the encrypted data let temp_file = tempfile::NamedTempFile::new().unwrap(); - let mut file_writer = - SerializedFileWriter::new(&temp_file, root_schema.clone(), props.clone()).unwrap(); - - let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new( - &file_writer, - parquet_schema, - schema.clone(), - props.clone(), - ); - let arrow_row_group_writer = arrow_row_group_writer_factory - .create_row_group_writer(0) - .unwrap(); + let mut writer = ArrowWriter::try_new( + temp_file.into_file(), + metadata.schema().clone(), + props, + ).unwrap(); - // Get column writers with encryptor from ArrowRowGroupWriter - let col_writers = arrow_row_group_writer.into_column_writers(); + // Get column writers with encryptor + let col_writers = writer + .get_column_writers() + .unwrap() + .get_column_writers(); let num_columns = col_writers.len(); // Create a channel for each column writer to send ArrowLeafColumn data to @@ -1173,9 +1162,9 @@ async fn test_multi_threaded_encrypted_writing() { col_array_channels.push(send_array); let handle = tokio::spawn(async move { while let Some(col) = receive_array.recv().await { - let _ = writer.write(&col); + writer.write(&col); } - writer.close().unwrap() + &writer.close().unwrap() }); col_writer_tasks.push(handle); } @@ -1195,23 +1184,13 @@ async fn test_multi_threaded_encrypted_writing() { finalized_rg.push(task.await.unwrap()); } - // Wait for the workers to complete writing then append - // the resulting column chunks to the row group (and the file) - let mut row_group_writer = file_writer.next_row_group().unwrap(); - for chunk in finalized_rg { - chunk.append_to_row_group(&mut row_group_writer).unwrap(); - } - - // Close the row group which writes to the underlying file - row_group_writer.close().unwrap(); - // Close the file writer which writes the footer - let metadata = file_writer.close().unwrap(); + let metadata = writer.finish().unwrap(); assert_eq!(metadata.num_rows, 50); // Check that the file was written correctly let (read_record_batches, read_metadata) = read_encrypted_file( - temp_file.path().to_str().unwrap(), + &path, decryption_properties.clone(), ) .unwrap(); From fcc2c3e3d642a979244b355317737cc7435af475 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 30 Jul 2025 09:48:47 +0200 Subject: [PATCH 07/17] get_column_writers returns Result> --- parquet/src/arrow/arrow_writer/mod.rs | 15 ++++------ parquet/tests/encryption/encryption.rs | 40 +++++++++++--------------- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index e0d2d557e277..60a847f4ab0c 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -403,15 +403,12 @@ impl ArrowWriter { self.finish() } - pub fn get_column_writers(&mut self) -> Result<&ArrowRowGroupWriter> { - let in_progress = match &mut self.in_progress { - Some(in_progress) => in_progress, - x => x.insert( - self.row_group_writer_factory - .create_row_group_writer(self.writer.flushed_row_groups().len())?, - ), - }; - Ok(in_progress) + pub fn get_column_writers(&mut self) -> Result> { + let _ = self.flush(); + let in_progress = self + .row_group_writer_factory + .create_row_group_writer(self.writer.flushed_row_groups().len())?; + Ok(in_progress.writers) } } diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index c58ad559da84..1cfde767eacd 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -28,7 +28,9 @@ use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, }; -use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowRowGroupWriterFactory}; +use parquet::arrow::arrow_writer::{ + compute_leaves, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory, +}; use parquet::arrow::{ArrowSchemaConverter, ArrowWriter}; use parquet::data_type::{ByteArray, ByteArrayType}; use parquet::encryption::decrypt::FileDecryptionProperties; @@ -1135,36 +1137,31 @@ async fn test_multi_threaded_encrypted_writing() { .collect(); let schema = metadata.schema().clone(); - let props = Some(WriterPropertiesBuilder::with_defaults() - .with_file_encryption_properties(file_encryption_properties) - .build()); + let props = Some( + WriterPropertiesBuilder::with_defaults() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); // Create a temporary file to write the encrypted data - let temp_file = tempfile::NamedTempFile::new().unwrap(); - let mut writer = ArrowWriter::try_new( - temp_file.into_file(), - metadata.schema().clone(), - props, - ).unwrap(); + let temp_file = tempfile::tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); // Get column writers with encryptor - let col_writers = writer - .get_column_writers() - .unwrap() - .get_column_writers(); + let mut col_writers = writer.get_column_writers().unwrap(); let num_columns = col_writers.len(); // Create a channel for each column writer to send ArrowLeafColumn data to let mut col_writer_tasks = Vec::with_capacity(num_columns); let mut col_array_channels = Vec::with_capacity(num_columns); - for mut writer in col_writers.into_iter() { + for mut col_writer in col_writers.into_iter() { let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(100); col_array_channels.push(send_array); let handle = tokio::spawn(async move { while let Some(col) = receive_array.recv().await { - writer.write(&col); + col_writer.write(&col).unwrap(); } - &writer.close().unwrap() + col_writer.close().unwrap(); }); col_writer_tasks.push(handle); } @@ -1189,15 +1186,12 @@ async fn test_multi_threaded_encrypted_writing() { assert_eq!(metadata.num_rows, 50); // Check that the file was written correctly - let (read_record_batches, read_metadata) = read_encrypted_file( - &path, - decryption_properties.clone(), - ) - .unwrap(); + let (read_record_batches, read_metadata) = + read_encrypted_file(&path, decryption_properties.clone()).unwrap(); verify_encryption_test_data(read_record_batches, read_metadata.metadata()); // Check that file was encrypted - let result = ArrowReaderMetadata::load(&temp_file.into_file(), ArrowReaderOptions::default()); + let result = ArrowReaderMetadata::load(&temp_file, ArrowReaderOptions::default()); assert_eq!( result.unwrap_err().to_string(), "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" From 7ef26b66701b3490c26c7937a36c491c33ceafb8 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 30 Jul 2025 13:44:59 +0200 Subject: [PATCH 08/17] Remove some pub --- parquet/src/arrow/arrow_writer/mod.rs | 26 +++++++++----------------- parquet/tests/encryption/encryption.rs | 15 ++++++++------- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 60a847f4ab0c..d2ccfafe35e8 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -403,6 +403,8 @@ impl ArrowWriter { self.finish() } + /// TODO: better docstring + /// Create a new row group writer and return it's column writers. pub fn get_column_writers(&mut self) -> Result> { let _ = self.flush(); let in_progress = self @@ -801,7 +803,7 @@ impl ArrowColumnWriter { } /// Encodes [`RecordBatch`] to a parquet row group -pub struct ArrowRowGroupWriter { +struct ArrowRowGroupWriter { writers: Vec, schema: SchemaRef, buffered_rows: usize, @@ -833,17 +835,9 @@ impl ArrowRowGroupWriter { .map(|writer| writer.close()) .collect() } - - /// Get [`ArrowColumnWriter`]s for all columns in a row group - pub fn get_column_writers(&self) -> &Vec { - &self.writers - } } -/// Factory for creating [`ArrowRowGroupWriter`] instances. -/// This is used by [`ArrowWriter`] to create row group writers, but can be used -/// directly for lower level API. -pub struct ArrowRowGroupWriterFactory { +struct ArrowRowGroupWriterFactory { schema: SchemaDescriptor, arrow_schema: SchemaRef, props: WriterPropertiesPtr, @@ -852,9 +846,8 @@ pub struct ArrowRowGroupWriterFactory { } impl ArrowRowGroupWriterFactory { - /// Creates a new [`ArrowRowGroupWriterFactory`] using provided [`SerializedFileWriter`]. #[cfg(feature = "encryption")] - pub fn new( + fn new( file_writer: &SerializedFileWriter, schema: SchemaDescriptor, arrow_schema: SchemaRef, @@ -869,7 +862,7 @@ impl ArrowRowGroupWriterFactory { } #[cfg(not(feature = "encryption"))] - pub fn new( + fn new( _file_writer: &SerializedFileWriter, schema: SchemaDescriptor, arrow_schema: SchemaRef, @@ -882,9 +875,8 @@ impl ArrowRowGroupWriterFactory { } } - /// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and writer properties. #[cfg(feature = "encryption")] - pub fn create_row_group_writer(&self, row_group_index: usize) -> Result { + fn create_row_group_writer(&self, row_group_index: usize) -> Result { let writers = get_column_writers_with_encryptor( &self.schema, &self.props, @@ -896,7 +888,7 @@ impl ArrowRowGroupWriterFactory { } #[cfg(not(feature = "encryption"))] - pub fn create_row_group_writer(&self, _row_group_index: usize) -> Result { + fn create_row_group_writer(&self, _row_group_index: usize) -> Result { let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?; Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) } @@ -1010,7 +1002,7 @@ impl ArrowColumnWriterFactory { } /// Gets the [`ArrowColumnWriter`] for the given `data_type` - pub fn get_arrow_column_writer( + fn get_arrow_column_writer( &self, data_type: &ArrowDataType, props: &WriterPropertiesPtr, diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 1cfde767eacd..ac1cc39e9b51 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -28,10 +28,8 @@ use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, }; -use parquet::arrow::arrow_writer::{ - compute_leaves, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory, -}; -use parquet::arrow::{ArrowSchemaConverter, ArrowWriter}; +use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn}; +use parquet::arrow::ArrowWriter; use parquet::data_type::{ByteArray, ByteArrayType}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; @@ -1148,7 +1146,7 @@ async fn test_multi_threaded_encrypted_writing() { let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); // Get column writers with encryptor - let mut col_writers = writer.get_column_writers().unwrap(); + let col_writers = writer.get_column_writers().unwrap(); let num_columns = col_writers.len(); // Create a channel for each column writer to send ArrowLeafColumn data to @@ -1161,7 +1159,7 @@ async fn test_multi_threaded_encrypted_writing() { while let Some(col) = receive_array.recv().await { col_writer.write(&col).unwrap(); } - col_writer.close().unwrap(); + col_writer.close().unwrap() }); col_writer_tasks.push(handle); } @@ -1181,9 +1179,12 @@ async fn test_multi_threaded_encrypted_writing() { finalized_rg.push(task.await.unwrap()); } + assert!(writer.flush().is_ok()); + // Close the file writer which writes the footer let metadata = writer.finish().unwrap(); - assert_eq!(metadata.num_rows, 50); + assert_eq!(metadata.num_rows, 0); + assert_eq!(metadata.schema, metadata.schema); // Check that the file was written correctly let (read_record_batches, read_metadata) = From 912645f2e02d9289eba0987bde4effe0ec4feda1 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 30 Jul 2025 19:20:12 +0200 Subject: [PATCH 09/17] add todo --- parquet/tests/encryption/encryption.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index ac1cc39e9b51..cff9169723bd 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -1186,6 +1186,9 @@ async fn test_multi_threaded_encrypted_writing() { assert_eq!(metadata.num_rows, 0); assert_eq!(metadata.schema, metadata.schema); + // TODO: Test inserting via high-level API and low-level API in the same + // file writer, and check that data and the footer are written correctly. + // Check that the file was written correctly let (read_record_batches, read_metadata) = read_encrypted_file(&path, decryption_properties.clone()).unwrap(); From 17b7f185163b946e767ed1352a8c91bd9d3bbce1 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 30 Jul 2025 19:33:09 +0200 Subject: [PATCH 10/17] Review feedback --- parquet/src/file/properties.rs | 8 +++++--- parquet/tests/encryption/encryption.rs | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 3ab437f168e6..96e3706e27d7 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -190,7 +190,7 @@ impl WriterProperties { /// Returns a new default [`WriterPropertiesBuilder`] for creating writer /// properties. pub fn builder() -> WriterPropertiesBuilder { - WriterPropertiesBuilder::with_defaults() + WriterPropertiesBuilder::default() } /// Returns data page size limit. @@ -455,9 +455,9 @@ pub struct WriterPropertiesBuilder { file_encryption_properties: Option, } -impl WriterPropertiesBuilder { +impl Default for WriterPropertiesBuilder { /// Returns default state of the builder. - pub fn with_defaults() -> Self { + fn default() -> Self { Self { data_page_size_limit: DEFAULT_PAGE_SIZE, data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, @@ -478,7 +478,9 @@ impl WriterPropertiesBuilder { file_encryption_properties: None, } } +} +impl WriterPropertiesBuilder { /// Finalizes the configuration and returns immutable writer properties struct. pub fn build(self) -> WriterProperties { WriterProperties { diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index cff9169723bd..b906a96585e9 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -1136,7 +1136,7 @@ async fn test_multi_threaded_encrypted_writing() { let schema = metadata.schema().clone(); let props = Some( - WriterPropertiesBuilder::with_defaults() + WriterPropertiesBuilder::default() .with_file_encryption_properties(file_encryption_properties) .build(), ); From 5e9f0ce9120e32abc5d96bc3ae38c6895ee3d559 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 31 Jul 2025 14:18:02 +0200 Subject: [PATCH 11/17] Test using ArrowWriters low-level and high-level API at once --- parquet/src/arrow/arrow_writer/mod.rs | 13 +++++++++++-- parquet/src/file/writer.rs | 2 +- parquet/tests/encryption/encryption.rs | 22 +++++++++++++++++----- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index d2ccfafe35e8..92529b0e7013 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -403,8 +403,7 @@ impl ArrowWriter { self.finish() } - /// TODO: better docstring - /// Create a new row group writer and return it's column writers. + /// Create a new row group writer and return its column writers. pub fn get_column_writers(&mut self) -> Result> { let _ = self.flush(); let in_progress = self @@ -412,6 +411,16 @@ impl ArrowWriter { .create_row_group_writer(self.writer.flushed_row_groups().len())?; Ok(in_progress.writers) } + + /// Append the given column chunks to the current row group. + pub fn append_to_row_groups(&mut self, chunks: Vec) -> Result<()> { + let mut row_group_writer = self.writer.next_row_group()?; + for chunk in chunks { + chunk.append_to_row_group(&mut row_group_writer)?; + } + let _ = row_group_writer.close(); + Ok(()) + } } impl RecordBatchWriter for ArrowWriter { diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 31a3344db66c..690efb36f281 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -486,7 +486,7 @@ fn write_bloom_filters( /// more columns are available to write. /// - Once done writing a column, close column writer with `close` /// - Once all columns have been written, close row group writer with `close` -/// method. THe close method will return row group metadata and is no-op +/// method. The close method will return row group metadata and is no-op /// on already closed row group. pub struct SerializedRowGroupWriter<'a, W: Write> { descr: SchemaDescPtr, diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index b906a96585e9..5c8779c811e4 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -1145,7 +1145,9 @@ async fn test_multi_threaded_encrypted_writing() { let temp_file = tempfile::tempfile().unwrap(); let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); - // Get column writers with encryptor + // LOW-LEVEL API: Use low level API to write into a file using multiple threads + + // Get column writers let col_writers = writer.get_column_writers().unwrap(); let num_columns = col_writers.len(); @@ -1179,19 +1181,29 @@ async fn test_multi_threaded_encrypted_writing() { finalized_rg.push(task.await.unwrap()); } + // Append the finalized row group to the SerializedFileWriter + assert!(writer.append_to_row_groups(finalized_rg).is_ok()); + assert!(writer.flush().is_ok()); + + // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter + + // Write individual RecordBatches into the file + for rb in record_batches { + writer.write(&rb).unwrap() + } assert!(writer.flush().is_ok()); // Close the file writer which writes the footer let metadata = writer.finish().unwrap(); - assert_eq!(metadata.num_rows, 0); + assert_eq!(metadata.num_rows, 100); assert_eq!(metadata.schema, metadata.schema); - // TODO: Test inserting via high-level API and low-level API in the same - // file writer, and check that data and the footer are written correctly. - // Check that the file was written correctly let (read_record_batches, read_metadata) = read_encrypted_file(&path, decryption_properties.clone()).unwrap(); + + // TODO: This should be failing since we're writing data twice and + // we only seem to be reading one copy out. verify_encryption_test_data(read_record_batches, read_metadata.metadata()); // Check that file was encrypted From 3d93a49a60459f7401303c7acd8fc53a7f48737f Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 31 Jul 2025 14:23:03 +0200 Subject: [PATCH 12/17] Revert async test --- parquet/src/arrow/arrow_writer/mod.rs | 7 ------ parquet/tests/encryption/encryption_async.rs | 26 ++++++++++---------- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 92529b0e7013..213507441820 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -955,14 +955,7 @@ struct ArrowColumnWriterFactory { file_encryptor: Option>, } -impl Default for ArrowColumnWriterFactory { - fn default() -> Self { - Self::new() - } -} - impl ArrowColumnWriterFactory { - /// Create a new [`ArrowColumnWriterFactory`] pub fn new() -> Self { Self { #[cfg(feature = "encryption")] diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 4aca4bacb057..e0fbbcdfafe3 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -20,7 +20,6 @@ use crate::encryption_util::{ verify_column_indexes, verify_encryption_test_data, TestKeyRetriever, }; -use arrow_array::RecordBatch; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::arrow_writer::ArrowWriterOptions; @@ -437,13 +436,14 @@ async fn test_decrypt_page_index( Ok(()) } -async fn read_encrypted_file_async( +async fn verify_encryption_test_file_read_async( file: &mut tokio::fs::File, decryption_properties: FileDecryptionProperties, -) -> Result<(Vec, ArrowReaderMetadata), ParquetError> { +) -> Result<(), ParquetError> { let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); let arrow_metadata = ArrowReaderMetadata::load_async(file, options).await?; + let metadata = arrow_metadata.metadata(); let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( file.try_clone().await?, @@ -451,15 +451,8 @@ async fn read_encrypted_file_async( ) .build()?; let record_batches = record_reader.try_collect::>().await?; - Ok((record_batches, arrow_metadata.clone())) -} -async fn verify_encryption_test_file_read_async( - file: &mut tokio::fs::File, - decryption_properties: FileDecryptionProperties, -) -> Result<(), ParquetError> { - let (record_batches, metadata) = read_encrypted_file_async(file, decryption_properties).await?; - verify_encryption_test_data(record_batches, metadata.metadata()); + verify_encryption_test_data(record_batches, metadata); Ok(()) } @@ -471,8 +464,15 @@ async fn read_and_roundtrip_to_encrypted_file_async( let temp_file = tempfile::tempfile().unwrap(); let mut file = File::open(&path).await.unwrap(); - let (record_batches, arrow_metadata) = - read_encrypted_file_async(&mut file, decryption_properties.clone()).await?; + let options = + ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties.clone()); + let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?; + let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( + file.try_clone().await?, + arrow_metadata.clone(), + ) + .build()?; + let record_batches = record_reader.try_collect::>().await?; let props = WriterProperties::builder() .with_file_encryption_properties(encryption_properties) From 3fb17276539ccd92764f503a75527a1a651bc331 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 1 Aug 2025 13:30:45 +0200 Subject: [PATCH 13/17] Update parquet/src/arrow/arrow_writer/mod.rs Co-authored-by: Adam Reeve --- parquet/src/arrow/arrow_writer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 213507441820..ffd46ef72906 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -405,7 +405,7 @@ impl ArrowWriter { /// Create a new row group writer and return its column writers. pub fn get_column_writers(&mut self) -> Result> { - let _ = self.flush(); + self.flush()?; let in_progress = self .row_group_writer_factory .create_row_group_writer(self.writer.flushed_row_groups().len())?; From 82e97e4e7a803c8d94777cae05c573745a382448 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 1 Aug 2025 13:31:10 +0200 Subject: [PATCH 14/17] Update parquet/src/arrow/arrow_writer/mod.rs Co-authored-by: Adam Reeve --- parquet/src/arrow/arrow_writer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index ffd46ef72906..0ec074231fdc 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -418,7 +418,7 @@ impl ArrowWriter { for chunk in chunks { chunk.append_to_row_group(&mut row_group_writer)?; } - let _ = row_group_writer.close(); + row_group_writer.close()?; Ok(()) } } From 22cd86539635ee71347cefae60aae3256e06d5b9 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 1 Aug 2025 13:32:03 +0200 Subject: [PATCH 15/17] Update parquet/src/arrow/arrow_writer/mod.rs Co-authored-by: Adam Reeve --- parquet/src/arrow/arrow_writer/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 0ec074231fdc..d235f5fcab64 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -412,8 +412,8 @@ impl ArrowWriter { Ok(in_progress.writers) } - /// Append the given column chunks to the current row group. - pub fn append_to_row_groups(&mut self, chunks: Vec) -> Result<()> { + /// Append the given column chunks to the file as a new row group. + pub fn append_row_group(&mut self, chunks: Vec) -> Result<()> { let mut row_group_writer = self.writer.next_row_group()?; for chunk in chunks { chunk.append_to_row_group(&mut row_group_writer)?; From b58b4c0c75eeca6ef414ebb413d4c495c1a802bd Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 1 Aug 2025 18:08:55 +0200 Subject: [PATCH 16/17] Review feedback --- parquet/tests/encryption/encryption.rs | 109 ++++++++++++++++++++----- 1 file changed, 87 insertions(+), 22 deletions(-) diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 5c8779c811e4..82f8b906703f 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -631,6 +631,7 @@ fn uniform_encryption_page_skipping(page_index: bool) -> parquet::errors::Result fn test_write_non_uniform_encryption() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); let footer_key = b"0123456789012345".to_vec(); // 128bit/16 let column_names = vec!["double_field", "float_field"]; @@ -648,13 +649,14 @@ fn test_write_non_uniform_encryption() { .build() .unwrap(); - read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties); + read_and_roundtrip_to_encrypted_file(&file, decryption_properties, file_encryption_properties); } #[test] fn test_write_uniform_encryption_plaintext_footer() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); let footer_key = b"0123456789012345".to_vec(); // 128bit/16 let wrong_footer_key = b"0000000000000000".to_vec(); // 128bit/16 @@ -680,7 +682,7 @@ fn test_write_uniform_encryption_plaintext_footer() { // Try writing plaintext footer and then reading it with the correct footer key read_and_roundtrip_to_encrypted_file( - &path, + &file, decryption_properties.clone(), file_encryption_properties.clone(), ); @@ -689,7 +691,6 @@ fn test_write_uniform_encryption_plaintext_footer() { let temp_file = tempfile::tempfile().unwrap(); // read example data - let file = File::open(path).unwrap(); let options = ArrowReaderOptions::default() .with_file_decryption_properties(decryption_properties.clone()); let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); @@ -731,6 +732,7 @@ fn test_write_uniform_encryption_plaintext_footer() { fn test_write_uniform_encryption() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + let file = File::open(path).unwrap(); let footer_key = b"0123456789012345".to_vec(); // 128bit/16 @@ -742,7 +744,7 @@ fn test_write_uniform_encryption() { .build() .unwrap(); - read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties); + read_and_roundtrip_to_encrypted_file(&file, decryption_properties, file_encryption_properties); } #[test] @@ -1064,32 +1066,30 @@ fn test_decrypt_page_index( } fn read_encrypted_file( - path: &str, + file: &File, decryption_properties: FileDecryptionProperties, ) -> Result<(Vec, ArrowReaderMetadata), ParquetError> { - let file = File::open(path).unwrap(); let options = ArrowReaderOptions::default() .with_file_decryption_properties(decryption_properties.clone()); - let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + let metadata = ArrowReaderMetadata::load(file, options.clone())?; - let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); - let batch_reader = builder.build().unwrap(); - let batches = batch_reader - .collect::, _>>() - .unwrap(); + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(file.try_clone().unwrap(), options)?; + let batch_reader = builder.build()?; + let batches = batch_reader.collect::, _>>()?; Ok((batches, metadata)) } fn read_and_roundtrip_to_encrypted_file( - path: &str, + file: &File, decryption_properties: FileDecryptionProperties, encryption_properties: FileEncryptionProperties, ) { // read example data - let (batches, metadata) = read_encrypted_file(path, decryption_properties.clone()).unwrap(); + let (batches, metadata) = read_encrypted_file(file, decryption_properties.clone()).unwrap(); + // write example data to a temporary file let temp_file = tempfile::tempfile().unwrap(); - // write example data let props = WriterProperties::builder() .with_file_encryption_properties(encryption_properties) .build(); @@ -1115,6 +1115,7 @@ async fn test_multi_threaded_encrypted_writing() { // Read example data and set up encryption/decryption properties let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) .with_column_key("double_field", b"1234567890123450".into()) @@ -1128,7 +1129,7 @@ async fn test_multi_threaded_encrypted_writing() { .unwrap(); let (record_batches, metadata) = - read_encrypted_file(&path, decryption_properties.clone()).unwrap(); + read_encrypted_file(&file, decryption_properties.clone()).unwrap(); let to_write: Vec<_> = record_batches .iter() .flat_map(|rb| rb.columns().to_vec()) @@ -1182,8 +1183,7 @@ async fn test_multi_threaded_encrypted_writing() { } // Append the finalized row group to the SerializedFileWriter - assert!(writer.append_to_row_groups(finalized_rg).is_ok()); - assert!(writer.flush().is_ok()); + assert!(writer.append_row_group(finalized_rg).is_ok()); // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter @@ -1200,11 +1200,76 @@ async fn test_multi_threaded_encrypted_writing() { // Check that the file was written correctly let (read_record_batches, read_metadata) = - read_encrypted_file(&path, decryption_properties.clone()).unwrap(); + read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + + let file_metadata = read_metadata.metadata().file_metadata(); + assert_eq!(file_metadata.num_rows(), 100); + assert_eq!(file_metadata.schema_descr().num_columns(), 8); - // TODO: This should be failing since we're writing data twice and - // we only seem to be reading one copy out. - verify_encryption_test_data(read_record_batches, read_metadata.metadata()); + read_metadata.metadata().row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 8); + assert_eq!(rg.num_rows(), 50); + }); + + let mut row_count = 0; + let wrap_at = 50; + for batch in read_record_batches { + let batch = batch; + row_count += batch.num_rows(); + + let bool_col = batch.column(0).as_boolean(); + let time_col = batch + .column(1) + .as_primitive::(); + let list_col = batch.column(2).as_list::(); + let timestamp_col = batch + .column(3) + .as_primitive::(); + let f32_col = batch.column(4).as_primitive::(); + let f64_col = batch.column(5).as_primitive::(); + let binary_col = batch.column(6).as_binary::(); + let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); + + for (i, x) in bool_col.iter().enumerate() { + assert_eq!(x.unwrap(), i % 2 == 0); + } + for (i, x) in time_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as i32); + } + for (i, list_item) in list_col.iter().enumerate() { + let list_item = list_item.unwrap(); + let list_item = list_item.as_primitive::(); + assert_eq!(list_item.len(), 2); + assert_eq!( + list_item.value(0), + (((i % wrap_at) * 2) * 1000000000000) as i64 + ); + assert_eq!( + list_item.value(1), + (((i % wrap_at) * 2 + 1) * 1000000000000) as i64 + ); + } + for x in timestamp_col.iter() { + assert!(x.is_some()); + } + for (i, x) in f32_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as f32 * 1.1f32); + } + for (i, x) in f64_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as f64 * 1.1111111f64); + } + for (i, x) in binary_col.iter().enumerate() { + assert_eq!(x.is_some(), i % 2 == 0); + if let Some(x) = x { + assert_eq!(&x[0..7], b"parquet"); + } + } + for (i, x) in fixed_size_binary_col.iter().enumerate() { + assert_eq!(x.unwrap(), &[(i % wrap_at) as u8; 10]); + } + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); // Check that file was encrypted let result = ArrowReaderMetadata::load(&temp_file, ArrowReaderOptions::default()); From d9590212db94de291203220e2ed0beb808c69072 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 1 Aug 2025 18:20:56 +0200 Subject: [PATCH 17/17] Move parallel write test to async test file --- parquet/tests/encryption/encryption.rs | 235 +------------------ parquet/tests/encryption/encryption_async.rs | 110 ++++++++- parquet/tests/encryption/encryption_util.rs | 153 +++++++++++- 3 files changed, 259 insertions(+), 239 deletions(-) diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 82f8b906703f..96dd8654cd76 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -18,7 +18,8 @@ //! This module contains tests for reading encrypted Parquet files with the Arrow API use crate::encryption_util::{ - verify_column_indexes, verify_encryption_test_data, TestKeyRetriever, + read_and_roundtrip_to_encrypted_file, verify_column_indexes, verify_encryption_test_file_read, + TestKeyRetriever, }; use arrow::array::*; use arrow::error::Result as ArrowResult; @@ -28,14 +29,13 @@ use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, }; -use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn}; use parquet::arrow::ArrowWriter; use parquet::data_type::{ByteArray, ByteArrayType}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; +use parquet::file::properties::WriterProperties; use parquet::file::writer::SerializedFileWriter; use parquet::schema::parser::parse_message_type; use std::fs::File; @@ -378,21 +378,6 @@ fn test_uniform_encryption_with_key_retriever() { verify_encryption_test_file_read(file, decryption_properties); } -fn verify_encryption_test_file_read(file: File, decryption_properties: FileDecryptionProperties) { - let options = - ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); - let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); - let metadata = reader_metadata.metadata(); - - let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); - let record_reader = builder.build().unwrap(); - let record_batches = record_reader - .map(|x| x.unwrap()) - .collect::>(); - - verify_encryption_test_data(record_batches, metadata); -} - fn row_group_sizes(metadata: &ParquetMetaData) -> Vec { metadata.row_groups().iter().map(|x| x.num_rows()).collect() } @@ -1064,217 +1049,3 @@ fn test_decrypt_page_index( Ok(()) } - -fn read_encrypted_file( - file: &File, - decryption_properties: FileDecryptionProperties, -) -> Result<(Vec, ArrowReaderMetadata), ParquetError> { - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); - let metadata = ArrowReaderMetadata::load(file, options.clone())?; - - let builder = - ParquetRecordBatchReaderBuilder::try_new_with_options(file.try_clone().unwrap(), options)?; - let batch_reader = builder.build()?; - let batches = batch_reader.collect::, _>>()?; - Ok((batches, metadata)) -} - -fn read_and_roundtrip_to_encrypted_file( - file: &File, - decryption_properties: FileDecryptionProperties, - encryption_properties: FileEncryptionProperties, -) { - // read example data - let (batches, metadata) = read_encrypted_file(file, decryption_properties.clone()).unwrap(); - - // write example data to a temporary file - let temp_file = tempfile::tempfile().unwrap(); - let props = WriterProperties::builder() - .with_file_encryption_properties(encryption_properties) - .build(); - - let mut writer = ArrowWriter::try_new( - temp_file.try_clone().unwrap(), - metadata.schema().clone(), - Some(props), - ) - .unwrap(); - for batch in batches { - writer.write(&batch).unwrap(); - } - - writer.close().unwrap(); - - // check re-written example data - verify_encryption_test_file_read(temp_file, decryption_properties); -} - -#[tokio::test] -async fn test_multi_threaded_encrypted_writing() { - // Read example data and set up encryption/decryption properties - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); - let file = File::open(path).unwrap(); - - let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) - .with_column_key("double_field", b"1234567890123450".into()) - .with_column_key("float_field", b"1234567890123451".into()) - .build() - .unwrap(); - let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) - .with_column_key("double_field", b"1234567890123450".into()) - .with_column_key("float_field", b"1234567890123451".into()) - .build() - .unwrap(); - - let (record_batches, metadata) = - read_encrypted_file(&file, decryption_properties.clone()).unwrap(); - let to_write: Vec<_> = record_batches - .iter() - .flat_map(|rb| rb.columns().to_vec()) - .collect(); - let schema = metadata.schema().clone(); - - let props = Some( - WriterPropertiesBuilder::default() - .with_file_encryption_properties(file_encryption_properties) - .build(), - ); - - // Create a temporary file to write the encrypted data - let temp_file = tempfile::tempfile().unwrap(); - let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); - - // LOW-LEVEL API: Use low level API to write into a file using multiple threads - - // Get column writers - let col_writers = writer.get_column_writers().unwrap(); - let num_columns = col_writers.len(); - - // Create a channel for each column writer to send ArrowLeafColumn data to - let mut col_writer_tasks = Vec::with_capacity(num_columns); - let mut col_array_channels = Vec::with_capacity(num_columns); - for mut col_writer in col_writers.into_iter() { - let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(100); - col_array_channels.push(send_array); - let handle = tokio::spawn(async move { - while let Some(col) = receive_array.recv().await { - col_writer.write(&col).unwrap(); - } - col_writer.close().unwrap() - }); - col_writer_tasks.push(handle); - } - - // Send the ArrowLeafColumn data to the respective column writer channels - let mut worker_iter = col_array_channels.iter_mut(); - for (array, field) in to_write.iter().zip(schema.fields()) { - for leaves in compute_leaves(field, array).unwrap() { - worker_iter.next().unwrap().send(leaves).await.unwrap(); - } - } - drop(col_array_channels); - - // Wait for all column writers to finish writing - let mut finalized_rg = Vec::with_capacity(num_columns); - for task in col_writer_tasks.into_iter() { - finalized_rg.push(task.await.unwrap()); - } - - // Append the finalized row group to the SerializedFileWriter - assert!(writer.append_row_group(finalized_rg).is_ok()); - - // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter - - // Write individual RecordBatches into the file - for rb in record_batches { - writer.write(&rb).unwrap() - } - assert!(writer.flush().is_ok()); - - // Close the file writer which writes the footer - let metadata = writer.finish().unwrap(); - assert_eq!(metadata.num_rows, 100); - assert_eq!(metadata.schema, metadata.schema); - - // Check that the file was written correctly - let (read_record_batches, read_metadata) = - read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); - - let file_metadata = read_metadata.metadata().file_metadata(); - assert_eq!(file_metadata.num_rows(), 100); - assert_eq!(file_metadata.schema_descr().num_columns(), 8); - - read_metadata.metadata().row_groups().iter().for_each(|rg| { - assert_eq!(rg.num_columns(), 8); - assert_eq!(rg.num_rows(), 50); - }); - - let mut row_count = 0; - let wrap_at = 50; - for batch in read_record_batches { - let batch = batch; - row_count += batch.num_rows(); - - let bool_col = batch.column(0).as_boolean(); - let time_col = batch - .column(1) - .as_primitive::(); - let list_col = batch.column(2).as_list::(); - let timestamp_col = batch - .column(3) - .as_primitive::(); - let f32_col = batch.column(4).as_primitive::(); - let f64_col = batch.column(5).as_primitive::(); - let binary_col = batch.column(6).as_binary::(); - let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); - - for (i, x) in bool_col.iter().enumerate() { - assert_eq!(x.unwrap(), i % 2 == 0); - } - for (i, x) in time_col.iter().enumerate() { - assert_eq!(x.unwrap(), (i % wrap_at) as i32); - } - for (i, list_item) in list_col.iter().enumerate() { - let list_item = list_item.unwrap(); - let list_item = list_item.as_primitive::(); - assert_eq!(list_item.len(), 2); - assert_eq!( - list_item.value(0), - (((i % wrap_at) * 2) * 1000000000000) as i64 - ); - assert_eq!( - list_item.value(1), - (((i % wrap_at) * 2 + 1) * 1000000000000) as i64 - ); - } - for x in timestamp_col.iter() { - assert!(x.is_some()); - } - for (i, x) in f32_col.iter().enumerate() { - assert_eq!(x.unwrap(), (i % wrap_at) as f32 * 1.1f32); - } - for (i, x) in f64_col.iter().enumerate() { - assert_eq!(x.unwrap(), (i % wrap_at) as f64 * 1.1111111f64); - } - for (i, x) in binary_col.iter().enumerate() { - assert_eq!(x.is_some(), i % 2 == 0); - if let Some(x) = x { - assert_eq!(&x[0..7], b"parquet"); - } - } - for (i, x) in fixed_size_binary_col.iter().enumerate() { - assert_eq!(x.unwrap(), &[(i % wrap_at) as u8; 10]); - } - } - - assert_eq!(row_count, file_metadata.num_rows() as usize); - - // Check that file was encrypted - let result = ArrowReaderMetadata::load(&temp_file, ArrowReaderOptions::default()); - assert_eq!( - result.unwrap_err().to_string(), - "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" - ); -} diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index e0fbbcdfafe3..af107f1e2610 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -18,17 +18,18 @@ //! This module contains tests for reading encrypted Parquet files with the async Arrow API use crate::encryption_util::{ - verify_column_indexes, verify_encryption_test_data, TestKeyRetriever, + read_encrypted_file, verify_column_indexes, verify_encryption_double_test_data, + verify_encryption_test_data, TestKeyRetriever, }; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::arrow_writer::ArrowWriterOptions; -use parquet::arrow::AsyncArrowWriter; +use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowWriterOptions}; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::{ArrowWriter, AsyncArrowWriter}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use std::sync::Arc; use tokio::fs::File; @@ -491,3 +492,104 @@ async fn read_and_roundtrip_to_encrypted_file_async( let mut file = tokio::fs::File::from_std(temp_file.try_clone().unwrap()); verify_encryption_test_file_read_async(&mut file, decryption_properties).await } + +#[tokio::test] +async fn test_multi_threaded_encrypted_writing() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = std::fs::File::open(path).unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + let to_write: Vec<_> = record_batches + .iter() + .flat_map(|rb| rb.columns().to_vec()) + .collect(); + let schema = metadata.schema().clone(); + + let props = Some( + WriterPropertiesBuilder::default() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); + + // LOW-LEVEL API: Use low level API to write into a file using multiple threads + + // Get column writers + let col_writers = writer.get_column_writers().unwrap(); + let num_columns = col_writers.len(); + + // Create a channel for each column writer to send ArrowLeafColumn data to + let mut col_writer_tasks = Vec::with_capacity(num_columns); + let mut col_array_channels = Vec::with_capacity(num_columns); + for mut col_writer in col_writers.into_iter() { + let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(100); + col_array_channels.push(send_array); + let handle = tokio::spawn(async move { + while let Some(col) = receive_array.recv().await { + col_writer.write(&col).unwrap(); + } + col_writer.close().unwrap() + }); + col_writer_tasks.push(handle); + } + + // Send the ArrowLeafColumn data to the respective column writer channels + let mut worker_iter = col_array_channels.iter_mut(); + for (array, field) in to_write.iter().zip(schema.fields()) { + for leaves in compute_leaves(field, array).unwrap() { + worker_iter.next().unwrap().send(leaves).await.unwrap(); + } + } + drop(col_array_channels); + + // Wait for all column writers to finish writing + let mut finalized_rg = Vec::with_capacity(num_columns); + for task in col_writer_tasks.into_iter() { + finalized_rg.push(task.await.unwrap()); + } + + // Append the finalized row group to the SerializedFileWriter + assert!(writer.append_row_group(finalized_rg).is_ok()); + + // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter + + // Write individual RecordBatches into the file + for rb in record_batches { + writer.write(&rb).unwrap() + } + assert!(writer.flush().is_ok()); + + // Close the file writer which writes the footer + let metadata = writer.finish().unwrap(); + assert_eq!(metadata.num_rows, 100); + assert_eq!(metadata.schema, metadata.schema); + + // Check that the file was written correctly + let (read_record_batches, read_metadata) = + read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + verify_encryption_double_test_data(read_record_batches, read_metadata.metadata()); + + // Check that file was encrypted + let result = ArrowReaderMetadata::load(&temp_file, ArrowReaderOptions::default()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" + ); +} diff --git a/parquet/tests/encryption/encryption_util.rs b/parquet/tests/encryption/encryption_util.rs index 5e962fe0755b..bf7fd08109f6 100644 --- a/parquet/tests/encryption/encryption_util.rs +++ b/parquet/tests/encryption/encryption_util.rs @@ -17,14 +17,98 @@ use arrow_array::cast::AsArray; use arrow_array::{types, RecordBatch}; -use parquet::encryption::decrypt::KeyRetriever; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; +use parquet::arrow::ArrowWriter; +use parquet::encryption::decrypt::{FileDecryptionProperties, KeyRetriever}; +use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::{ParquetError, Result}; use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; use std::collections::HashMap; +use std::fs::File; use std::sync::Mutex; +pub(crate) fn verify_encryption_double_test_data( + record_batches: Vec, + metadata: &ParquetMetaData, +) { + let file_metadata = metadata.file_metadata(); + assert_eq!(file_metadata.num_rows(), 100); + assert_eq!(file_metadata.schema_descr().num_columns(), 8); + + metadata.row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 8); + assert_eq!(rg.num_rows(), 50); + }); + + let mut row_count = 0; + let wrap_at = 50; + for batch in record_batches { + let batch = batch; + row_count += batch.num_rows(); + + let bool_col = batch.column(0).as_boolean(); + let time_col = batch + .column(1) + .as_primitive::(); + let list_col = batch.column(2).as_list::(); + let timestamp_col = batch + .column(3) + .as_primitive::(); + let f32_col = batch.column(4).as_primitive::(); + let f64_col = batch.column(5).as_primitive::(); + let binary_col = batch.column(6).as_binary::(); + let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); + + for (i, x) in bool_col.iter().enumerate() { + assert_eq!(x.unwrap(), i % 2 == 0); + } + for (i, x) in time_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as i32); + } + for (i, list_item) in list_col.iter().enumerate() { + let list_item = list_item.unwrap(); + let list_item = list_item.as_primitive::(); + assert_eq!(list_item.len(), 2); + assert_eq!( + list_item.value(0), + (((i % wrap_at) * 2) * 1000000000000) as i64 + ); + assert_eq!( + list_item.value(1), + (((i % wrap_at) * 2 + 1) * 1000000000000) as i64 + ); + } + for x in timestamp_col.iter() { + assert!(x.is_some()); + } + for (i, x) in f32_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as f32 * 1.1f32); + } + for (i, x) in f64_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as f64 * 1.1111111f64); + } + for (i, x) in binary_col.iter().enumerate() { + assert_eq!(x.is_some(), i % 2 == 0); + if let Some(x) = x { + assert_eq!(&x[0..7], b"parquet"); + } + } + for (i, x) in fixed_size_binary_col.iter().enumerate() { + assert_eq!(x.unwrap(), &[(i % wrap_at) as u8; 10]); + } + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); +} + /// Verifies data read from an encrypted file from the parquet-testing repository -pub fn verify_encryption_test_data(record_batches: Vec, metadata: &ParquetMetaData) { +pub(crate) fn verify_encryption_test_data( + record_batches: Vec, + metadata: &ParquetMetaData, +) { let file_metadata = metadata.file_metadata(); assert_eq!(file_metadata.num_rows(), 50); assert_eq!(file_metadata.schema_descr().num_columns(), 8); @@ -90,7 +174,7 @@ pub fn verify_encryption_test_data(record_batches: Vec, metadata: & /// Verifies that the column and offset indexes were successfully read from an /// encrypted test file. -pub fn verify_column_indexes(metadata: &ParquetMetaData) { +pub(crate) fn verify_column_indexes(metadata: &ParquetMetaData) { let offset_index = metadata.offset_index().unwrap(); // 1 row group, 8 columns assert_eq!(offset_index.len(), 1); @@ -120,6 +204,69 @@ pub fn verify_column_indexes(metadata: &ParquetMetaData) { }; } +pub(crate) fn read_encrypted_file( + file: &File, + decryption_properties: FileDecryptionProperties, +) -> std::result::Result<(Vec, ArrowReaderMetadata), ParquetError> { + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(file, options.clone())?; + + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(file.try_clone().unwrap(), options)?; + let batch_reader = builder.build()?; + let batches = batch_reader.collect::, _>>()?; + Ok((batches, metadata)) +} + +pub(crate) fn read_and_roundtrip_to_encrypted_file( + file: &File, + decryption_properties: FileDecryptionProperties, + encryption_properties: FileEncryptionProperties, +) { + // read example data + let (batches, metadata) = read_encrypted_file(file, decryption_properties.clone()).unwrap(); + + // write example data to a temporary file + let temp_file = tempfile::tempfile().unwrap(); + let props = WriterProperties::builder() + .with_file_encryption_properties(encryption_properties) + .build(); + + let mut writer = ArrowWriter::try_new( + temp_file.try_clone().unwrap(), + metadata.schema().clone(), + Some(props), + ) + .unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + } + + writer.close().unwrap(); + + // check re-written example data + verify_encryption_test_file_read(temp_file, decryption_properties); +} + +pub(crate) fn verify_encryption_test_file_read( + file: File, + decryption_properties: FileDecryptionProperties, +) { + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); + let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + let metadata = reader_metadata.metadata(); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let record_reader = builder.build().unwrap(); + let record_batches = record_reader + .map(|x| x.unwrap()) + .collect::>(); + + verify_encryption_test_data(record_batches, metadata); +} + /// A KeyRetriever to use in Parquet encryption tests, /// which stores a map from key names/metadata to encryption key bytes. pub struct TestKeyRetriever {