From 854f5918c24ffc948594fbaf2c095dd649d2aed6 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 12 Aug 2025 17:42:13 +0200 Subject: [PATCH 01/16] initial commit --- parquet/src/arrow/arrow_writer/mod.rs | 12 +++++++----- parquet/tests/encryption/encryption_async.rs | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 864c1bf2da45..3b7c5b6ff7d1 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -188,6 +188,7 @@ pub struct ArrowWriter { /// The length of arrays to write to each row group max_row_group_size: usize, + all_progress: std::collections::BTreeMap, } impl std::fmt::Debug for ArrowWriter { @@ -409,12 +410,13 @@ impl ArrowWriter { } /// Create a new row group writer and return its column writers. - pub fn get_column_writers(&mut self) -> Result> { + pub fn get_column_writers(&mut self) -> Result<(usize, Vec)> { self.flush()?; - let in_progress = self - .row_group_writer_factory - .create_row_group_writer(self.writer.flushed_row_groups().len())?; - Ok(in_progress.writers) + let row_group_factory = &self.row_group_writer_factory; + let row_group_index = self.writer.flushed_row_groups().len(); + let in_progress = row_group_factory.create_row_group_writer(row_group_index)?; + self.all_progress.insert(row_group_index, in_progress); + Ok((row_group_index, self.all_progress.get(&row_group_index).unwrap().writers)) } /// Append the given column chunks to the file as a new row group. diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index af107f1e2610..26d85954e2b4 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -532,7 +532,7 @@ async fn test_multi_threaded_encrypted_writing() { // LOW-LEVEL API: Use low level API to write into a file using multiple threads // Get column writers - let col_writers = writer.get_column_writers().unwrap(); + let (_row_group_index, col_writers) = writer.get_column_writers().unwrap(); let num_columns = col_writers.len(); // Create a channel for each column writer to send ArrowLeafColumn data to From b3d9f6161c852e94a28e17c5209e690dc3d0f0f3 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 14 Aug 2025 20:56:04 +0200 Subject: [PATCH 02/16] work --- parquet/src/arrow/arrow_writer/mod.rs | 31 ++- parquet/tests/encryption/encryption_async.rs | 209 +++++++++++++++++-- 2 files changed, 214 insertions(+), 26 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 3b7c5b6ff7d1..f4f404153aab 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -188,7 +188,6 @@ pub struct ArrowWriter { /// The length of arrays to write to each row group max_row_group_size: usize, - all_progress: std::collections::BTreeMap, } impl std::fmt::Debug for ArrowWriter { @@ -410,18 +409,25 @@ impl ArrowWriter { } /// Create a new row group writer and return its column writers. - pub fn get_column_writers(&mut self) -> Result<(usize, Vec)> { + pub fn get_column_writers(&mut self) -> Result<(usize, Vec, SerializedRowGroupWriter)> { self.flush()?; let row_group_factory = &self.row_group_writer_factory; let row_group_index = self.writer.flushed_row_groups().len(); let in_progress = row_group_factory.create_row_group_writer(row_group_index)?; - self.all_progress.insert(row_group_index, in_progress); - Ok((row_group_index, self.all_progress.get(&row_group_index).unwrap().writers)) + let serialized_row_group_writer = self.writer.next_row_group()?; + Ok((row_group_index, in_progress.writers, serialized_row_group_writer)) + } + + /// Returns the ArrowRowGroupWriterFactory used bt this ArrowWriter. + pub fn get_row_group_writer_factory(self) -> ArrowRowGroupWriterFactory { + self.row_group_writer_factory } /// Append the given column chunks to the file as a new row group. - pub fn append_row_group(&mut self, chunks: Vec) -> Result<()> { - let mut row_group_writer = self.writer.next_row_group()?; + pub fn append_row_group( + chunks: Vec, + mut row_group_writer: SerializedRowGroupWriter, + ) -> Result<()> { for chunk in chunks { chunk.append_to_row_group(&mut row_group_writer)?; } @@ -853,7 +859,9 @@ impl ArrowRowGroupWriter { } } -struct ArrowRowGroupWriterFactory { +/// ArrowRowGroupWriterFactory can be used for creating new [`ArrowRowGroupWriter`] instances +/// for each row group in the Parquet file. +pub struct ArrowRowGroupWriterFactory { schema: SchemaDescriptor, arrow_schema: SchemaRef, props: WriterPropertiesPtr, @@ -892,7 +900,7 @@ impl ArrowRowGroupWriterFactory { } #[cfg(feature = "encryption")] - fn create_row_group_writer(&self, row_group_index: usize) -> Result { + pub fn create_row_group_writer(&self, row_group_index: usize) -> Result { let writers = get_column_writers_with_encryptor( &self.schema, &self.props, @@ -908,6 +916,13 @@ impl ArrowRowGroupWriterFactory { let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?; Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) } + + /// Create a new row group writer and return its column writers. + pub fn get_column_writers(&mut self, row_group_index: usize) -> Result> { + // let row_group_index = self.writer.flushed_row_groups().len(); + let in_progress = self.create_row_group_writer(row_group_index)?; + Ok(in_progress.writers) + } } /// Returns the [`ArrowColumnWriter`] for a given schema diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 26d85954e2b4..6c10c70ed34f 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -23,7 +23,7 @@ use crate::encryption_util::{ }; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowWriterOptions}; +use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowWriterOptions}; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::{ArrowWriter, AsyncArrowWriter}; use parquet::encryption::decrypt::FileDecryptionProperties; @@ -32,6 +32,10 @@ use parquet::errors::ParquetError; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use std::sync::Arc; use tokio::fs::File; +use tokio::sync::mpsc::{Sender, Receiver}; +use tokio::task::JoinHandle; +use arrow_array::RecordBatch; +use arrow_schema::Schema; #[tokio::test] async fn test_non_uniform_encryption_plaintext_footer() { @@ -493,8 +497,141 @@ async fn read_and_roundtrip_to_encrypted_file_async( verify_encryption_test_file_read_async(&mut file, decryption_properties).await } +async fn send_arrays_to_column_writers( + col_array_channels: &Vec>, + rb: &RecordBatch, + schema: Arc, +) -> Result<(), ParquetError> { + // Each leaf column has its own channel, increment next_channel for each leaf column sent. + let mut next_channel = 0; + for (array, field) in rb.columns().iter().zip(schema.fields()) { + for c in compute_leaves(field, array)? { + if col_array_channels[next_channel].send(c).await.is_err() { + return Ok(()); + } + next_channel += 1; + } + } + Ok(()) +} + +fn spawn_parquet_parallel_serialization_task( + arrow_writer: ArrowWriter, + mut data: Receiver, + schema: Arc, +) -> Result>, ParquetError> { + let handle = tokio::spawn(async move { + let max_buffer_rb = 10; + let max_row_group_rows = 10; + let mut row_group_index = 0; + let mut rgwf = arrow_writer.get_row_group_writer_factory(); + let column_writers = rgwf.get_column_writers(row_group_index).unwrap(); + + let (mut col_writer_tasks, mut col_array_channels) = + spawn_column_parallel_row_group_writer(column_writers, max_buffer_rb)?; + + let mut current_rg_rows = 0; + + while let Some(mut rb) = data.recv().await { + // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case + // when max_row_group_rows < execution.batch_size as an alternative to a recursive async + // function. + loop { + if current_rg_rows + rb.num_rows() < max_row_group_rows { + send_arrays_to_column_writers( + &col_array_channels, + &rb, + schema.clone(), + ).await?; + current_rg_rows += rb.num_rows(); + break; + } else { + let rows_left = max_row_group_rows - current_rg_rows; + let rb_split = rb.slice(0, rows_left); + send_arrays_to_column_writers( + &col_array_channels, + &rb_split, + schema.clone(), + ).await?; + + // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup + // on a separate task, so that we can immediately start on the next RG before waiting + // for the current one to finish. + drop(col_array_channels); + + // TODO + // let finalize_rg_task = spawn_rg_join_and_finalize_task( + // column_writer_handles, + // max_row_group_rows, + // &pool, + // ); + // + // // Do not surface error from closed channel (means something + // // else hit an error, and the plan is shutting down). + // if serialize_tx.send(finalize_rg_task).await.is_err() { + // return Ok(()); + // } + + current_rg_rows = 0; + rb = rb.slice(rows_left, rb.num_rows() - rows_left); + + row_group_index += 1; + let column_writers = rgwf.get_column_writers(row_group_index).unwrap(); + (col_writer_tasks, col_array_channels) = + spawn_column_parallel_row_group_writer(column_writers, 100).unwrap(); + } + } + } + + drop(col_array_channels); + // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows + if current_rg_rows > 0 { + // let finalize_rg_task = spawn_rg_join_and_finalize_task( + // column_writer_handles, + // current_rg_rows, + // &pool, + // ); + // + // // Do not surface error from closed channel (means something + // // else hit an error, and the plan is shutting down). + // if serialize_tx.send(finalize_rg_task).await.is_err() { + // return Ok(()); + // } + } + + Ok(()) + }); + + Ok(handle) +} + +fn spawn_column_parallel_row_group_writer( + col_writers: Vec, + max_buffer_size: usize, +) -> Result<(Vec>, Vec>), ParquetError> { + let num_columns = col_writers.len(); + + let mut col_writer_tasks = Vec::with_capacity(num_columns); + let mut col_array_channels = Vec::with_capacity(num_columns); + for mut col_writer in col_writers.into_iter() { + let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(max_buffer_size); + col_array_channels.push(send_array); + let handle = tokio::spawn(async move { + while let Some(col) = receive_array.recv().await { + col_writer.write(&col).unwrap(); + } + col_writer.close().unwrap() + }); + col_writer_tasks.push(handle); + } + + Ok((col_writer_tasks, col_array_channels)) +} + +// This test is based on DataFusion's ParquetSink. Motivation is to test +// concurrent writing of encrypted data over multiple row groups using the low-level API. #[tokio::test] -async fn test_multi_threaded_encrypted_writing() { +async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { // Read example data and set up encryption/decryption properties let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); @@ -528,27 +665,62 @@ async fn test_multi_threaded_encrypted_writing() { // Create a temporary file to write the encrypted data let temp_file = tempfile::tempfile().unwrap(); let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); + // TODO: use spawn_parquet_parallel_serialization_task + // spawn_parquet_parallel_serialization_task( + + // TODO: use concatenate_parallel_row_groups // LOW-LEVEL API: Use low level API to write into a file using multiple threads // Get column writers - let (_row_group_index, col_writers) = writer.get_column_writers().unwrap(); + let (_row_group_index, col_writers, srgw) = writer.get_column_writers().unwrap(); let num_columns = col_writers.len(); +} - // Create a channel for each column writer to send ArrowLeafColumn data to - let mut col_writer_tasks = Vec::with_capacity(num_columns); - let mut col_array_channels = Vec::with_capacity(num_columns); - for mut col_writer in col_writers.into_iter() { - let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(100); - col_array_channels.push(send_array); - let handle = tokio::spawn(async move { - while let Some(col) = receive_array.recv().await { - col_writer.write(&col).unwrap(); - } - col_writer.close().unwrap() - }); - col_writer_tasks.push(handle); - } +#[tokio::test] +async fn test_multi_threaded_encrypted_writing() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = std::fs::File::open(path).unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + let to_write: Vec<_> = record_batches + .iter() + .flat_map(|rb| rb.columns().to_vec()) + .collect(); + let schema = metadata.schema().clone(); + + let props = Some( + WriterPropertiesBuilder::default() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&temp_file, schema.clone(), props).unwrap(); + + // LOW-LEVEL API: Use low level API to write into a file using multiple threads + + // Get column writers + let (_row_group_index, col_writers, srgw) = writer.get_column_writers().unwrap(); + let num_columns = col_writers.len(); + + let (col_writer_tasks, mut col_array_channels) = + spawn_column_parallel_row_group_writer(col_writers, 100).unwrap(); // Send the ArrowLeafColumn data to the respective column writer channels let mut worker_iter = col_array_channels.iter_mut(); @@ -566,7 +738,8 @@ async fn test_multi_threaded_encrypted_writing() { } // Append the finalized row group to the SerializedFileWriter - assert!(writer.append_row_group(finalized_rg).is_ok()); + // assert!(writer.append_row_group(finalized_rg).is_ok()); + assert!(ArrowWriter::append_row_group(finalized_rg, srgw).is_ok()); // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter From ef8d66a097b25371f78f9a90591cd913e3629526 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 15 Aug 2025 01:39:37 +0200 Subject: [PATCH 03/16] work --- parquet/src/arrow/arrow_writer/mod.rs | 10 +- parquet/tests/encryption/encryption_async.rs | 140 ++++++++++++++----- 2 files changed, 113 insertions(+), 37 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index f4f404153aab..89c07f66e6f1 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -418,7 +418,7 @@ impl ArrowWriter { Ok((row_group_index, in_progress.writers, serialized_row_group_writer)) } - /// Returns the ArrowRowGroupWriterFactory used bt this ArrowWriter. + /// Returns the ArrowRowGroupWriterFactory used by this ArrowWriter. pub fn get_row_group_writer_factory(self) -> ArrowRowGroupWriterFactory { self.row_group_writer_factory } @@ -434,6 +434,14 @@ impl ArrowWriter { row_group_writer.close()?; Ok(()) } + // pub fn append_row_group(&mut self, chunks: Vec) -> Result<()> { + // let mut row_group_writer = self.writer.next_row_group()?; + // for chunk in chunks { + // chunk.append_to_row_group(&mut row_group_writer)?; + // } + // row_group_writer.close()?; + // Ok(()) + // } } impl RecordBatchWriter for ArrowWriter { diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 6c10c70ed34f..9c4703808be1 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -17,13 +17,14 @@ //! This module contains tests for reading encrypted Parquet files with the async Arrow API +use parquet::format::FileMetaData; use crate::encryption_util::{ read_encrypted_file, verify_column_indexes, verify_encryption_double_test_data, verify_encryption_test_data, TestKeyRetriever, }; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowWriterOptions}; +use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory, ArrowWriterOptions}; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::{ArrowWriter, AsyncArrowWriter}; use parquet::encryption::decrypt::FileDecryptionProperties; @@ -36,6 +37,7 @@ use tokio::sync::mpsc::{Sender, Receiver}; use tokio::task::JoinHandle; use arrow_array::RecordBatch; use arrow_schema::Schema; +use parquet::file::writer::SerializedRowGroupWriter; #[tokio::test] async fn test_non_uniform_encryption_plaintext_footer() { @@ -497,8 +499,12 @@ async fn read_and_roundtrip_to_encrypted_file_async( verify_encryption_test_file_read_async(&mut file, decryption_properties).await } +type ColSender = Sender; +type ColumnWriterTask = JoinHandle>; +type RBStreamSerializeResult = Result<(Vec, usize), ParquetError>; + async fn send_arrays_to_column_writers( - col_array_channels: &Vec>, + col_array_channels: &Vec, rb: &RecordBatch, schema: Arc, ) -> Result<(), ParquetError> { @@ -515,18 +521,39 @@ async fn send_arrays_to_column_writers( Ok(()) } +/// Spawns a tokio task which joins the parallel column writer tasks, +/// and finalizes the row group +fn spawn_rg_join_and_finalize_task( + column_writer_tasks: Vec, + rg_rows: usize, +) -> JoinHandle { + tokio::task::spawn(async move { + let num_cols = column_writer_tasks.len(); + let mut finalized_rg = Vec::with_capacity(num_cols); + for task in column_writer_tasks.into_iter() { + let writer = task + .await + .map_err(|e| ParquetError::General(e.to_string()))??; + finalized_rg.push(writer.close()?); + } + Ok((finalized_rg, rg_rows)) + }) +} + fn spawn_parquet_parallel_serialization_task( - arrow_writer: ArrowWriter, + mut rgwf: ArrowRowGroupWriterFactory, mut data: Receiver, + serialize_tx: Sender>, schema: Arc, ) -> Result>, ParquetError> { let handle = tokio::spawn(async move { let max_buffer_rb = 10; let max_row_group_rows = 10; let mut row_group_index = 0; - let mut rgwf = arrow_writer.get_row_group_writer_factory(); + let column_writers = rgwf.get_column_writers(row_group_index).unwrap(); + // type ColumnWriterTask = SpawnedTask>; let (mut col_writer_tasks, mut col_array_channels) = spawn_column_parallel_row_group_writer(column_writers, max_buffer_rb)?; @@ -560,17 +587,16 @@ fn spawn_parquet_parallel_serialization_task( drop(col_array_channels); // TODO - // let finalize_rg_task = spawn_rg_join_and_finalize_task( - // column_writer_handles, - // max_row_group_rows, - // &pool, - // ); - // - // // Do not surface error from closed channel (means something - // // else hit an error, and the plan is shutting down). - // if serialize_tx.send(finalize_rg_task).await.is_err() { - // return Ok(()); - // } + let finalize_rg_task = spawn_rg_join_and_finalize_task( + col_writer_tasks, + max_row_group_rows, + ); + + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } current_rg_rows = 0; rb = rb.slice(rows_left, rb.num_rows() - rows_left); @@ -586,17 +612,16 @@ fn spawn_parquet_parallel_serialization_task( drop(col_array_channels); // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows if current_rg_rows > 0 { - // let finalize_rg_task = spawn_rg_join_and_finalize_task( - // column_writer_handles, - // current_rg_rows, - // &pool, - // ); - // - // // Do not surface error from closed channel (means something - // // else hit an error, and the plan is shutting down). - // if serialize_tx.send(finalize_rg_task).await.is_err() { - // return Ok(()); - // } + let finalize_rg_task = spawn_rg_join_and_finalize_task( + col_writer_tasks, + current_rg_rows, + ); + + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } } Ok(()) @@ -608,7 +633,7 @@ fn spawn_parquet_parallel_serialization_task( fn spawn_column_parallel_row_group_writer( col_writers: Vec, max_buffer_size: usize, -) -> Result<(Vec>, Vec>), ParquetError> { +) -> Result<(Vec, Vec), ParquetError> { let num_columns = col_writers.len(); let mut col_writer_tasks = Vec::with_capacity(num_columns); @@ -618,16 +643,40 @@ fn spawn_column_parallel_row_group_writer( col_array_channels.push(send_array); let handle = tokio::spawn(async move { while let Some(col) = receive_array.recv().await { - col_writer.write(&col).unwrap(); + col_writer.write(&col)?; } - col_writer.close().unwrap() + Ok(col_writer) }); col_writer_tasks.push(handle); } - Ok((col_writer_tasks, col_array_channels)) } +/// Consume RowGroups serialized by other parallel tasks and concatenate them in +/// to the final parquet file, while flushing finalized bytes to an [ObjectStore] +async fn concatenate_parallel_row_groups( + // mut arrow_row_group_writer_factory: ArrowRowGroupWriterFactory, + mut arrow_writer: ArrowWriter<&std::fs::File>, + mut serialize_rx: Receiver>, + mut file: &std::fs::File, +) -> Result { + while let Some(task) = serialize_rx.recv().await { + let result = task.await; + let (serialized_columns, _cnt) = + result.map_err(|e| ParquetError::General(e.to_string()))??; + + let mut finalized_rg = Vec::with_capacity(serialized_columns.len()); + for task in serialized_columns { + finalized_rg.push(task); + // file.write_all_buf(task.data.as_slice()).await?; + } + // arrow_row_group_writer_factory.append_row_group(finalized_rg)?; + } + + let file_metadata = arrow_writer.finish()?; + Ok(file_metadata) +} + // This test is based on DataFusion's ParquetSink. Motivation is to test // concurrent writing of encrypted data over multiple row groups using the low-level API. #[tokio::test] @@ -664,17 +713,36 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { // Create a temporary file to write the encrypted data let temp_file = tempfile::tempfile().unwrap(); - let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); - // TODO: use spawn_parquet_parallel_serialization_task - // spawn_parquet_parallel_serialization_task( + let mut arrow_writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); + + let row_group_writer_factory = arrow_writer.get_row_group_writer_factory(); + + let max_rowgroups = 1; + let (serialize_tx, serialize_rx) = + tokio::sync::mpsc::channel::>(max_rowgroups); + + let (record_batch_tx, record_batch_rx) = tokio::sync::mpsc::channel::(100); + + let launch_serialization_task = spawn_parquet_parallel_serialization_task( + row_group_writer_factory, record_batch_rx, serialize_tx, schema.clone()).unwrap(); + + let _ = launch_serialization_task + .await + .map_err(|e| ParquetError::General(e.to_string())); // TODO: use concatenate_parallel_row_groups + // let file_metadata = concatenate_parallel_row_groups( + // arrow_writer, + // serialize_rx, + // &temp_file, + // ) + // .await; // LOW-LEVEL API: Use low level API to write into a file using multiple threads // Get column writers - let (_row_group_index, col_writers, srgw) = writer.get_column_writers().unwrap(); - let num_columns = col_writers.len(); + // let (_row_group_index, col_writers, srgw) = arrow_writer.get_column_writers().unwrap(); + // let num_columns = col_writers.len(); } #[tokio::test] @@ -734,7 +802,7 @@ async fn test_multi_threaded_encrypted_writing() { // Wait for all column writers to finish writing let mut finalized_rg = Vec::with_capacity(num_columns); for task in col_writer_tasks.into_iter() { - finalized_rg.push(task.await.unwrap()); + finalized_rg.push(task.await.unwrap().unwrap().close().unwrap()); } // Append the finalized row group to the SerializedFileWriter From 4523b484371af58db7b1ce66368fdb1e8270a37b Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 15 Aug 2025 14:02:17 +0200 Subject: [PATCH 04/16] work --- parquet/src/arrow/arrow_writer/mod.rs | 2 +- parquet/tests/encryption/encryption_async.rs | 81 ++++++++++++++++---- 2 files changed, 68 insertions(+), 15 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 89c07f66e6f1..19959341c281 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -173,7 +173,7 @@ mod levels; /// ``` pub struct ArrowWriter { /// Underlying Parquet writer - writer: SerializedFileWriter, + pub writer: SerializedFileWriter, /// The in-progress row group if any in_progress: Option, diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 9c4703808be1..32fd3279281a 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -17,6 +17,7 @@ //! This module contains tests for reading encrypted Parquet files with the async Arrow API +use std::io::Write; use parquet::format::FileMetaData; use crate::encryption_util::{ read_encrypted_file, verify_column_indexes, verify_encryption_double_test_data, @@ -25,7 +26,7 @@ use crate::encryption_util::{ use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory, ArrowWriterOptions}; -use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::{ArrowSchemaConverter, ParquetRecordBatchStreamBuilder}; use parquet::arrow::{ArrowWriter, AsyncArrowWriter}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; @@ -37,7 +38,7 @@ use tokio::sync::mpsc::{Sender, Receiver}; use tokio::task::JoinHandle; use arrow_array::RecordBatch; use arrow_schema::Schema; -use parquet::file::writer::SerializedRowGroupWriter; +use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; #[tokio::test] async fn test_non_uniform_encryption_plaintext_footer() { @@ -652,28 +653,75 @@ fn spawn_column_parallel_row_group_writer( Ok((col_writer_tasks, col_array_channels)) } +/// A buffer with interior mutability shared by the SerializedFileWriter and +/// ObjectStore writer +#[derive(Clone)] +pub struct SharedBuffer { + /// The inner buffer for reading and writing + /// + /// The lock is used to obtain internal mutability, so no worry about the + /// lock contention. + pub buffer: Arc>>, +} + +impl SharedBuffer { + pub fn new(capacity: usize) -> Self { + Self { + buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))), + } + } +} + +impl Write for SharedBuffer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut buffer = self.buffer.try_lock().unwrap(); + Write::write(&mut *buffer, buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + let mut buffer = self.buffer.try_lock().unwrap(); + Write::flush(&mut *buffer) + } +} + /// Consume RowGroups serialized by other parallel tasks and concatenate them in /// to the final parquet file, while flushing finalized bytes to an [ObjectStore] async fn concatenate_parallel_row_groups( // mut arrow_row_group_writer_factory: ArrowRowGroupWriterFactory, - mut arrow_writer: ArrowWriter<&std::fs::File>, + mut writer: &mut SerializedFileWriter<&std::fs::File>, + schema: Arc, + writer_props: Arc, mut serialize_rx: Receiver>, mut file: &std::fs::File, ) -> Result { + + let merged_buff = SharedBuffer::new(0); + let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?; + + let mut parquet_writer = SerializedFileWriter::new( + merged_buff.clone(), + schema_desc.root_schema_ptr(), + writer_props, + )?; + while let Some(task) = serialize_rx.recv().await { let result = task.await; + let mut rg_out = parquet_writer.next_row_group()?; let (serialized_columns, _cnt) = result.map_err(|e| ParquetError::General(e.to_string()))??; - let mut finalized_rg = Vec::with_capacity(serialized_columns.len()); for task in serialized_columns { - finalized_rg.push(task); - // file.write_all_buf(task.data.as_slice()).await?; + task.append_to_row_group(&mut rg_out)?; + let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); + writer + .write_all(buff_to_flush.as_slice())?; } - // arrow_row_group_writer_factory.append_row_group(finalized_rg)?; + rg_out.close()?; } + let file_metadata = parquet_writer.close()?; + let final_buff = merged_buff.buffer.try_lock().unwrap(); - let file_metadata = arrow_writer.finish()?; + writer.write_all(final_buff.as_slice())?; Ok(file_metadata) } @@ -713,7 +761,7 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { // Create a temporary file to write the encrypted data let temp_file = tempfile::tempfile().unwrap(); - let mut arrow_writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); + let mut arrow_writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap(); let row_group_writer_factory = arrow_writer.get_row_group_writer_factory(); @@ -730,12 +778,17 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { .await .map_err(|e| ParquetError::General(e.to_string())); + // TODO: use concatenate_parallel_row_groups - // let file_metadata = concatenate_parallel_row_groups( - // arrow_writer, - // serialize_rx, - // &temp_file, - // ) + + let file_metadata = concatenate_parallel_row_groups( + &mut arrow_writer, + schema.clone(), + Arc::new(props.unwrap().clone()), + serialize_rx, + &temp_file, + ); + // .await; // LOW-LEVEL API: Use low level API to write into a file using multiple threads From 520e32b1f16c8311210bd020ed0f3fd6c7bc6d3c Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 15 Aug 2025 20:01:40 +0200 Subject: [PATCH 05/16] work --- parquet/tests/encryption/encryption_async.rs | 85 +++++++++----------- 1 file changed, 38 insertions(+), 47 deletions(-) diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 32fd3279281a..2837183f96ac 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -587,7 +587,6 @@ fn spawn_parquet_parallel_serialization_task( // for the current one to finish. drop(col_array_channels); - // TODO let finalize_rg_task = spawn_rg_join_and_finalize_task( col_writer_tasks, max_row_group_rows, @@ -687,23 +686,9 @@ impl Write for SharedBuffer { /// Consume RowGroups serialized by other parallel tasks and concatenate them in /// to the final parquet file, while flushing finalized bytes to an [ObjectStore] async fn concatenate_parallel_row_groups( - // mut arrow_row_group_writer_factory: ArrowRowGroupWriterFactory, - mut writer: &mut SerializedFileWriter<&std::fs::File>, - schema: Arc, - writer_props: Arc, + mut parquet_writer: SerializedFileWriter, mut serialize_rx: Receiver>, - mut file: &std::fs::File, ) -> Result { - - let merged_buff = SharedBuffer::new(0); - let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?; - - let mut parquet_writer = SerializedFileWriter::new( - merged_buff.clone(), - schema_desc.root_schema_ptr(), - writer_props, - )?; - while let Some(task) = serialize_rx.recv().await { let result = task.await; let mut rg_out = parquet_writer.next_row_group()?; @@ -712,16 +697,11 @@ async fn concatenate_parallel_row_groups( for task in serialized_columns { task.append_to_row_group(&mut rg_out)?; - let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); - writer - .write_all(buff_to_flush.as_slice())?; } rg_out.close()?; } - let file_metadata = parquet_writer.close()?; - let final_buff = merged_buff.buffer.try_lock().unwrap(); - writer.write_all(final_buff.as_slice())?; + let file_metadata = parquet_writer.close()?; Ok(file_metadata) } @@ -747,11 +727,15 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { let (record_batches, metadata) = read_encrypted_file(&file, decryption_properties.clone()).unwrap(); - let to_write: Vec<_> = record_batches - .iter() - .flat_map(|rb| rb.columns().to_vec()) - .collect(); - let schema = metadata.schema().clone(); + let schema = metadata.schema(); + + // Create a channel to send RecordBatches to the writer and send row groups + let (record_batch_tx, data) = tokio::sync::mpsc::channel::(100); + tokio::spawn(async move { + for record_batch in record_batches { + record_batch_tx.send(record_batch).await.expect(""); + } + }); let props = Some( WriterPropertiesBuilder::default() @@ -764,38 +748,45 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { let mut arrow_writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap(); let row_group_writer_factory = arrow_writer.get_row_group_writer_factory(); - let max_rowgroups = 1; + let (serialize_tx, serialize_rx) = tokio::sync::mpsc::channel::>(max_rowgroups); - let (record_batch_tx, record_batch_rx) = tokio::sync::mpsc::channel::(100); - let launch_serialization_task = spawn_parquet_parallel_serialization_task( - row_group_writer_factory, record_batch_rx, serialize_tx, schema.clone()).unwrap(); + row_group_writer_factory, data, serialize_tx, schema.clone()).unwrap(); - let _ = launch_serialization_task - .await - .map_err(|e| ParquetError::General(e.to_string())); - - - // TODO: use concatenate_parallel_row_groups + let merged_buff = SharedBuffer::new(1000); + let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref()).unwrap(); + let parquet_writer = SerializedFileWriter::new( + merged_buff.clone(), + schema_desc.root_schema_ptr(), + Arc::new(props.clone().unwrap()), + ).unwrap(); let file_metadata = concatenate_parallel_row_groups( - &mut arrow_writer, - schema.clone(), - Arc::new(props.unwrap().clone()), + parquet_writer, serialize_rx, - &temp_file, - ); + ).await.unwrap(); - // .await; + _ = tokio::spawn(async move { launch_serialization_task.await }); - // LOW-LEVEL API: Use low level API to write into a file using multiple threads + let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); - // Get column writers - // let (_row_group_index, col_writers, srgw) = arrow_writer.get_column_writers().unwrap(); - // let num_columns = col_writers.len(); + + let mut parquet_writer = SerializedFileWriter::new( + &temp_file, + schema_desc.root_schema_ptr(), + Arc::new(props.clone().unwrap()), + ).unwrap(); + _ = parquet_writer.write_all(buff_to_flush.as_slice()).unwrap(); + + // Check that the file was written correctly + let (read_record_batches, read_metadata) = + read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + + assert_eq!(read_metadata.metadata().file_metadata().num_rows(), 100); + verify_encryption_double_test_data(read_record_batches, read_metadata.metadata()); } #[tokio::test] From 49c9a0dd3e860c0502c718d341b47e5eafc673d5 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Fri, 22 Aug 2025 14:38:45 +1200 Subject: [PATCH 06/16] Add ArrowWriter::into_serialized_writer and update tests --- parquet/src/arrow/arrow_writer/mod.rs | 44 +++--- parquet/tests/encryption/encryption_async.rs | 138 ++++++------------- parquet/tests/encryption/encryption_util.rs | 31 +++-- 3 files changed, 83 insertions(+), 130 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 19959341c281..c2d633fb4922 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -173,7 +173,7 @@ mod levels; /// ``` pub struct ArrowWriter { /// Underlying Parquet writer - pub writer: SerializedFileWriter, + writer: SerializedFileWriter, /// The in-progress row group if any in_progress: Option, @@ -409,39 +409,31 @@ impl ArrowWriter { } /// Create a new row group writer and return its column writers. - pub fn get_column_writers(&mut self) -> Result<(usize, Vec, SerializedRowGroupWriter)> { + pub fn get_column_writers(&mut self) -> Result> { self.flush()?; - let row_group_factory = &self.row_group_writer_factory; - let row_group_index = self.writer.flushed_row_groups().len(); - let in_progress = row_group_factory.create_row_group_writer(row_group_index)?; - let serialized_row_group_writer = self.writer.next_row_group()?; - Ok((row_group_index, in_progress.writers, serialized_row_group_writer)) - } - - /// Returns the ArrowRowGroupWriterFactory used by this ArrowWriter. - pub fn get_row_group_writer_factory(self) -> ArrowRowGroupWriterFactory { - self.row_group_writer_factory + let in_progress = self + .row_group_writer_factory + .create_row_group_writer(self.writer.flushed_row_groups().len())?; + Ok(in_progress.writers) } /// Append the given column chunks to the file as a new row group. - pub fn append_row_group( - chunks: Vec, - mut row_group_writer: SerializedRowGroupWriter, - ) -> Result<()> { + pub fn append_row_group(&mut self, chunks: Vec) -> Result<()> { + let mut row_group_writer = self.writer.next_row_group()?; for chunk in chunks { chunk.append_to_row_group(&mut row_group_writer)?; } row_group_writer.close()?; Ok(()) } - // pub fn append_row_group(&mut self, chunks: Vec) -> Result<()> { - // let mut row_group_writer = self.writer.next_row_group()?; - // for chunk in chunks { - // chunk.append_to_row_group(&mut row_group_writer)?; - // } - // row_group_writer.close()?; - // Ok(()) - // } + + /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`] + pub fn into_serialized_writer( + mut self, + ) -> Result<(SerializedFileWriter, ArrowRowGroupWriterFactory)> { + self.flush()?; + Ok((self.writer, self.row_group_writer_factory)) + } } impl RecordBatchWriter for ArrowWriter { @@ -908,7 +900,7 @@ impl ArrowRowGroupWriterFactory { } #[cfg(feature = "encryption")] - pub fn create_row_group_writer(&self, row_group_index: usize) -> Result { + fn create_row_group_writer(&self, row_group_index: usize) -> Result { let writers = get_column_writers_with_encryptor( &self.schema, &self.props, @@ -926,7 +918,7 @@ impl ArrowRowGroupWriterFactory { } /// Create a new row group writer and return its column writers. - pub fn get_column_writers(&mut self, row_group_index: usize) -> Result> { + pub fn get_column_writers(&self, row_group_index: usize) -> Result> { // let row_group_index = self.writer.flushed_row_groups().len(); let in_progress = self.create_row_group_writer(row_group_index)?; Ok(in_progress.writers) diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 2837183f96ac..18c3b855ac1b 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -17,28 +17,31 @@ //! This module contains tests for reading encrypted Parquet files with the async Arrow API -use std::io::Write; -use parquet::format::FileMetaData; use crate::encryption_util::{ read_encrypted_file, verify_column_indexes, verify_encryption_double_test_data, verify_encryption_test_data, TestKeyRetriever, }; +use arrow_array::RecordBatch; +use arrow_schema::Schema; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory, ArrowWriterOptions}; -use parquet::arrow::{ArrowSchemaConverter, ParquetRecordBatchStreamBuilder}; +use parquet::arrow::arrow_writer::{ + compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, + ArrowRowGroupWriterFactory, ArrowWriterOptions, +}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::{ArrowWriter, AsyncArrowWriter}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; +use parquet::file::writer::SerializedFileWriter; +use parquet::format::FileMetaData; +use std::io::Write; use std::sync::Arc; use tokio::fs::File; -use tokio::sync::mpsc::{Sender, Receiver}; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinHandle; -use arrow_array::RecordBatch; -use arrow_schema::Schema; -use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; #[tokio::test] async fn test_non_uniform_encryption_plaintext_footer() { @@ -542,7 +545,7 @@ fn spawn_rg_join_and_finalize_task( } fn spawn_parquet_parallel_serialization_task( - mut rgwf: ArrowRowGroupWriterFactory, + rgwf: ArrowRowGroupWriterFactory, mut data: Receiver, serialize_tx: Sender>, schema: Arc, @@ -566,31 +569,22 @@ fn spawn_parquet_parallel_serialization_task( // function. loop { if current_rg_rows + rb.num_rows() < max_row_group_rows { - send_arrays_to_column_writers( - &col_array_channels, - &rb, - schema.clone(), - ).await?; + send_arrays_to_column_writers(&col_array_channels, &rb, schema.clone()).await?; current_rg_rows += rb.num_rows(); break; } else { let rows_left = max_row_group_rows - current_rg_rows; let rb_split = rb.slice(0, rows_left); - send_arrays_to_column_writers( - &col_array_channels, - &rb_split, - schema.clone(), - ).await?; + send_arrays_to_column_writers(&col_array_channels, &rb_split, schema.clone()) + .await?; // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup // on a separate task, so that we can immediately start on the next RG before waiting // for the current one to finish. drop(col_array_channels); - let finalize_rg_task = spawn_rg_join_and_finalize_task( - col_writer_tasks, - max_row_group_rows, - ); + let finalize_rg_task = + spawn_rg_join_and_finalize_task(col_writer_tasks, max_row_group_rows); // Do not surface error from closed channel (means something // else hit an error, and the plan is shutting down). @@ -612,10 +606,8 @@ fn spawn_parquet_parallel_serialization_task( drop(col_array_channels); // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows if current_rg_rows > 0 { - let finalize_rg_task = spawn_rg_join_and_finalize_task( - col_writer_tasks, - current_rg_rows, - ); + let finalize_rg_task = + spawn_rg_join_and_finalize_task(col_writer_tasks, current_rg_rows); // Do not surface error from closed channel (means something // else hit an error, and the plan is shutting down). @@ -639,7 +631,8 @@ fn spawn_column_parallel_row_group_writer( let mut col_writer_tasks = Vec::with_capacity(num_columns); let mut col_array_channels = Vec::with_capacity(num_columns); for mut col_writer in col_writers.into_iter() { - let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(max_buffer_size); + let (send_array, mut receive_array) = + tokio::sync::mpsc::channel::(max_buffer_size); col_array_channels.push(send_array); let handle = tokio::spawn(async move { while let Some(col) = receive_array.recv().await { @@ -652,41 +645,10 @@ fn spawn_column_parallel_row_group_writer( Ok((col_writer_tasks, col_array_channels)) } -/// A buffer with interior mutability shared by the SerializedFileWriter and -/// ObjectStore writer -#[derive(Clone)] -pub struct SharedBuffer { - /// The inner buffer for reading and writing - /// - /// The lock is used to obtain internal mutability, so no worry about the - /// lock contention. - pub buffer: Arc>>, -} - -impl SharedBuffer { - pub fn new(capacity: usize) -> Self { - Self { - buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))), - } - } -} - -impl Write for SharedBuffer { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut buffer = self.buffer.try_lock().unwrap(); - Write::write(&mut *buffer, buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - let mut buffer = self.buffer.try_lock().unwrap(); - Write::flush(&mut *buffer) - } -} - /// Consume RowGroups serialized by other parallel tasks and concatenate them in /// to the final parquet file, while flushing finalized bytes to an [ObjectStore] -async fn concatenate_parallel_row_groups( - mut parquet_writer: SerializedFileWriter, +async fn concatenate_parallel_row_groups( + mut parquet_writer: SerializedFileWriter, mut serialize_rx: Receiver>, ) -> Result { while let Some(task) = serialize_rx.recv().await { @@ -730,10 +692,10 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { let schema = metadata.schema(); // Create a channel to send RecordBatches to the writer and send row groups - let (record_batch_tx, data) = tokio::sync::mpsc::channel::(100); - tokio::spawn(async move { + let (record_batch_tx, data) = tokio::sync::mpsc::channel::(100); + let data_generator = tokio::spawn(async move { for record_batch in record_batches { - record_batch_tx.send(record_batch).await.expect(""); + record_batch_tx.send(record_batch).await.unwrap(); } }); @@ -745,48 +707,36 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { // Create a temporary file to write the encrypted data let temp_file = tempfile::tempfile().unwrap(); - let mut arrow_writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap(); + let arrow_writer = + ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap(); - let row_group_writer_factory = arrow_writer.get_row_group_writer_factory(); + let (writer, row_group_writer_factory) = arrow_writer.into_serialized_writer().unwrap(); let max_rowgroups = 1; let (serialize_tx, serialize_rx) = tokio::sync::mpsc::channel::>(max_rowgroups); let launch_serialization_task = spawn_parquet_parallel_serialization_task( - row_group_writer_factory, data, serialize_tx, schema.clone()).unwrap(); - - let merged_buff = SharedBuffer::new(1000); - let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref()).unwrap(); - let parquet_writer = SerializedFileWriter::new( - merged_buff.clone(), - schema_desc.root_schema_ptr(), - Arc::new(props.clone().unwrap()), - ).unwrap(); - - let file_metadata = concatenate_parallel_row_groups( - parquet_writer, - serialize_rx, - ).await.unwrap(); - - _ = tokio::spawn(async move { launch_serialization_task.await }); - - let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); + row_group_writer_factory, + data, + serialize_tx, + schema.clone(), + ) + .unwrap(); + let _file_metadata = concatenate_parallel_row_groups(writer, serialize_rx) + .await + .unwrap(); - let mut parquet_writer = SerializedFileWriter::new( - &temp_file, - schema_desc.root_schema_ptr(), - Arc::new(props.clone().unwrap()), - ).unwrap(); - _ = parquet_writer.write_all(buff_to_flush.as_slice()).unwrap(); + data_generator.await.unwrap(); + launch_serialization_task.await.unwrap().unwrap(); // Check that the file was written correctly let (read_record_batches, read_metadata) = read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); - assert_eq!(read_metadata.metadata().file_metadata().num_rows(), 100); - verify_encryption_double_test_data(read_record_batches, read_metadata.metadata()); + assert_eq!(read_metadata.metadata().file_metadata().num_rows(), 50); + verify_encryption_test_data(read_record_batches, read_metadata.metadata()); } #[tokio::test] @@ -828,7 +778,7 @@ async fn test_multi_threaded_encrypted_writing() { // LOW-LEVEL API: Use low level API to write into a file using multiple threads // Get column writers - let (_row_group_index, col_writers, srgw) = writer.get_column_writers().unwrap(); + let col_writers = writer.get_column_writers().unwrap(); let num_columns = col_writers.len(); let (col_writer_tasks, mut col_array_channels) = @@ -851,7 +801,7 @@ async fn test_multi_threaded_encrypted_writing() { // Append the finalized row group to the SerializedFileWriter // assert!(writer.append_row_group(finalized_rg).is_ok()); - assert!(ArrowWriter::append_row_group(finalized_rg, srgw).is_ok()); + assert!(writer.append_row_group(finalized_rg).is_ok()); // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter diff --git a/parquet/tests/encryption/encryption_util.rs b/parquet/tests/encryption/encryption_util.rs index bf7fd08109f6..f53e12adb720 100644 --- a/parquet/tests/encryption/encryption_util.rs +++ b/parquet/tests/encryption/encryption_util.rs @@ -113,15 +113,18 @@ pub(crate) fn verify_encryption_test_data( assert_eq!(file_metadata.num_rows(), 50); assert_eq!(file_metadata.schema_descr().num_columns(), 8); + let mut total_rows = 0; metadata.row_groups().iter().for_each(|rg| { assert_eq!(rg.num_columns(), 8); - assert_eq!(rg.num_rows(), 50); + total_rows += rg.num_rows(); }); + assert_eq!(total_rows, 50); let mut row_count = 0; for batch in record_batches { let batch = batch; - row_count += batch.num_rows(); + + let row_index = |index_in_batch: usize| row_count + index_in_batch; let bool_col = batch.column(0).as_boolean(); let time_col = batch @@ -137,36 +140,44 @@ pub(crate) fn verify_encryption_test_data( let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); for (i, x) in bool_col.iter().enumerate() { - assert_eq!(x.unwrap(), i % 2 == 0); + assert_eq!(x.unwrap(), row_index(i) % 2 == 0); } for (i, x) in time_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as i32); + assert_eq!(x.unwrap(), row_index(i) as i32); } for (i, list_item) in list_col.iter().enumerate() { let list_item = list_item.unwrap(); let list_item = list_item.as_primitive::(); assert_eq!(list_item.len(), 2); - assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64); - assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64); + assert_eq!( + list_item.value(0), + ((row_index(i) * 2) * 1000000000000) as i64 + ); + assert_eq!( + list_item.value(1), + ((row_index(i) * 2 + 1) * 1000000000000) as i64 + ); } for x in timestamp_col.iter() { assert!(x.is_some()); } for (i, x) in f32_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as f32 * 1.1f32); + assert_eq!(x.unwrap(), row_index(i) as f32 * 1.1f32); } for (i, x) in f64_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as f64 * 1.1111111f64); + assert_eq!(x.unwrap(), row_index(i) as f64 * 1.1111111f64); } for (i, x) in binary_col.iter().enumerate() { - assert_eq!(x.is_some(), i % 2 == 0); + assert_eq!(x.is_some(), row_index(i) % 2 == 0); if let Some(x) = x { assert_eq!(&x[0..7], b"parquet"); } } for (i, x) in fixed_size_binary_col.iter().enumerate() { - assert_eq!(x.unwrap(), &[i as u8; 10]); + assert_eq!(x.unwrap(), &[row_index(i) as u8; 10]); } + + row_count += batch.num_rows(); } assert_eq!(row_count, file_metadata.num_rows() as usize); From 52d4f618fec8d11964b06745b62c112713088f30 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Fri, 22 Aug 2025 15:52:54 +1200 Subject: [PATCH 07/16] Tidy ups --- parquet/src/arrow/arrow_writer/mod.rs | 15 ++++--- parquet/tests/encryption/encryption_async.rs | 42 +++++++++----------- 2 files changed, 26 insertions(+), 31 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index c2d633fb4922..92750aed0661 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -427,7 +427,8 @@ impl ArrowWriter { Ok(()) } - /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`] + /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`]. + /// This can be useful to provide more control over how files are written. pub fn into_serialized_writer( mut self, ) -> Result<(SerializedFileWriter, ArrowRowGroupWriterFactory)> { @@ -859,8 +860,7 @@ impl ArrowRowGroupWriter { } } -/// ArrowRowGroupWriterFactory can be used for creating new [`ArrowRowGroupWriter`] instances -/// for each row group in the Parquet file. +/// Factory that creates new column writers for each row group in the Parquet file. pub struct ArrowRowGroupWriterFactory { schema: SchemaDescriptor, arrow_schema: SchemaRef, @@ -917,11 +917,10 @@ impl ArrowRowGroupWriterFactory { Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) } - /// Create a new row group writer and return its column writers. - pub fn get_column_writers(&self, row_group_index: usize) -> Result> { - // let row_group_index = self.writer.flushed_row_groups().len(); - let in_progress = self.create_row_group_writer(row_group_index)?; - Ok(in_progress.writers) + /// Create column writers for a new row group. + pub fn create_column_writers(&self, row_group_index: usize) -> Result> { + let rg_writer = self.create_row_group_writer(row_group_index)?; + Ok(rg_writer.writers) } } diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 18c3b855ac1b..2f52e10df7a0 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -503,14 +503,15 @@ async fn read_and_roundtrip_to_encrypted_file_async( verify_encryption_test_file_read_async(&mut file, decryption_properties).await } +// Type aliases for multithreaded file writing tests type ColSender = Sender; type ColumnWriterTask = JoinHandle>; type RBStreamSerializeResult = Result<(Vec, usize), ParquetError>; async fn send_arrays_to_column_writers( - col_array_channels: &Vec, + col_array_channels: &[ColSender], rb: &RecordBatch, - schema: Arc, + schema: &Arc, ) -> Result<(), ParquetError> { // Each leaf column has its own channel, increment next_channel for each leaf column sent. let mut next_channel = 0; @@ -545,17 +546,17 @@ fn spawn_rg_join_and_finalize_task( } fn spawn_parquet_parallel_serialization_task( - rgwf: ArrowRowGroupWriterFactory, + writer_factory: ArrowRowGroupWriterFactory, mut data: Receiver, serialize_tx: Sender>, schema: Arc, -) -> Result>, ParquetError> { - let handle = tokio::spawn(async move { +) -> JoinHandle> { + tokio::spawn(async move { let max_buffer_rb = 10; let max_row_group_rows = 10; let mut row_group_index = 0; - let column_writers = rgwf.get_column_writers(row_group_index).unwrap(); + let column_writers = writer_factory.create_column_writers(row_group_index)?; // type ColumnWriterTask = SpawnedTask>; let (mut col_writer_tasks, mut col_array_channels) = @@ -569,14 +570,13 @@ fn spawn_parquet_parallel_serialization_task( // function. loop { if current_rg_rows + rb.num_rows() < max_row_group_rows { - send_arrays_to_column_writers(&col_array_channels, &rb, schema.clone()).await?; + send_arrays_to_column_writers(&col_array_channels, &rb, &schema).await?; current_rg_rows += rb.num_rows(); break; } else { let rows_left = max_row_group_rows - current_rg_rows; let rb_split = rb.slice(0, rows_left); - send_arrays_to_column_writers(&col_array_channels, &rb_split, schema.clone()) - .await?; + send_arrays_to_column_writers(&col_array_channels, &rb_split, &schema).await?; // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup // on a separate task, so that we can immediately start on the next RG before waiting @@ -596,9 +596,9 @@ fn spawn_parquet_parallel_serialization_task( rb = rb.slice(rows_left, rb.num_rows() - rows_left); row_group_index += 1; - let column_writers = rgwf.get_column_writers(row_group_index).unwrap(); + let column_writers = writer_factory.create_column_writers(row_group_index)?; (col_writer_tasks, col_array_channels) = - spawn_column_parallel_row_group_writer(column_writers, 100).unwrap(); + spawn_column_parallel_row_group_writer(column_writers, 100)?; } } } @@ -617,9 +617,7 @@ fn spawn_parquet_parallel_serialization_task( } Ok(()) - }); - - Ok(handle) + }) } fn spawn_column_parallel_row_group_writer( @@ -645,8 +643,8 @@ fn spawn_column_parallel_row_group_writer( Ok((col_writer_tasks, col_array_channels)) } -/// Consume RowGroups serialized by other parallel tasks and concatenate them in -/// to the final parquet file, while flushing finalized bytes to an [ObjectStore] +/// Consume RowGroups serialized by other parallel tasks and concatenate them +/// to the final parquet file async fn concatenate_parallel_row_groups( mut parquet_writer: SerializedFileWriter, mut serialize_rx: Receiver>, @@ -657,8 +655,8 @@ async fn concatenate_parallel_row_groups( let (serialized_columns, _cnt) = result.map_err(|e| ParquetError::General(e.to_string()))??; - for task in serialized_columns { - task.append_to_row_group(&mut rg_out)?; + for column_chunk in serialized_columns { + column_chunk.append_to_row_group(&mut rg_out)?; } rg_out.close()?; } @@ -711,18 +709,17 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap(); let (writer, row_group_writer_factory) = arrow_writer.into_serialized_writer().unwrap(); - let max_rowgroups = 1; + let max_row_groups = 1; let (serialize_tx, serialize_rx) = - tokio::sync::mpsc::channel::>(max_rowgroups); + tokio::sync::mpsc::channel::>(max_row_groups); let launch_serialization_task = spawn_parquet_parallel_serialization_task( row_group_writer_factory, data, serialize_tx, schema.clone(), - ) - .unwrap(); + ); let _file_metadata = concatenate_parallel_row_groups(writer, serialize_rx) .await @@ -800,7 +797,6 @@ async fn test_multi_threaded_encrypted_writing() { } // Append the finalized row group to the SerializedFileWriter - // assert!(writer.append_row_group(finalized_rg).is_ok()); assert!(writer.append_row_group(finalized_rg).is_ok()); // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter From bac36900826e411564231b89e3eb544ea9082cab Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 22 Aug 2025 10:52:27 +0200 Subject: [PATCH 08/16] Deprecation of original API --- parquet/src/arrow/arrow_writer/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 92750aed0661..90ad9875f19b 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -409,6 +409,7 @@ impl ArrowWriter { } /// Create a new row group writer and return its column writers. + #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")] pub fn get_column_writers(&mut self) -> Result> { self.flush()?; let in_progress = self @@ -418,6 +419,7 @@ impl ArrowWriter { } /// Append the given column chunks to the file as a new row group. + #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")] pub fn append_row_group(&mut self, chunks: Vec) -> Result<()> { let mut row_group_writer = self.writer.next_row_group()?; for chunk in chunks { From d5f8ed48102044bcaf6a4fb868e6b5e95964b277 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 22 Aug 2025 16:53:56 +0200 Subject: [PATCH 09/16] allow deprecated tests --- parquet/tests/encryption/encryption_async.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 2f52e10df7a0..af7acd2c4613 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -775,6 +775,7 @@ async fn test_multi_threaded_encrypted_writing() { // LOW-LEVEL API: Use low level API to write into a file using multiple threads // Get column writers + #[allow(deprecated)] let col_writers = writer.get_column_writers().unwrap(); let num_columns = col_writers.len(); @@ -797,7 +798,8 @@ async fn test_multi_threaded_encrypted_writing() { } // Append the finalized row group to the SerializedFileWriter - assert!(writer.append_row_group(finalized_rg).is_ok()); + #[allow(deprecated)] + writer.append_row_group(finalized_rg).unwrap(); // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter From 71527d0c2858d4f429aeb93ce022734c2f8e0412 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 29 Aug 2025 20:14:09 +0200 Subject: [PATCH 10/16] Review feedback --- parquet/tests/encryption/encryption_async.rs | 104 ++++++++++++++++++- 1 file changed, 102 insertions(+), 2 deletions(-) diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index af7acd2c4613..62f93051500e 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -558,7 +558,6 @@ fn spawn_parquet_parallel_serialization_task( let column_writers = writer_factory.create_column_writers(row_group_index)?; - // type ColumnWriterTask = SpawnedTask>; let (mut col_writer_tasks, mut col_array_channels) = spawn_column_parallel_row_group_writer(column_writers, max_buffer_rb)?; @@ -737,7 +736,108 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { } #[tokio::test] -async fn test_multi_threaded_encrypted_writing() { +async fn test_multi_threaded_encrypted_writing_replace_deprecated_api() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = std::fs::File::open(path).unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + let schema = metadata.schema().clone(); + + let props = Some( + WriterPropertiesBuilder::default() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::tempfile().unwrap(); + let writer = + ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap(); + + let (mut serialized_file_writer, row_group_writer_factory) = + writer.into_serialized_writer().unwrap(); + + let (serialize_tx, mut serialize_rx) = + tokio::sync::mpsc::channel::>(1); + + // Create a channel to send RecordBatches to the writer and send row batches + let (record_batch_tx, mut data) = tokio::sync::mpsc::channel::(100); + let data_generator = tokio::spawn(async move { + for record_batch in record_batches { + record_batch_tx.send(record_batch).await.unwrap(); + } + }); + + // Get column writers + // This is instead of let col_writers = writer.get_column_writers().unwrap(); + let col_writers = row_group_writer_factory.create_column_writers(0).unwrap(); + + let (col_writer_tasks, col_array_channels) = + spawn_column_parallel_row_group_writer(col_writers, 10).unwrap(); + + // Spawn serialization tasks for incoming RecordBatches + let launch_serialization_task = tokio::spawn(async move { + let Some(rb) = data.recv().await else { + panic!() + }; + send_arrays_to_column_writers(&col_array_channels, &rb, &schema) + .await + .unwrap(); + let finalize_rg_task = spawn_rg_join_and_finalize_task(col_writer_tasks, 10); + + serialize_tx.send(finalize_rg_task).await.unwrap(); + drop(col_array_channels); + }); + + // Append the finalized row groups to the SerializedFileWriter + // This is instead of arrow_writer.append_row_group(arrow_column_chunks) + while let Some(task) = serialize_rx.recv().await { + let (arrow_column_chunks, _) = task.await.unwrap().unwrap(); + let mut row_group_writer = serialized_file_writer.next_row_group().unwrap(); + for chunk in arrow_column_chunks { + chunk.append_to_row_group(&mut row_group_writer).unwrap(); + } + row_group_writer.close().unwrap(); + } + + // Wait for data generator and serialization task to finish + data_generator.await.unwrap(); + launch_serialization_task.await.unwrap(); + let metadata = serialized_file_writer.close().unwrap(); + + // Close the file writer which writes the footer + assert_eq!(metadata.num_rows, 50); + assert_eq!(metadata.schema, metadata.schema); + + // Check that the file was written correctly + let (read_record_batches, read_metadata) = + read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + verify_encryption_test_data(read_record_batches, read_metadata.metadata()); + + // Check that file was encrypted + let result = ArrowReaderMetadata::load(&temp_file, ArrowReaderOptions::default()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" + ); +} + +#[tokio::test] +async fn test_multi_threaded_encrypted_writing_deprecated() { // Read example data and set up encryption/decryption properties let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); From 2d8334b9a72c1219a5f37c02922be6cd9c4a9096 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 1 Sep 2025 11:23:27 +0200 Subject: [PATCH 11/16] Update parquet/tests/encryption/encryption_async.rs Co-authored-by: Adam Reeve --- parquet/tests/encryption/encryption_async.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 62f93051500e..9a90715466cc 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -736,7 +736,7 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { } #[tokio::test] -async fn test_multi_threaded_encrypted_writing_replace_deprecated_api() { +async fn test_multi_threaded_encrypted_writing() { // Read example data and set up encryption/decryption properties let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); From 16c612fe32c7087ef255b49edbd0ad9e6f899a2f Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 1 Sep 2025 11:26:27 +0200 Subject: [PATCH 12/16] Review feedback --- parquet/tests/encryption/encryption_async.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 9a90715466cc..9c1e0c00a3f6 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -783,7 +783,6 @@ async fn test_multi_threaded_encrypted_writing() { }); // Get column writers - // This is instead of let col_writers = writer.get_column_writers().unwrap(); let col_writers = row_group_writer_factory.create_column_writers(0).unwrap(); let (col_writer_tasks, col_array_channels) = @@ -804,7 +803,6 @@ async fn test_multi_threaded_encrypted_writing() { }); // Append the finalized row groups to the SerializedFileWriter - // This is instead of arrow_writer.append_row_group(arrow_column_chunks) while let Some(task) = serialize_rx.recv().await { let (arrow_column_chunks, _) = task.await.unwrap().unwrap(); let mut row_group_writer = serialized_file_writer.next_row_group().unwrap(); From 71eaaf1499006b3c04be8489d15f8d96b3ae5317 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 15 Sep 2025 23:05:42 +0200 Subject: [PATCH 13/16] ignoring deprecation --- parquet/src/arrow/async_writer/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 4547f71274b7..5f502e5525e0 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -292,7 +292,13 @@ impl AsyncArrowWriter { /// Create a new row group writer and return its column writers. pub async fn get_column_writers(&mut self) -> Result> { let before = self.sync_writer.flushed_row_groups().len(); + // TODO: should use the new API + #[allow(deprecated)] let writers = self.sync_writer.get_column_writers()?; + // let (serialized_file_writer, arrow_row_group_writer_factory) = + // self.sync_writer.into_serialized_writer().unwrap(); + // let writers = row_group_writer_factory.create_column_writers(0).unwrap(); + // let metadata = serialized_file_writer.close().unwrap(); if before != self.sync_writer.flushed_row_groups().len() { self.do_write().await?; } @@ -301,6 +307,8 @@ impl AsyncArrowWriter { /// Append the given column chunks to the file as a new row group. pub async fn append_row_group(&mut self, chunks: Vec) -> Result<()> { + // TODO: should use the new API + #[allow(deprecated)] self.sync_writer.append_row_group(chunks)?; self.do_write().await } From c4e38db434b8e7d2675ead399eaf48cef1926b86 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 17 Sep 2025 12:11:27 +0200 Subject: [PATCH 14/16] Removing to-be-deprecated from async --- parquet/src/arrow/async_writer/mod.rs | 48 +++++++++------------------ 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 5f502e5525e0..03ffb8054891 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -61,7 +61,7 @@ mod store; pub use store::*; use crate::{ - arrow::arrow_writer::{ArrowColumnChunk, ArrowColumnWriter, ArrowWriterOptions}, + arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, file::{metadata::RowGroupMetaData, properties::WriterProperties}, @@ -288,42 +288,17 @@ impl AsyncArrowWriter { Ok(()) } - - /// Create a new row group writer and return its column writers. - pub async fn get_column_writers(&mut self) -> Result> { - let before = self.sync_writer.flushed_row_groups().len(); - // TODO: should use the new API - #[allow(deprecated)] - let writers = self.sync_writer.get_column_writers()?; - // let (serialized_file_writer, arrow_row_group_writer_factory) = - // self.sync_writer.into_serialized_writer().unwrap(); - // let writers = row_group_writer_factory.create_column_writers(0).unwrap(); - // let metadata = serialized_file_writer.close().unwrap(); - if before != self.sync_writer.flushed_row_groups().len() { - self.do_write().await?; - } - Ok(writers) - } - - /// Append the given column chunks to the file as a new row group. - pub async fn append_row_group(&mut self, chunks: Vec) -> Result<()> { - // TODO: should use the new API - #[allow(deprecated)] - self.sync_writer.append_row_group(chunks)?; - self.do_write().await - } } #[cfg(test)] mod tests { + use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; + use crate::arrow::arrow_writer::compute_leaves; use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader}; use bytes::Bytes; use std::sync::Arc; - use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; - use crate::arrow::arrow_writer::compute_leaves; - use super::*; fn get_test_reader() -> ParquetRecordBatchReader { @@ -369,7 +344,13 @@ mod tests { // Use classic API writer.write(&to_write_record).await.unwrap(); - let mut writers = writer.get_column_writers().await.unwrap(); + // Use low-level API to write an Arrow group + let arrow_writer = writer.sync_writer; + let (mut serialized_file_writer, row_group_writer_factory) = + arrow_writer.into_serialized_writer().unwrap(); + + // Get column writers + let mut writers = row_group_writer_factory.create_column_writers(0).unwrap(); let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; let to_write_arrow_group = RecordBatch::try_from_iter([("col", col)]).unwrap(); @@ -384,10 +365,13 @@ mod tests { } } - let columns: Vec<_> = writers.into_iter().map(|w| w.close().unwrap()).collect(); + let mut columns: Vec<_> = writers.into_iter().map(|w| w.close().unwrap()).collect(); // Append the arrow group as a new row group. Flush in progress - writer.append_row_group(columns).await.unwrap(); - writer.close().await.unwrap(); + let mut row_group_writer = serialized_file_writer.next_row_group().unwrap(); + let chunk = columns.remove(0); + chunk.append_to_row_group(&mut row_group_writer).unwrap(); + row_group_writer.close().unwrap(); + let _metadata = serialized_file_writer.close().unwrap(); let buffer = Bytes::from(buffer); let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) From 2bc45565cbf150e14c2c8ffdb244bf8a7d519756 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 Sep 2025 09:12:35 -0400 Subject: [PATCH 15/16] Remove outdated test --- parquet/src/arrow/async_writer/mod.rs | 54 --------------------------- 1 file changed, 54 deletions(-) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 03ffb8054891..44f5a139478c 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -332,60 +332,6 @@ mod tests { assert_eq!(to_write, read); } - #[tokio::test] - async fn test_async_arrow_group_writer() { - let col = Arc::new(Int64Array::from_iter_values([4, 5, 6])) as ArrayRef; - let to_write_record = RecordBatch::try_from_iter([("col", col)]).unwrap(); - - let mut buffer = Vec::new(); - let mut writer = - AsyncArrowWriter::try_new(&mut buffer, to_write_record.schema(), None).unwrap(); - - // Use classic API - writer.write(&to_write_record).await.unwrap(); - - // Use low-level API to write an Arrow group - let arrow_writer = writer.sync_writer; - let (mut serialized_file_writer, row_group_writer_factory) = - arrow_writer.into_serialized_writer().unwrap(); - - // Get column writers - let mut writers = row_group_writer_factory.create_column_writers(0).unwrap(); - let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; - let to_write_arrow_group = RecordBatch::try_from_iter([("col", col)]).unwrap(); - - for (field, column) in to_write_arrow_group - .schema() - .fields() - .iter() - .zip(to_write_arrow_group.columns()) - { - for leaf in compute_leaves(field.as_ref(), column).unwrap() { - writers[0].write(&leaf).unwrap(); - } - } - - let mut columns: Vec<_> = writers.into_iter().map(|w| w.close().unwrap()).collect(); - // Append the arrow group as a new row group. Flush in progress - let mut row_group_writer = serialized_file_writer.next_row_group().unwrap(); - let chunk = columns.remove(0); - chunk.append_to_row_group(&mut row_group_writer).unwrap(); - row_group_writer.close().unwrap(); - let _metadata = serialized_file_writer.close().unwrap(); - - let buffer = Bytes::from(buffer); - let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) - .unwrap() - .build() - .unwrap(); - - let col = Arc::new(Int64Array::from_iter_values([4, 5, 6, 1, 2, 3])) as ArrayRef; - let expected = RecordBatch::try_from_iter([("col", col)]).unwrap(); - - let read = reader.next().unwrap().unwrap(); - assert_eq!(expected, read); - } - // Read the data from the test file and write it by the async writer and sync writer. // And then compares the results of the two writers. #[tokio::test] From dae1793884a81b5e86856bc2d942214fffd93c57 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 Sep 2025 09:15:17 -0400 Subject: [PATCH 16/16] clippy --- parquet/src/arrow/async_writer/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 44f5a139478c..66ba6b87fee7 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -293,7 +293,6 @@ impl AsyncArrowWriter { #[cfg(test)] mod tests { use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; - use crate::arrow::arrow_writer::compute_leaves; use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader}; use bytes::Bytes;