diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index ae6ca497b85..05ec9a0fa06 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -136,10 +136,8 @@ impl ArrayReader for PrimitiveArrayReader { while records_read < batch_size { let records_to_read = batch_size - records_read; + // NB can be 0 if at end of page let records_read_once = self.record_reader.read_records(records_to_read)?; - if records_read_once == 0 { - break; // record reader has no record - } records_read = records_read + records_read_once; // Record reader exhausted @@ -934,7 +932,7 @@ mod tests { use super::*; use crate::arrow::converter::Utf8Converter; use crate::basic::{Encoding, Type as PhysicalType}; - use crate::column::page::Page; + use crate::column::page::{Page, PageReader}; use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type}; use crate::errors::Result; use crate::file::reader::{FileReader, SerializedFileReader}; @@ -1000,6 +998,33 @@ mod tests { } } + #[test] + fn test_primitive_array_reader_empty_pages() { + // Construct column schema + let message_type = " + message test_schema { + REQUIRED INT32 leaf; + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + let page_iterator = EmptyPageIterator::new(schema.clone()); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc.clone(), + ) + .unwrap(); + + // expect no values to be read + let array = array_reader.next_batch(50).unwrap(); + assert!(array.is_empty()); + } + #[test] fn test_primitive_array_reader_data() { // Construct column schema @@ -1452,6 +1477,35 @@ mod tests { } } + /// Iterator for testing reading empty columns + struct EmptyPageIterator { + schema: SchemaDescPtr, + } + + impl EmptyPageIterator { + fn new(schema: SchemaDescPtr) -> Self { + EmptyPageIterator { schema } + } + } + + impl Iterator for EmptyPageIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + None + } + } + + impl PageIterator for EmptyPageIterator { + fn schema(&mut self) -> Result { + Ok(self.schema.clone()) + } + + fn column_schema(&mut self) -> Result { + Ok(self.schema.column(0)) + } + } + #[test] fn test_struct_array_reader() { let array_1 = Arc::new(PrimitiveArray::::from(vec![1, 2, 3, 4, 5])); diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index f052d0f36e5..180514b3988 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -304,6 +304,45 @@ mod tests { >(2, 100, 2, message_type, 15, 50, converter); } + #[test] + fn test_bool_single_column_reader_test_batch_size_divides_into_row_group_size() { + let message_type = " + message test_schema { + REQUIRED BOOLEAN leaf; + } + "; + + // Use a batch size (5) so batches to fall on + // row group boundaries (25 rows in 3 row groups --> row + // groups of 10, 10, and 5) to test edge refilling edge cases. + let converter = FromConverter::new(); + single_column_reader_test::< + BoolType, + BooleanArray, + FromConverter>, BooleanArray>, + BoolType, + >(3, 25, 2, message_type, 5, 50, converter); + } + + #[test] + fn test_bool_single_column_reader_test_batch_size_divides_into_row_group_size2() { + let message_type = " + message test_schema { + REQUIRED BOOLEAN leaf; + } + "; + + // Ensure that every batch size (25) falls exactly a row group + // boundary (25 in this case) to test edge case. + let converter = FromConverter::new(); + single_column_reader_test::< + BoolType, + BooleanArray, + FromConverter>, BooleanArray>, + BoolType, + >(4, 100, 2, message_type, 25, 50, converter); + } + struct RandFixedLenGen {} impl RandGen for RandFixedLenGen {