diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 864c1bf2da45..90ad9875f19b 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -409,6 +409,7 @@ impl ArrowWriter { } /// Create a new row group writer and return its column writers. + #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")] pub fn get_column_writers(&mut self) -> Result> { self.flush()?; let in_progress = self @@ -418,6 +419,7 @@ impl ArrowWriter { } /// Append the given column chunks to the file as a new row group. + #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")] pub fn append_row_group(&mut self, chunks: Vec) -> Result<()> { let mut row_group_writer = self.writer.next_row_group()?; for chunk in chunks { @@ -426,6 +428,15 @@ impl ArrowWriter { row_group_writer.close()?; Ok(()) } + + /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`]. + /// This can be useful to provide more control over how files are written. + pub fn into_serialized_writer( + mut self, + ) -> Result<(SerializedFileWriter, ArrowRowGroupWriterFactory)> { + self.flush()?; + Ok((self.writer, self.row_group_writer_factory)) + } } impl RecordBatchWriter for ArrowWriter { @@ -851,7 +862,8 @@ impl ArrowRowGroupWriter { } } -struct ArrowRowGroupWriterFactory { +/// Factory that creates new column writers for each row group in the Parquet file. +pub struct ArrowRowGroupWriterFactory { schema: SchemaDescriptor, arrow_schema: SchemaRef, props: WriterPropertiesPtr, @@ -906,6 +918,12 @@ impl ArrowRowGroupWriterFactory { let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?; Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema)) } + + /// Create column writers for a new row group. + pub fn create_column_writers(&self, row_group_index: usize) -> Result> { + let rg_writer = self.create_row_group_writer(row_group_index)?; + Ok(rg_writer.writers) + } } /// Returns the [`ArrowColumnWriter`] for a given schema diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 4547f71274b7..66ba6b87fee7 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -61,7 +61,7 @@ mod store; pub use store::*; use crate::{ - arrow::arrow_writer::{ArrowColumnChunk, ArrowColumnWriter, ArrowWriterOptions}, + arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, file::{metadata::RowGroupMetaData, properties::WriterProperties}, @@ -288,34 +288,16 @@ impl AsyncArrowWriter { Ok(()) } - - /// Create a new row group writer and return its column writers. - pub async fn get_column_writers(&mut self) -> Result> { - let before = self.sync_writer.flushed_row_groups().len(); - let writers = self.sync_writer.get_column_writers()?; - if before != self.sync_writer.flushed_row_groups().len() { - self.do_write().await?; - } - Ok(writers) - } - - /// Append the given column chunks to the file as a new row group. - pub async fn append_row_group(&mut self, chunks: Vec) -> Result<()> { - self.sync_writer.append_row_group(chunks)?; - self.do_write().await - } } #[cfg(test)] mod tests { + use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader}; use bytes::Bytes; use std::sync::Arc; - use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; - use crate::arrow::arrow_writer::compute_leaves; - use super::*; fn get_test_reader() -> ParquetRecordBatchReader { @@ -349,51 +331,6 @@ mod tests { assert_eq!(to_write, read); } - #[tokio::test] - async fn test_async_arrow_group_writer() { - let col = Arc::new(Int64Array::from_iter_values([4, 5, 6])) as ArrayRef; - let to_write_record = RecordBatch::try_from_iter([("col", col)]).unwrap(); - - let mut buffer = Vec::new(); - let mut writer = - AsyncArrowWriter::try_new(&mut buffer, to_write_record.schema(), None).unwrap(); - - // Use classic API - writer.write(&to_write_record).await.unwrap(); - - let mut writers = writer.get_column_writers().await.unwrap(); - let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; - let to_write_arrow_group = RecordBatch::try_from_iter([("col", col)]).unwrap(); - - for (field, column) in to_write_arrow_group - .schema() - .fields() - .iter() - .zip(to_write_arrow_group.columns()) - { - for leaf in compute_leaves(field.as_ref(), column).unwrap() { - writers[0].write(&leaf).unwrap(); - } - } - - let columns: Vec<_> = writers.into_iter().map(|w| w.close().unwrap()).collect(); - // Append the arrow group as a new row group. Flush in progress - writer.append_row_group(columns).await.unwrap(); - writer.close().await.unwrap(); - - let buffer = Bytes::from(buffer); - let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) - .unwrap() - .build() - .unwrap(); - - let col = Arc::new(Int64Array::from_iter_values([4, 5, 6, 1, 2, 3])) as ArrayRef; - let expected = RecordBatch::try_from_iter([("col", col)]).unwrap(); - - let read = reader.next().unwrap().unwrap(); - assert_eq!(expected, read); - } - // Read the data from the test file and write it by the async writer and sync writer. // And then compares the results of the two writers. #[tokio::test] diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index af107f1e2610..9c1e0c00a3f6 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -21,17 +21,27 @@ use crate::encryption_util::{ read_encrypted_file, verify_column_indexes, verify_encryption_double_test_data, verify_encryption_test_data, TestKeyRetriever, }; +use arrow_array::RecordBatch; +use arrow_schema::Schema; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; -use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowWriterOptions}; +use parquet::arrow::arrow_writer::{ + compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, + ArrowRowGroupWriterFactory, ArrowWriterOptions, +}; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::{ArrowWriter, AsyncArrowWriter}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; +use parquet::file::writer::SerializedFileWriter; +use parquet::format::FileMetaData; +use std::io::Write; use std::sync::Arc; use tokio::fs::File; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::task::JoinHandle; #[tokio::test] async fn test_non_uniform_encryption_plaintext_footer() { @@ -493,6 +503,238 @@ async fn read_and_roundtrip_to_encrypted_file_async( verify_encryption_test_file_read_async(&mut file, decryption_properties).await } +// Type aliases for multithreaded file writing tests +type ColSender = Sender; +type ColumnWriterTask = JoinHandle>; +type RBStreamSerializeResult = Result<(Vec, usize), ParquetError>; + +async fn send_arrays_to_column_writers( + col_array_channels: &[ColSender], + rb: &RecordBatch, + schema: &Arc, +) -> Result<(), ParquetError> { + // Each leaf column has its own channel, increment next_channel for each leaf column sent. + let mut next_channel = 0; + for (array, field) in rb.columns().iter().zip(schema.fields()) { + for c in compute_leaves(field, array)? { + if col_array_channels[next_channel].send(c).await.is_err() { + return Ok(()); + } + next_channel += 1; + } + } + Ok(()) +} + +/// Spawns a tokio task which joins the parallel column writer tasks, +/// and finalizes the row group +fn spawn_rg_join_and_finalize_task( + column_writer_tasks: Vec, + rg_rows: usize, +) -> JoinHandle { + tokio::task::spawn(async move { + let num_cols = column_writer_tasks.len(); + let mut finalized_rg = Vec::with_capacity(num_cols); + for task in column_writer_tasks.into_iter() { + let writer = task + .await + .map_err(|e| ParquetError::General(e.to_string()))??; + finalized_rg.push(writer.close()?); + } + Ok((finalized_rg, rg_rows)) + }) +} + +fn spawn_parquet_parallel_serialization_task( + writer_factory: ArrowRowGroupWriterFactory, + mut data: Receiver, + serialize_tx: Sender>, + schema: Arc, +) -> JoinHandle> { + tokio::spawn(async move { + let max_buffer_rb = 10; + let max_row_group_rows = 10; + let mut row_group_index = 0; + + let column_writers = writer_factory.create_column_writers(row_group_index)?; + + let (mut col_writer_tasks, mut col_array_channels) = + spawn_column_parallel_row_group_writer(column_writers, max_buffer_rb)?; + + let mut current_rg_rows = 0; + + while let Some(mut rb) = data.recv().await { + // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case + // when max_row_group_rows < execution.batch_size as an alternative to a recursive async + // function. + loop { + if current_rg_rows + rb.num_rows() < max_row_group_rows { + send_arrays_to_column_writers(&col_array_channels, &rb, &schema).await?; + current_rg_rows += rb.num_rows(); + break; + } else { + let rows_left = max_row_group_rows - current_rg_rows; + let rb_split = rb.slice(0, rows_left); + send_arrays_to_column_writers(&col_array_channels, &rb_split, &schema).await?; + + // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup + // on a separate task, so that we can immediately start on the next RG before waiting + // for the current one to finish. + drop(col_array_channels); + + let finalize_rg_task = + spawn_rg_join_and_finalize_task(col_writer_tasks, max_row_group_rows); + + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } + + current_rg_rows = 0; + rb = rb.slice(rows_left, rb.num_rows() - rows_left); + + row_group_index += 1; + let column_writers = writer_factory.create_column_writers(row_group_index)?; + (col_writer_tasks, col_array_channels) = + spawn_column_parallel_row_group_writer(column_writers, 100)?; + } + } + } + + drop(col_array_channels); + // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows + if current_rg_rows > 0 { + let finalize_rg_task = + spawn_rg_join_and_finalize_task(col_writer_tasks, current_rg_rows); + + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } + } + + Ok(()) + }) +} + +fn spawn_column_parallel_row_group_writer( + col_writers: Vec, + max_buffer_size: usize, +) -> Result<(Vec, Vec), ParquetError> { + let num_columns = col_writers.len(); + + let mut col_writer_tasks = Vec::with_capacity(num_columns); + let mut col_array_channels = Vec::with_capacity(num_columns); + for mut col_writer in col_writers.into_iter() { + let (send_array, mut receive_array) = + tokio::sync::mpsc::channel::(max_buffer_size); + col_array_channels.push(send_array); + let handle = tokio::spawn(async move { + while let Some(col) = receive_array.recv().await { + col_writer.write(&col)?; + } + Ok(col_writer) + }); + col_writer_tasks.push(handle); + } + Ok((col_writer_tasks, col_array_channels)) +} + +/// Consume RowGroups serialized by other parallel tasks and concatenate them +/// to the final parquet file +async fn concatenate_parallel_row_groups( + mut parquet_writer: SerializedFileWriter, + mut serialize_rx: Receiver>, +) -> Result { + while let Some(task) = serialize_rx.recv().await { + let result = task.await; + let mut rg_out = parquet_writer.next_row_group()?; + let (serialized_columns, _cnt) = + result.map_err(|e| ParquetError::General(e.to_string()))??; + + for column_chunk in serialized_columns { + column_chunk.append_to_row_group(&mut rg_out)?; + } + rg_out.close()?; + } + + let file_metadata = parquet_writer.close()?; + Ok(file_metadata) +} + +// This test is based on DataFusion's ParquetSink. Motivation is to test +// concurrent writing of encrypted data over multiple row groups using the low-level API. +#[tokio::test] +async fn test_concurrent_encrypted_writing_over_multiple_row_groups() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = std::fs::File::open(path).unwrap(); + + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + let schema = metadata.schema(); + + // Create a channel to send RecordBatches to the writer and send row groups + let (record_batch_tx, data) = tokio::sync::mpsc::channel::(100); + let data_generator = tokio::spawn(async move { + for record_batch in record_batches { + record_batch_tx.send(record_batch).await.unwrap(); + } + }); + + let props = Some( + WriterPropertiesBuilder::default() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::tempfile().unwrap(); + let arrow_writer = + ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap(); + + let (writer, row_group_writer_factory) = arrow_writer.into_serialized_writer().unwrap(); + let max_row_groups = 1; + + let (serialize_tx, serialize_rx) = + tokio::sync::mpsc::channel::>(max_row_groups); + + let launch_serialization_task = spawn_parquet_parallel_serialization_task( + row_group_writer_factory, + data, + serialize_tx, + schema.clone(), + ); + + let _file_metadata = concatenate_parallel_row_groups(writer, serialize_rx) + .await + .unwrap(); + + data_generator.await.unwrap(); + launch_serialization_task.await.unwrap().unwrap(); + + // Check that the file was written correctly + let (read_record_batches, read_metadata) = + read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + + assert_eq!(read_metadata.metadata().file_metadata().num_rows(), 50); + verify_encryption_test_data(read_record_batches, read_metadata.metadata()); +} + #[tokio::test] async fn test_multi_threaded_encrypted_writing() { // Read example data and set up encryption/decryption properties @@ -500,6 +742,105 @@ async fn test_multi_threaded_encrypted_writing() { let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); let file = std::fs::File::open(path).unwrap(); + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&file, decryption_properties.clone()).unwrap(); + let schema = metadata.schema().clone(); + + let props = Some( + WriterPropertiesBuilder::default() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::tempfile().unwrap(); + let writer = + ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap(); + + let (mut serialized_file_writer, row_group_writer_factory) = + writer.into_serialized_writer().unwrap(); + + let (serialize_tx, mut serialize_rx) = + tokio::sync::mpsc::channel::>(1); + + // Create a channel to send RecordBatches to the writer and send row batches + let (record_batch_tx, mut data) = tokio::sync::mpsc::channel::(100); + let data_generator = tokio::spawn(async move { + for record_batch in record_batches { + record_batch_tx.send(record_batch).await.unwrap(); + } + }); + + // Get column writers + let col_writers = row_group_writer_factory.create_column_writers(0).unwrap(); + + let (col_writer_tasks, col_array_channels) = + spawn_column_parallel_row_group_writer(col_writers, 10).unwrap(); + + // Spawn serialization tasks for incoming RecordBatches + let launch_serialization_task = tokio::spawn(async move { + let Some(rb) = data.recv().await else { + panic!() + }; + send_arrays_to_column_writers(&col_array_channels, &rb, &schema) + .await + .unwrap(); + let finalize_rg_task = spawn_rg_join_and_finalize_task(col_writer_tasks, 10); + + serialize_tx.send(finalize_rg_task).await.unwrap(); + drop(col_array_channels); + }); + + // Append the finalized row groups to the SerializedFileWriter + while let Some(task) = serialize_rx.recv().await { + let (arrow_column_chunks, _) = task.await.unwrap().unwrap(); + let mut row_group_writer = serialized_file_writer.next_row_group().unwrap(); + for chunk in arrow_column_chunks { + chunk.append_to_row_group(&mut row_group_writer).unwrap(); + } + row_group_writer.close().unwrap(); + } + + // Wait for data generator and serialization task to finish + data_generator.await.unwrap(); + launch_serialization_task.await.unwrap(); + let metadata = serialized_file_writer.close().unwrap(); + + // Close the file writer which writes the footer + assert_eq!(metadata.num_rows, 50); + assert_eq!(metadata.schema, metadata.schema); + + // Check that the file was written correctly + let (read_record_batches, read_metadata) = + read_encrypted_file(&temp_file, decryption_properties.clone()).unwrap(); + verify_encryption_test_data(read_record_batches, read_metadata.metadata()); + + // Check that file was encrypted + let result = ArrowReaderMetadata::load(&temp_file, ArrowReaderOptions::default()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" + ); +} + +#[tokio::test] +async fn test_multi_threaded_encrypted_writing_deprecated() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = std::fs::File::open(path).unwrap(); + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) .with_column_key("double_field", b"1234567890123450".into()) .with_column_key("float_field", b"1234567890123451".into()) @@ -527,28 +868,17 @@ async fn test_multi_threaded_encrypted_writing() { // Create a temporary file to write the encrypted data let temp_file = tempfile::tempfile().unwrap(); - let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); + let mut writer = ArrowWriter::try_new(&temp_file, schema.clone(), props).unwrap(); // LOW-LEVEL API: Use low level API to write into a file using multiple threads // Get column writers + #[allow(deprecated)] let col_writers = writer.get_column_writers().unwrap(); let num_columns = col_writers.len(); - // Create a channel for each column writer to send ArrowLeafColumn data to - let mut col_writer_tasks = Vec::with_capacity(num_columns); - let mut col_array_channels = Vec::with_capacity(num_columns); - for mut col_writer in col_writers.into_iter() { - let (send_array, mut receive_array) = tokio::sync::mpsc::channel::(100); - col_array_channels.push(send_array); - let handle = tokio::spawn(async move { - while let Some(col) = receive_array.recv().await { - col_writer.write(&col).unwrap(); - } - col_writer.close().unwrap() - }); - col_writer_tasks.push(handle); - } + let (col_writer_tasks, mut col_array_channels) = + spawn_column_parallel_row_group_writer(col_writers, 100).unwrap(); // Send the ArrowLeafColumn data to the respective column writer channels let mut worker_iter = col_array_channels.iter_mut(); @@ -562,11 +892,12 @@ async fn test_multi_threaded_encrypted_writing() { // Wait for all column writers to finish writing let mut finalized_rg = Vec::with_capacity(num_columns); for task in col_writer_tasks.into_iter() { - finalized_rg.push(task.await.unwrap()); + finalized_rg.push(task.await.unwrap().unwrap().close().unwrap()); } // Append the finalized row group to the SerializedFileWriter - assert!(writer.append_row_group(finalized_rg).is_ok()); + #[allow(deprecated)] + writer.append_row_group(finalized_rg).unwrap(); // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter diff --git a/parquet/tests/encryption/encryption_util.rs b/parquet/tests/encryption/encryption_util.rs index bf7fd08109f6..f53e12adb720 100644 --- a/parquet/tests/encryption/encryption_util.rs +++ b/parquet/tests/encryption/encryption_util.rs @@ -113,15 +113,18 @@ pub(crate) fn verify_encryption_test_data( assert_eq!(file_metadata.num_rows(), 50); assert_eq!(file_metadata.schema_descr().num_columns(), 8); + let mut total_rows = 0; metadata.row_groups().iter().for_each(|rg| { assert_eq!(rg.num_columns(), 8); - assert_eq!(rg.num_rows(), 50); + total_rows += rg.num_rows(); }); + assert_eq!(total_rows, 50); let mut row_count = 0; for batch in record_batches { let batch = batch; - row_count += batch.num_rows(); + + let row_index = |index_in_batch: usize| row_count + index_in_batch; let bool_col = batch.column(0).as_boolean(); let time_col = batch @@ -137,36 +140,44 @@ pub(crate) fn verify_encryption_test_data( let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); for (i, x) in bool_col.iter().enumerate() { - assert_eq!(x.unwrap(), i % 2 == 0); + assert_eq!(x.unwrap(), row_index(i) % 2 == 0); } for (i, x) in time_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as i32); + assert_eq!(x.unwrap(), row_index(i) as i32); } for (i, list_item) in list_col.iter().enumerate() { let list_item = list_item.unwrap(); let list_item = list_item.as_primitive::(); assert_eq!(list_item.len(), 2); - assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64); - assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64); + assert_eq!( + list_item.value(0), + ((row_index(i) * 2) * 1000000000000) as i64 + ); + assert_eq!( + list_item.value(1), + ((row_index(i) * 2 + 1) * 1000000000000) as i64 + ); } for x in timestamp_col.iter() { assert!(x.is_some()); } for (i, x) in f32_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as f32 * 1.1f32); + assert_eq!(x.unwrap(), row_index(i) as f32 * 1.1f32); } for (i, x) in f64_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as f64 * 1.1111111f64); + assert_eq!(x.unwrap(), row_index(i) as f64 * 1.1111111f64); } for (i, x) in binary_col.iter().enumerate() { - assert_eq!(x.is_some(), i % 2 == 0); + assert_eq!(x.is_some(), row_index(i) % 2 == 0); if let Some(x) = x { assert_eq!(&x[0..7], b"parquet"); } } for (i, x) in fixed_size_binary_col.iter().enumerate() { - assert_eq!(x.unwrap(), &[i as u8; 10]); + assert_eq!(x.unwrap(), &[row_index(i) as u8; 10]); } + + row_count += batch.num_rows(); } assert_eq!(row_count, file_metadata.num_rows() as usize);