Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 58 additions & 31 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,12 @@ impl<W: Write + Send> ArrowWriter<W> {

let max_row_group_size = props.max_row_group_size();

let props_ptr = Arc::new(props);
let file_writer =
SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;

let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
let row_group_writer_factory =
ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);

Ok(Self {
writer: file_writer,
Expand Down Expand Up @@ -310,12 +312,10 @@ impl<W: Write + Send> ArrowWriter<W> {

let in_progress = match &mut self.in_progress {
Some(in_progress) => in_progress,
x => x.insert(self.row_group_writer_factory.create_row_group_writer(
self.writer.schema_descr(),
self.writer.properties(),
&self.arrow_schema,
self.writer.flushed_row_groups().len(),
)?),
x => x.insert(
self.row_group_writer_factory
.create_row_group_writer(self.writer.flushed_row_groups().len())?,
),
};

// If would exceed max_row_group_size, split batch
Expand Down Expand Up @@ -402,6 +402,25 @@ impl<W: Write + Send> ArrowWriter<W> {
pub fn close(mut self) -> Result<crate::format::FileMetaData> {
self.finish()
}

/// Create a new row group writer and return its column writers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this API 👌

pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
self.flush()?;
let in_progress = self
.row_group_writer_factory
.create_row_group_writer(self.writer.flushed_row_groups().len())?;
Ok(in_progress.writers)
}

/// Append the given column chunks to the file as a new row group.
pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
let mut row_group_writer = self.writer.next_row_group()?;
for chunk in chunks {
chunk.append_to_row_group(&mut row_group_writer)?;
}
row_group_writer.close()?;
Ok(())
}
}

impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
Expand Down Expand Up @@ -828,51 +847,59 @@ impl ArrowRowGroupWriter {
}

struct ArrowRowGroupWriterFactory {
schema: SchemaDescriptor,
arrow_schema: SchemaRef,
props: WriterPropertiesPtr,
#[cfg(feature = "encryption")]
file_encryptor: Option<Arc<FileEncryptor>>,
}

impl ArrowRowGroupWriterFactory {
#[cfg(feature = "encryption")]
fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
fn new<W: Write + Send>(
file_writer: &SerializedFileWriter<W>,
schema: SchemaDescriptor,
arrow_schema: SchemaRef,
props: WriterPropertiesPtr,
) -> Self {
Self {
schema,
arrow_schema,
props,
file_encryptor: file_writer.file_encryptor(),
}
}

#[cfg(not(feature = "encryption"))]
fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
Self {}
fn new<W: Write + Send>(
_file_writer: &SerializedFileWriter<W>,
schema: SchemaDescriptor,
arrow_schema: SchemaRef,
props: WriterPropertiesPtr,
) -> Self {
Self {
schema,
arrow_schema,
props,
}
}

#[cfg(feature = "encryption")]
fn create_row_group_writer(
&self,
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
row_group_index: usize,
) -> Result<ArrowRowGroupWriter> {
fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
let writers = get_column_writers_with_encryptor(
parquet,
props,
arrow,
&self.schema,
&self.props,
&self.arrow_schema,
self.file_encryptor.clone(),
row_group_index,
)?;
Ok(ArrowRowGroupWriter::new(writers, arrow))
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
}

#[cfg(not(feature = "encryption"))]
fn create_row_group_writer(
&self,
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
_row_group_index: usize,
) -> Result<ArrowRowGroupWriter> {
let writers = get_column_writers(parquet, props, arrow)?;
Ok(ArrowRowGroupWriter::new(writers, arrow))
fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
}
}

Expand Down
8 changes: 5 additions & 3 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl WriterProperties {
/// Returns a new default [`WriterPropertiesBuilder`] for creating writer
/// properties.
pub fn builder() -> WriterPropertiesBuilder {
WriterPropertiesBuilder::with_defaults()
WriterPropertiesBuilder::default()
}

/// Returns data page size limit.
Expand Down Expand Up @@ -455,9 +455,9 @@ pub struct WriterPropertiesBuilder {
file_encryption_properties: Option<FileEncryptionProperties>,
}

impl WriterPropertiesBuilder {
impl Default for WriterPropertiesBuilder {
/// Returns default state of the builder.
fn with_defaults() -> Self {
fn default() -> Self {
Self {
data_page_size_limit: DEFAULT_PAGE_SIZE,
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
Expand All @@ -478,7 +478,9 @@ impl WriterPropertiesBuilder {
file_encryption_properties: None,
}
}
}

impl WriterPropertiesBuilder {
/// Finalizes the configuration and returns immutable writer properties struct.
pub fn build(self) -> WriterProperties {
WriterProperties {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ fn write_bloom_filters<W: Write + Send>(
/// more columns are available to write.
/// - Once done writing a column, close column writer with `close`
/// - Once all columns have been written, close row group writer with `close`
/// method. THe close method will return row group metadata and is no-op
/// method. The close method will return row group metadata and is no-op
/// on already closed row group.
pub struct SerializedRowGroupWriter<'a, W: Write> {
descr: SchemaDescPtr,
Expand Down
68 changes: 8 additions & 60 deletions parquet/tests/encryption/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! This module contains tests for reading encrypted Parquet files with the Arrow API

use crate::encryption_util::{
verify_column_indexes, verify_encryption_test_data, TestKeyRetriever,
read_and_roundtrip_to_encrypted_file, verify_column_indexes, verify_encryption_test_file_read,
TestKeyRetriever,
};
use arrow::array::*;
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -377,21 +378,6 @@ fn test_uniform_encryption_with_key_retriever() {
verify_encryption_test_file_read(file, decryption_properties);
}

fn verify_encryption_test_file_read(file: File, decryption_properties: FileDecryptionProperties) {
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
let reader_metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
let metadata = reader_metadata.metadata();

let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
let record_reader = builder.build().unwrap();
let record_batches = record_reader
.map(|x| x.unwrap())
.collect::<Vec<RecordBatch>>();

verify_encryption_test_data(record_batches, metadata);
}

fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
metadata.row_groups().iter().map(|x| x.num_rows()).collect()
}
Expand Down Expand Up @@ -630,6 +616,7 @@ fn uniform_encryption_page_skipping(page_index: bool) -> parquet::errors::Result
fn test_write_non_uniform_encryption() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
let file = File::open(path).unwrap();

let footer_key = b"0123456789012345".to_vec(); // 128bit/16
let column_names = vec!["double_field", "float_field"];
Expand All @@ -647,13 +634,14 @@ fn test_write_non_uniform_encryption() {
.build()
.unwrap();

read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties);
read_and_roundtrip_to_encrypted_file(&file, decryption_properties, file_encryption_properties);
}

#[test]
fn test_write_uniform_encryption_plaintext_footer() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
let file = File::open(path).unwrap();

let footer_key = b"0123456789012345".to_vec(); // 128bit/16
let wrong_footer_key = b"0000000000000000".to_vec(); // 128bit/16
Expand All @@ -679,7 +667,7 @@ fn test_write_uniform_encryption_plaintext_footer() {

// Try writing plaintext footer and then reading it with the correct footer key
read_and_roundtrip_to_encrypted_file(
&path,
&file,
decryption_properties.clone(),
file_encryption_properties.clone(),
);
Expand All @@ -688,7 +676,6 @@ fn test_write_uniform_encryption_plaintext_footer() {
let temp_file = tempfile::tempfile().unwrap();

// read example data
let file = File::open(path).unwrap();
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
Expand Down Expand Up @@ -730,6 +717,7 @@ fn test_write_uniform_encryption_plaintext_footer() {
fn test_write_uniform_encryption() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/uniform_encryption.parquet.encrypted");
let file = File::open(path).unwrap();

let footer_key = b"0123456789012345".to_vec(); // 128bit/16

Expand All @@ -741,7 +729,7 @@ fn test_write_uniform_encryption() {
.build()
.unwrap();

read_and_roundtrip_to_encrypted_file(&path, decryption_properties, file_encryption_properties);
read_and_roundtrip_to_encrypted_file(&file, decryption_properties, file_encryption_properties);
}

#[test]
Expand Down Expand Up @@ -1061,43 +1049,3 @@ fn test_decrypt_page_index(

Ok(())
}

fn read_and_roundtrip_to_encrypted_file(
path: &str,
decryption_properties: FileDecryptionProperties,
encryption_properties: FileEncryptionProperties,
) {
let temp_file = tempfile::tempfile().unwrap();

// read example data
let file = File::open(path).unwrap();
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();

let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
let batch_reader = builder.build().unwrap();
let batches = batch_reader
.collect::<parquet::errors::Result<Vec<RecordBatch>, _>>()
.unwrap();

// write example data
let props = WriterProperties::builder()
.with_file_encryption_properties(encryption_properties)
.build();

let mut writer = ArrowWriter::try_new(
temp_file.try_clone().unwrap(),
metadata.schema().clone(),
Some(props),
)
.unwrap();
for batch in batches {
writer.write(&batch).unwrap();
}

writer.close().unwrap();

// check re-written example data
verify_encryption_test_file_read(temp_file, decryption_properties);
}
Loading
Loading