Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 11 additions & 76 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,88 +177,23 @@ impl<R: RunEndIndexType> RunArray<R> {
self.run_ends.get_physical_index(logical_index)
}

/// Given the input `logical_indices`, return the corresponding physical index
/// for each, according to the underlying [`RunEndBuffer`], taking into account
/// any slicing that has occurred.
/// Returns the physical indices corresponding to the provided logical indices.
///
/// Returns an error if any of the provided logical indices is out of range.
///
/// # Implementation
///
/// The logical indices are sorted and iterated along with the `run_ends` buffer
/// to find the matching physical index. The approach used here was chosen over
/// finding the physical index for each logical index using binary search via
/// the function [`RunEndBuffer::get_physical_index`].
///
/// Running benchmarks on both approaches showed that the approach used here
/// scaled well for larger inputs.
///
/// See <https://github.com/apache/arrow-rs/pull/3622#issuecomment-1407753727> for more details.
// TODO: this technically should be a method on RunEndBuffer
/// See [`RunEndBuffer::get_physical_indices`] for more details.
#[inline]
pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, ArrowError>
where
I: ArrowNativeType,
{
let len = self.run_ends().len();
let offset = self.run_ends().offset();

let indices_len = logical_indices.len();

if indices_len == 0 {
return Ok(vec![]);
}

// `ordered_indices` store index into `logical_indices` and can be used
// to iterate `logical_indices` in sorted order.
let mut ordered_indices: Vec<usize> = (0..indices_len).collect();

// Instead of sorting `logical_indices` directly, sort the `ordered_indices`
// whose values are index of `logical_indices`
ordered_indices.sort_unstable_by(|lhs, rhs| {
logical_indices[*lhs]
.partial_cmp(&logical_indices[*rhs])
.unwrap()
});

// Return early if all the logical indices cannot be converted to physical indices.
let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
if largest_logical_index >= len {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.",
)));
}

// Skip some physical indices based on offset.
let skip_value = self.get_start_physical_index();

let mut physical_indices = vec![0; indices_len];

let mut ordered_index = 0_usize;
for (physical_index, run_end) in self.run_ends.values().iter().enumerate().skip(skip_value)
{
// Get the run end index (relative to offset) of current physical index
let run_end_value = run_end.as_usize() - offset;

// All the `logical_indices` that are less than current run end index
// belongs to current physical index.
while ordered_index < indices_len
&& logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
{
physical_indices[ordered_indices[ordered_index]] = physical_index;
ordered_index += 1;
}
}

// If there are input values >= run_ends.last_value then we'll not be able to convert
// all logical indices to physical indices.
if ordered_index < logical_indices.len() {
let logical_index = logical_indices[ordered_indices[ordered_index]].as_usize();
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot convert all logical indices to physical indices. The logical index cannot be converted is {logical_index}.",
)));
}
Ok(physical_indices)
self.run_ends()
.get_physical_indices(logical_indices)
.map_err(|index| {
ArrowError::InvalidArgumentError(format!(
"Logical index {} is out of bounds for RunArray of length {}",
index.as_usize(),
self.len()
))
})
}

/// Returns a zero-copy slice of this array with the indicated offset and length.
Expand Down
77 changes: 77 additions & 0 deletions arrow-buffer/src/buffer/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,83 @@ where
pub fn into_inner(self) -> ScalarBuffer<E> {
self.run_ends
}

/// Returns the physical indices corresponding to the provided logical indices.
///
/// Given a slice of logical indices, this method returns a `Vec` containing the
/// corresponding physical indices into the run-ends buffer.
///
/// This method operates by iterating the logical indices in sorted order, instead of
/// finding the physical index for each logical index using binary search via
/// the function [`RunEndBuffer::get_physical_index`].
///
/// Running benchmarks on both approaches showed that the approach used here
/// scaled well for larger inputs.
///
/// See <https://github.com/apache/arrow-rs/pull/3622#issuecomment-1407753727> for more details.
///
/// # Errors
///
/// If any logical index is out of bounds (>= self.len()), returns an error containing the invalid index.
#[inline]
pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, I>
where
I: ArrowNativeType,
{
let len = self.len();
let offset = self.offset();

let indices_len = logical_indices.len();

if indices_len == 0 {
return Ok(vec![]);
}

// `ordered_indices` store index into `logical_indices` and can be used
// to iterate `logical_indices` in sorted order.
let mut ordered_indices: Vec<usize> = (0..indices_len).collect();

// Instead of sorting `logical_indices` directly, sort the `ordered_indices`
// whose values are index of `logical_indices`
ordered_indices.sort_unstable_by(|lhs, rhs| {
logical_indices[*lhs]
.partial_cmp(&logical_indices[*rhs])
.unwrap()
});

// Return early if all the logical indices cannot be converted to physical indices.
let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
if largest_logical_index >= len {
return Err(logical_indices[*ordered_indices.last().unwrap()]);
}

// Skip some physical indices based on offset.
let skip_value = self.get_start_physical_index();

let mut physical_indices = vec![0; indices_len];

let mut ordered_index = 0_usize;
for (physical_index, run_end) in self.values().iter().enumerate().skip(skip_value) {
// Get the run end index (relative to offset) of current physical index
let run_end_value = run_end.as_usize() - offset;

// All the `logical_indices` that are less than current run end index
// belongs to current physical index.
while ordered_index < indices_len
&& logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
{
physical_indices[ordered_indices[ordered_index]] = physical_index;
ordered_index += 1;
}
}

// If there are input values >= run_ends.last_value then we'll not be able to convert
// all logical indices to physical indices.
if ordered_index < logical_indices.len() {
return Err(logical_indices[ordered_indices[ordered_index]]);
}
Ok(physical_indices)
}
}

#[cfg(test)]
Expand Down
Loading