Skip to content

Commit

Permalink
feat(ffi): add run end encoded arrays (#5632)
Browse files Browse the repository at this point in the history
* feat(ffi): add run end encoded arrays

* fix(ffi): add correct data type layout for run end encoded arrays

* test: add test_nullable_run_array
  • Loading branch information
notfilippo authored Apr 15, 2024
1 parent d84a1a6 commit 7d929f0
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 5 deletions.
19 changes: 14 additions & 5 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1544,14 +1544,14 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
DataType::Utf8 => DataTypeLayout::new_binary::<i32>(),
DataType::LargeUtf8 => DataTypeLayout::new_binary::<i64>(),
DataType::BinaryView | DataType::Utf8View => DataTypeLayout::new_view(),
DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data
DataType::FixedSizeList(_, _) => DataTypeLayout::new_nullable_empty(), // all in child data
DataType::List(_) => DataTypeLayout::new_fixed_width::<i32>(),
DataType::ListView(_) | DataType::LargeListView(_) => {
unimplemented!("ListView/LargeListView not implemented")
}
DataType::LargeList(_) => DataTypeLayout::new_fixed_width::<i64>(),
DataType::Map(_, _) => DataTypeLayout::new_fixed_width::<i32>(),
DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child data,
DataType::Struct(_) => DataTypeLayout::new_nullable_empty(), // all in child data,
DataType::RunEndEncoded(_, _) => DataTypeLayout::new_empty(), // all in child data,
DataType::Union(_, mode) => {
let type_ids = BufferSpec::FixedWidth {
Expand Down Expand Up @@ -1612,16 +1612,25 @@ impl DataTypeLayout {
}

/// Describes arrays which have no data of their own
/// (e.g. FixedSizeList). Note such arrays may still have a Null
/// Bitmap
pub fn new_empty() -> Self {
/// but may still have a Null Bitmap (e.g. FixedSizeList)
pub fn new_nullable_empty() -> Self {
Self {
buffers: vec![],
can_contain_null_mask: true,
variadic: false,
}
}

/// Describes arrays which have no data of their own
/// (e.g. RunEndEncoded).
pub fn new_empty() -> Self {
Self {
buffers: vec![],
can_contain_null_mask: false,
variadic: false,
}
}

/// Describes a basic numeric array where each element has a fixed
/// with offset buffer of type `T`, followed by a
/// variable width data buffer
Expand Down
17 changes: 17 additions & 0 deletions arrow-schema/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ impl TryFrom<&FFI_ArrowSchema> for DataType {
let map_keys_sorted = c_schema.map_keys_sorted();
DataType::Map(Arc::new(Field::try_from(c_child)?), map_keys_sorted)
}
"+r" => {
let c_run_ends = c_schema.child(0);
let c_values = c_schema.child(1);
DataType::RunEndEncoded(
Arc::new(Field::try_from(c_run_ends)?),
Arc::new(Field::try_from(c_values)?),
)
}
// Parametrized types, requiring string parse
other => {
match other.splitn(2, ':').collect::<Vec<&str>>().as_slice() {
Expand Down Expand Up @@ -616,6 +624,10 @@ impl TryFrom<&DataType> for FFI_ArrowSchema {
.iter()
.map(FFI_ArrowSchema::try_from)
.collect::<Result<Vec<_>, ArrowError>>()?,
DataType::RunEndEncoded(run_ends, values) => vec![
FFI_ArrowSchema::try_from(run_ends.as_ref())?,
FFI_ArrowSchema::try_from(values.as_ref())?,
],
_ => vec![],
};
let dictionary = if let DataType::Dictionary(_, value_data_type) = dtype {
Expand Down Expand Up @@ -681,6 +693,7 @@ fn get_format_string(dtype: &DataType) -> Result<String, ArrowError> {
DataType::LargeList(_) => Ok("+L".to_string()),
DataType::Struct(_) => Ok("+s".to_string()),
DataType::Map(_, _) => Ok("+m".to_string()),
DataType::RunEndEncoded(_, _) => Ok("+r".to_string()),
DataType::Dictionary(key_data_type, _) => get_format_string(key_data_type),
DataType::Union(fields, mode) => {
let formats = fields
Expand Down Expand Up @@ -807,6 +820,10 @@ mod tests {
DataType::Utf8,
true,
)])));
round_trip_type(DataType::RunEndEncoded(
Arc::new(Field::new("run_ends", DataType::Int32, false)),
Arc::new(Field::new("values", DataType::Binary, true)),
));
}

#[test]
Expand Down
71 changes: 71 additions & 0 deletions arrow/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ impl<'a> ImportedArrowArray<'a> {
.map(|(i, (_, field))| self.consume_child(i, field.data_type()))
.collect::<Result<Vec<_>>>()
}
DataType::RunEndEncoded(run_ends_field, values_field) => Ok([
self.consume_child(0, run_ends_field.data_type())?,
self.consume_child(1, values_field.data_type())?,
]
.to_vec()),
_ => Ok(Vec::new()),
}
}
Expand Down Expand Up @@ -468,6 +473,7 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::{Float64Type, Int32Type};
use arrow_array::*;
use arrow_buffer::NullBuffer;

use crate::compute::kernels;
use crate::datatypes::{Field, Int8Type};
Expand Down Expand Up @@ -1176,4 +1182,69 @@ mod tests {

Ok(())
}

#[test]
fn test_run_array() -> Result<()> {
let value_data =
PrimitiveArray::<Int8Type>::from_iter_values([10_i8, 11, 12, 13, 14, 15, 16, 17]);

// Construct a run_ends array:
let run_ends_values = [4_i32, 6, 7, 9, 13, 18, 20, 22];
let run_ends_data =
PrimitiveArray::<Int32Type>::from_iter_values(run_ends_values.iter().copied());

// Construct a run ends encoded array from the above two
let ree_array = RunArray::<Int32Type>::try_new(&run_ends_data, &value_data).unwrap();

// export it
let (array, schema) = to_ffi(&ree_array.to_data())?;

// (simulate consumer) import it
let data = unsafe { from_ffi(array, &schema) }?;
let array = make_array(data);

// perform some operation
let array = array
.as_any()
.downcast_ref::<RunArray<Int32Type>>()
.unwrap();
assert_eq!(array.data_type(), ree_array.data_type());
assert_eq!(array.run_ends().values(), ree_array.run_ends().values());
assert_eq!(array.values(), ree_array.values());

Ok(())
}

#[test]
fn test_nullable_run_array() -> Result<()> {
let nulls = NullBuffer::from(vec![true, false, true, true, false]);
let value_data =
PrimitiveArray::<Int8Type>::new(vec![1_i8, 2, 3, 4, 5].into(), Some(nulls));

// Construct a run_ends array:
let run_ends_values = [5_i32, 6, 7, 8, 10];
let run_ends_data =
PrimitiveArray::<Int32Type>::from_iter_values(run_ends_values.iter().copied());

// Construct a run ends encoded array from the above two
let ree_array = RunArray::<Int32Type>::try_new(&run_ends_data, &value_data).unwrap();

// export it
let (array, schema) = to_ffi(&ree_array.to_data())?;

// (simulate consumer) import it
let data = unsafe { from_ffi(array, &schema) }?;
let array = make_array(data);

// perform some operation
let array = array
.as_any()
.downcast_ref::<RunArray<Int32Type>>()
.unwrap();
assert_eq!(array.data_type(), ree_array.data_type());
assert_eq!(array.run_ends().values(), ree_array.run_ends().values());
assert_eq!(array.values(), ree_array.values());

Ok(())
}
}

0 comments on commit 7d929f0

Please sign in to comment.