From 6d38130ac65fcb7d97042193400a45c2b0ee99cb Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 26 Jun 2025 14:10:35 +0200 Subject: [PATCH 1/4] Revert "Backport: Support `FixedSizeList` RowConverter (#12)" This reverts commit e733b03ae96aa275bf98318738e5544a9be4b21b. --- .../src/array/fixed_size_list_array.rs | 4 +- arrow-row/src/lib.rs | 137 +----------------- arrow-row/src/list.rs | 130 +---------------- 3 files changed, 10 insertions(+), 261 deletions(-) diff --git a/arrow-array/src/array/fixed_size_list_array.rs b/arrow-array/src/array/fixed_size_list_array.rs index af814cc61414..44be442c9f85 100644 --- a/arrow-array/src/array/fixed_size_list_array.rs +++ b/arrow-array/src/array/fixed_size_list_array.rs @@ -343,8 +343,8 @@ impl From for FixedSizeListArray { fn from(data: ArrayData) -> Self { let value_length = match data.data_type() { DataType::FixedSizeList(_, len) => *len, - data_type => { - panic!("FixedSizeListArray data should contain a FixedSizeList data type, got {data_type:?}") + _ => { + panic!("FixedSizeListArray data should contain a FixedSizeList data type") } }; diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index adef5397684e..d0fad12210db 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -139,7 +139,6 @@ use arrow_schema::*; use variable::{decode_binary_view, decode_string_view}; use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive}; -use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list}; use crate::variable::{decode_binary, decode_string}; mod fixed; @@ -410,11 +409,6 @@ impl Codec { let converter = RowConverter::new(vec![field])?; Ok(Self::List(converter)) } - DataType::FixedSizeList(f, _) => { - let field = SortField::new_with_options(f.data_type().clone(), sort_field.options); - let converter = RowConverter::new(vec![field])?; - Ok(Self::List(converter)) - } DataType::Struct(f) => { let sort_fields = f .iter() @@ -456,7 +450,6 @@ impl Codec { let values = match array.data_type() { DataType::List(_) => as_list_array(array).values(), DataType::LargeList(_) => as_large_list_array(array).values(), - DataType::FixedSizeList(_, _) => as_fixed_size_list_array(array).values(), _ => unreachable!(), }; let rows = converter.convert_columns(&[values.clone()])?; @@ -543,10 +536,9 @@ impl RowConverter { fn supports_datatype(d: &DataType) -> bool { match d { _ if !d.is_nested() => true, - DataType::List(f) - | DataType::LargeList(f) - | DataType::FixedSizeList(f, _) - | DataType::Map(f, _) => Self::supports_datatype(f.data_type()), + DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => { + Self::supports_datatype(f.data_type()) + } DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())), _ => false, } @@ -1252,11 +1244,6 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { DataType::LargeList(_) => { list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) } - DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list( - &mut lengths, - rows, - as_fixed_size_list_array(array), - ), _ => unreachable!(), }, } @@ -1353,9 +1340,6 @@ fn encode_column( DataType::LargeList(_) => { list::encode(data, offsets, rows, opts, as_large_list_array(column)) } - DataType::FixedSizeList(_, _) => { - encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column)) - } _ => unreachable!(), }, } @@ -1441,13 +1425,6 @@ unsafe fn decode_column( DataType::LargeList(_) => { Arc::new(list::decode::(converter, rows, field, validate_utf8)?) } - DataType::FixedSizeList(_, value_length) => Arc::new(list::decode_fixed_size_list( - converter, - rows, - field, - validate_utf8, - value_length.as_usize(), - )?), _ => unreachable!(), }, }; @@ -2213,114 +2190,6 @@ mod tests { test_nested_list::(); } - #[test] - fn test_fixed_size_list() { - let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3); - builder.values().append_value(32); - builder.values().append_value(52); - builder.values().append_value(32); - builder.append(true); - builder.values().append_value(32); - builder.values().append_value(52); - builder.values().append_value(12); - builder.append(true); - builder.values().append_value(32); - builder.values().append_value(52); - builder.values().append_null(); - builder.append(true); - builder.values().append_value(32); // MASKED - builder.values().append_value(52); // MASKED - builder.values().append_value(13); // MASKED - builder.append(false); - builder.values().append_value(32); - builder.values().append_null(); - builder.values().append_null(); - builder.append(true); - builder.values().append_null(); - builder.values().append_null(); - builder.values().append_null(); - builder.append(true); - builder.values().append_value(17); // MASKED - builder.values().append_null(); // MASKED - builder.values().append_value(77); // MASKED - builder.append(false); - - let list = Arc::new(builder.finish()) as ArrayRef; - let d = list.data_type().clone(); - - // Default sorting (ascending, nulls first) - let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); - - let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); - assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] - assert!(rows.row(2) < rows.row(1)); // [32, 52, null] < [32, 52, 12] - assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null] - assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null] - assert!(rows.row(5) < rows.row(2)); // [null, null, null] < [32, 52, null] - assert!(rows.row(3) < rows.row(5)); // null < [null, null, null] - assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) - - let back = converter.convert_rows(&rows).unwrap(); - assert_eq!(back.len(), 1); - back[0].to_data().validate_full().unwrap(); - assert_eq!(&back[0], &list); - - // Ascending, null last - let options = SortOptions::default().asc().with_nulls_first(false); - let field = SortField::new_with_options(d.clone(), options); - let converter = RowConverter::new(vec![field]).unwrap(); - let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); - assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] - assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12] - assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null] - assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null] - assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null] - assert!(rows.row(3) > rows.row(5)); // null > [null, null, null] - assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) - - let back = converter.convert_rows(&rows).unwrap(); - assert_eq!(back.len(), 1); - back[0].to_data().validate_full().unwrap(); - assert_eq!(&back[0], &list); - - // Descending, nulls last - let options = SortOptions::default().desc().with_nulls_first(false); - let field = SortField::new_with_options(d.clone(), options); - let converter = RowConverter::new(vec![field]).unwrap(); - let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); - assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] - assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12] - assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null] - assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null] - assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null] - assert!(rows.row(3) > rows.row(5)); // null > [null, null, null] - assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) - - let back = converter.convert_rows(&rows).unwrap(); - assert_eq!(back.len(), 1); - back[0].to_data().validate_full().unwrap(); - assert_eq!(&back[0], &list); - - // Descending, nulls first - let options = SortOptions::default().desc().with_nulls_first(true); - let field = SortField::new_with_options(d, options); - let converter = RowConverter::new(vec![field]).unwrap(); - let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); - - assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] - assert!(rows.row(2) < rows.row(1)); // [32, 52, null] > [32, 52, 12] - assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null] - assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null] - assert!(rows.row(5) < rows.row(2)); // [null, null, null] > [32, 52, null] - assert!(rows.row(3) < rows.row(5)); // null < [null, null, null] - assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) - - let back = converter.convert_rows(&rows).unwrap(); - assert_eq!(back.len(), 1); - back[0].to_data().validate_full().unwrap(); - assert_eq!(&back[0], &list); - } - fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray where K: ArrowPrimitiveType, diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs index be7c02f3c86d..46cd0f3d3d81 100644 --- a/arrow-row/src/list.rs +++ b/arrow-row/src/list.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::{fixed, null_sentinel, RowConverter, Rows, SortField}; -use arrow_array::{new_null_array, Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait}; -use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; +use crate::{null_sentinel, RowConverter, Rows, SortField}; +use arrow_array::{Array, GenericListArray, OffsetSizeTrait}; +use arrow_buffer::{Buffer, MutableBuffer}; use arrow_data::ArrayDataBuilder; -use arrow_schema::{ArrowError, DataType, SortOptions}; +use arrow_schema::{ArrowError, SortOptions}; use std::ops::Range; pub fn compute_lengths( @@ -97,7 +97,7 @@ fn encode_one( } } -/// Decodes an array from `rows` with the provided `options` +/// Decodes a string array from `rows` with the provided `options` /// /// # Safety /// @@ -184,123 +184,3 @@ pub unsafe fn decode( Ok(GenericListArray::from(unsafe { builder.build_unchecked() })) } - -pub fn compute_lengths_fixed_size_list( - lengths: &mut [usize], - rows: &Rows, - array: &FixedSizeListArray, -) { - let value_length = array.value_length().as_usize(); - lengths.iter_mut().enumerate().for_each(|(idx, length)| { - *length = match array.is_valid(idx) { - true => { - 1 + ((idx * value_length)..(idx + 1) * value_length) - .map(|child_idx| rows.row(child_idx).as_ref().len()) - .sum::() - } - false => 1, - }; - }) -} - -/// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions` -/// -/// `rows` should contain the encoded child elements -pub fn encode_fixed_size_list( - data: &mut [u8], - offsets: &mut [usize], - rows: &Rows, - opts: SortOptions, - array: &FixedSizeListArray, -) { - let null_sentinel = null_sentinel(opts); - offsets - .iter_mut() - .skip(1) - .enumerate() - .for_each(|(idx, offset)| { - let value_length = array.value_length().as_usize(); - match array.is_valid(idx) { - true => { - data[*offset] = 0x01; - *offset += 1; - for child_idx in (idx * value_length)..(idx + 1) * value_length { - //dbg!(child_idx); - let row = rows.row(child_idx); - let end_offset = *offset + row.as_ref().len(); - data[*offset..end_offset].copy_from_slice(row.as_ref()); - *offset = end_offset; - } - } - false => { - let null_sentinels = 1; - //+ value_length; // 1 for self + for values too - for i in 0..null_sentinels { - data[*offset + i] = null_sentinel; - } - *offset += null_sentinels; - } - }; - }) -} - -/// Decodes a fixed size list array from `rows` with the provided `options` -/// -/// # Safety -/// -/// `rows` must contain valid data for the provided `converter` -pub unsafe fn decode_fixed_size_list( - converter: &RowConverter, - rows: &mut [&[u8]], - field: &SortField, - validate_utf8: bool, - value_length: usize, -) -> Result { - let list_type = &field.data_type; - let element_type = match list_type { - DataType::FixedSizeList(element_field, _) => element_field.data_type(), - _ => { - return Err(ArrowError::InvalidArgumentError(format!( - "Expected FixedSizeListArray, found: {:?}", - list_type - ))) - } - }; - - let len = rows.len(); - let (null_count, nulls) = fixed::decode_nulls(rows); - - let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?; - let null_element_encoded = null_element_encoded.row(0); - let null_element_slice = null_element_encoded.as_ref(); - - let mut child_rows = Vec::new(); - for row in rows { - let valid = row[0] == 1; - let mut row_offset = 1; - if !valid { - for _ in 0..value_length { - child_rows.push(null_element_slice); - } - } else { - for _ in 0..value_length { - let mut temp_child_rows = vec![&row[row_offset..]]; - converter.convert_raw(&mut temp_child_rows, validate_utf8)?; - let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len(); - let next_offset = row_offset + decoded_bytes; - child_rows.push(&row[row_offset..next_offset]); - row_offset = next_offset; - } - } - } - - let children = converter.convert_raw(&mut child_rows, validate_utf8)?; - let child_data = children.iter().map(|c| c.to_data()).collect(); - let builder = ArrayDataBuilder::new(list_type.clone()) - .len(len) - .null_count(null_count) - .null_bit_buffer(Some(nulls)) - .child_data(child_data); - - Ok(FixedSizeListArray::from(builder.build_unchecked())) -} From af9cb3c614df39bb75c5f027710b558f5ef80f35 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 25 Jun 2025 22:33:17 +0200 Subject: [PATCH 2/4] Support `FixedSizeList` RowConverter (#7705) # Which issue does this PR close? none # Rationale for this change This is necessary to support DISTINCT and GROUP BY over fixed-sized arrays in DataFusion. # What changes are included in this PR? Add `DataType::FixedSizeList` support to `RowConverter`. # Are there any user-facing changes? No (cherry picked from commit d7fc41651502aad412903b35c6d08322ee210323) --- .../src/array/fixed_size_list_array.rs | 4 +- arrow-row/src/lib.rs | 216 ++++++++++++++++-- arrow-row/src/list.rs | 130 ++++++++++- 3 files changed, 324 insertions(+), 26 deletions(-) diff --git a/arrow-array/src/array/fixed_size_list_array.rs b/arrow-array/src/array/fixed_size_list_array.rs index 44be442c9f85..af814cc61414 100644 --- a/arrow-array/src/array/fixed_size_list_array.rs +++ b/arrow-array/src/array/fixed_size_list_array.rs @@ -343,8 +343,8 @@ impl From for FixedSizeListArray { fn from(data: ArrayData) -> Self { let value_length = match data.data_type() { DataType::FixedSizeList(_, len) => *len, - _ => { - panic!("FixedSizeListArray data should contain a FixedSizeList data type") + data_type => { + panic!("FixedSizeListArray data should contain a FixedSizeList data type, got {data_type:?}") } }; diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index d0fad12210db..59a5cd02e471 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -139,6 +139,7 @@ use arrow_schema::*; use variable::{decode_binary_view, decode_string_view}; use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive}; +use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list}; use crate::variable::{decode_binary, decode_string}; mod fixed; @@ -335,6 +336,46 @@ mod variable; /// /// With `[]` represented by an empty byte array, and `null` a null byte array. /// +/// ## Fixed Size List Encoding +/// +/// Fixed Size Lists are encoded by first encoding all child elements to the row format. +/// +/// A non-null list value is then encoded as 0x01 followed by the concatenation of each +/// of the child elements. A null list value is encoded as a null marker. +/// +/// For example given: +/// +/// ```text +/// [1_u8, 2_u8] +/// [3_u8, null] +/// null +/// ``` +/// +/// The elements would be converted to: +/// +/// ```text +/// ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ +/// 1 │01│01│ 2 │01│02│ 3 │01│03│ null │00│00│ +/// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ +///``` +/// +/// Which would be encoded as +/// +/// ```text +/// ┌──┬──┬──┬──┬──┐ +/// [1_u8, 2_u8] │01│01│01│01│02│ +/// └──┴──┴──┴──┴──┘ +/// └ 1 ┘ └ 2 ┘ +/// ┌──┬──┬──┬──┬──┐ +/// [3_u8, null] │01│01│03│00│00│ +/// └──┴──┴──┴──┴──┘ +/// └ 1 ┘ └null┘ +/// ┌──┐ +/// null │00│ +/// └──┘ +/// +///``` +/// /// # Ordering /// /// ## Float Ordering @@ -409,6 +450,11 @@ impl Codec { let converter = RowConverter::new(vec![field])?; Ok(Self::List(converter)) } + DataType::FixedSizeList(f, _) => { + let field = SortField::new_with_options(f.data_type().clone(), sort_field.options); + let converter = RowConverter::new(vec![field])?; + Ok(Self::List(converter)) + } DataType::Struct(f) => { let sort_fields = f .iter() @@ -450,6 +496,7 @@ impl Codec { let values = match array.data_type() { DataType::List(_) => as_list_array(array).values(), DataType::LargeList(_) => as_large_list_array(array).values(), + DataType::FixedSizeList(_, _) => as_fixed_size_list_array(array).values(), _ => unreachable!(), }; let rows = converter.convert_columns(&[values.clone()])?; @@ -536,9 +583,10 @@ impl RowConverter { fn supports_datatype(d: &DataType) -> bool { match d { _ if !d.is_nested() => true, - DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => { - Self::supports_datatype(f.data_type()) - } + DataType::List(f) + | DataType::LargeList(f) + | DataType::FixedSizeList(f, _) + | DataType::Map(f, _) => Self::supports_datatype(f.data_type()), DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())), _ => false, } @@ -1244,6 +1292,11 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { DataType::LargeList(_) => { list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) } + DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list( + &mut tracker, + rows, + as_fixed_size_list_array(array), + ), _ => unreachable!(), }, } @@ -1340,6 +1393,9 @@ fn encode_column( DataType::LargeList(_) => { list::encode(data, offsets, rows, opts, as_large_list_array(column)) } + DataType::FixedSizeList(_, _) => { + encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column)) + } _ => unreachable!(), }, } @@ -1425,6 +1481,13 @@ unsafe fn decode_column( DataType::LargeList(_) => { Arc::new(list::decode::(converter, rows, field, validate_utf8)?) } + DataType::FixedSizeList(_, value_length) => Arc::new(list::decode_fixed_size_list( + converter, + rows, + field, + validate_utf8, + value_length.as_usize(), + )?), _ => unreachable!(), }, }; @@ -2016,6 +2079,9 @@ mod tests { builder.values().append_null(); builder.append(true); builder.append(true); + builder.values().append_value(17); // MASKED + builder.values().append_null(); // MASKED + builder.append(false); let list = Arc::new(builder.finish()) as ArrayRef; let d = list.data_type().clone(); @@ -2024,11 +2090,12 @@ mod tests { let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] - assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12] - assert!(rows.row(3) < rows.row(2)); // null < [32, 42] - assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42] - assert!(rows.row(5) < rows.row(2)); // [] < [32, 42] + assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12] + assert!(rows.row(3) < rows.row(2)); // null < [32, 52] + assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52] + assert!(rows.row(5) < rows.row(2)); // [] < [32, 52] assert!(rows.row(3) < rows.row(5)); // null < [] + assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); @@ -2041,11 +2108,12 @@ mod tests { let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] - assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12] - assert!(rows.row(3) > rows.row(2)); // null > [32, 42] - assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42] - assert!(rows.row(5) < rows.row(2)); // [] < [32, 42] + assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12] + assert!(rows.row(3) > rows.row(2)); // null > [32, 52] + assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52] + assert!(rows.row(5) < rows.row(2)); // [] < [32, 52] assert!(rows.row(3) > rows.row(5)); // null > [] + assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); @@ -2058,11 +2126,12 @@ mod tests { let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] - assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12] - assert!(rows.row(3) > rows.row(2)); // null > [32, 42] - assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42] - assert!(rows.row(5) > rows.row(2)); // [] > [32, 42] + assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12] + assert!(rows.row(3) > rows.row(2)); // null > [32, 52] + assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52] + assert!(rows.row(5) > rows.row(2)); // [] > [32, 52] assert!(rows.row(3) > rows.row(5)); // null > [] + assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); @@ -2075,11 +2144,12 @@ mod tests { let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] - assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12] - assert!(rows.row(3) < rows.row(2)); // null < [32, 42] - assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42] - assert!(rows.row(5) > rows.row(2)); // [] > [32, 42] + assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12] + assert!(rows.row(3) < rows.row(2)); // null < [32, 52] + assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52] + assert!(rows.row(5) > rows.row(2)); // [] > [32, 52] assert!(rows.row(3) < rows.row(5)); // null < [] + assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) let back = converter.convert_rows(&rows).unwrap(); assert_eq!(back.len(), 1); @@ -2190,6 +2260,114 @@ mod tests { test_nested_list::(); } + #[test] + fn test_fixed_size_list() { + let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3); + builder.values().append_value(32); + builder.values().append_value(52); + builder.values().append_value(32); + builder.append(true); + builder.values().append_value(32); + builder.values().append_value(52); + builder.values().append_value(12); + builder.append(true); + builder.values().append_value(32); + builder.values().append_value(52); + builder.values().append_null(); + builder.append(true); + builder.values().append_value(32); // MASKED + builder.values().append_value(52); // MASKED + builder.values().append_value(13); // MASKED + builder.append(false); + builder.values().append_value(32); + builder.values().append_null(); + builder.values().append_null(); + builder.append(true); + builder.values().append_null(); + builder.values().append_null(); + builder.values().append_null(); + builder.append(true); + builder.values().append_value(17); // MASKED + builder.values().append_null(); // MASKED + builder.values().append_value(77); // MASKED + builder.append(false); + + let list = Arc::new(builder.finish()) as ArrayRef; + let d = list.data_type().clone(); + + // Default sorting (ascending, nulls first) + let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); + + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] + assert!(rows.row(2) < rows.row(1)); // [32, 52, null] < [32, 52, 12] + assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null] + assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null] + assert!(rows.row(5) < rows.row(2)); // [null, null, null] < [32, 52, null] + assert!(rows.row(3) < rows.row(5)); // null < [null, null, null] + assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].to_data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + + // Ascending, null last + let options = SortOptions::default().asc().with_nulls_first(false); + let field = SortField::new_with_options(d.clone(), options); + let converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] + assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12] + assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null] + assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null] + assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null] + assert!(rows.row(3) > rows.row(5)); // null > [null, null, null] + assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].to_data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + + // Descending, nulls last + let options = SortOptions::default().desc().with_nulls_first(false); + let field = SortField::new_with_options(d.clone(), options); + let converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] + assert!(rows.row(2) > rows.row(1)); // [32, 52, null] > [32, 52, 12] + assert!(rows.row(3) > rows.row(2)); // null > [32, 52, null] + assert!(rows.row(4) > rows.row(2)); // [32, null, null] > [32, 52, null] + assert!(rows.row(5) > rows.row(2)); // [null, null, null] > [32, 52, null] + assert!(rows.row(3) > rows.row(5)); // null > [null, null, null] + assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].to_data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + + // Descending, nulls first + let options = SortOptions::default().desc().with_nulls_first(true); + let field = SortField::new_with_options(d, options); + let converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + + assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] + assert!(rows.row(2) < rows.row(1)); // [32, 52, null] > [32, 52, 12] + assert!(rows.row(3) < rows.row(2)); // null < [32, 52, null] + assert!(rows.row(4) < rows.row(2)); // [32, null, null] < [32, 52, null] + assert!(rows.row(5) < rows.row(2)); // [null, null, null] > [32, 52, null] + assert!(rows.row(3) < rows.row(5)); // null < [null, null, null] + assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked values) + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].to_data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + } + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray where K: ArrowPrimitiveType, diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs index 46cd0f3d3d81..627214dc9c46 100644 --- a/arrow-row/src/list.rs +++ b/arrow-row/src/list.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::{null_sentinel, RowConverter, Rows, SortField}; -use arrow_array::{Array, GenericListArray, OffsetSizeTrait}; -use arrow_buffer::{Buffer, MutableBuffer}; +use crate::{fixed, null_sentinel, LengthTracker, RowConverter, Rows, SortField}; +use arrow_array::{new_null_array, Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait}; +use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; use arrow_data::ArrayDataBuilder; -use arrow_schema::{ArrowError, SortOptions}; +use arrow_schema::{ArrowError, DataType, SortOptions}; use std::ops::Range; pub fn compute_lengths( @@ -97,7 +97,7 @@ fn encode_one( } } -/// Decodes a string array from `rows` with the provided `options` +/// Decodes an array from `rows` with the provided `options` /// /// # Safety /// @@ -184,3 +184,123 @@ pub unsafe fn decode( Ok(GenericListArray::from(unsafe { builder.build_unchecked() })) } + +pub fn compute_lengths_fixed_size_list( + tracker: &mut LengthTracker, + rows: &Rows, + array: &FixedSizeListArray, +) { + let value_length = array.value_length().as_usize(); + tracker.push_variable((0..array.len()).map(|idx| { + match array.is_valid(idx) { + true => { + 1 + ((idx * value_length)..(idx + 1) * value_length) + .map(|child_idx| rows.row(child_idx).as_ref().len()) + .sum::() + } + false => 1, + } + })) +} + +/// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions` +/// +/// `rows` should contain the encoded child elements +pub fn encode_fixed_size_list( + data: &mut [u8], + offsets: &mut [usize], + rows: &Rows, + opts: SortOptions, + array: &FixedSizeListArray, +) { + let null_sentinel = null_sentinel(opts); + offsets + .iter_mut() + .skip(1) + .enumerate() + .for_each(|(idx, offset)| { + let value_length = array.value_length().as_usize(); + match array.is_valid(idx) { + true => { + data[*offset] = 0x01; + *offset += 1; + for child_idx in (idx * value_length)..(idx + 1) * value_length { + //dbg!(child_idx); + let row = rows.row(child_idx); + let end_offset = *offset + row.as_ref().len(); + data[*offset..end_offset].copy_from_slice(row.as_ref()); + *offset = end_offset; + } + } + false => { + let null_sentinels = 1; + //+ value_length; // 1 for self + for values too + for i in 0..null_sentinels { + data[*offset + i] = null_sentinel; + } + *offset += null_sentinels; + } + }; + }) +} + +/// Decodes a fixed size list array from `rows` with the provided `options` +/// +/// # Safety +/// +/// `rows` must contain valid data for the provided `converter` +pub unsafe fn decode_fixed_size_list( + converter: &RowConverter, + rows: &mut [&[u8]], + field: &SortField, + validate_utf8: bool, + value_length: usize, +) -> Result { + let list_type = &field.data_type; + let element_type = match list_type { + DataType::FixedSizeList(element_field, _) => element_field.data_type(), + _ => { + return Err(ArrowError::InvalidArgumentError(format!( + "Expected FixedSizeListArray, found: {:?}", + list_type + ))) + } + }; + + let len = rows.len(); + let (null_count, nulls) = fixed::decode_nulls(rows); + + let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?; + let null_element_encoded = null_element_encoded.row(0); + let null_element_slice = null_element_encoded.as_ref(); + + let mut child_rows = Vec::new(); + for row in rows { + let valid = row[0] == 1; + let mut row_offset = 1; + if !valid { + for _ in 0..value_length { + child_rows.push(null_element_slice); + } + } else { + for _ in 0..value_length { + let mut temp_child_rows = vec![&row[row_offset..]]; + converter.convert_raw(&mut temp_child_rows, validate_utf8)?; + let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len(); + let next_offset = row_offset + decoded_bytes; + child_rows.push(&row[row_offset..next_offset]); + row_offset = next_offset; + } + } + } + + let children = converter.convert_raw(&mut child_rows, validate_utf8)?; + let child_data = children.iter().map(|c| c.to_data()).collect(); + let builder = ArrayDataBuilder::new(list_type.clone()) + .len(len) + .null_count(null_count) + .null_bit_buffer(Some(nulls)) + .child_data(child_data); + + Ok(FixedSizeListArray::from(builder.build_unchecked())) +} From bf0f6bd31e97d85bbad430b38e8be4d7c5d1ea6d Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 23 Jun 2025 15:20:50 +0200 Subject: [PATCH 3/4] adapt to 54.2.1 --- arrow-row/src/lib.rs | 2 +- arrow-row/src/list.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 59a5cd02e471..c8251bf65560 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -1293,7 +1293,7 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) } DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list( - &mut tracker, + &mut lengths, rows, as_fixed_size_list_array(array), ), diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs index 627214dc9c46..c5ea92d565ac 100644 --- a/arrow-row/src/list.rs +++ b/arrow-row/src/list.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::{fixed, null_sentinel, LengthTracker, RowConverter, Rows, SortField}; +use crate::{fixed, null_sentinel, RowConverter, Rows, SortField}; use arrow_array::{new_null_array, Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait}; use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; use arrow_data::ArrayDataBuilder; @@ -186,21 +186,21 @@ pub unsafe fn decode( } pub fn compute_lengths_fixed_size_list( - tracker: &mut LengthTracker, + lengths: &mut [usize], rows: &Rows, array: &FixedSizeListArray, ) { let value_length = array.value_length().as_usize(); - tracker.push_variable((0..array.len()).map(|idx| { - match array.is_valid(idx) { + lengths.iter_mut().enumerate().for_each(|(idx, length)| { + *length += match array.is_valid(idx) { true => { 1 + ((idx * value_length)..(idx + 1) * value_length) .map(|child_idx| rows.row(child_idx).as_ref().len()) .sum::() } false => 1, - } - })) + }; + }) } /// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions` From be9ef1cc005cf40ab863f136606038fa68a3b4da Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 26 Jun 2025 13:41:34 +0200 Subject: [PATCH 4/4] Backport PR snapshot: Fix RowConverter when FixedSizeList is not the last Fix `RowConverter` row decoding when there is a `FixedSizeList` element and it's not the last. --- arrow-row/src/lib.rs | 71 ++++++++++++++++++++++++++++++++++++++++++- arrow-row/src/list.rs | 1 + 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index c8251bf65560..b1178f452548 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -742,7 +742,18 @@ impl RowConverter { // SAFETY // We have validated that the rows came from this [`RowConverter`] // and therefore must be valid - unsafe { self.convert_raw(&mut rows, validate_utf8) } + let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?; + + for (i, row) in rows.iter().enumerate() { + if !row.is_empty() { + return Err(ArrowError::InvalidArgumentError(format!( + "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}", + codecs = &self.codecs + ))); + } + } + + Ok(result) } /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with @@ -2368,6 +2379,64 @@ mod tests { assert_eq!(&back[0], &list); } + #[test] + fn test_two_fixed_size_lists() { + let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1); + // 0: [100] + first.values().append_value(100); + first.append(true); + // 1: [101] + first.values().append_value(101); + first.append(true); + // 2: [102] + first.values().append_value(102); + first.append(true); + // 3: [null] + first.values().append_null(); + first.append(true); + // 4: null + first.values().append_null(); // MASKED + first.append(false); + let first = Arc::new(first.finish()) as ArrayRef; + let first_type = first.data_type().clone(); + + let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1); + // 0: [200] + second.values().append_value(200); + second.append(true); + // 1: [201] + second.values().append_value(201); + second.append(true); + // 2: [202] + second.values().append_value(202); + second.append(true); + // 3: [null] + second.values().append_null(); + second.append(true); + // 4: null + second.values().append_null(); // MASKED + second.append(false); + let second = Arc::new(second.finish()) as ArrayRef; + let second_type = second.data_type().clone(); + + let converter = RowConverter::new(vec![ + SortField::new(first_type.clone()), + SortField::new(second_type.clone()), + ]) + .unwrap(); + + let rows = converter + .convert_columns(&[Arc::clone(&first), Arc::clone(&second)]) + .unwrap(); + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 2); + back[0].to_data().validate_full().unwrap(); + assert_eq!(&back[0], &first); + back[1].to_data().validate_full().unwrap(); + assert_eq!(&back[1], &second); + } + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray where K: ArrowPrimitiveType, diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs index c5ea92d565ac..4884165d8368 100644 --- a/arrow-row/src/list.rs +++ b/arrow-row/src/list.rs @@ -292,6 +292,7 @@ pub unsafe fn decode_fixed_size_list( row_offset = next_offset; } } + *row = &row[row_offset..]; // Update row for the next decoder } let children = converter.convert_raw(&mut child_rows, validate_utf8)?;