Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,8 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
while records_read < batch_size {
let records_to_read = batch_size - records_read;

// NB can be 0 if at end of page
let records_read_once = self.record_reader.read_records(records_to_read)?;
if records_read_once == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case of 0 rows being read is handled in the if records_read_once < records_to-read clause below -- namely in this case the code needs to try and get the next page of data from the page reader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

break; // record reader has no record
}
records_read = records_read + records_read_once;

// Record reader exhausted
Expand Down Expand Up @@ -934,7 +932,7 @@ mod tests {
use super::*;
use crate::arrow::converter::Utf8Converter;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::Page;
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
Expand Down Expand Up @@ -1000,6 +998,33 @@ mod tests {
}
}

#[test]
fn test_primitive_array_reader_empty_pages() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test for the functionality added in 8a61570 / #7140

// Construct column schema
let message_type = "
message test_schema {
REQUIRED INT32 leaf;
}
";

let schema = parse_message_type(message_type)
.map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t))))
.unwrap();

let column_desc = schema.column(0);
let page_iterator = EmptyPageIterator::new(schema.clone());

let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
column_desc.clone(),
)
.unwrap();

// expect no values to be read
let array = array_reader.next_batch(50).unwrap();
assert!(array.is_empty());
}

#[test]
fn test_primitive_array_reader_data() {
// Construct column schema
Expand Down Expand Up @@ -1452,6 +1477,35 @@ mod tests {
}
}

/// Iterator for testing reading empty columns
struct EmptyPageIterator {
schema: SchemaDescPtr,
}

impl EmptyPageIterator {
fn new(schema: SchemaDescPtr) -> Self {
EmptyPageIterator { schema }
}
}

impl Iterator for EmptyPageIterator {
type Item = Result<Box<dyn PageReader>>;

fn next(&mut self) -> Option<Self::Item> {
None
}
}

impl PageIterator for EmptyPageIterator {
fn schema(&mut self) -> Result<SchemaDescPtr> {
Ok(self.schema.clone())
}

fn column_schema(&mut self) -> Result<ColumnDescPtr> {
Ok(self.schema.column(0))
}
}

#[test]
fn test_struct_array_reader() {
let array_1 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![1, 2, 3, 4, 5]));
Expand Down
39 changes: 39 additions & 0 deletions rust/parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,45 @@ mod tests {
>(2, 100, 2, message_type, 15, 50, converter);
}

#[test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both these tests fail without the changes in this PR.

I don't like the copy/paste nature of these tests and I plan a minor PR building on this one proposing how to remove the duplication and make the tests easier to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn test_bool_single_column_reader_test_batch_size_divides_into_row_group_size() {
let message_type = "
message test_schema {
REQUIRED BOOLEAN leaf;
}
";

// Use a batch size (5) so batches to fall on
// row group boundaries (25 rows in 3 row groups --> row
// groups of 10, 10, and 5) to test edge refilling edge cases.
let converter = FromConverter::new();
single_column_reader_test::<
BoolType,
BooleanArray,
FromConverter<Vec<Option<bool>>, BooleanArray>,
BoolType,
>(3, 25, 2, message_type, 5, 50, converter);
}

#[test]
fn test_bool_single_column_reader_test_batch_size_divides_into_row_group_size2() {
let message_type = "
message test_schema {
REQUIRED BOOLEAN leaf;
}
";

// Ensure that every batch size (25) falls exactly a row group
// boundary (25 in this case) to test edge case.
let converter = FromConverter::new();
single_column_reader_test::<
BoolType,
BooleanArray,
FromConverter<Vec<Option<bool>>, BooleanArray>,
BoolType,
>(4, 100, 2, message_type, 25, 50, converter);
}

struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
Expand Down