Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions arrow/src/array/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ pub fn as_large_list_array(arr: &ArrayRef) -> &LargeListArray {
as_generic_list_array::<i64>(arr)
}

#[doc = "Force downcast ArrayRef to GenericBinaryArray"]
#[inline]
pub fn as_generic_binary_array<S: BinaryOffsetSizeTrait>(
arr: &ArrayRef,
) -> &GenericBinaryArray<S> {
arr.as_any()
.downcast_ref::<GenericBinaryArray<S>>()
.expect("Unable to downcast to binary array")
}

macro_rules! array_downcast_fn {
($name: ident, $arrty: ty, $arrty_str:expr) => {
#[doc = "Force downcast ArrayRef to "]
Expand Down
6 changes: 3 additions & 3 deletions arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ pub use self::ord::{build_compare, DynComparator};
// --------------------- Array downcast helper functions ---------------------

pub use self::cast::{
as_boolean_array, as_dictionary_array, as_generic_list_array, as_large_list_array,
as_largestring_array, as_list_array, as_null_array, as_primitive_array,
as_string_array, as_struct_array,
as_boolean_array, as_dictionary_array, as_generic_binary_array,
as_generic_list_array, as_large_list_array, as_largestring_array, as_list_array,
as_null_array, as_primitive_array, as_string_array, as_struct_array,
};

// ------------------------------ C Data Interface ---------------------------
Expand Down
317 changes: 317 additions & 0 deletions arrow/src/compute/kernels/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ pub fn sort_to_indices(
}
}
}
DataType::Binary | DataType::FixedSizeBinary(_) => {
sort_binary::<i32>(values, v, n, &options, limit)
}
DataType::LargeBinary => sort_binary::<i64>(values, v, n, &options, limit),
t => {
return Err(ArrowError::ComputeError(format!(
"Sort not supported for data type {:?}",
Expand Down Expand Up @@ -764,6 +768,67 @@ where
}
}

fn sort_binary<S>(
values: &ArrayRef,
value_indices: Vec<u32>,
mut null_indices: Vec<u32>,
options: &SortOptions,
limit: Option<usize>,
) -> UInt32Array
where
S: BinaryOffsetSizeTrait,
{
let mut valids: Vec<(u32, &[u8])> = values
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.map_or_else(
|| {
let values = as_generic_binary_array::<S>(values);
value_indices
.iter()
.copied()
.map(|index| (index, values.value(index as usize)))
.collect()
},
|values| {
value_indices
.iter()
.copied()
.map(|index| (index, values.value(index as usize)))
.collect()
},
);

let mut len = values.len();
let descending = options.descending;
let nulls_len = null_indices.len();

if let Some(limit) = limit {
len = limit.min(len);
}
if !descending {
sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| {
a.1.cmp(b.1)
});
} else {
sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| {
a.1.cmp(b.1).reverse()
});
null_indices.reverse();
}

let mut valid_indices: Vec<u32> = valids.iter().map(|tuple| tuple.0).collect();
if options.nulls_first {
null_indices.append(&mut valid_indices);
null_indices.truncate(len);
UInt32Array::from(null_indices)
} else {
valid_indices.append(&mut null_indices);
valid_indices.truncate(len);
UInt32Array::from(valid_indices)
}
}

/// Compare two `Array`s based on the ordering defined in [ord](crate::array::ord).
fn cmp_array(a: &dyn Array, b: &dyn Array) -> Ordering {
let cmp_op = build_compare(a, b).unwrap();
Expand Down Expand Up @@ -1183,6 +1248,60 @@ mod tests {
}
}

fn test_sort_binary_arrays(
data: Vec<Option<Vec<u8>>>,
options: Option<SortOptions>,
limit: Option<usize>,
expected_data: Vec<Option<Vec<u8>>>,
fixed_length: Option<i32>,
) {
// Fixed size binary array
if fixed_length.is_some() {
let input = Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter(data.iter().cloned()).unwrap(),
);
let sorted = match limit {
Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(),
None => sort(&(input as ArrayRef), options).unwrap(),
};
let expected = Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter(expected_data.iter().cloned())
.unwrap(),
) as ArrayRef;

assert_eq!(&sorted, &expected);
}

// Generic size binary array
fn make_generic_binary_array<S: BinaryOffsetSizeTrait>(
data: &[Option<Vec<u8>>],
) -> Arc<GenericBinaryArray<S>> {
Arc::new(GenericBinaryArray::<S>::from_opt_vec(
data.iter()
.map(|binary| binary.as_ref().map(Vec::as_slice))
.collect(),
))
}

// BinaryArray
let input = make_generic_binary_array::<i32>(&data);
let sorted = match limit {
Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(),
None => sort(&(input as ArrayRef), options).unwrap(),
};
let expected = make_generic_binary_array::<i32>(&expected_data) as ArrayRef;
assert_eq!(&sorted, &expected);

// LargeBinaryArray
let input = make_generic_binary_array::<i64>(&data);
let sorted = match limit {
Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(),
None => sort(&(input as ArrayRef), options).unwrap(),
};
let expected = make_generic_binary_array::<i64>(&expected_data) as ArrayRef;
assert_eq!(&sorted, &expected);
}

#[test]
fn test_sort_to_indices_primitives() {
test_sort_to_indices_primitive_arrays::<Int8Type>(
Expand Down Expand Up @@ -2382,6 +2501,204 @@ mod tests {
);
}

#[test]
fn test_sort_binary() {
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
Some(vec![0, 0, 5]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
],
Some(SortOptions {
descending: false,
nulls_first: false,
}),
None,
vec![
Some(vec![0, 0, 0]),
Some(vec![0, 0, 1]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 5]),
Some(vec![0, 0, 7]),
],
Some(3),
);

// with nulls
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
None,
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
None,
],
Some(SortOptions {
descending: false,
nulls_first: false,
}),
None,
vec![
Some(vec![0, 0, 0]),
Some(vec![0, 0, 1]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
None,
None,
],
Some(3),
);

test_sort_binary_arrays(
vec![
Some(vec![3, 5, 7]),
None,
Some(vec![1, 7, 1]),
Some(vec![2, 7, 3]),
None,
Some(vec![1, 4, 3]),
],
Some(SortOptions {
descending: false,
nulls_first: false,
}),
None,
vec![
Some(vec![1, 4, 3]),
Some(vec![1, 7, 1]),
Some(vec![2, 7, 3]),
Some(vec![3, 5, 7]),
None,
None,
],
Some(3),
);

// descending
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
None,
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
None,
],
Some(SortOptions {
descending: true,
nulls_first: false,
}),
None,
vec![
Some(vec![0, 0, 7]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 1]),
Some(vec![0, 0, 0]),
None,
None,
],
Some(3),
);

// nulls first
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
None,
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
None,
],
Some(SortOptions {
descending: false,
nulls_first: true,
}),
None,
vec![
None,
None,
Some(vec![0, 0, 0]),
Some(vec![0, 0, 1]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
],
Some(3),
);

// limit
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
None,
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
None,
],
Some(SortOptions {
descending: false,
nulls_first: true,
}),
Some(4),
vec![None, None, Some(vec![0, 0, 0]), Some(vec![0, 0, 1])],
Some(3),
);

// var length
test_sort_binary_arrays(
vec![
Some(b"Hello".to_vec()),
None,
Some(b"from".to_vec()),
Some(b"Apache".to_vec()),
Some(b"Arrow-rs".to_vec()),
None,
],
Some(SortOptions {
descending: false,
nulls_first: false,
}),
None,
vec![
Some(b"Apache".to_vec()),
Some(b"Arrow-rs".to_vec()),
Some(b"Hello".to_vec()),
Some(b"from".to_vec()),
None,
None,
],
None,
);

// limit
test_sort_binary_arrays(
vec![
Some(b"Hello".to_vec()),
None,
Some(b"from".to_vec()),
Some(b"Apache".to_vec()),
Some(b"Arrow-rs".to_vec()),
None,
],
Some(SortOptions {
descending: false,
nulls_first: true,
}),
Some(4),
vec![
None,
None,
Some(b"Apache".to_vec()),
Some(b"Arrow-rs".to_vec()),
],
None,
);
}

#[test]
fn test_lex_sort_single_column() {
let input = vec![SortColumn {
Expand Down
Loading