diff --git a/arrow/src/array/cast.rs b/arrow/src/array/cast.rs index 0477f2831f9a..dfc15608555f 100644 --- a/arrow/src/array/cast.rs +++ b/arrow/src/array/cast.rs @@ -59,6 +59,16 @@ pub fn as_large_list_array(arr: &ArrayRef) -> &LargeListArray { as_generic_list_array::(arr) } +#[doc = "Force downcast ArrayRef to GenericBinaryArray"] +#[inline] +pub fn as_generic_binary_array( + arr: &ArrayRef, +) -> &GenericBinaryArray { + arr.as_any() + .downcast_ref::>() + .expect("Unable to downcast to binary array") +} + macro_rules! array_downcast_fn { ($name: ident, $arrty: ty, $arrty_str:expr) => { #[doc = "Force downcast ArrayRef to "] diff --git a/arrow/src/array/mod.rs b/arrow/src/array/mod.rs index 6a0b94a55515..69b65f41ad5a 100644 --- a/arrow/src/array/mod.rs +++ b/arrow/src/array/mod.rs @@ -273,9 +273,9 @@ pub use self::ord::{build_compare, DynComparator}; // --------------------- Array downcast helper functions --------------------- pub use self::cast::{ - as_boolean_array, as_dictionary_array, as_generic_list_array, as_large_list_array, - as_largestring_array, as_list_array, as_null_array, as_primitive_array, - as_string_array, as_struct_array, + as_boolean_array, as_dictionary_array, as_generic_binary_array, + as_generic_list_array, as_large_list_array, as_largestring_array, as_list_array, + as_null_array, as_primitive_array, as_string_array, as_struct_array, }; // ------------------------------ C Data Interface --------------------------- diff --git a/arrow/src/compute/kernels/sort.rs b/arrow/src/compute/kernels/sort.rs index 008661ec0104..95afc14549e9 100644 --- a/arrow/src/compute/kernels/sort.rs +++ b/arrow/src/compute/kernels/sort.rs @@ -383,6 +383,10 @@ pub fn sort_to_indices( } } } + DataType::Binary | DataType::FixedSizeBinary(_) => { + sort_binary::(values, v, n, &options, limit) + } + DataType::LargeBinary => sort_binary::(values, v, n, &options, limit), t => { return Err(ArrowError::ComputeError(format!( "Sort not supported for data type {:?}", @@ -764,6 +768,67 @@ where } } +fn sort_binary( + values: &ArrayRef, + value_indices: Vec, + mut null_indices: Vec, + options: &SortOptions, + limit: Option, +) -> UInt32Array +where + S: BinaryOffsetSizeTrait, +{ + let mut valids: Vec<(u32, &[u8])> = values + .as_any() + .downcast_ref::() + .map_or_else( + || { + let values = as_generic_binary_array::(values); + value_indices + .iter() + .copied() + .map(|index| (index, values.value(index as usize))) + .collect() + }, + |values| { + value_indices + .iter() + .copied() + .map(|index| (index, values.value(index as usize))) + .collect() + }, + ); + + let mut len = values.len(); + let descending = options.descending; + let nulls_len = null_indices.len(); + + if let Some(limit) = limit { + len = limit.min(len); + } + if !descending { + sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| { + a.1.cmp(b.1) + }); + } else { + sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| { + a.1.cmp(b.1).reverse() + }); + null_indices.reverse(); + } + + let mut valid_indices: Vec = valids.iter().map(|tuple| tuple.0).collect(); + if options.nulls_first { + null_indices.append(&mut valid_indices); + null_indices.truncate(len); + UInt32Array::from(null_indices) + } else { + valid_indices.append(&mut null_indices); + valid_indices.truncate(len); + UInt32Array::from(valid_indices) + } +} + /// Compare two `Array`s based on the ordering defined in [ord](crate::array::ord). fn cmp_array(a: &dyn Array, b: &dyn Array) -> Ordering { let cmp_op = build_compare(a, b).unwrap(); @@ -1183,6 +1248,60 @@ mod tests { } } + fn test_sort_binary_arrays( + data: Vec>>, + options: Option, + limit: Option, + expected_data: Vec>>, + fixed_length: Option, + ) { + // Fixed size binary array + if fixed_length.is_some() { + let input = Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter(data.iter().cloned()).unwrap(), + ); + let sorted = match limit { + Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(), + None => sort(&(input as ArrayRef), options).unwrap(), + }; + let expected = Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter(expected_data.iter().cloned()) + .unwrap(), + ) as ArrayRef; + + assert_eq!(&sorted, &expected); + } + + // Generic size binary array + fn make_generic_binary_array( + data: &[Option>], + ) -> Arc> { + Arc::new(GenericBinaryArray::::from_opt_vec( + data.iter() + .map(|binary| binary.as_ref().map(Vec::as_slice)) + .collect(), + )) + } + + // BinaryArray + let input = make_generic_binary_array::(&data); + let sorted = match limit { + Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(), + None => sort(&(input as ArrayRef), options).unwrap(), + }; + let expected = make_generic_binary_array::(&expected_data) as ArrayRef; + assert_eq!(&sorted, &expected); + + // LargeBinaryArray + let input = make_generic_binary_array::(&data); + let sorted = match limit { + Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(), + None => sort(&(input as ArrayRef), options).unwrap(), + }; + let expected = make_generic_binary_array::(&expected_data) as ArrayRef; + assert_eq!(&sorted, &expected); + } + #[test] fn test_sort_to_indices_primitives() { test_sort_to_indices_primitive_arrays::( @@ -2382,6 +2501,204 @@ mod tests { ); } + #[test] + fn test_sort_binary() { + test_sort_binary_arrays( + vec![ + Some(vec![0, 0, 0]), + Some(vec![0, 0, 5]), + Some(vec![0, 0, 3]), + Some(vec![0, 0, 7]), + Some(vec![0, 0, 1]), + ], + Some(SortOptions { + descending: false, + nulls_first: false, + }), + None, + vec![ + Some(vec![0, 0, 0]), + Some(vec![0, 0, 1]), + Some(vec![0, 0, 3]), + Some(vec![0, 0, 5]), + Some(vec![0, 0, 7]), + ], + Some(3), + ); + + // with nulls + test_sort_binary_arrays( + vec![ + Some(vec![0, 0, 0]), + None, + Some(vec![0, 0, 3]), + Some(vec![0, 0, 7]), + Some(vec![0, 0, 1]), + None, + ], + Some(SortOptions { + descending: false, + nulls_first: false, + }), + None, + vec![ + Some(vec![0, 0, 0]), + Some(vec![0, 0, 1]), + Some(vec![0, 0, 3]), + Some(vec![0, 0, 7]), + None, + None, + ], + Some(3), + ); + + test_sort_binary_arrays( + vec![ + Some(vec![3, 5, 7]), + None, + Some(vec![1, 7, 1]), + Some(vec![2, 7, 3]), + None, + Some(vec![1, 4, 3]), + ], + Some(SortOptions { + descending: false, + nulls_first: false, + }), + None, + vec![ + Some(vec![1, 4, 3]), + Some(vec![1, 7, 1]), + Some(vec![2, 7, 3]), + Some(vec![3, 5, 7]), + None, + None, + ], + Some(3), + ); + + // descending + test_sort_binary_arrays( + vec![ + Some(vec![0, 0, 0]), + None, + Some(vec![0, 0, 3]), + Some(vec![0, 0, 7]), + Some(vec![0, 0, 1]), + None, + ], + Some(SortOptions { + descending: true, + nulls_first: false, + }), + None, + vec![ + Some(vec![0, 0, 7]), + Some(vec![0, 0, 3]), + Some(vec![0, 0, 1]), + Some(vec![0, 0, 0]), + None, + None, + ], + Some(3), + ); + + // nulls first + test_sort_binary_arrays( + vec![ + Some(vec![0, 0, 0]), + None, + Some(vec![0, 0, 3]), + Some(vec![0, 0, 7]), + Some(vec![0, 0, 1]), + None, + ], + Some(SortOptions { + descending: false, + nulls_first: true, + }), + None, + vec![ + None, + None, + Some(vec![0, 0, 0]), + Some(vec![0, 0, 1]), + Some(vec![0, 0, 3]), + Some(vec![0, 0, 7]), + ], + Some(3), + ); + + // limit + test_sort_binary_arrays( + vec![ + Some(vec![0, 0, 0]), + None, + Some(vec![0, 0, 3]), + Some(vec![0, 0, 7]), + Some(vec![0, 0, 1]), + None, + ], + Some(SortOptions { + descending: false, + nulls_first: true, + }), + Some(4), + vec![None, None, Some(vec![0, 0, 0]), Some(vec![0, 0, 1])], + Some(3), + ); + + // var length + test_sort_binary_arrays( + vec![ + Some(b"Hello".to_vec()), + None, + Some(b"from".to_vec()), + Some(b"Apache".to_vec()), + Some(b"Arrow-rs".to_vec()), + None, + ], + Some(SortOptions { + descending: false, + nulls_first: false, + }), + None, + vec![ + Some(b"Apache".to_vec()), + Some(b"Arrow-rs".to_vec()), + Some(b"Hello".to_vec()), + Some(b"from".to_vec()), + None, + None, + ], + None, + ); + + // limit + test_sort_binary_arrays( + vec![ + Some(b"Hello".to_vec()), + None, + Some(b"from".to_vec()), + Some(b"Apache".to_vec()), + Some(b"Arrow-rs".to_vec()), + None, + ], + Some(SortOptions { + descending: false, + nulls_first: true, + }), + Some(4), + vec![ + None, + None, + Some(b"Apache".to_vec()), + Some(b"Arrow-rs".to_vec()), + ], + None, + ); + } + #[test] fn test_lex_sort_single_column() { let input = vec![SortColumn { diff --git a/arrow/src/compute/kernels/take.rs b/arrow/src/compute/kernels/take.rs index f04208a689e6..75c8f766888f 100644 --- a/arrow/src/compute/kernels/take.rs +++ b/arrow/src/compute/kernels/take.rs @@ -259,6 +259,27 @@ where DataType::UInt64 => downcast_dict_take!(UInt64Type, values, indices), t => unimplemented!("Take not supported for dictionary key type {:?}", t), }, + DataType::Binary => { + let values = values + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(Arc::new(take_binary(values, indices)?)) + } + DataType::LargeBinary => { + let values = values + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(Arc::new(take_binary(values, indices)?)) + } + DataType::FixedSizeBinary(_) => { + let values = values + .as_any() + .downcast_ref::() + .unwrap(); + Ok(Arc::new(take_fixed_size_binary(values, indices)?)) + } t => unimplemented!("Take not supported for data type {:?}", t), } } @@ -760,6 +781,59 @@ where Ok(FixedSizeListArray::from(list_data)) } +fn take_binary( + values: &GenericBinaryArray, + indices: &PrimitiveArray, +) -> Result> +where + OffsetType: BinaryOffsetSizeTrait, + IndexType: ArrowNumericType, + IndexType::Native: ToPrimitive, +{ + let data_ref = values.data_ref(); + let array_iter = indices + .values() + .iter() + .map(|idx| { + let idx = maybe_usize::(*idx)?; + if data_ref.is_valid(idx) { + Ok(Some(values.value(idx))) + } else { + Ok(None) + } + }) + .collect::>>()? + .into_iter(); + + Ok(array_iter.collect::>()) +} + +fn take_fixed_size_binary( + values: &FixedSizeBinaryArray, + indices: &PrimitiveArray, +) -> Result +where + IndexType: ArrowNumericType, + IndexType::Native: ToPrimitive, +{ + let data_ref = values.data_ref(); + let array_iter = indices + .values() + .iter() + .map(|idx| { + let idx = maybe_usize::(*idx)?; + if data_ref.is_valid(idx) { + Ok(Some(values.value(idx))) + } else { + Ok(None) + } + }) + .collect::>>()? + .into_iter(); + + FixedSizeBinaryArray::try_from_sparse_iter(array_iter) +} + /// `take` implementation for dictionary arrays /// /// applies `take` to the keys of the dictionary array and returns a new dictionary array