Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,11 +912,36 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
));
}

// Need to remove from the values array the nulls that represent null lists rather than null items
// null lists have def_level = 0
// List definitions can be encoded as 4 values:
// - n + 0: the list slot is null
// - n + 1: the list slot is not null, but is empty (i.e. [])
// - n + 2: the list slot is not null, but its child is empty (i.e. [ null ])
// - n + 3: the list slot is not null, and its child is not empty
// Where n is the max definition level of the list's parent.
// If a Parquet schema's only leaf is the list, then n = 0.

// TODO: ARROW-10391 - add a test case with a non-nullable child, check if max is 3
let list_field_type = match self.get_data_type() {
ArrowType::List(field)
| ArrowType::FixedSizeList(field, _)
| ArrowType::LargeList(field) => field,
_ => {
// Panic: this is safe as we only write lists from list datatypes
unreachable!()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own curiosity, is there a specific reason to use unreachable! rather than panic! in cases like this? I understand the outcome will be the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't make any difference, unreachable!() will call panic!() with a message about unreachable code being reached. So it's probably a more descriptive panic.

I tohught that marking a condition as unreachable!() lets the compiler optimise out that condition, but it seems like only its unsafe equivalent does.

}
};
let max_list_def_range = if list_field_type.is_nullable() { 3 } else { 2 };
let max_list_definition = *(def_levels.iter().max().unwrap());
// TODO: ARROW-10391 - Find a reliable way of validating deeply-nested lists
// debug_assert!(
// max_list_definition >= max_list_def_range,
// "Lift definition max less than range"
// );
let list_null_def = max_list_definition - max_list_def_range;
let list_empty_def = max_list_definition - 1;
let mut null_list_indices: Vec<usize> = Vec::new();
for i in 0..def_levels.len() {
if def_levels[i] == 0 {
if def_levels[i] == list_null_def {
null_list_indices.push(i);
}
}
Expand All @@ -937,7 +962,7 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
if rep_levels[i] == 0 {
offsets.push(cur_offset)
}
if def_levels[i] > 0 {
if def_levels[i] >= list_empty_def {
cur_offset += OffsetSize::one();
}
}
Expand Down
54 changes: 17 additions & 37 deletions rust/parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
let batch_level = LevelInfo::new_from_batch(batch);
let mut row_group_writer = self.writer.next_row_group()?;
for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
let mut levels = batch_level.calculate_array_levels(array, field, 1);
let mut levels = batch_level.calculate_array_levels(array, field);
write_leaves(&mut row_group_writer, array, &mut levels)?;
}

Expand Down Expand Up @@ -200,7 +200,7 @@ fn write_leaf(
column: &arrow_array::ArrayRef,
levels: LevelInfo,
) -> Result<i64> {
let indices = filter_array_indices(&levels);
let indices = levels.filter_array_indices();
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
Expand Down Expand Up @@ -429,27 +429,6 @@ fn get_fsb_array_slice(
values
}

/// Given a level's information, calculate the offsets required to index an array
/// correctly.
fn filter_array_indices(level: &LevelInfo) -> Vec<usize> {
let mut filtered = vec![];
// remove slots that are false from definition_mask
let mut index = 0;
level
.definition
.iter()
.zip(&level.definition_mask)
.for_each(|(def, (mask, _))| {
if *mask {
if *def == level.max_definition {
filtered.push(index);
}
index += 1;
}
});
filtered
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -557,7 +536,6 @@ mod tests {
}

#[test]
#[ignore = "ARROW-10766: list support is incomplete"]
fn arrow_writer_list() {
// define schema
let schema = Schema::new(vec![Field::new(
Expand All @@ -576,7 +554,7 @@ mod tests {

// Construct a list array from the above two
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"items",
"item",
DataType::Int32,
true,
))))
Expand Down Expand Up @@ -657,15 +635,15 @@ mod tests {
}

#[test]
#[ignore = "ARROW-10766: list support is incomplete"]
#[ignore = "See ARROW-11294, data is correct but list field name is incorrect"]
fn arrow_writer_complex() {
// define schema
let struct_field_d = Field::new("d", DataType::Float64, true);
let struct_field_f = Field::new("f", DataType::Float32, true);
let struct_field_g = Field::new(
"g",
DataType::List(Box::new(Field::new("items", DataType::Int16, false))),
false,
DataType::List(Box::new(Field::new("item", DataType::Int16, true))),
true,
);
let struct_field_e = Field::new(
"e",
Expand All @@ -678,7 +656,7 @@ mod tests {
Field::new(
"c",
DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]),
false,
true, // NB: this test fails if value is false. Why?
),
]);

Expand All @@ -691,7 +669,7 @@ mod tests {
let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// Construct a buffer for value offsets, for the nested array:
// [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
// [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
let g_value_offsets =
arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());

Expand All @@ -700,6 +678,7 @@ mod tests {
.len(5)
.add_buffer(g_value_offsets)
.add_child_data(g_value.data())
// .null_bit_buffer(Buffer::from(vec![0b00011011])) // TODO: add to test after resolving other issues
.build();
let g = ListArray::from(g_list_data);

Expand Down Expand Up @@ -786,6 +765,7 @@ mod tests {
}

#[test]
#[ignore = "The levels generated are correct, but because of field_a being non-nullable, we cannot write record"]
fn arrow_writer_2_level_struct_mixed_null() {
// tests writing <struct<struct<primitive>>
let field_c = Field::new("c", DataType::Int32, false);
Expand Down Expand Up @@ -817,7 +797,7 @@ mod tests {
roundtrip("test_arrow_writer_2_level_struct_mixed_null.parquet", batch);
}

const SMALL_SIZE: usize = 100;
const SMALL_SIZE: usize = 4;

fn roundtrip(filename: &str, expected_batch: RecordBatch) {
let file = get_temp_file(filename, &[]);
Expand Down Expand Up @@ -848,6 +828,7 @@ mod tests {
let actual_data = actual_batch.column(i).data();

assert_eq!(expected_data, actual_data);
// assert_eq!(expected_data, actual_data, "L: {:#?}\nR: {:#?}", expected_data, actual_data);
}
}

Expand Down Expand Up @@ -1161,32 +1142,30 @@ mod tests {
}

#[test]
#[ignore = "ARROW-10766: list support is incomplete"]
fn list_single_column() {
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let a_value_offsets =
arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"item",
DataType::Int32,
true,
true, // TODO: why does this fail when false? Is it related to logical nulls?
))))
.len(5)
.add_buffer(a_value_offsets)
.null_bit_buffer(Buffer::from(vec![0b00011011]))
.add_child_data(a_values.data())
.build();

// I think this setup is incorrect because this should pass
assert_eq!(a_list_data.null_count(), 1);

let a = ListArray::from(a_list_data);
let values = Arc::new(a);

one_column_roundtrip("list_single_column", values, false);
one_column_roundtrip("list_single_column", values, true);
}

#[test]
#[ignore = "ARROW-10766: list support is incomplete"]
fn large_list_single_column() {
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let a_value_offsets =
Expand All @@ -1199,6 +1178,7 @@ mod tests {
.len(5)
.add_buffer(a_value_offsets)
.add_child_data(a_values.data())
.null_bit_buffer(Buffer::from(vec![0b00011011]))
.build();

// I think this setup is incorrect because this should pass
Expand All @@ -1207,7 +1187,7 @@ mod tests {
let a = LargeListArray::from(a_list_data);
let values = Arc::new(a);

one_column_roundtrip("large_list_single_column", values, false);
one_column_roundtrip("large_list_single_column", values, true);
}

#[test]
Expand Down
Loading