diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 25046273d065..d235f5fcab64 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -236,10 +236,12 @@ impl ArrowWriter { let max_row_group_size = props.max_row_group_size(); + let props_ptr = Arc::new(props); let file_writer = - SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?; + SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?; - let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer); + let row_group_writer_factory = + ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr); Ok(Self { writer: file_writer, @@ -310,12 +312,10 @@ impl ArrowWriter { let in_progress = match &mut self.in_progress { Some(in_progress) => in_progress, - x => x.insert(self.row_group_writer_factory.create_row_group_writer( - self.writer.schema_descr(), - self.writer.properties(), - &self.arrow_schema, - self.writer.flushed_row_groups().len(), - )?), + x => x.insert( + self.row_group_writer_factory + .create_row_group_writer(self.writer.flushed_row_groups().len())?, + ), }; // If would exceed max_row_group_size, split batch @@ -402,6 +402,25 @@ impl ArrowWriter { pub fn close(mut self) -> Result { self.finish() } + + /// Create a new row group writer and return its column writers. + pub fn get_column_writers(&mut self) -> Result> { + self.flush()?; + let in_progress = self + .row_group_writer_factory + .create_row_group_writer(self.writer.flushed_row_groups().len())?; + Ok(in_progress.writers) + } + + /// Append the given column chunks to the file as a new row group. + pub fn append_row_group(&mut self, chunks: Vec) -> Result<()> { + let mut row_group_writer = self.writer.next_row_group()?; + for chunk in chunks { + chunk.append_to_row_group(&mut row_group_writer)?; + } + row_group_writer.close()?; + Ok(()) + } } impl RecordBatchWriter for ArrowWriter { @@ -828,51 +847,59 @@ impl ArrowRowGroupWriter { } struct ArrowRowGroupWriterFactory { + schema: SchemaDescriptor, + arrow_schema: SchemaRef, + props: WriterPropertiesPtr, #[cfg(feature = "encryption")] file_encryptor: Option>, } impl ArrowRowGroupWriterFactory { #[cfg(feature = "encryption")] - fn new(file_writer: &SerializedFileWriter) -> Self { + fn new( + file_writer: &SerializedFileWriter, + schema: SchemaDescriptor, + arrow_schema: SchemaRef, + props: WriterPropertiesPtr, + ) -> Self { Self { + schema, + arrow_schema, + props, file_encryptor: file_writer.file_encryptor(), } } #[cfg(not(feature = "encryption"))] - fn new(_file_writer: &SerializedFileWriter) -> Self { - Self {} + fn new( + _file_writer: &SerializedFileWriter, + schema: SchemaDescriptor, + arrow_schema: SchemaRef, + props: WriterPropertiesPtr, + ) -> Self { + Self { + schema, + arrow_schema, + props, + } } #[cfg(feature = "encryption")] - fn create_row_group_writer( - &self, - parquet: &SchemaDescriptor, - props: &WriterPropertiesPtr, - arrow: &SchemaRef, - row_group_index: usize, - ) -> Result { + fn create_row_group_writer(&self, row_group_index: usize) -> Result { let writers = get_column_writers_with_encryptor( - parquet, - props, - arrow, + &self.schema, + &self.props, + &self.arrow_schema, self.file_encryptor.clone(), row_group_index, )?; - Ok(ArrowRowGroupWriter::new(writers, arrow)) + Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) } #[cfg(not(feature = "encryption"))] - fn create_row_group_writer( - &self, - parquet: &SchemaDescriptor, - props: &WriterPropertiesPtr, - arrow: &SchemaRef, - _row_group_index: usize, - ) -> Result { - let writers = get_column_writers(parquet, props, arrow)?; - Ok(ArrowRowGroupWriter::new(writers, arrow)) + fn create_row_group_writer(&self, _row_group_index: usize) -> Result { + let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?; + Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) } } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 26177b69a577..96e3706e27d7 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -190,7 +190,7 @@ impl WriterProperties { /// Returns a new default [`WriterPropertiesBuilder`] for creating writer /// properties. pub fn builder() -> WriterPropertiesBuilder { - WriterPropertiesBuilder::with_defaults() + WriterPropertiesBuilder::default() } /// Returns data page size limit. @@ -455,9 +455,9 @@ pub struct WriterPropertiesBuilder { file_encryption_properties: Option, } -impl WriterPropertiesBuilder { +impl Default for WriterPropertiesBuilder { /// Returns default state of the builder. - fn with_defaults() -> Self { + fn default() -> Self { Self { data_page_size_limit: DEFAULT_PAGE_SIZE, data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, @@ -478,7 +478,9 @@ impl WriterPropertiesBuilder { file_encryption_properties: None, } } +} +impl WriterPropertiesBuilder { /// Finalizes the configuration and returns immutable writer properties struct. pub fn build(self) -> WriterProperties { WriterProperties { diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 31a3344db66c..690efb36f281 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -486,7 +486,7 @@ fn write_bloom_filters( /// more columns are available to write. /// - Once done writing a column, close column writer with `close` /// - Once all columns have been written, close row group writer with `close` -/// method. THe close method will return row group metadata and is no-op +/// method. The close method will return row group metadata and is no-op /// on already closed row group. pub struct SerializedRowGroupWriter<'a, W: Write> { descr: SchemaDescPtr, diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index 7079e91d1209..96dd8654cd76 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -18,7 +18,8 @@ //! This module contains tests for reading encrypted Parquet files with the Arrow API use crate::encryption_util::{ - verify_column_indexes, verify_encryption_test_data, TestKeyRetriever, + read_and_roundtrip_to_encrypted_file, verify_column_indexes, verify_encryption_test_file_read, + TestKeyRetriever, }; use arrow::array::*; use arrow::error::Result as ArrowResult; @@ -377,21 +378,6 @@ fn test_uniform_encryption_with_key_retriever() { verify_encryption_test_file_read(file, decryption_properties); } -fn verify_encryption_test_file_read(file: File, decryption_properties: FileDecryptionProperties) { - let options = - ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); - let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); - let metadata = reader_metadata.metadata(); - - let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); - let record_reader = builder.build().unwrap(); - let record_batches = record_reader - .map(|x| x.unwrap()) - .collect::>(); - - verify_encryption_test_data(record_batches, metadata); -} - fn row_group_sizes(metadata: &ParquetMetaData) -> Vec { metadata.row_groups().iter().map(|x| x.num_rows()).collect() } @@ -630,6 +616,7 @@ fn uniform_encryption_page_skipping(page_index: bool) -> parquet::errors::Result fn test_write_non_uniform_encryption() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); let footer_key = b"0123456789012345".to_vec(); // 128bit/16 let column_names = vec!["double_field", "float_field"]; @@ -647,13 +634,14 @@ fn test_write_non_uniform_encryption() { .build() .unwrap(); - read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties); + read_and_roundtrip_to_encrypted_file(&file, decryption_properties, file_encryption_properties); } #[test] fn test_write_uniform_encryption_plaintext_footer() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); let footer_key = b"0123456789012345".to_vec(); // 128bit/16 let wrong_footer_key = b"0000000000000000".to_vec(); // 128bit/16 @@ -679,7 +667,7 @@ fn test_write_uniform_encryption_plaintext_footer() { // Try writing plaintext footer and then reading it with the correct footer key read_and_roundtrip_to_encrypted_file( - &path, + &file, decryption_properties.clone(), file_encryption_properties.clone(), ); @@ -688,7 +676,6 @@ fn test_write_uniform_encryption_plaintext_footer() { let temp_file = tempfile::tempfile().unwrap(); // read example data - let file = File::open(path).unwrap(); let options = ArrowReaderOptions::default() .with_file_decryption_properties(decryption_properties.clone()); let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); @@ -730,6 +717,7 @@ fn test_write_uniform_encryption_plaintext_footer() { fn test_write_uniform_encryption() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + let file = File::open(path).unwrap(); let footer_key = b"0123456789012345".to_vec(); // 128bit/16 @@ -741,7 +729,7 @@ fn test_write_uniform_encryption() { .build() .unwrap(); - read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties); + read_and_roundtrip_to_encrypted_file(&file, decryption_properties, file_encryption_properties); } #[test] @@ -1061,43 +1049,3 @@ fn test_decrypt_page_index( Ok(()) } - -fn read_and_roundtrip_to_encrypted_file( - path: &str, - decryption_properties: FileDecryptionProperties, - encryption_properties: FileEncryptionProperties, -) { - let temp_file = tempfile::tempfile().unwrap(); - - // read example data - let file = File::open(path).unwrap(); - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); - let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); - - let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); - let batch_reader = builder.build().unwrap(); - let batches = batch_reader - .collect::, _>>() - .unwrap(); - - // write example data - let props = WriterProperties::builder() - .with_file_encryption_properties(encryption_properties) - .build(); - - let mut writer = ArrowWriter::try_new( - temp_file.try_clone().unwrap(), - metadata.schema().clone(), - Some(props), - ) - .unwrap(); - for batch in batches { - writer.write(&batch).unwrap(); - } - - writer.close().unwrap(); - - // check re-written example data - verify_encryption_test_file_read(temp_file, decryption_properties); -} diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index e0fbbcdfafe3..af107f1e2610 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -18,17 +18,18 @@ //! This module contains tests for reading encrypted Parquet files with the async Arrow API use crate::encryption_util::{ - verify_column_indexes, verify_encryption_test_data, TestKeyRetriever, + read_encrypted_file, verify_column_indexes, verify_encryption_double_test_data, + verify_encryption_test_data, TestKeyRetriever, }; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::arrow_writer::ArrowWriterOptions; -use parquet::arrow::AsyncArrowWriter; +use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowWriterOptions}; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::{ArrowWriter, AsyncArrowWriter}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use std::sync::Arc; use tokio::fs::File; @@ -491,3 +492,104 @@ async fn read_and_roundtrip_to_encrypted_file_async( let mut file = tokio::fs::File::from_std(temp_file.try_clone().unwrap()); verify_encryption_test_file_read_async(&mut file, decryption_properties).await } + +#[tokio::test] +async fn test_multi_threaded_encrypted_writing() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = std::fs::File::open(path).unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + let to_write: Vec<_> = record_batches + .iter() + .flat_map(|rb| rb.columns().to_vec()) + .collect(); + let schema = metadata.schema().clone(); + + let props = Some( + WriterPropertiesBuilder::default() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); + + // LOW-LEVEL API: Use low level API to write into a file using multiple threads + + // Get column writers + let col_writers = writer.get_column_writers().unwrap(); + let num_columns = col_writers.len(); + + // Create a channel for each column writer to send ArrowLeafColumn data to + let mut col_writer_tasks = Vec::with_capacity(num_columns); + let mut col_array_channels = Vec::with_capacity(num_columns); + for mut col_writer in col_writers.into_iter() { + let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(100); + col_array_channels.push(send_array); + let handle = tokio::spawn(async move { + while let Some(col) = receive_array.recv().await { + col_writer.write(&col).unwrap(); + } + col_writer.close().unwrap() + }); + col_writer_tasks.push(handle); + } + + // Send the ArrowLeafColumn data to the respective column writer channels + let mut worker_iter = col_array_channels.iter_mut(); + for (array, field) in to_write.iter().zip(schema.fields()) { + for leaves in compute_leaves(field, array).unwrap() { + worker_iter.next().unwrap().send(leaves).await.unwrap(); + } + } + drop(col_array_channels); + + // Wait for all column writers to finish writing + let mut finalized_rg = Vec::with_capacity(num_columns); + for task in col_writer_tasks.into_iter() { + finalized_rg.push(task.await.unwrap()); + } + + // Append the finalized row group to the SerializedFileWriter + assert!(writer.append_row_group(finalized_rg).is_ok()); + + // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter + + // Write individual RecordBatches into the file + for rb in record_batches { + writer.write(&rb).unwrap() + } + assert!(writer.flush().is_ok()); + + // Close the file writer which writes the footer + let metadata = writer.finish().unwrap(); + assert_eq!(metadata.num_rows, 100); + assert_eq!(metadata.schema, metadata.schema); + + // Check that the file was written correctly + let (read_record_batches, read_metadata) = + read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + verify_encryption_double_test_data(read_record_batches, read_metadata.metadata()); + + // Check that file was encrypted + let result = ArrowReaderMetadata::load(&temp_file, ArrowReaderOptions::default()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" + ); +} diff --git a/parquet/tests/encryption/encryption_util.rs b/parquet/tests/encryption/encryption_util.rs index 5e962fe0755b..bf7fd08109f6 100644 --- a/parquet/tests/encryption/encryption_util.rs +++ b/parquet/tests/encryption/encryption_util.rs @@ -17,14 +17,98 @@ use arrow_array::cast::AsArray; use arrow_array::{types, RecordBatch}; -use parquet::encryption::decrypt::KeyRetriever; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; +use parquet::arrow::ArrowWriter; +use parquet::encryption::decrypt::{FileDecryptionProperties, KeyRetriever}; +use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::{ParquetError, Result}; use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; use std::collections::HashMap; +use std::fs::File; use std::sync::Mutex; +pub(crate) fn verify_encryption_double_test_data( + record_batches: Vec, + metadata: &ParquetMetaData, +) { + let file_metadata = metadata.file_metadata(); + assert_eq!(file_metadata.num_rows(), 100); + assert_eq!(file_metadata.schema_descr().num_columns(), 8); + + metadata.row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 8); + assert_eq!(rg.num_rows(), 50); + }); + + let mut row_count = 0; + let wrap_at = 50; + for batch in record_batches { + let batch = batch; + row_count += batch.num_rows(); + + let bool_col = batch.column(0).as_boolean(); + let time_col = batch + .column(1) + .as_primitive::(); + let list_col = batch.column(2).as_list::(); + let timestamp_col = batch + .column(3) + .as_primitive::(); + let f32_col = batch.column(4).as_primitive::(); + let f64_col = batch.column(5).as_primitive::(); + let binary_col = batch.column(6).as_binary::(); + let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); + + for (i, x) in bool_col.iter().enumerate() { + assert_eq!(x.unwrap(), i % 2 == 0); + } + for (i, x) in time_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as i32); + } + for (i, list_item) in list_col.iter().enumerate() { + let list_item = list_item.unwrap(); + let list_item = list_item.as_primitive::(); + assert_eq!(list_item.len(), 2); + assert_eq!( + list_item.value(0), + (((i % wrap_at) * 2) * 1000000000000) as i64 + ); + assert_eq!( + list_item.value(1), + (((i % wrap_at) * 2 + 1) * 1000000000000) as i64 + ); + } + for x in timestamp_col.iter() { + assert!(x.is_some()); + } + for (i, x) in f32_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as f32 * 1.1f32); + } + for (i, x) in f64_col.iter().enumerate() { + assert_eq!(x.unwrap(), (i % wrap_at) as f64 * 1.1111111f64); + } + for (i, x) in binary_col.iter().enumerate() { + assert_eq!(x.is_some(), i % 2 == 0); + if let Some(x) = x { + assert_eq!(&x[0..7], b"parquet"); + } + } + for (i, x) in fixed_size_binary_col.iter().enumerate() { + assert_eq!(x.unwrap(), &[(i % wrap_at) as u8; 10]); + } + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); +} + /// Verifies data read from an encrypted file from the parquet-testing repository -pub fn verify_encryption_test_data(record_batches: Vec, metadata: &ParquetMetaData) { +pub(crate) fn verify_encryption_test_data( + record_batches: Vec, + metadata: &ParquetMetaData, +) { let file_metadata = metadata.file_metadata(); assert_eq!(file_metadata.num_rows(), 50); assert_eq!(file_metadata.schema_descr().num_columns(), 8); @@ -90,7 +174,7 @@ pub fn verify_encryption_test_data(record_batches: Vec, metadata: & /// Verifies that the column and offset indexes were successfully read from an /// encrypted test file. -pub fn verify_column_indexes(metadata: &ParquetMetaData) { +pub(crate) fn verify_column_indexes(metadata: &ParquetMetaData) { let offset_index = metadata.offset_index().unwrap(); // 1 row group, 8 columns assert_eq!(offset_index.len(), 1); @@ -120,6 +204,69 @@ pub fn verify_column_indexes(metadata: &ParquetMetaData) { }; } +pub(crate) fn read_encrypted_file( + file: &File, + decryption_properties: FileDecryptionProperties, +) -> std::result::Result<(Vec, ArrowReaderMetadata), ParquetError> { + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(file, options.clone())?; + + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(file.try_clone().unwrap(), options)?; + let batch_reader = builder.build()?; + let batches = batch_reader.collect::, _>>()?; + Ok((batches, metadata)) +} + +pub(crate) fn read_and_roundtrip_to_encrypted_file( + file: &File, + decryption_properties: FileDecryptionProperties, + encryption_properties: FileEncryptionProperties, +) { + // read example data + let (batches, metadata) = read_encrypted_file(file, decryption_properties.clone()).unwrap(); + + // write example data to a temporary file + let temp_file = tempfile::tempfile().unwrap(); + let props = WriterProperties::builder() + .with_file_encryption_properties(encryption_properties) + .build(); + + let mut writer = ArrowWriter::try_new( + temp_file.try_clone().unwrap(), + metadata.schema().clone(), + Some(props), + ) + .unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + } + + writer.close().unwrap(); + + // check re-written example data + verify_encryption_test_file_read(temp_file, decryption_properties); +} + +pub(crate) fn verify_encryption_test_file_read( + file: File, + decryption_properties: FileDecryptionProperties, +) { + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); + let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + let metadata = reader_metadata.metadata(); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let record_reader = builder.build().unwrap(); + let record_batches = record_reader + .map(|x| x.unwrap()) + .collect::>(); + + verify_encryption_test_data(record_batches, metadata); +} + /// A KeyRetriever to use in Parquet encryption tests, /// which stores a map from key names/metadata to encryption key bytes. pub struct TestKeyRetriever {