From 80f640697ccdd21c8ec8bd63f6d7b093f41f53da Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 23 Mar 2021 07:57:17 +0100 Subject: [PATCH 01/12] Add Child Data to Arrow's C FFI implementation. This commit adds the children and implements it for List and LargeList datatypes. --- rust/arrow/src/array/ffi.rs | 31 ++++-- rust/arrow/src/datatypes/field.rs | 6 ++ rust/arrow/src/ffi.rs | 167 ++++++++++++++++++++++++++---- 3 files changed, 179 insertions(+), 25 deletions(-) diff --git a/rust/arrow/src/array/ffi.rs b/rust/arrow/src/array/ffi.rs index a9b5d29ed79..89825f5d53e 100644 --- a/rust/arrow/src/array/ffi.rs +++ b/rust/arrow/src/array/ffi.rs @@ -25,17 +25,35 @@ use crate::{ }; use super::ArrayData; +use crate::datatypes::{DataType, Field}; +use crate::ffi::ArrowArray; impl TryFrom for ArrayData { type Error = ArrowError; fn try_from(value: ffi::ArrowArray) -> Result { let data_type = value.data_type()?; + let len = value.len(); let offset = value.offset(); let null_count = value.null_count(); let buffers = value.buffers()?; let null_bit_buffer = value.null_bit_buffer(); + let child_data = value.children()?; + + let data_type = match data_type { + DataType::List(_) => DataType::List(Box::new(Field::new( + "", + child_data[0].data_type().clone(), + false, + ))), + DataType::LargeList(_) => DataType::LargeList(Box::new(Field::new( + "", + child_data[0].data_type().clone(), + false, + ))), + dt => dt, + }; Ok(ArrayData::new( data_type, @@ -44,9 +62,7 @@ impl TryFrom for ArrayData { null_bit_buffer, offset, buffers, - // this is empty because ffi still does not support it. - // this is ok because FFI only supports datatypes without childs - vec![], + child_data, )) } } @@ -60,6 +76,11 @@ impl TryFrom for ffi::ArrowArray { let null_count = value.null_count(); let buffers = value.buffers().to_vec(); let null_buffer = value.null_buffer().cloned(); + let child_data = value + .child_data() + .into_iter() + .map(|arr| ArrowArray::try_from((*arr).clone()).expect("infallible")) + .collect::>(); unsafe { ffi::ArrowArray::try_new( @@ -69,9 +90,7 @@ impl TryFrom for ffi::ArrowArray { null_buffer, offset, buffers, - // this is empty because ffi still does not support it. - // this is ok because FFI only supports datatypes without childs - vec![], + child_data, ) } } diff --git a/rust/arrow/src/datatypes/field.rs b/rust/arrow/src/datatypes/field.rs index 11fc31d6343..e81cca3554d 100644 --- a/rust/arrow/src/datatypes/field.rs +++ b/rust/arrow/src/datatypes/field.rs @@ -39,6 +39,12 @@ pub struct Field { metadata: Option>, } +impl Default for Field { + fn default() -> Self { + Field::new("", DataType::Null, false) + } +} + impl Field { /// Creates a new field pub fn new(name: &str, data_type: DataType, nullable: bool) -> Self { diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 6df5690fd41..2d4706d0e35 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -85,10 +85,12 @@ use std::{ sync::Arc, }; +use crate::array::ArrayData; use crate::buffer::Buffer; use crate::datatypes::{DataType, TimeUnit}; use crate::error::{ArrowError, Result}; use crate::util::bit_util; +use std::convert::TryFrom; /// ABI-compatible struct for `ArrowSchema` from C Data Interface /// See @@ -117,20 +119,29 @@ unsafe extern "C" fn release_schema(schema: *mut FFI_ArrowSchema) { schema.release = None; } +struct SchemaPrivateData { + children: Box<[*mut FFI_ArrowSchema]>, +} + impl FFI_ArrowSchema { /// create a new [FFI_ArrowSchema] from a format. - fn new(format: &str) -> FFI_ArrowSchema { + fn new(format: &str, children: Vec<*mut FFI_ArrowSchema>) -> FFI_ArrowSchema { + let children = children.into_boxed_slice(); + let n_children = children.len() as i64; + let children_ptr = children.as_ptr() as *mut *mut FFI_ArrowSchema; + + let private_data = Box::new(SchemaPrivateData { children }); // FFI_ArrowSchema { format: CString::new(format).unwrap().into_raw(), name: std::ptr::null_mut(), metadata: std::ptr::null_mut(), flags: 0, - n_children: 0, - children: ptr::null_mut(), + n_children, + children: children_ptr, dictionary: std::ptr::null_mut(), release: Some(release_schema), - private_data: std::ptr::null_mut(), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, } } @@ -193,6 +204,8 @@ fn to_datatype(format: &str) -> Result { "ttm" => DataType::Time32(TimeUnit::Millisecond), "ttu" => DataType::Time64(TimeUnit::Microsecond), "ttn" => DataType::Time64(TimeUnit::Nanosecond), + "+l" => DataType::List(Box::new(Default::default())), + "+L" => DataType::LargeList(Box::new(Default::default())), dt => { return Err(ArrowError::CDataInterface(format!( "The datatype \"{}\" is not supported in the Rust implementation", @@ -228,6 +241,8 @@ fn from_datatype(datatype: &DataType) -> Result { DataType::Time32(TimeUnit::Millisecond) => "ttm", DataType::Time64(TimeUnit::Microsecond) => "ttu", DataType::Time64(TimeUnit::Nanosecond) => "ttn", + DataType::List(_) => "+l", + DataType::LargeList(_) => "+L", z => { return Err(ArrowError::CDataInterface(format!( "The datatype \"{:?}\" is still not supported in Rust implementation", @@ -275,9 +290,9 @@ fn bit_width(data_type: &DataType, i: usize) -> Result { } // Variable-sized binaries: have two buffers. // "small": first buffer is i32, second is in bytes - (DataType::Utf8, 1) | (DataType::Binary, 1) => size_of::() * 8, - (DataType::Utf8, 2) | (DataType::Binary, 2) => size_of::() * 8, - (DataType::Utf8, _) | (DataType::Binary, _) => { + (DataType::Utf8, 1) | (DataType::Binary, 1) | (DataType::List(_), 1) => size_of::() * 8, + (DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => size_of::() * 8, + (DataType::Utf8, _) | (DataType::Binary, _) | (DataType::List(_), _)=> { return Err(ArrowError::CDataInterface(format!( "The datatype \"{:?}\" expects 3 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", data_type, i @@ -285,9 +300,9 @@ fn bit_width(data_type: &DataType, i: usize) -> Result { } // Variable-sized binaries: have two buffers. // LargeUtf8: first buffer is i64, second is in bytes - (DataType::LargeUtf8, 1) | (DataType::LargeBinary, 1) => size_of::() * 8, - (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => size_of::() * 8, - (DataType::LargeUtf8, _) | (DataType::LargeBinary, _) => { + (DataType::LargeUtf8, 1) | (DataType::LargeBinary, 1) | (DataType::LargeList(_), 1) => size_of::() * 8, + (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) | (DataType::LargeList(_), 2)=> size_of::() * 8, + (DataType::LargeUtf8, _) | (DataType::LargeBinary, _) | (DataType::LargeList(_), _)=> { return Err(ArrowError::CDataInterface(format!( "The datatype \"{:?}\" expects 3 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", data_type, i @@ -340,6 +355,7 @@ unsafe extern "C" fn release_array(array: *mut FFI_ArrowArray) { struct PrivateData { buffers: Vec>, buffers_ptr: Box<[*const std::os::raw::c_void]>, + children: Box<[*mut FFI_ArrowArray]>, } impl FFI_ArrowArray { @@ -353,6 +369,7 @@ impl FFI_ArrowArray { offset: i64, n_buffers: i64, buffers: Vec>, + children: Vec<*mut FFI_ArrowArray>, ) -> Self { let buffers_ptr = buffers .iter() @@ -364,11 +381,16 @@ impl FFI_ArrowArray { .collect::>(); let pointer = buffers_ptr.as_ptr() as *mut *const std::ffi::c_void; + let children = children.into_boxed_slice(); + let children_ptr = children.as_ptr() as *mut *mut FFI_ArrowArray; + let n_children = children.len() as i64; + // create the private data owning everything. // any other data must be added here, e.g. via a struct, to track lifetime. let private_data = Box::new(PrivateData { buffers, buffers_ptr, + children, }); Self { @@ -376,9 +398,9 @@ impl FFI_ArrowArray { null_count, offset, n_buffers, - n_children: 0, + n_children, buffers: pointer, - children: std::ptr::null_mut(), + children: children_ptr, dictionary: std::ptr::null_mut(), release: Some(release_array), private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, @@ -425,6 +447,27 @@ unsafe fn create_buffer( NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, array)) } +unsafe fn create_child_arrays( + array: Arc, + schema: Arc, +) -> Result> { + if array.n_children == 0 { + return Ok(vec![]); + } + let mut children = Vec::with_capacity(array.n_children as usize); + for i in 0..array.n_children as usize { + let arr_ptr = *array.children.add(i); + let schema_ptr = *schema.children.add(i); + let arrow_arr = ArrowArray::try_from_raw( + arr_ptr as *const FFI_ArrowArray, + schema_ptr as *const FFI_ArrowSchema, + )?; + let data = ArrayData::try_from(arrow_arr)?; + children.push(data) + } + Ok(children) +} + impl Drop for FFI_ArrowArray { fn drop(&mut self) { match self.release { @@ -471,7 +514,7 @@ impl ArrowArray { null_buffer: Option, offset: usize, buffers: Vec, - _child_data: Vec, + child_data: Vec, ) -> Result { let format = from_datatype(data_type)?; // * insert the null buffer at the start @@ -480,13 +523,23 @@ impl ArrowArray { .chain(buffers.iter().map(|b| Some(b.clone()))) .collect::>(); - let schema = Arc::new(FFI_ArrowSchema::new(&format)); + let mut ffi_arrow_arrays = Vec::with_capacity(child_data.len()); + let mut ffi_arrow_schemas = Vec::with_capacity(child_data.len()); + + child_data.into_iter().for_each(|arrow_arr| { + let (arr, schema) = ArrowArray::into_raw(arrow_arr); + ffi_arrow_arrays.push(arr as *mut FFI_ArrowArray); + ffi_arrow_schemas.push(schema as *mut FFI_ArrowSchema); + }); + + let schema = Arc::new(FFI_ArrowSchema::new(&format, ffi_arrow_schemas)); let array = Arc::new(FFI_ArrowArray::new( len as i64, null_count as i64, offset as i64, new_buffers.len() as i64, new_buffers, + ffi_arrow_arrays, )); Ok(ArrowArray { schema, array }) @@ -548,13 +601,15 @@ impl ArrowArray { (DataType::Utf8, 1) | (DataType::LargeUtf8, 1) | (DataType::Binary, 1) - | (DataType::LargeBinary, 1) => { + | (DataType::LargeBinary, 1) + | (DataType::List(_), 1) + | (DataType::LargeList(_), 1) => { // the len of the offset buffer (buffer 1) equals length + 1 let bits = bit_width(data_type, i)?; debug_assert_eq!(bits % 8, 0); (self.array.length as usize + 1) * (bits / 8) } - (DataType::Utf8, 2) | (DataType::Binary, 2) => { + (DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = self.buffer_len(1)?; // first buffer is the null buffer => add(1) @@ -566,7 +621,9 @@ impl ArrowArray { // get last offset (unsafe { *offset_buffer.add(len / size_of::() - 1) }) as usize } - (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => { + (DataType::LargeUtf8, 2) + | (DataType::LargeBinary, 2) + | (DataType::LargeList(_), 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = self.buffer_len(1)?; // first buffer is the null buffer => add(1) @@ -607,6 +664,10 @@ impl ArrowArray { .collect() } + pub fn children(&self) -> Result> { + unsafe { create_child_arrays(self.array.clone(), self.schema.clone()) } + } + /// the length of the array pub fn len(&self) -> usize { self.array.length as usize @@ -638,11 +699,14 @@ mod tests { use super::*; use crate::array::{ make_array, Array, ArrayData, BinaryOffsetSizeTrait, BooleanArray, - GenericBinaryArray, GenericStringArray, Int32Array, StringOffsetSizeTrait, - Time32MillisecondArray, + GenericBinaryArray, GenericListArray, GenericStringArray, Int32Array, + OffsetSizeTrait, StringOffsetSizeTrait, Time32MillisecondArray, }; use crate::compute::kernels; + use crate::datatypes::Field; use std::convert::TryFrom; + use std::iter::FromIterator; + use std::sync::Arc; #[test] fn test_round_trip() -> Result<()> { @@ -712,6 +776,71 @@ mod tests { test_generic_string::() } + fn test_generic_list() -> Result<()> { + // Construct a value array + let value_data = ArrayData::builder(DataType::Int32) + .len(8) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) + .build(); + + // Construct a buffer for value offsets, for the nested array: + // [[0, 1, 2], [3, 4, 5], [6, 7]] + let value_offsets = Buffer::from_iter( + [0usize, 3, 6, 8] + .iter() + .map(|i| Offset::from_usize(*i).unwrap()), + ); + + // Construct a list array from the above two + let list_data_type = match std::mem::size_of::() { + 4 => DataType::List(Box::new(Field::new("item", DataType::Int32, false))), + _ => { + DataType::LargeList(Box::new(Field::new("item", DataType::Int32, false))) + } + }; + + let list_data = ArrayData::builder(list_data_type.clone()) + .len(3) + .add_buffer(value_offsets.clone()) + .add_child_data(value_data.clone()) + .build(); + + // create an array natively + let array = GenericListArray::::from(list_data.clone()); + + // export it + let array = ArrowArray::try_from(array.data().clone())?; + + // (simulate consumer) import it + let data = ArrayData::try_from(array)?; + let array = make_array(data); + + // perform some operation + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + // verify + let expected = GenericListArray::::from(list_data); + assert_eq!(&array.value(0), &expected.value(0)); + assert_eq!(&array.value(1), &expected.value(1)); + assert_eq!(&array.value(2), &expected.value(2)); + + // (drop/release) + Ok(()) + } + + #[test] + fn test_list() -> Result<()> { + test_generic_list::() + } + + #[test] + fn test_large_list() -> Result<()> { + test_generic_list::() + } + fn test_generic_binary() -> Result<()> { // create an array natively let array: Vec> = vec![Some(b"a"), None, Some(b"aaa")]; From 83769225697ff388afea1cd39047227ed5cddfdf Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 23 Mar 2021 12:51:55 +0100 Subject: [PATCH 02/12] clippy --- rust/arrow/src/array/ffi.rs | 2 +- rust/arrow/src/datatypes/field.rs | 2 +- rust/arrow/src/ffi.rs | 11 +++++------ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/rust/arrow/src/array/ffi.rs b/rust/arrow/src/array/ffi.rs index 89825f5d53e..72253350809 100644 --- a/rust/arrow/src/array/ffi.rs +++ b/rust/arrow/src/array/ffi.rs @@ -78,7 +78,7 @@ impl TryFrom for ffi::ArrowArray { let null_buffer = value.null_buffer().cloned(); let child_data = value .child_data() - .into_iter() + .iter() .map(|arr| ArrowArray::try_from((*arr).clone()).expect("infallible")) .collect::>(); diff --git a/rust/arrow/src/datatypes/field.rs b/rust/arrow/src/datatypes/field.rs index e81cca3554d..e96a084af28 100644 --- a/rust/arrow/src/datatypes/field.rs +++ b/rust/arrow/src/datatypes/field.rs @@ -315,8 +315,8 @@ impl Field { }; Ok(Field { name, - nullable, data_type, + nullable, dict_id, dict_is_ordered, metadata, diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 2d4706d0e35..84de659e39c 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -542,7 +542,7 @@ impl ArrowArray { ffi_arrow_arrays, )); - Ok(ArrowArray { schema, array }) + Ok(ArrowArray { array, schema }) } /// creates a new [ArrowArray] from two pointers. Used to import from the C Data Interface. @@ -572,7 +572,7 @@ impl ArrowArray { pub unsafe fn empty() -> Self { let schema = Arc::new(FFI_ArrowSchema::empty()); let array = Arc::new(FFI_ArrowArray::empty()); - ArrowArray { schema, array } + ArrowArray { array, schema } } /// exports [ArrowArray] to the C Data Interface @@ -706,7 +706,6 @@ mod tests { use crate::datatypes::Field; use std::convert::TryFrom; use std::iter::FromIterator; - use std::sync::Arc; #[test] fn test_round_trip() -> Result<()> { @@ -799,10 +798,10 @@ mod tests { } }; - let list_data = ArrayData::builder(list_data_type.clone()) + let list_data = ArrayData::builder(list_data_type) .len(3) - .add_buffer(value_offsets.clone()) - .add_child_data(value_data.clone()) + .add_buffer(value_offsets) + .add_child_data(value_data) .build(); // create an array natively From be102d2023896c729fcb05a300cf7e58fc945de4 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Thu, 25 Mar 2021 09:11:57 +0100 Subject: [PATCH 03/12] process remarks --- rust/arrow/src/array/ffi.rs | 26 +++++--------- rust/arrow/src/datatypes/field.rs | 6 ---- rust/arrow/src/ffi.rs | 57 ++++++++++++++++++------------- 3 files changed, 43 insertions(+), 46 deletions(-) diff --git a/rust/arrow/src/array/ffi.rs b/rust/arrow/src/array/ffi.rs index 72253350809..55cd5530e6a 100644 --- a/rust/arrow/src/array/ffi.rs +++ b/rust/arrow/src/array/ffi.rs @@ -25,35 +25,27 @@ use crate::{ }; use super::ArrayData; -use crate::datatypes::{DataType, Field}; use crate::ffi::ArrowArray; impl TryFrom for ArrayData { type Error = ArrowError; fn try_from(value: ffi::ArrowArray) -> Result { - let data_type = value.data_type()?; + let child_data = value.children()?; + + let child_type = if !child_data.is_empty() { + Some(child_data[0].data_type().clone()) + } else { + None + }; + + let data_type = value.data_type(child_type)?; let len = value.len(); let offset = value.offset(); let null_count = value.null_count(); let buffers = value.buffers()?; let null_bit_buffer = value.null_bit_buffer(); - let child_data = value.children()?; - - let data_type = match data_type { - DataType::List(_) => DataType::List(Box::new(Field::new( - "", - child_data[0].data_type().clone(), - false, - ))), - DataType::LargeList(_) => DataType::LargeList(Box::new(Field::new( - "", - child_data[0].data_type().clone(), - false, - ))), - dt => dt, - }; Ok(ArrayData::new( data_type, diff --git a/rust/arrow/src/datatypes/field.rs b/rust/arrow/src/datatypes/field.rs index e96a084af28..a471f12ef95 100644 --- a/rust/arrow/src/datatypes/field.rs +++ b/rust/arrow/src/datatypes/field.rs @@ -39,12 +39,6 @@ pub struct Field { metadata: Option>, } -impl Default for Field { - fn default() -> Self { - Field::new("", DataType::Null, false) - } -} - impl Field { /// Creates a new field pub fn new(name: &str, data_type: DataType, nullable: bool) -> Self { diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 84de659e39c..b278cc68069 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -87,7 +87,7 @@ use std::{ use crate::array::ArrayData; use crate::buffer::Buffer; -use crate::datatypes::{DataType, TimeUnit}; +use crate::datatypes::{DataType, Field, TimeUnit}; use crate::error::{ArrowError, Result}; use crate::util::bit_util; use std::convert::TryFrom; @@ -179,7 +179,7 @@ impl Drop for FFI_ArrowSchema { /// maps a DataType `format` to a [DataType](arrow::datatypes::DataType). /// See https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings -fn to_datatype(format: &str) -> Result { +fn to_datatype(format: &str, child_type: Option) -> Result { Ok(match format { "n" => DataType::Null, "b" => DataType::Boolean, @@ -204,8 +204,20 @@ fn to_datatype(format: &str) -> Result { "ttm" => DataType::Time32(TimeUnit::Millisecond), "ttu" => DataType::Time64(TimeUnit::Microsecond), "ttn" => DataType::Time64(TimeUnit::Nanosecond), - "+l" => DataType::List(Box::new(Default::default())), - "+L" => DataType::LargeList(Box::new(Default::default())), + + // Note: The datatype null will only be created when called from ArrowArray::buffer_len + // at that point the child data is not yet known, but it is also not required to determine + // the buffer length of the list arrays. + "+l" => DataType::List(Box::new(Field::new( + "", + child_type.unwrap_or(DataType::Null), + false, + ))), + "+L" => DataType::LargeList(Box::new(Field::new( + "", + child_type.unwrap_or(DataType::Null), + false, + ))), dt => { return Err(ArrowError::CDataInterface(format!( "The datatype \"{}\" is not supported in the Rust implementation", @@ -451,21 +463,17 @@ unsafe fn create_child_arrays( array: Arc, schema: Arc, ) -> Result> { - if array.n_children == 0 { - return Ok(vec![]); - } - let mut children = Vec::with_capacity(array.n_children as usize); - for i in 0..array.n_children as usize { - let arr_ptr = *array.children.add(i); - let schema_ptr = *schema.children.add(i); - let arrow_arr = ArrowArray::try_from_raw( - arr_ptr as *const FFI_ArrowArray, - schema_ptr as *const FFI_ArrowSchema, - )?; - let data = ArrayData::try_from(arrow_arr)?; - children.push(data) - } - Ok(children) + (0..array.n_children as usize) + .map(|i| { + let arr_ptr = *array.children.add(i); + let schema_ptr = *schema.children.add(i); + let arrow_arr = ArrowArray::try_from_raw( + arr_ptr as *const FFI_ArrowArray, + schema_ptr as *const FFI_ArrowSchema, + )?; + ArrayData::try_from(arrow_arr) + }) + .collect() } impl Drop for FFI_ArrowArray { @@ -595,7 +603,8 @@ impl ArrowArray { // for variable-sized buffers, such as the second buffer of a stringArray, we need // to fetch offset buffer's len to build the second buffer. fn buffer_len(&self, i: usize) -> Result { - let data_type = &self.data_type()?; + // Inner type is not important for buffer length. + let data_type = &self.data_type(None)?; Ok(match (data_type, i) { (DataType::Utf8, 1) @@ -689,8 +698,8 @@ impl ArrowArray { } /// the data_type as declared in the schema - pub fn data_type(&self) -> Result { - to_datatype(self.schema.format()) + pub fn data_type(&self, child_type: Option) -> Result { + to_datatype(self.schema.format(), child_type) } } @@ -814,12 +823,14 @@ mod tests { let data = ArrayData::try_from(array)?; let array = make_array(data); - // perform some operation + // downcast let array = array .as_any() .downcast_ref::>() .unwrap(); + dbg!(&array); + // verify let expected = GenericListArray::::from(list_data); assert_eq!(&array.value(0), &expected.value(0)); From 44efefbee9b6b1ec93547a9bda1a1574812c007e Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Thu, 25 Mar 2021 09:26:43 +0100 Subject: [PATCH 04/12] test round trip with pyarrow --- .../src/lib.rs | 11 ++++++++++ .../tests/test_sql.py | 20 +++++++++++++++++++ rust/arrow/src/ffi.rs | 3 ++- 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/rust/arrow-pyarrow-integration-testing/src/lib.rs b/rust/arrow-pyarrow-integration-testing/src/lib.rs index 951d52983f1..5b5462d9c15 100644 --- a/rust/arrow-pyarrow-integration-testing/src/lib.rs +++ b/rust/arrow-pyarrow-integration-testing/src/lib.rs @@ -167,11 +167,22 @@ fn concatenate(array: PyObject, py: Python) -> PyResult { to_py(array, py) } +/// Converts to rust and back to python +#[pyfunction] +fn round_trip(array: PyObject, py: Python) -> PyResult { + // import + let array = to_rust(array, py)?; + + // export + to_py(array, py) +} + #[pymodule] fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(double))?; m.add_wrapped(wrap_pyfunction!(double_py))?; m.add_wrapped(wrap_pyfunction!(substring))?; m.add_wrapped(wrap_pyfunction!(concatenate))?; + m.add_wrapped(wrap_pyfunction!(round_trip))?; Ok(()) } diff --git a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py index c81ac4c635f..358dfcdd6b7 100644 --- a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -78,3 +78,23 @@ def test_time32_python(self): del expected # No leak of C++ memory self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) + + def test_list_array(self): + """ + Python -> Rust -> Python + """ + old_allocated = pyarrow.total_allocated_bytes() + a = pyarrow.array([[1, 2], [1, 2]], pyarrow.list_(pyarrow.int64())) + b = arrow_pyarrow_integration_testing.round_trip(a) + # list equality does not work, so we check the elements + self.assertEqual(a[0][0], b[0][0]) + self.assertEqual(a[0][1], b[0][1]) + self.assertEqual(a[1][0], b[1][0]) + self.assertEqual(a[1][1], b[1][1]) + del a + del b + # No leak of C++ memory + self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) + + + diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index b278cc68069..1bfe8eb2c61 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -134,7 +134,8 @@ impl FFI_ArrowSchema { // FFI_ArrowSchema { format: CString::new(format).unwrap().into_raw(), - name: std::ptr::null_mut(), + // For child data a non null string is expected + name: CString::new("").unwrap().into_raw(), metadata: std::ptr::null_mut(), flags: 0, n_children, From 24e88b529e57b05b8c582284a17ee3bdc489334e Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 28 Mar 2021 09:22:07 +0200 Subject: [PATCH 05/12] process comments nevi-me --- rust/arrow/src/ffi.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 1bfe8eb2c61..314bd1c28c5 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -77,6 +77,7 @@ To export an array, create an `ArrowArray` using [ArrowArray::try_new]. */ use std::{ + convert::TryFrom, ffi::CStr, ffi::CString, iter, @@ -90,7 +91,6 @@ use crate::buffer::Buffer; use crate::datatypes::{DataType, Field, TimeUnit}; use crate::error::{ArrowError, Result}; use crate::util::bit_util; -use std::convert::TryFrom; /// ABI-compatible struct for `ArrowSchema` from C Data Interface /// See @@ -210,12 +210,12 @@ fn to_datatype(format: &str, child_type: Option) -> Result { // at that point the child data is not yet known, but it is also not required to determine // the buffer length of the list arrays. "+l" => DataType::List(Box::new(Field::new( - "", + "item", child_type.unwrap_or(DataType::Null), false, ))), "+L" => DataType::LargeList(Box::new(Field::new( - "", + "item", child_type.unwrap_or(DataType::Null), false, ))), @@ -674,6 +674,7 @@ impl ArrowArray { .collect() } + /// returns the child data of this array pub fn children(&self) -> Result> { unsafe { create_child_arrays(self.array.clone(), self.schema.clone()) } } From dd50114b6f55fe7f9b15d9cf1f346aff60e982dd Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 28 Mar 2021 20:53:40 +0200 Subject: [PATCH 06/12] set nullable to true --- rust/arrow/src/ffi.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 314bd1c28c5..55f293bd336 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -212,12 +212,12 @@ fn to_datatype(format: &str, child_type: Option) -> Result { "+l" => DataType::List(Box::new(Field::new( "item", child_type.unwrap_or(DataType::Null), - false, + true, ))), "+L" => DataType::LargeList(Box::new(Field::new( "item", child_type.unwrap_or(DataType::Null), - false, + true, ))), dt => { return Err(ArrowError::CDataInterface(format!( From 7101de3218fd23e59b8f495882626ed3088dc8e8 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 29 Mar 2021 15:42:33 +0200 Subject: [PATCH 07/12] update test list ffi --- .../tests/test_sql.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py index 358dfcdd6b7..64e55ea4f09 100644 --- a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -84,13 +84,11 @@ def test_list_array(self): Python -> Rust -> Python """ old_allocated = pyarrow.total_allocated_bytes() - a = pyarrow.array([[1, 2], [1, 2]], pyarrow.list_(pyarrow.int64())) + a = pyarrow.array([[], None, [1, 2], [4, 5, 6]], pyarrow.list_(pyarrow.int64())) b = arrow_pyarrow_integration_testing.round_trip(a) - # list equality does not work, so we check the elements - self.assertEqual(a[0][0], b[0][0]) - self.assertEqual(a[0][1], b[0][1]) - self.assertEqual(a[1][0], b[1][0]) - self.assertEqual(a[1][1], b[1][1]) + + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() del a del b # No leak of C++ memory From 4fe20df57d38c77793edf485069aa5c5f73034cf Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 29 Mar 2021 16:08:03 +0200 Subject: [PATCH 08/12] default to nullable data types --- rust/arrow-pyarrow-integration-testing/tests/test_sql.py | 1 + rust/arrow/src/ffi.rs | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py index 64e55ea4f09..c0de382057c 100644 --- a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -89,6 +89,7 @@ def test_list_array(self): b.validate(full=True) assert a.to_pylist() == b.to_pylist() + assert a.type == b.type del a del b # No leak of C++ memory diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 55f293bd336..c0b78103b55 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -134,10 +134,11 @@ impl FFI_ArrowSchema { // FFI_ArrowSchema { format: CString::new(format).unwrap().into_raw(), - // For child data a non null string is expected - name: CString::new("").unwrap().into_raw(), + // For child data a non null string is expected and is called item + name: CString::new("item").unwrap().into_raw(), metadata: std::ptr::null_mut(), - flags: 0, + // default to nullable + flags: 2, n_children, children: children_ptr, dictionary: std::ptr::null_mut(), From ef14a9952c5bad85c898e3c238afed49cae00065 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 29 Mar 2021 16:32:37 +0200 Subject: [PATCH 09/12] pass nullable flag to C++ --- rust/arrow/src/array/ffi.rs | 33 ++++++++++++++++++++++++++++++++- rust/arrow/src/ffi.rs | 14 ++++++++++---- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/rust/arrow/src/array/ffi.rs b/rust/arrow/src/array/ffi.rs index 55cd5530e6a..450685bf522 100644 --- a/rust/arrow/src/array/ffi.rs +++ b/rust/arrow/src/array/ffi.rs @@ -25,6 +25,7 @@ use crate::{ }; use super::ArrayData; +use crate::datatypes::DataType; use crate::ffi::ArrowArray; impl TryFrom for ArrayData { @@ -63,6 +64,14 @@ impl TryFrom for ffi::ArrowArray { type Error = ArrowError; fn try_from(value: ArrayData) -> Result { + // If parent is nullable, then children also must be nullable + // so we pass this nullable to the creation of hte child data + let nullable = match value.data_type() { + DataType::List(field) => field.is_nullable(), + DataType::LargeList(field) => field.is_nullable(), + _ => false, + }; + let len = value.len(); let offset = value.offset() as usize; let null_count = value.null_count(); @@ -71,7 +80,28 @@ impl TryFrom for ffi::ArrowArray { let child_data = value .child_data() .iter() - .map(|arr| ArrowArray::try_from((*arr).clone()).expect("infallible")) + .map(|arr| { + let len = arr.len(); + let offset = arr.offset() as usize; + let null_count = arr.null_count(); + let buffers = arr.buffers().to_vec(); + let null_buffer = arr.null_buffer().cloned(); + + // Note: the nullable comes from the parent data. + unsafe { + ArrowArray::try_new( + arr.data_type(), + len, + null_count, + null_buffer, + offset, + buffers, + vec![], + nullable, + ) + .expect("infallible") + } + }) .collect::>(); unsafe { @@ -83,6 +113,7 @@ impl TryFrom for ffi::ArrowArray { offset, buffers, child_data, + nullable, ) } } diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index c0b78103b55..f808b80f51c 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -125,11 +125,17 @@ struct SchemaPrivateData { impl FFI_ArrowSchema { /// create a new [FFI_ArrowSchema] from a format. - fn new(format: &str, children: Vec<*mut FFI_ArrowSchema>) -> FFI_ArrowSchema { + fn new( + format: &str, + children: Vec<*mut FFI_ArrowSchema>, + nullable: bool, + ) -> FFI_ArrowSchema { let children = children.into_boxed_slice(); let n_children = children.len() as i64; let children_ptr = children.as_ptr() as *mut *mut FFI_ArrowSchema; + let flags = if nullable { 2 } else { 0 }; + let private_data = Box::new(SchemaPrivateData { children }); // FFI_ArrowSchema { @@ -137,8 +143,7 @@ impl FFI_ArrowSchema { // For child data a non null string is expected and is called item name: CString::new("item").unwrap().into_raw(), metadata: std::ptr::null_mut(), - // default to nullable - flags: 2, + flags, n_children, children: children_ptr, dictionary: std::ptr::null_mut(), @@ -525,6 +530,7 @@ impl ArrowArray { offset: usize, buffers: Vec, child_data: Vec, + nullable: bool, ) -> Result { let format = from_datatype(data_type)?; // * insert the null buffer at the start @@ -542,7 +548,7 @@ impl ArrowArray { ffi_arrow_schemas.push(schema as *mut FFI_ArrowSchema); }); - let schema = Arc::new(FFI_ArrowSchema::new(&format, ffi_arrow_schemas)); + let schema = Arc::new(FFI_ArrowSchema::new(&format, ffi_arrow_schemas, nullable)); let array = Arc::new(FFI_ArrowArray::new( len as i64, null_count as i64, From d7c599cd1471d400134b3609f6a89e030a88aed1 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 29 Mar 2021 18:17:16 +0200 Subject: [PATCH 10/12] use schema information to parameterize list dtypes/fields --- rust/arrow/src/ffi.rs | 52 +++++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index f808b80f51c..61520f10c67 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -91,6 +91,8 @@ use crate::buffer::Buffer; use crate::datatypes::{DataType, Field, TimeUnit}; use crate::error::{ArrowError, Result}; use crate::util::bit_util; +use std::mem::ManuallyDrop; +use std::os::raw::c_char; /// ABI-compatible struct for `ArrowSchema` from C Data Interface /// See @@ -186,7 +188,11 @@ impl Drop for FFI_ArrowSchema { /// maps a DataType `format` to a [DataType](arrow::datatypes::DataType). /// See https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings -fn to_datatype(format: &str, child_type: Option) -> Result { +fn to_datatype( + format: &str, + child_type: Option, + schema: &FFI_ArrowSchema, +) -> Result { Ok(match format { "n" => DataType::Null, "b" => DataType::Boolean, @@ -215,16 +221,38 @@ fn to_datatype(format: &str, child_type: Option) -> Result { // Note: The datatype null will only be created when called from ArrowArray::buffer_len // at that point the child data is not yet known, but it is also not required to determine // the buffer length of the list arrays. - "+l" => DataType::List(Box::new(Field::new( - "item", - child_type.unwrap_or(DataType::Null), - true, - ))), - "+L" => DataType::LargeList(Box::new(Field::new( - "item", - child_type.unwrap_or(DataType::Null), - true, - ))), + "+l" => { + let nullable = schema.flags == 2; + // Safety + // Should be set as this is expected from the C FFI definition + debug_assert!(!schema.name.is_null()); + let name = unsafe { CString::from_raw(schema.name as *mut c_char) } + .into_string() + .unwrap(); + // prevent a double free + let name = ManuallyDrop::new(name); + DataType::List(Box::new(Field::new( + &name, + child_type.unwrap_or(DataType::Null), + nullable, + ))) + } + "+L" => { + let nullable = schema.flags == 2; + // Safety + // Should be set as this is expected from the C FFI definition + debug_assert!(!schema.name.is_null()); + let name = unsafe { CString::from_raw(schema.name as *mut c_char) } + .into_string() + .unwrap(); + // prevent a double free + let name = ManuallyDrop::new(name); + DataType::LargeList(Box::new(Field::new( + &name, + child_type.unwrap_or(DataType::Null), + nullable, + ))) + } dt => { return Err(ArrowError::CDataInterface(format!( "The datatype \"{}\" is not supported in the Rust implementation", @@ -708,7 +736,7 @@ impl ArrowArray { /// the data_type as declared in the schema pub fn data_type(&self, child_type: Option) -> Result { - to_datatype(self.schema.format(), child_type) + to_datatype(self.schema.format(), child_type, self.schema.as_ref()) } } From 104dc1f3d6d516c1d87475980c5ecd3d96d92069 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 29 Mar 2021 18:21:52 +0200 Subject: [PATCH 11/12] group imports --- rust/arrow/src/ffi.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 61520f10c67..6f09558d9ef 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -81,7 +81,8 @@ use std::{ ffi::CStr, ffi::CString, iter, - mem::size_of, + mem::{size_of, ManuallyDrop}, + os::raw::c_char, ptr::{self, NonNull}, sync::Arc, }; @@ -91,8 +92,6 @@ use crate::buffer::Buffer; use crate::datatypes::{DataType, Field, TimeUnit}; use crate::error::{ArrowError, Result}; use crate::util::bit_util; -use std::mem::ManuallyDrop; -use std::os::raw::c_char; /// ABI-compatible struct for `ArrowSchema` from C Data Interface /// See From acd93b8ef2de77759dd5ac3e5ff868a508de6c22 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 2 Apr 2021 14:09:26 +0200 Subject: [PATCH 12/12] clippy --- rust/arrow/src/ffi.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs index 6f09558d9ef..3a6d031ebd8 100644 --- a/rust/arrow/src/ffi.rs +++ b/rust/arrow/src/ffi.rs @@ -549,6 +549,7 @@ impl ArrowArray { /// creates a new `ArrowArray`. This is used to export to the C Data Interface. /// # Safety /// See safety of [ArrowArray] + #[allow(clippy::too_many_arguments)] pub unsafe fn try_new( data_type: &DataType, len: usize,