Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 157 additions & 3 deletions arrow-buffer/src/buffer/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use std::ops::{BitAnd, BitOr, BitXor, Not};

/// A slice-able [`Buffer`] containing bit-packed booleans
///
/// `BooleanBuffer`s can be creating using [`BooleanBufferBuilder`]
/// `BooleanBuffer`s can be modified using [`BooleanBufferBuilder`]
///
/// # See Also
///
/// # See Also
/// * [`NullBuffer`] for representing null values in Arrow arrays
///
/// [`NullBuffer`]: crate::NullBuffer
Expand Down Expand Up @@ -96,12 +96,128 @@ impl BooleanBuffer {
Self::new(buffer.into(), 0, len)
}

/// Create a new [`BooleanBuffer`] by copying the relevant bits from an
/// input buffer.
///
/// # Notes:
/// * The new `BooleanBuffer` has zero offset, even if `offset_in_bits` is non-zero
///
/// # Example: Create a new [`BooleanBuffer`] copying a bit slice from in input slice
/// ```
/// # use arrow_buffer::BooleanBuffer;
/// let input = [0b11001100u8, 0b10111010u8];
/// // // Copy bits 4..16 from input
/// let result = BooleanBuffer::from_bits(&input, 4, 12);
/// assert_eq!(result.values(), &[0b10101100u8, 0b00001011u8]);
pub fn from_bits(src: impl AsRef<[u8]>, offset_in_bits: usize, len_in_bits: usize) -> Self {
Self::from_bitwise_unary_op(src, offset_in_bits, len_in_bits, |a| a)
}

/// Create a new [`BooleanBuffer`] by applying the bitwise operation to `op`
/// to an input buffer.
///
/// This function is faster than applying the operation bit by bit as
/// it processes input buffers in chunks of 64 bits (8 bytes) at a time
///
/// # Notes:
/// * `op` takes a single `u64` inputs and produces one `u64` output.
/// * `op` must only apply bitwise operations
/// on the relevant bits; the input `u64` may contain irrelevant bits
/// and may be processed differently on different endian architectures.
/// * The output always has zero offset
///
/// # See Also
/// - [`apply_bitwise_unary_op`](bit_util::apply_bitwise_unary_op) for in-place unary bitwise operations
///
/// # Example: Create new [`BooleanBuffer`] from bitwise `NOT` of an input [`Buffer`]
/// ```
/// # use arrow_buffer::BooleanBuffer;
/// let input = [0b11001100u8, 0b10111010u8]; // 2 bytes = 16 bits
/// // NOT of the first 12 bits
/// let result = BooleanBuffer::from_bitwise_unary_op(
/// &input, 0, 12, |a| !a
/// );
/// assert_eq!(result.values(), &[0b00110011u8, 0b11110101u8]);
/// ```
pub fn from_bitwise_unary_op<F>(
src: impl AsRef<[u8]>,
offset_in_bits: usize,
len_in_bits: usize,
mut op: F,
) -> Self
where
F: FnMut(u64) -> u64,
{
// try fast path for aligned input
if offset_in_bits & 0x7 == 0 {
// align to byte boundary
let aligned = &src.as_ref()[offset_in_bits / 8..];
if let Some(result) =
Self::try_from_aligned_bitwise_unary_op(aligned, len_in_bits, &mut op)
{
return result;
}
}

let chunks = BitChunks::new(src.as_ref(), offset_in_bits, len_in_bits);
let mut result = MutableBuffer::with_capacity(chunks.num_u64s() * 8);
for chunk in chunks.iter() {
// SAFETY: reserved enough capacity above, (exactly num_u64s()
// items) and we assume `BitChunks` correctly reports upper bound
unsafe {
result.push_unchecked(op(chunk));
}
}
if chunks.remainder_len() > 0 {
debug_assert!(result.capacity() >= result.len() + 8); // should not reallocate
// SAFETY: reserved enough capacity above, (exactly num_u64s()
// items) and we assume `BitChunks` correctly reports upper bound
unsafe {
result.push_unchecked(op(chunks.remainder_bits()));
}
// Just pushed one u64, which may have trailing zeros
result.truncate(chunks.num_bytes());
}

let buffer = Buffer::from(result);
BooleanBuffer {
buffer,
offset: 0,
len: len_in_bits,
}
}

/// Fast path for [`Self::from_bitwise_unary_op`] when input is aligned to
/// 8-byte (64-bit) boundaries
///
/// Returns None if the fast path cannot be taken
fn try_from_aligned_bitwise_unary_op<F>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I wrote a version of this code to handle works for byte aligned, but it actually seems to have made performance worse, so I am going to update the comments and leave it this way

What I tried
    /// Like [`Self::from_bitwise_unary_op`] but optimized for the case where the
    /// input is aligned to byte boundaries
    fn try_from_aligned_bitwise_unary_op<F>(
        left: &[u8],
        len_in_bits: usize,
        op: &mut F,
    ) -> Option<Self>
    where
        F: FnMut(u64) -> u64,
    {
        // safety: all valid bytes are valid u64s
        let (left_prefix, left_u64s, left_suffix) = unsafe { left.align_to::<u64>() };
        // if there is no prefix or suffix, the buffer is aligned and we can do
        // the operation directly on u64s
        if left_prefix.is_empty() && left_suffix.is_empty() {
            let result_u64s: Vec<u64> = left_u64s.iter().map(|l| op(*l)).collect();
            let buffer = Buffer::from(result_u64s);
            return Some(BooleanBuffer::new(buffer, 0, len_in_bits));
        }

        let mut result = MutableBuffer::with_capacity(
            left_prefix.len() + left_u64s.len() * 8 + left_suffix.len(),
        );
        let prefix_u64 = op(Self::byte_slice_to_u64(left_prefix));

        result.extend_from_slice(&prefix_u64.to_be_bytes()[0..left_prefix.len()]);

        assert!(result.capacity() >= result.len() + left_u64s.len() * 8);
        for &left in left_u64s.iter() {
            // SAFETY: we asserted there is enough capacity above
            unsafe {
                result.push_unchecked(op(left));
            }
        }

        let suffix_u64 = op(Self::byte_slice_to_u64(left_suffix));
        result.extend_from_slice(&suffix_u64.to_be_bytes()[0..left_suffix.len()]);

        Some(BooleanBuffer::new(result.into(), 0, len_in_bits))
    }
diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs
index 97674c18843..285888b3a7c 100644
--- a/arrow-buffer/src/buffer/boolean.rs
+++ b/arrow-buffer/src/buffer/boolean.rs
@@ -18,10 +18,11 @@
 use crate::bit_chunk_iterator::BitChunks;
 use crate::bit_iterator::{BitIndexIterator, BitIndexU32Iterator, BitIterator, BitSliceIterator};
 use crate::{
-    BooleanBufferBuilder, Buffer, MutableBuffer, bit_util, buffer_bin_and, buffer_bin_or,
-    buffer_bin_xor, buffer_unary_not,
+    BooleanBufferBuilder, Buffer, MutableBuffer, ToByteSlice, bit_util, buffer_bin_and,
+    buffer_bin_or, buffer_bin_xor, buffer_unary_not,
 };

+use crate::bit_util::get_remainder_bits;
 use std::ops::{BitAnd, BitOr, BitXor, Not};

 /// A slice-able [`Buffer`] containing bit-packed booleans
@@ -200,14 +201,37 @@ impl BooleanBuffer {
         // the operation directly on u64s
         if left_prefix.is_empty() && left_suffix.is_empty() {
             let result_u64s: Vec<u64> = left_u64s.iter().map(|l| op(*l)).collect();
-            Some(BooleanBuffer::new(
-                Buffer::from(result_u64s),
-                0,
-                len_in_bits,
-            ))
-        } else {
-            None
+            let buffer = Buffer::from(result_u64s);
+            return Some(BooleanBuffer::new(buffer, 0, len_in_bits));
         }
+
+        let mut result = MutableBuffer::with_capacity(
+            left_prefix.len() + left_u64s.len() * 8 + left_suffix.len(),
+        );
+        let prefix_u64 = op(Self::byte_slice_to_u64(left_prefix));
+
+        result.extend_from_slice(&prefix_u64.to_be_bytes()[0..left_prefix.len()]);
+
+        assert!(result.capacity() >= result.len() + left_u64s.len() * 8);
+        for &left in left_u64s.iter() {
+            // SAFETY: we asserted there is enough capacity above
+            unsafe {
+                result.push_unchecked(op(left));
+            }
+        }
+
+        let suffix_u64 = op(Self::byte_slice_to_u64(left_suffix));
+        result.extend_from_slice(&suffix_u64.to_be_bytes()[0..left_suffix.len()]);
+
+        Some(BooleanBuffer::new(result.into(), 0, len_in_bits))
+    }
+
+    /// convert the bytes into a u64 suitable for opeartion
+    fn byte_slice_to_u64(src: &[u8]) -> u64 {
+        let num_bytes = src.len();
+        let mut bytes = [0u8; 8];
+        bytes[0..num_bytes].copy_from_slice(src);
+        u64::from_be_bytes(bytes)
     }

     /// Returns the number of set bits in this buffer
diff --git a/arrow-buffer/src/util/bit_util.rs b/arrow-buffer/src/util/bit_util.rs

This PR

not_slice_24            time:   [81.729 ns 82.091 ns 82.587 ns]

When I tried fancier code for byte alignment:

not_slice_24            time:   [121.13 ns 122.69 ns 124.52 ns]

src: &[u8],
len_in_bits: usize,
op: &mut F,
) -> Option<Self>
where
F: FnMut(u64) -> u64,
{
// Safety: all valid bytes are valid u64s
let (prefix, aligned_u6us, suffix) = unsafe { src.align_to::<u64>() };
if !(prefix.is_empty() && suffix.is_empty()) {
// Couldn't make this case any faster than the default path, see
// https://github.com/apache/arrow-rs/pull/8996/changes#r2620022082
return None;
}
// the buffer is word (64 bit) aligned, so use optimized Vec code.
let result_u64s: Vec<u64> = aligned_u6us.iter().map(|l| op(*l)).collect();
let buffer = Buffer::from(result_u64s);
Some(BooleanBuffer::new(buffer, 0, len_in_bits))
}

/// Returns the number of set bits in this buffer
pub fn count_set_bits(&self) -> usize {
self.buffer.count_set_bits_offset(self.offset, self.len)
}

/// Returns a `BitChunks` instance which can be used to iterate over
/// Returns a [`BitChunks`] instance which can be used to iterate over
/// this buffer's bits in `u64` chunks
#[inline]
pub fn bit_chunks(&self) -> BitChunks<'_> {
Expand Down Expand Up @@ -437,4 +553,42 @@ mod tests {
assert_eq!(buf.values().len(), 1);
assert!(buf.value(0));
}

#[test]
fn test_from_bitwise_unary_op() {
// Use 1024 boolean values so that at least some of the tests cover multiple u64 chunks and
// perfect alignment
let input_bools = (0..1024)
.map(|_| rand::random::<bool>())
.collect::<Vec<bool>>();
let input_buffer = BooleanBuffer::from(&input_bools[..]);

// Note ensure we test offsets over 100 to cover multiple u64 chunks
for offset in 0..1024 {
let result = BooleanBuffer::from_bitwise_unary_op(
input_buffer.values(),
offset,
input_buffer.len() - offset,
|a| !a,
);
let expected = input_bools[offset..]
.iter()
.map(|b| !*b)
.collect::<BooleanBuffer>();
assert_eq!(result, expected);
}

// Also test when the input doesn't cover the entire buffer
for offset in 0..512 {
let len = 512 - offset; // fixed length less than total
let result =
BooleanBuffer::from_bitwise_unary_op(input_buffer.values(), offset, len, |a| !a);
let expected = input_bools[offset..]
.iter()
.take(len)
.map(|b| !*b)
.collect::<BooleanBuffer>();
assert_eq!(result, expected);
}
}
}
5 changes: 2 additions & 3 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ use std::fmt::Debug;
use std::ptr::NonNull;
use std::sync::Arc;

use crate::BufferBuilder;
use crate::alloc::{Allocation, Deallocation};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{BooleanBuffer, BufferBuilder};
use crate::{bit_util, bytes::Bytes, native::ArrowNativeType};

#[cfg(feature = "pool")]
use crate::pool::MemoryPool;

use super::ops::bitwise_unary_op_helper;
use super::{MutableBuffer, ScalarBuffer};

/// A contiguous memory region that can be shared with other buffers and across
Expand Down Expand Up @@ -344,7 +343,7 @@ impl Buffer {
return self.slice_with_length(offset / 8, bit_util::ceil(len, 8));
}

bitwise_unary_op_helper(self, offset, len, |a| a)
BooleanBuffer::from_bits(self.as_slice(), offset, len).into_inner()
}

/// Returns a `BitChunks` instance which can be used to iterate over this buffers bits
Expand Down
32 changes: 9 additions & 23 deletions arrow-buffer/src/buffer/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use super::{Buffer, MutableBuffer};
use crate::BooleanBuffer;
use crate::util::bit_util::ceil;

/// Apply a bitwise operation `op` to four inputs and return the result as a Buffer.
Expand Down Expand Up @@ -93,36 +94,20 @@ where

/// Apply a bitwise operation `op` to one input and return the result as a Buffer.
/// The input is treated as a bitmap, meaning that offset and length are specified in number of bits.
#[deprecated(
since = "57.2.0",
note = "use BooleanBuffer::from_bitwise_unary_op instead"
)]
pub fn bitwise_unary_op_helper<F>(
left: &Buffer,
offset_in_bits: usize,
len_in_bits: usize,
mut op: F,
op: F,
) -> Buffer
where
F: FnMut(u64) -> u64,
{
// reserve capacity and set length so we can get a typed view of u64 chunks
let mut result =
MutableBuffer::new(ceil(len_in_bits, 8)).with_bitset(len_in_bits / 64 * 8, false);

let left_chunks = left.bit_chunks(offset_in_bits, len_in_bits);

let result_chunks = result.typed_data_mut::<u64>().iter_mut();

result_chunks
.zip(left_chunks.iter())
.for_each(|(res, left)| {
*res = op(left);
});

let remainder_bytes = ceil(left_chunks.remainder_len(), 8);
let rem = op(left_chunks.remainder_bits());
// we are counting its starting from the least significant bit, to to_le_bytes should be correct
let rem = &rem.to_le_bytes()[0..remainder_bytes];
result.extend_from_slice(rem);

result.into()
BooleanBuffer::from_bitwise_unary_op(left, offset_in_bits, len_in_bits, op).into_inner()
}

/// Apply a bitwise and to two inputs and return the result as a Buffer.
Expand Down Expand Up @@ -204,5 +189,6 @@ pub fn buffer_bin_and_not(
/// Apply a bitwise not to one input and return the result as a Buffer.
/// The input is treated as a bitmap, meaning that offset and length are specified in number of bits.
pub fn buffer_unary_not(left: &Buffer, offset_in_bits: usize, len_in_bits: usize) -> Buffer {
bitwise_unary_op_helper(left, offset_in_bits, len_in_bits, |a| !a)
// TODO: should we deprecate this function in favor of the Buffer ! impl ?
BooleanBuffer::from_bitwise_unary_op(left, offset_in_bits, len_in_bits, |a| !a).into_inner()
}
30 changes: 25 additions & 5 deletions arrow-buffer/src/util/bit_chunk_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,10 @@ fn compute_suffix_mask(len: usize, lead_padding: usize) -> (u64, usize) {
(suffix_mask, trailing_padding)
}

/// Iterates over an arbitrarily aligned byte buffer
/// Iterates over an arbitrarily aligned byte buffer 64 bits at a time
///
/// Yields an iterator of u64, and a remainder. The first byte in the buffer
/// [`Self::iter`] yields iterator of `u64`, and a remainder. The first byte in the buffer
/// will be the least significant byte in output u64
///
#[derive(Debug)]
pub struct BitChunks<'a> {
buffer: &'a [u8],
Expand Down Expand Up @@ -259,7 +258,7 @@ impl<'a> BitChunks<'a> {
self.remainder_len
}

/// Returns the number of chunks
/// Returns the number of `u64` chunks
#[inline]
pub const fn chunk_len(&self) -> usize {
self.chunk_len
Expand Down Expand Up @@ -293,7 +292,28 @@ impl<'a> BitChunks<'a> {
}
}

/// Returns an iterator over chunks of 64 bits represented as an u64
/// Return the number of `u64` that are needed to represent all bits
/// (including remainder).
///
/// This is equal to `chunk_len + 1` if there is a remainder,
/// otherwise it is equal to `chunk_len`.
#[inline]
pub fn num_u64s(&self) -> usize {
if self.remainder_len == 0 {
self.chunk_len
} else {
self.chunk_len + 1
}
}

/// Return the number of *bytes* that are needed to represent all bits
/// (including remainder).
#[inline]
pub fn num_bytes(&self) -> usize {
ceil(self.chunk_len * 64 + self.remainder_len, 8)
}

/// Returns an iterator over chunks of 64 bits represented as an `u64`
#[inline]
pub const fn iter(&self) -> BitChunkIterator<'a> {
BitChunkIterator::<'a> {
Expand Down
14 changes: 8 additions & 6 deletions arrow-select/src/nullif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Implements the `nullif` function for Arrow arrays.

use arrow_array::{Array, ArrayRef, BooleanArray, make_array};
use arrow_buffer::buffer::{bitwise_bin_op_helper, bitwise_unary_op_helper};
use arrow_buffer::buffer::bitwise_bin_op_helper;
use arrow_buffer::{BooleanBuffer, NullBuffer};
use arrow_schema::{ArrowError, DataType};

Expand Down Expand Up @@ -91,11 +91,13 @@ pub fn nullif(left: &dyn Array, right: &BooleanArray) -> Result<ArrayRef, ArrowE
}
None => {
let mut null_count = 0;
let buffer = bitwise_unary_op_helper(right.inner(), right.offset(), len, |b| {
let t = !b;
null_count += t.count_zeros() as usize;
t
});
let buffer =
BooleanBuffer::from_bitwise_unary_op(right.inner(), right.offset(), len, |b| {
let t = !b;
null_count += t.count_zeros() as usize;
t
})
.into_inner();
(buffer, null_count)
}
};
Expand Down
Loading