From 8e3055b71b44d26cb3eff9c5cafcebb4edbe39fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Tue, 29 Mar 2022 11:47:06 +0200 Subject: [PATCH 1/6] Alternative implementation of nullif kernel by slicing nested buffers --- arrow/src/compute/kernels/boolean.rs | 396 ++++++++++++++++++++++++++- arrow/src/compute/util.rs | 49 ++-- 2 files changed, 413 insertions(+), 32 deletions(-) diff --git a/arrow/src/compute/kernels/boolean.rs b/arrow/src/compute/kernels/boolean.rs index fd3539455b5a..8145ad16bab8 100644 --- a/arrow/src/compute/kernels/boolean.rs +++ b/arrow/src/compute/kernels/boolean.rs @@ -24,12 +24,14 @@ use std::ops::Not; -use crate::array::{Array, ArrayData, BooleanArray, PrimitiveArray}; +use crate::array::{ + make_array, Array, ArrayData, ArrayRef, BooleanArray, OffsetSizeTrait, PrimitiveArray, +}; use crate::buffer::{ buffer_bin_and, buffer_bin_or, buffer_unary_not, Buffer, MutableBuffer, }; -use crate::compute::util::combine_option_bitmap; -use crate::datatypes::{ArrowNumericType, DataType}; +use crate::compute::util::{combine_option_bitmap, combine_option_buffers}; +use crate::datatypes::{ArrowNumericType, DataType, IntervalUnit}; use crate::error::{ArrowError, Result}; use crate::util::bit_util::{ceil, round_upto_multiple_of_64}; use core::iter; @@ -575,10 +577,242 @@ where Ok(PrimitiveArray::::from(data)) } +/// Creates a (mostly) zero-copy slice of the given buffers so that they can be combined +/// in the same array with other buffers that start at offset 0. +/// The only buffers that need an actual copy are booleans (if they are not byte-aligned) +/// and list/binary/string offsets because the arrow implementation requires them to start at 0. +/// This is useful when a kernel calculates a new validity bitmap but wants to reuse other buffers. +fn slice_buffers( + buffers: &[Buffer], + offset: usize, + len: usize, + data_type: &DataType, + child_data: &[ArrayData], +) -> (Vec, Vec) { + use std::mem::size_of; + + if offset == 0 { + return (buffers.to_vec(), child_data.to_vec()); + } + + // we only need to do something special to child data in 2 of the match branches + let mut result_child_data = None; + + let result_buffers = match data_type { + DataType::Boolean => vec![buffers[0].bit_slice(offset, len)], + DataType::Int8 | DataType::UInt8 => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Int16 | DataType::UInt16 | DataType::Float16 => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Int32 | DataType::UInt32 | DataType::Float32 => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Int64 | DataType::UInt64 | DataType::Float64 => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Timestamp(_, _) | DataType::Duration(_) => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Date32 | DataType::Time32(_) => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Date64 | DataType::Time64(_) => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Interval(IntervalUnit::YearMonth) => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Interval(IntervalUnit::DayTime) => { + vec![buffers[0].slice(offset * 2 * size_of::())] + } + DataType::Interval(IntervalUnit::MonthDayNano) => { + vec![buffers[0].slice(offset * (2 * size_of::() + size_of::()))] + } + DataType::Decimal(_, _) => vec![buffers[0].slice(offset * size_of::())], + DataType::Dictionary(key_type, _) => match key_type.as_ref() { + DataType::Int8 | DataType::UInt8 => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Int16 | DataType::UInt16 => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Int32 | DataType::UInt32 => { + vec![buffers[0].slice(offset * size_of::())] + } + DataType::Int64 | DataType::UInt64 => { + vec![buffers[0].slice(offset * size_of::())] + } + _ => unreachable!(), + }, + DataType::List(_) => { + // safe because for List the first buffer is guaranteed to contain i32 offsets + let offsets = unsafe { &buffers[0].typed_data()[offset..] }; + let first_offset = offsets[0] as usize; + let last_offset = offsets[len] as usize; + let nested_len = last_offset - first_offset; + + // since we calculate a new offset buffer starting from 0 we also have to slice the child data + result_child_data = Some( + child_data + .iter() + .map(|d| d.slice(first_offset, nested_len)) + .collect(), + ); + vec![offset_buffer_slice::(offsets, len)] + } + DataType::LargeList(_) => { + // safe because for LargeList the first buffer is guaranteed to contain i64 offsets + let offsets = unsafe { &buffers[0].typed_data()[offset..] }; + let first_offset = offsets[0] as usize; + let last_offset = offsets[len] as usize; + let nested_len = last_offset - first_offset; + // since we calculate a new offset buffer starting from 0 we also have to slice the child data + + result_child_data = Some( + child_data + .iter() + .map(|d| d.slice(first_offset, nested_len)) + .collect(), + ); + vec![offset_buffer_slice::(offsets, len)] + } + DataType::Binary | DataType::Utf8 => { + // safe because for Binary/Utf8 the first buffer is guaranteed to contain i32 offsets + let offsets = unsafe { &buffers[0].typed_data()[offset..] }; + let first_offset = offsets[0] as usize; + vec![ + offset_buffer_slice::(offsets, len), + buffers[1].slice(first_offset * size_of::()), + ] + } + DataType::LargeBinary | DataType::LargeUtf8 => { + // safe because for LargeBinary/LargeUtf8 the first buffer is guaranteed to contain i64 offsets + let offsets = unsafe { &buffers[0].typed_data()[offset..] }; + let first_offset = offsets[0] as usize; + vec![ + offset_buffer_slice::(offsets, len), + buffers[1].slice(first_offset * size_of::()), + ] + } + DataType::FixedSizeBinary(size) => { + vec![buffers[0].slice(offset * (*size as usize))] + } + DataType::FixedSizeList(_, _size) => { + // TODO: should this actually slice the child arrays? + vec![] + } + DataType::Struct(_) => { + // TODO: should this actually slice the child arrays? + vec![] + } + DataType::Union(..) => { + // TODO: verify this is actually correct + if buffers.len() > 1 { + // dense union, type ids and offsets + vec![ + buffers[0].slice(offset * size_of::()), + buffers[1].slice(offset * size_of::()), + ] + } else { + // sparse union with only type ids + // should this also slice the child arrays? + vec![buffers[0].slice(offset * size_of::())] + } + } + DataType::Map(_, _) => { + // TODO: verify this is actually correct + result_child_data = + Some(child_data.iter().map(|d| d.slice(offset, len)).collect()); + vec![] + } + DataType::Null => vec![], + }; + + ( + result_buffers, + result_child_data.unwrap_or_else(|| child_data.to_vec()), + ) +} + +// ListArray::from(ArrayData) expected offsets to always start at 0 so we can't simply make a slice of the buffer, +// but instead have to calculate a new buffer +fn offset_buffer_slice( + original_offsets: &[T], + array_len: usize, +) -> Buffer { + // offset buffer has 1 element more than the corresponding data buffer pointing yet another offset for the end of the buffer + let len_in_bytes = (array_len + 1) * std::mem::size_of::(); + let mut offset_buffer = + MutableBuffer::new(len_in_bytes).with_bitset(len_in_bytes, false); + let offset_slice = unsafe { offset_buffer.typed_data_mut::() }; + let offset_start = original_offsets[0]; + offset_slice + .iter_mut() + .zip(original_offsets.iter()) + .for_each(|(output, input)| { + *output = *input - offset_start; + }); + + offset_buffer.into() +} + +pub fn nullif_alternative( + array: &ArrayRef, + condition: &BooleanArray, +) -> Result { + if array.len() != condition.len() { + return Err(ArrowError::ComputeError( + "Inputs to conditional kernels must have the same length".to_string(), + )); + } + + let condition_buffer = combine_option_buffers( + condition.data().buffers().first(), + condition.offset(), + condition.data().null_buffer(), + condition.offset(), + condition.len(), + ) + .map(|b| !&b); + + let result_valid_buffer = combine_option_buffers( + condition_buffer.as_ref(), + 0, + array.data().null_buffer(), + array.offset(), + condition.len(), + ); + + let (result_data_buffers, result_child_data) = slice_buffers( + array.data().buffers(), + array.offset(), + array.len(), + array.data_type(), + array.data().child_data(), + ); + + let data = unsafe { + ArrayData::new_unchecked( + array.data_type().clone(), + condition.len(), + None, + result_valid_buffer, + 0, + result_data_buffers, + result_child_data, + ) + }; + + Ok(make_array(data)) +} + #[cfg(test)] mod tests { use super::*; - use crate::array::{ArrayRef, Int32Array}; + use crate::array::{ArrayRef, DictionaryArray, Int32Array, ListArray}; + use crate::datatypes::{Float64Type, Int32Type}; use std::sync::Arc; #[test] @@ -1139,12 +1373,18 @@ mod tests { #[test] fn test_nullif_int_array() { - let a = Int32Array::from(vec![Some(15), None, Some(8), Some(1), Some(9)]); + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(15), + None, + Some(8), + Some(1), + Some(9), + ])); let comp = BooleanArray::from(vec![Some(false), None, Some(true), Some(false), None]); - let res = nullif(&a, &comp).unwrap(); + let res = nullif_alternative(&a, &comp).unwrap(); - let expected = Int32Array::from(vec![ + let expected: ArrayRef = Arc::new(Int32Array::from(vec![ Some(15), None, None, // comp true, slot 2 turned into null @@ -1152,16 +1392,16 @@ mod tests { // Even though comp array / right is null, should still pass through original value // comp true, slot 2 turned into null Some(9), - ]); + ])); - assert_eq!(expected, res); + assert_eq!(&expected, &res); } #[test] fn test_nullif_int_array_offset() { let a = Int32Array::from(vec![None, Some(15), Some(8), Some(1), Some(9)]); let a = a.slice(1, 3); // Some(15), Some(8), Some(1) - let a = a.as_any().downcast_ref::().unwrap(); + // let a = a.as_any().downcast_ref::().unwrap(); let comp = BooleanArray::from(vec![ Some(false), Some(false), @@ -1173,13 +1413,143 @@ mod tests { ]); let comp = comp.slice(2, 3); // Some(false), None, Some(true) let comp = comp.as_any().downcast_ref::().unwrap(); - let res = nullif(a, comp).unwrap(); + let res = nullif_alternative(&a, comp).unwrap(); - let expected = Int32Array::from(vec![ + let expected: ArrayRef = Arc::new(Int32Array::from(vec![ Some(15), // False => keep it Some(8), // None => keep it None, // true => None - ]); + ])); assert_eq!(&expected, &res) } + + #[test] + fn test_nullif_dict() { + let array = DictionaryArray::::from_iter(["a", "b", "c"]); + let array_ref = Arc::new(array) as ArrayRef; + let condition = BooleanArray::from(vec![true, false, true]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result + .as_any() + .downcast_ref::>() + .unwrap(); + let expected_keys = &[None, Some(1), None] + .iter() + .collect::>(); + let actual_keys = result.keys(); + + assert_eq!(expected_keys, actual_keys); + } + + #[test] + fn test_nullif_dict_sliced() { + let array = + DictionaryArray::::from_iter(["a", "b", "c", "b", "c", "b"]); + let array_ref = Arc::new(array) as ArrayRef; + let array_ref = array_ref.slice(3, 3); + let condition = BooleanArray::from(vec![true, true, false]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result + .as_any() + .downcast_ref::>() + .unwrap(); + let expected_keys = &[None, None, Some(1)] + .iter() + .collect::>(); + let actual_keys = result.keys(); + + assert_eq!(expected_keys, actual_keys); + } + + #[test] + fn test_nullif_dict_cond_sliced() { + let array = DictionaryArray::::from_iter(["a", "b", "c"]); + let array_ref = Arc::new(array) as ArrayRef; + let condition = BooleanArray::from(vec![true, false, true, true, true, false]); + let condition = condition.slice(3, 3); + let condition = condition.as_any().downcast_ref::().unwrap(); + + let result = nullif_alternative(&array_ref, condition).unwrap(); + let result = result + .as_any() + .downcast_ref::>() + .unwrap(); + let expected_keys = &[None, None, Some(2)] + .iter() + .collect::>(); + let actual_keys = result.keys(); + + assert_eq!(expected_keys, actual_keys); + } + + #[test] + fn test_nullif_list() { + let array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1.0), Some(2.0), Some(3.0)]), + None, + Some(vec![Some(4.0), None, Some(6.0)]), + Some(vec![]), + Some(vec![None, Some(8.0), None]), + ]); + let array_ref = Arc::new(array) as ArrayRef; + let condition = BooleanArray::from(vec![true, false, false, false, true]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + let expected = ListArray::from_iter_primitive::(vec![ + None, + None, + Some(vec![Some(4.0), None, Some(6.0)]), + Some(vec![]), + None, + ]); + let expected = &expected; + + // simple assert_eq on ListArrays does not work if their offsets are different + // see https://github.com/apache/arrow-rs/issues/626 + assert_eq!(expected.len(), result.len()); + for i in 0..expected.len() { + assert_eq!(expected.is_valid(i), result.is_valid(i)); + if expected.is_valid(i) { + assert_eq!(&expected.value(i), &result.value(i)); + } + } + } + + #[test] + fn test_nullif_list_sliced() { + let array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1.0), Some(2.0), Some(3.0)]), + None, + Some(vec![Some(4.0), None, Some(6.0)]), + Some(vec![]), + Some(vec![None, Some(8.0), None]), + ]); + let array_ref = array.slice(1, 4); + let condition = BooleanArray::from(vec![false, false, false, true]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + let expected = ListArray::from_iter_primitive::(vec![ + None, + Some(vec![Some(4.0), None, Some(6.0)]), + Some(vec![]), + None, + ]); + let expected = &expected; + + // simple assert_eq on ListArrays does not work if their offsets are different + // see https://github.com/apache/arrow-rs/issues/626 + assert_eq!(expected.len(), result.len()); + for i in 0..expected.len() { + assert_eq!(expected.is_valid(i), result.is_valid(i)); + if expected.is_valid(i) { + assert_eq!(&expected.value(i), &result.value(i)); + } + } + } } diff --git a/arrow/src/compute/util.rs b/arrow/src/compute/util.rs index 62c3be62f55a..e5643ad6094d 100644 --- a/arrow/src/compute/util.rs +++ b/arrow/src/compute/util.rs @@ -36,25 +36,36 @@ pub(super) fn combine_option_bitmap( let left_offset_in_bits = left_data.offset(); let right_offset_in_bits = right_data.offset(); - let left = left_data.null_buffer(); - let right = right_data.null_buffer(); - - match left { - None => match right { - None => Ok(None), - Some(r) => Ok(Some(r.bit_slice(right_offset_in_bits, len_in_bits))), - }, - Some(l) => match right { - None => Ok(Some(l.bit_slice(left_offset_in_bits, len_in_bits))), - - Some(r) => Ok(Some(buffer_bin_and( - l, - left_offset_in_bits, - r, - right_offset_in_bits, - len_in_bits, - ))), - }, + let left_null_buffer = left_data.null_buffer(); + let right_null_buffer = right_data.null_buffer(); + + Ok(combine_option_buffers( + left_null_buffer, + left_offset_in_bits, + right_null_buffer, + right_offset_in_bits, + len_in_bits, + )) +} + +pub(super) fn combine_option_buffers( + left_null_buffer: Option<&Buffer>, + left_offset_in_bits: usize, + right_null_buffer: Option<&Buffer>, + right_offset_in_bits: usize, + len_in_bits: usize, +) -> Option { + match (left_null_buffer, right_null_buffer) { + (None, None) => None, + (None, Some(r)) => Some(r.bit_slice(right_offset_in_bits, len_in_bits)), + (Some(l), None) => Some(l.bit_slice(left_offset_in_bits, len_in_bits)), + (Some(l), Some(r)) => Some(buffer_bin_and( + l, + left_offset_in_bits, + r, + right_offset_in_bits, + len_in_bits, + )), } } From 239a9a2471351de6b6db85407e189f39a5d6564b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Thu, 31 Mar 2022 14:56:59 +0200 Subject: [PATCH 2/6] Remove requirement that offsets start at zero so that the offset buffer can be sliced without copying --- arrow/src/array/array_list.rs | 64 +++++++++++++++------------- arrow/src/array/array_map.rs | 7 --- arrow/src/compute/kernels/boolean.rs | 28 +----------- 3 files changed, 37 insertions(+), 62 deletions(-) diff --git a/arrow/src/array/array_list.rs b/arrow/src/array/array_list.rs index e918683583bd..b8ecd49aff65 100644 --- a/arrow/src/array/array_list.rs +++ b/arrow/src/array/array_list.rs @@ -241,13 +241,6 @@ impl GenericListArray { let value_offsets = data.buffers()[0].as_ptr(); let value_offsets = unsafe { RawPtrBox::::new(value_offsets) }; - unsafe { - if !(*value_offsets.as_ptr().offset(0)).is_zero() { - return Err(ArrowError::InvalidArgumentError(String::from( - "offsets do not start at zero", - ))); - } - } Ok(Self { data, values, @@ -1027,6 +1020,41 @@ mod tests { assert_eq!(8, sliced_list_array.value_offset(3)); } + #[test] + fn test_list_array_offsets_need_not_start_at_zero() { + let value_data = ArrayData::builder(DataType::Int32) + .len(8) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) + .build() + .unwrap(); + + let value_offsets = Buffer::from_slice_ref(&[2, 2, 5, 7]); + + let list_data_type = + DataType::List(Box::new(Field::new("item", DataType::Int32, false))); + let list_data = ArrayData::builder(list_data_type) + .len(3) + .add_buffer(value_offsets) + .add_child_data(value_data) + .build() + .unwrap(); + + let list_array = make_array(list_data); + let list_array = list_array.as_any().downcast_ref::().unwrap(); + + assert_eq!(list_array.len(), 3); + + assert!(list_array.value(0).is_empty()); + assert_eq!( + list_array.value(1).as_ref(), + &PrimitiveArray::::from(vec![2, 3, 4]) + ); + assert_eq!( + list_array.value(2).as_ref(), + &PrimitiveArray::::from(vec![5, 6]) + ); + } + #[test] #[should_panic(expected = "assertion failed: (offset + length) <= self.len()")] fn test_fixed_size_list_array_index_out_of_bound() { @@ -1100,28 +1128,6 @@ mod tests { drop(ListArray::from(list_data)); } - #[test] - #[should_panic(expected = "offsets do not start at zero")] - fn test_list_array_invalid_value_offset_start() { - let value_data = ArrayData::builder(DataType::Int32) - .len(8) - .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) - .build() - .unwrap(); - - let value_offsets = Buffer::from_slice_ref(&[2, 2, 5, 7]); - - let list_data_type = - DataType::List(Box::new(Field::new("item", DataType::Int32, false))); - let list_data = ArrayData::builder(list_data_type) - .len(3) - .add_buffer(value_offsets) - .add_child_data(value_data) - .build() - .unwrap(); - drop(ListArray::from(list_data)); - } - #[test] #[should_panic(expected = "memory is not aligned")] fn test_primitive_array_alignment() { diff --git a/arrow/src/array/array_map.rs b/arrow/src/array/array_map.rs index 704c11149147..e13b5fcbb403 100644 --- a/arrow/src/array/array_map.rs +++ b/arrow/src/array/array_map.rs @@ -139,13 +139,6 @@ impl MapArray { let value_offsets = data.buffers()[0].as_ptr(); let value_offsets = unsafe { RawPtrBox::::new(value_offsets) }; - unsafe { - if (*value_offsets.as_ptr().offset(0)) != 0 { - return Err(ArrowError::InvalidArgumentError(String::from( - "offsets do not start at zero", - ))); - } - } Ok(Self { data, values, diff --git a/arrow/src/compute/kernels/boolean.rs b/arrow/src/compute/kernels/boolean.rs index 8145ad16bab8..04b5503953cf 100644 --- a/arrow/src/compute/kernels/boolean.rs +++ b/arrow/src/compute/kernels/boolean.rs @@ -648,35 +648,11 @@ fn slice_buffers( }, DataType::List(_) => { // safe because for List the first buffer is guaranteed to contain i32 offsets - let offsets = unsafe { &buffers[0].typed_data()[offset..] }; - let first_offset = offsets[0] as usize; - let last_offset = offsets[len] as usize; - let nested_len = last_offset - first_offset; - - // since we calculate a new offset buffer starting from 0 we also have to slice the child data - result_child_data = Some( - child_data - .iter() - .map(|d| d.slice(first_offset, nested_len)) - .collect(), - ); - vec![offset_buffer_slice::(offsets, len)] + vec![buffers[0].slice(offset * size_of::())] } DataType::LargeList(_) => { // safe because for LargeList the first buffer is guaranteed to contain i64 offsets - let offsets = unsafe { &buffers[0].typed_data()[offset..] }; - let first_offset = offsets[0] as usize; - let last_offset = offsets[len] as usize; - let nested_len = last_offset - first_offset; - // since we calculate a new offset buffer starting from 0 we also have to slice the child data - - result_child_data = Some( - child_data - .iter() - .map(|d| d.slice(first_offset, nested_len)) - .collect(), - ); - vec![offset_buffer_slice::(offsets, len)] + vec![buffers[0].slice(offset * size_of::())] } DataType::Binary | DataType::Utf8 => { // safe because for Binary/Utf8 the first buffer is guaranteed to contain i32 offsets From cbbfa77523b7b2631d229067289741ae7cabc0be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Thu, 31 Mar 2022 15:09:55 +0200 Subject: [PATCH 3/6] Also skip copying string offsets --- arrow/src/compute/kernels/boolean.rs | 67 +++++++++++++++------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/arrow/src/compute/kernels/boolean.rs b/arrow/src/compute/kernels/boolean.rs index 04b5503953cf..5493316aaa50 100644 --- a/arrow/src/compute/kernels/boolean.rs +++ b/arrow/src/compute/kernels/boolean.rs @@ -25,7 +25,7 @@ use std::ops::Not; use crate::array::{ - make_array, Array, ArrayData, ArrayRef, BooleanArray, OffsetSizeTrait, PrimitiveArray, + make_array, Array, ArrayData, ArrayRef, BooleanArray, PrimitiveArray, }; use crate::buffer::{ buffer_bin_and, buffer_bin_or, buffer_unary_not, Buffer, MutableBuffer, @@ -656,20 +656,16 @@ fn slice_buffers( } DataType::Binary | DataType::Utf8 => { // safe because for Binary/Utf8 the first buffer is guaranteed to contain i32 offsets - let offsets = unsafe { &buffers[0].typed_data()[offset..] }; - let first_offset = offsets[0] as usize; vec![ - offset_buffer_slice::(offsets, len), - buffers[1].slice(first_offset * size_of::()), + buffers[0].slice(offset * size_of::()), + buffers[1].clone(), ] } DataType::LargeBinary | DataType::LargeUtf8 => { // safe because for LargeBinary/LargeUtf8 the first buffer is guaranteed to contain i64 offsets - let offsets = unsafe { &buffers[0].typed_data()[offset..] }; - let first_offset = offsets[0] as usize; vec![ - offset_buffer_slice::(offsets, len), - buffers[1].slice(first_offset * size_of::()), + buffers[0].slice(offset * size_of::()), + buffers[1].clone(), ] } DataType::FixedSizeBinary(size) => { @@ -712,28 +708,6 @@ fn slice_buffers( ) } -// ListArray::from(ArrayData) expected offsets to always start at 0 so we can't simply make a slice of the buffer, -// but instead have to calculate a new buffer -fn offset_buffer_slice( - original_offsets: &[T], - array_len: usize, -) -> Buffer { - // offset buffer has 1 element more than the corresponding data buffer pointing yet another offset for the end of the buffer - let len_in_bytes = (array_len + 1) * std::mem::size_of::(); - let mut offset_buffer = - MutableBuffer::new(len_in_bytes).with_bitset(len_in_bytes, false); - let offset_slice = unsafe { offset_buffer.typed_data_mut::() }; - let offset_start = original_offsets[0]; - offset_slice - .iter_mut() - .zip(original_offsets.iter()) - .for_each(|(output, input)| { - *output = *input - offset_start; - }); - - offset_buffer.into() -} - pub fn nullif_alternative( array: &ArrayRef, condition: &BooleanArray, @@ -787,7 +761,7 @@ pub fn nullif_alternative( #[cfg(test)] mod tests { use super::*; - use crate::array::{ArrayRef, DictionaryArray, Int32Array, ListArray}; + use crate::array::{ArrayRef, DictionaryArray, Int32Array, ListArray, StringArray}; use crate::datatypes::{Float64Type, Int32Type}; use std::sync::Arc; @@ -1528,4 +1502,33 @@ mod tests { } } } + + #[test] + fn test_nullif_strings() { + let array = StringArray::from(vec!["abc", "", "def", "gh", "ijkl"]); + let array_ref = Arc::new(array) as ArrayRef; + let condition = BooleanArray::from(vec![true, false, false, false, true]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + let expected = + StringArray::from(vec![None, Some(""), Some("def"), Some("gh"), None]); + + assert_eq!(result, &expected); + } + + #[test] + fn test_nullif_strings_sliced() { + let array = StringArray::from(vec!["abc", "", "def", "gh", "ijkl"]); + let array_ref = array.slice(1, 4); + let condition = BooleanArray::from(vec![false, false, false, true]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + let expected = StringArray::from(vec![Some(""), Some("def"), Some("gh"), None]); + + assert_eq!(result, &expected); + } } From bf41041341afa714790d1b9a29a207d1a666fc5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Thu, 31 Mar 2022 16:21:20 +0200 Subject: [PATCH 4/6] Implement nullif and slicing for FixedSizeList --- arrow/src/compute/kernels/boolean.rs | 130 +++++++++++++++++++++++++-- 1 file changed, 124 insertions(+), 6 deletions(-) diff --git a/arrow/src/compute/kernels/boolean.rs b/arrow/src/compute/kernels/boolean.rs index 5493316aaa50..dd7a16ee34eb 100644 --- a/arrow/src/compute/kernels/boolean.rs +++ b/arrow/src/compute/kernels/boolean.rs @@ -579,8 +579,7 @@ where /// Creates a (mostly) zero-copy slice of the given buffers so that they can be combined /// in the same array with other buffers that start at offset 0. -/// The only buffers that need an actual copy are booleans (if they are not byte-aligned) -/// and list/binary/string offsets because the arrow implementation requires them to start at 0. +/// The only buffers that need an actual copy are booleans and validity (if they are not byte-aligned). /// This is useful when a kernel calculates a new validity bitmap but wants to reuse other buffers. fn slice_buffers( buffers: &[Buffer], @@ -671,8 +670,19 @@ fn slice_buffers( DataType::FixedSizeBinary(size) => { vec![buffers[0].slice(offset * (*size as usize))] } - DataType::FixedSizeList(_, _size) => { - // TODO: should this actually slice the child arrays? + DataType::FixedSizeList(_field, size) => { + result_child_data = Some( + child_data + .iter() + .map(|d| { + slice_array_data( + d, + offset * (*size as usize), + len * (*size as usize), + ) + }) + .collect(), + ); vec![] } DataType::Struct(_) => { @@ -708,6 +718,36 @@ fn slice_buffers( ) } +fn slice_array_data(data: &ArrayData, offset: usize, len: usize) -> ArrayData { + assert!(offset + len < data.len()); + + if offset == 0 { + return data.clone(); + } + + let result_valid_buffer = data.null_buffer().map(|b| b.bit_slice(offset, len)); + + let (result_data_buffers, result_child_data) = slice_buffers( + data.buffers(), + data.offset() + offset, + len, + data.data_type(), + data.child_data(), + ); + + unsafe { + ArrayData::new_unchecked( + data.data_type().clone(), + len, + None, + result_valid_buffer, + 0, + result_data_buffers, + result_child_data, + ) + } +} + pub fn nullif_alternative( array: &ArrayRef, condition: &BooleanArray, @@ -761,8 +801,11 @@ pub fn nullif_alternative( #[cfg(test)] mod tests { use super::*; - use crate::array::{ArrayRef, DictionaryArray, Int32Array, ListArray, StringArray}; - use crate::datatypes::{Float64Type, Int32Type}; + use crate::array::{ + ArrayRef, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray, Int32Array, + ListArray, StringArray, + }; + use crate::datatypes::{Field, Float64Type, Int16Type, Int32Type}; use std::sync::Arc; #[test] @@ -1531,4 +1574,79 @@ mod tests { assert_eq!(result, &expected); } + + #[test] + fn test_nullif_fixed_size_binary_sliced() { + let array = FixedSizeBinaryArray::try_from_iter( + vec![b"abc", b"def", b"ghi", b"jkl"].into_iter(), + ) + .unwrap(); + let array_ref = array.slice(1, 2); + let condition = BooleanArray::from(vec![false, false]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result + .as_any() + .downcast_ref::() + .unwrap(); + + let expected = FixedSizeBinaryArray::try_from_sparse_iter( + vec![Some(b"def"), Some(b"ghi")].into_iter(), + ) + .unwrap(); + + assert_eq!(result, &expected); + } + + #[test] + fn test_nullif_fixed_size_list_sliced() { + let primitives = PrimitiveArray::::from(vec![ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, + ]); + let fixed_size_array = make_array( + ArrayData::try_new( + DataType::FixedSizeList( + Box::new(Field::new("item", primitives.data_type().clone(), true)), + 3, + ), + 5, + None, + None, + 0, + vec![], + vec![primitives.data().clone()], + ) + .unwrap(), + ); + let array_ref = fixed_size_array.slice(1, 3); + let condition = BooleanArray::from(vec![false, true, false]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result + .as_any() + .downcast_ref::() + .unwrap(); + + let expected = make_array( + ArrayData::try_new( + DataType::FixedSizeList( + Box::new(Field::new("item", primitives.data_type().clone(), true)), + 3, + ), + 3, + None, + Some(MutableBuffer::from_iter(vec![true, false, true]).into()), + 0, + vec![], + vec![primitives.slice(3, 9).data().clone()], + ) + .unwrap(), + ); + let expected = expected + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(result, expected); + } } From d67842e433a1dc4f85e414ef637c4b925109979e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Thu, 31 Mar 2022 19:36:04 +0200 Subject: [PATCH 5/6] Implement nullif and slicing StructArray --- arrow/src/compute/kernels/boolean.rs | 85 ++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 12 deletions(-) diff --git a/arrow/src/compute/kernels/boolean.rs b/arrow/src/compute/kernels/boolean.rs index dd7a16ee34eb..2dab3c549a0d 100644 --- a/arrow/src/compute/kernels/boolean.rs +++ b/arrow/src/compute/kernels/boolean.rs @@ -686,7 +686,12 @@ fn slice_buffers( vec![] } DataType::Struct(_) => { - // TODO: should this actually slice the child arrays? + result_child_data = Some( + child_data + .iter() + .map(|d| slice_array_data(d, offset, len)) + .collect(), + ); vec![] } DataType::Union(..) => { @@ -719,7 +724,8 @@ fn slice_buffers( } fn slice_array_data(data: &ArrayData, offset: usize, len: usize) -> ArrayData { - assert!(offset + len < data.len()); + assert!(offset >= data.offset()); + assert!((offset - data.offset()) + len <= data.len()); if offset == 0 { return data.clone(); @@ -729,7 +735,7 @@ fn slice_array_data(data: &ArrayData, offset: usize, len: usize) -> ArrayData { let (result_data_buffers, result_child_data) = slice_buffers( data.buffers(), - data.offset() + offset, + offset, len, data.data_type(), data.child_data(), @@ -802,10 +808,10 @@ pub fn nullif_alternative( mod tests { use super::*; use crate::array::{ - ArrayRef, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray, Int32Array, - ListArray, StringArray, + ArrayRef, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray, Int16Array, + Int32Array, ListArray, StringArray, StructArray, }; - use crate::datatypes::{Field, Float64Type, Int16Type, Int32Type}; + use crate::datatypes::{Field, Float64Type, Int32Type}; use std::sync::Arc; #[test] @@ -1427,9 +1433,7 @@ mod tests { .as_any() .downcast_ref::>() .unwrap(); - let expected_keys = &[None, Some(1), None] - .iter() - .collect::>(); + let expected_keys = &[None, Some(1), None].iter().collect::(); let actual_keys = result.keys(); assert_eq!(expected_keys, actual_keys); @@ -1600,9 +1604,8 @@ mod tests { #[test] fn test_nullif_fixed_size_list_sliced() { - let primitives = PrimitiveArray::::from(vec![ - 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, - ]); + let primitives = + Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]); let fixed_size_array = make_array( ArrayData::try_new( DataType::FixedSizeList( @@ -1649,4 +1652,62 @@ mod tests { assert_eq!(result, expected); } + + #[test] + fn test_nullif_struct_sliced() { + let array = StructArray::try_from(vec![ + ( + "a", + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8])) as ArrayRef, + ), + ( + "b", + Arc::new(StringArray::from(vec![ + "abc", "de", "dghi", "jk", "lm", "nop", "q", "rstu", + ])), + ), + ]) + .unwrap(); + let array_ref = array.slice(2, 4); + + let condition = BooleanArray::from(vec![false, true, false, false]); + + let result = nullif_alternative(&array_ref, &condition).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + let expected = StructArray::try_from(vec![ + ( + "a", + Arc::new(Int32Array::from(vec![Some(3), None, Some(5), Some(6)])) + as ArrayRef, + ), + ( + "b", + Arc::new(StringArray::from(vec![ + Some("dghi"), + None, + Some("lm"), + Some("nop"), + ])), + ), + ]) + .unwrap(); + + // StructArray comparison does not seem to respect validity bitmap of the struct itself + // assert_eq!(result, &expected); + + assert!(result.is_valid(0)); + assert!(!result.is_valid(1)); + assert!(result.is_valid(2)); + assert!(result.is_valid(3)); + + assert_eq!( + result.column(0), + &(Arc::new(Int32Array::from(vec![3, 4, 5, 6])) as ArrayRef) + ); + assert_eq!( + result.column(1), + &(Arc::new(StringArray::from(vec!["dghi", "jk", "lm", "nop"])) as ArrayRef) + ); + } } From c22862d709c0d23e03496b0875385fc952c17299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Thu, 31 Mar 2022 19:39:15 +0200 Subject: [PATCH 6/6] Remove unused var in test --- arrow/src/compute/kernels/boolean.rs | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/arrow/src/compute/kernels/boolean.rs b/arrow/src/compute/kernels/boolean.rs index 2dab3c549a0d..5327681c991b 100644 --- a/arrow/src/compute/kernels/boolean.rs +++ b/arrow/src/compute/kernels/boolean.rs @@ -1675,26 +1675,8 @@ mod tests { let result = nullif_alternative(&array_ref, &condition).unwrap(); let result = result.as_any().downcast_ref::().unwrap(); - let expected = StructArray::try_from(vec![ - ( - "a", - Arc::new(Int32Array::from(vec![Some(3), None, Some(5), Some(6)])) - as ArrayRef, - ), - ( - "b", - Arc::new(StringArray::from(vec![ - Some("dghi"), - None, - Some("lm"), - Some("nop"), - ])), - ), - ]) - .unwrap(); - // StructArray comparison does not seem to respect validity bitmap of the struct itself - // assert_eq!(result, &expected); + // so we need to compare the fields separately assert!(result.is_valid(0)); assert!(!result.is_valid(1));