Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 92 additions & 56 deletions rust/parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,51 +296,12 @@ mod tests {
";

let converter = FromConverter::new();
single_column_reader_test::<
run_single_column_reader_tests::<
BoolType,
BooleanArray,
FromConverter<Vec<Option<bool>>, BooleanArray>,
BoolType,
>(2, 100, 2, message_type, 15, 50, converter);
}

#[test]
fn test_bool_single_column_reader_test_batch_size_divides_into_row_group_size() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let message_type = "
message test_schema {
REQUIRED BOOLEAN leaf;
}
";

// Use a batch size (5) so batches to fall on
// row group boundaries (25 rows in 3 row groups --> row
// groups of 10, 10, and 5) to test edge refilling edge cases.
let converter = FromConverter::new();
single_column_reader_test::<
BoolType,
BooleanArray,
FromConverter<Vec<Option<bool>>, BooleanArray>,
BoolType,
>(3, 25, 2, message_type, 5, 50, converter);
}

#[test]
fn test_bool_single_column_reader_test_batch_size_divides_into_row_group_size2() {
let message_type = "
message test_schema {
REQUIRED BOOLEAN leaf;
}
";

// Ensure that every batch size (25) falls exactly a row group
// boundary (25 in this case) to test edge case.
let converter = FromConverter::new();
single_column_reader_test::<
BoolType,
BooleanArray,
FromConverter<Vec<Option<bool>>, BooleanArray>,
BoolType,
>(4, 100, 2, message_type, 25, 50, converter);
>(2, message_type, &converter);
}

struct RandFixedLenGen {}
Expand All @@ -362,12 +323,12 @@ mod tests {
";

let converter = FixedSizeArrayConverter::new(20);
single_column_reader_test::<
run_single_column_reader_tests::<
FixedLenByteArrayType,
FixedSizeBinaryArray,
FixedSizeArrayConverter,
RandFixedLenGen,
>(2, 100, 20, message_type, 15, 50, converter);
>(20, message_type, &converter);
}

struct RandUtf8Gen {}
Expand All @@ -387,30 +348,104 @@ mod tests {
";

let converter = Utf8ArrayConverter {};
single_column_reader_test::<
run_single_column_reader_tests::<
ByteArrayType,
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
>(2, 100, 2, message_type, 15, 50, converter);
>(2, message_type, &converter);
}

fn single_column_reader_test<T, A, C, G>(
/// Parameters for single_column_reader_test
#[derive(Debug)]
struct TestOptions {
/// Number of row group to write to parquet (row group size =
/// num_row_groups / num_rows)
num_row_groups: usize,
/// Total number of rows
num_rows: usize,
rand_max: i32,
message_type: &str,
/// Size of batches to read back
record_batch_size: usize,
/// Total number of batches to attempt to read.
/// `record_batch_size` * `num_iterations` should be greater
/// than `num_rows` to ensure the data can be read back completely
num_iterations: usize,
converter: C,
}

/// Create a parquet file and then read it using
/// `ParquetFileArrowReader` using a standard set of parameters
/// `opts`.
///
/// `rand_max` represents the maximum size of value to pass to to
/// value generator
fn run_single_column_reader_tests<T, A, C, G>(
rand_max: i32,
message_type: &str,
converter: &C,
) where
T: DataType,
G: RandGen<T>,
A: PartialEq + Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
let values: Vec<Vec<T::T>> = (0..num_row_groups)
.map(|_| G::gen_vec(rand_max, num_rows))
let all_options = vec![
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the basic idea -- drive the test with a table that is shared between bool, utf, and fixed_len_binary_array types

// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions {
num_row_groups: 2,
num_rows: 100,
record_batch_size: 15,
num_iterations: 50,
},
// choose record_batch_batch (5) so batches sometime fall
// on row group boundaries and (25 rows in 3 row groups
// --> row groups of 10, 10, and 5). Tests buffer
// refilling edge cases.
TestOptions {
num_row_groups: 3,
num_rows: 25,
record_batch_size: 5,
num_iterations: 50,
},
// Choose record_batch_size (25) so all batches fall
// exactly on row group boundary (25). Tests buffer
// refilling edge cases.
TestOptions {
num_row_groups: 4,
num_rows: 100,
record_batch_size: 25,
num_iterations: 50,
},
];

all_options.into_iter().for_each(|opts| {
// Print out options to facilitate debugging failures on CI
println!("Running with Test Options: {:?}", opts);
single_column_reader_test::<T, A, C, G>(
opts,
rand_max,
message_type,
converter,
)
});
}

/// Create a parquet file and then read it using
/// `ParquetFileArrowReader` using the parameters described in
/// `opts`.
fn single_column_reader_test<T, A, C, G>(
opts: TestOptions,
rand_max: i32,
message_type: &str,
converter: &C,
) where
T: DataType,
G: RandGen<T>,
A: PartialEq + Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
.map(|_| G::gen_vec(rand_max, opts.num_rows))
.collect();

let path = get_temp_filename();
Expand All @@ -426,21 +461,22 @@ mod tests {
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader));

let mut record_reader =
arrow_reader.get_record_reader(record_batch_size).unwrap();
let mut record_reader = arrow_reader
.get_record_reader(opts.record_batch_size)
.unwrap();

let expected_data: Vec<Option<T::T>> = values
.iter()
.flat_map(|v| v.iter())
.map(|b| Some(b.clone()))
.collect();

for i in 0..num_iterations {
let start = i * record_batch_size;
for i in 0..opts.num_iterations {
let start = i * opts.record_batch_size;

let batch = record_reader.next_batch().unwrap();
if start < expected_data.len() {
let end = min(start + record_batch_size, expected_data.len());
let end = min(start + opts.record_batch_size, expected_data.len());
assert!(batch.is_some());

let mut data = vec![];
Expand Down