diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index ff53dc9c445..55372481e1e 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -36,6 +36,7 @@ name = "arrow" path = "src/lib.rs" [dependencies] +rayon = "1.5" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } @@ -48,11 +49,12 @@ lazy_static = "1.4" packed_simd = { version = "0.3.4", optional = true, package = "packed_simd_2" } chrono = "0.4" flatbuffers = "0.6" +bitvec = "0.19" hex = "0.4" prettytable-rs = { version = "0.8.0", optional = true } [features] -default = [] +default = ["simd"] avx512 = [] simd = ["packed_simd"] prettyprint = ["prettytable-rs"] diff --git a/rust/arrow/benches/aggregate_kernels.rs b/rust/arrow/benches/aggregate_kernels.rs index 7e8aea3636e..466df050194 100644 --- a/rust/arrow/benches/aggregate_kernels.rs +++ b/rust/arrow/benches/aggregate_kernels.rs @@ -74,21 +74,22 @@ fn bench_min_string(arr_a: &StringArray) { } fn add_benchmark(c: &mut Criterion) { - let arr_a = create_array(512, false); + let size = 2_usize.pow(20); + let arr_a = create_array(size, false); - c.bench_function("sum 512", |b| b.iter(|| bench_sum(&arr_a))); - c.bench_function("min 512", |b| b.iter(|| bench_min(&arr_a))); + c.bench_function("sum 2^20", |b| b.iter(|| bench_sum(&arr_a))); + c.bench_function("min 2^20", |b| b.iter(|| bench_min(&arr_a))); - let arr_a = create_array(512, true); + let arr_a = create_array(size, true); - c.bench_function("sum nulls 512", |b| b.iter(|| bench_sum(&arr_a))); - c.bench_function("min nulls 512", |b| b.iter(|| bench_min(&arr_a))); + c.bench_function("sum nulls 2^20", |b| b.iter(|| bench_sum(&arr_a))); + c.bench_function("min nulls 2^20", |b| b.iter(|| bench_min(&arr_a))); - let arr_b = create_string_array(512, false); - c.bench_function("min string 512", |b| b.iter(|| bench_min_string(&arr_b))); + let arr_b = create_string_array(size, false); + c.bench_function("min string 2^20", |b| b.iter(|| bench_min_string(&arr_b))); - let arr_b = create_string_array(512, true); - c.bench_function("min nulls string 512", |b| { + let arr_b = create_string_array(size, true); + c.bench_function("min nulls string 2^20", |b| { b.iter(|| bench_min_string(&arr_b)) }); } diff --git a/rust/arrow/benches/arithmetic_kernels.rs b/rust/arrow/benches/arithmetic_kernels.rs index b3272d12e46..8538b1ae434 100644 --- a/rust/arrow/benches/arithmetic_kernels.rs +++ b/rust/arrow/benches/arithmetic_kernels.rs @@ -73,25 +73,25 @@ fn bench_limit(arr_a: &ArrayRef, max: usize) { } fn add_benchmark(c: &mut Criterion) { - let arr_a = create_array(512, false); - let arr_b = create_array(512, false); + let arr_a = create_array(2_usize.pow(20), false); + let arr_b = create_array(2_usize.pow(20), false); - c.bench_function("add 512", |b| b.iter(|| bench_add(&arr_a, &arr_b))); - c.bench_function("subtract 512", |b| { + c.bench_function("add 2^20", |b| b.iter(|| bench_add(&arr_a, &arr_b))); + c.bench_function("subtract 2^20", |b| { b.iter(|| bench_subtract(&arr_a, &arr_b)) }); - c.bench_function("multiply 512", |b| { + c.bench_function("multiply 2^20", |b| { b.iter(|| bench_multiply(&arr_a, &arr_b)) }); - c.bench_function("divide 512", |b| b.iter(|| bench_divide(&arr_a, &arr_b))); - c.bench_function("limit 512, 512", |b| b.iter(|| bench_limit(&arr_a, 512))); + c.bench_function("divide 2^20", |b| b.iter(|| bench_divide(&arr_a, &arr_b))); + c.bench_function("limit 2^20, 512", |b| b.iter(|| bench_limit(&arr_a, 512))); - let arr_a_nulls = create_array(512, false); - let arr_b_nulls = create_array(512, false); - c.bench_function("add_nulls_512", |b| { + let arr_a_nulls = create_array(2_usize.pow(20), false); + let arr_b_nulls = create_array(2_usize.pow(20), false); + c.bench_function("add_nulls_2^20", |b| { b.iter(|| bench_add(&arr_a_nulls, &arr_b_nulls)) }); - c.bench_function("divide_nulls_512", |b| { + c.bench_function("divide_nulls_2^20", |b| { b.iter(|| bench_divide(&arr_a_nulls, &arr_b_nulls)) }); } diff --git a/rust/arrow/src/array/array_binary.rs b/rust/arrow/src/array/array_binary.rs index d7a3eb7217a..c8898001880 100644 --- a/rust/arrow/src/array/array_binary.rs +++ b/rust/arrow/src/array/array_binary.rs @@ -25,7 +25,7 @@ use super::{ Array, ArrayData, ArrayDataRef, FixedSizeListArray, GenericBinaryIter, GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, }; -use crate::util::bit_util; + use crate::{buffer::Buffer, datatypes::ToByteSlice}; use crate::{buffer::MutableBuffer, datatypes::DataType}; @@ -231,12 +231,10 @@ where offsets.push(length_so_far); { - let null_slice = null_buf.data_mut(); - for (i, s) in iter.enumerate() { if let Some(s) = s { let s = s.as_ref(); - bit_util::set_bit(null_slice, i); + null_buf.set_bit(i); length_so_far = length_so_far + OffsetSize::from_usize(s.len()).unwrap(); values.extend_from_slice(s); diff --git a/rust/arrow/src/array/array_list.rs b/rust/arrow/src/array/array_list.rs index 4eb8dc56640..e886d7a4e38 100644 --- a/rust/arrow/src/array/array_list.rs +++ b/rust/arrow/src/array/array_list.rs @@ -302,10 +302,10 @@ mod tests { buffer::Buffer, datatypes::{Field, ToByteSlice}, memory, - util::bit_util, }; use super::*; + use crate::util::bit_ops::{BufferBitSlice, BufferBitSliceMut}; #[test] fn test_list_array() { @@ -552,11 +552,14 @@ mod tests { Buffer::from(&[0, 2, 2, 2, 4, 6, 6, 9, 9, 10].to_byte_slice()); // 01011001 00000001 let mut null_bits: [u8; 2] = [0; 2]; - bit_util::set_bit(&mut null_bits, 0); - bit_util::set_bit(&mut null_bits, 3); - bit_util::set_bit(&mut null_bits, 4); - bit_util::set_bit(&mut null_bits, 6); - bit_util::set_bit(&mut null_bits, 8); + { + let mut null_bit_slice = BufferBitSliceMut::new(&mut null_bits); + null_bit_slice.set_bit(0, true); + null_bit_slice.set_bit(3, true); + null_bit_slice.set_bit(4, true); + null_bit_slice.set_bit(6, true); + null_bit_slice.set_bit(8, true); + } // Construct a list array from the above two let list_data_type = @@ -582,8 +585,9 @@ mod tests { assert_eq!(1, sliced_array.offset()); assert_eq!(3, sliced_array.null_count()); + let null_bit_slice = BufferBitSliceMut::new(&mut null_bits); for i in 0..sliced_array.len() { - if bit_util::get_bit(&null_bits, sliced_array.offset() + i) { + if null_bit_slice.get_bit(sliced_array.offset() + i) { assert!(sliced_array.is_valid(i)); } else { assert!(sliced_array.is_null(i)); @@ -617,11 +621,14 @@ mod tests { Buffer::from(&[0i64, 2, 2, 2, 4, 6, 6, 9, 9, 10].to_byte_slice()); // 01011001 00000001 let mut null_bits: [u8; 2] = [0; 2]; - bit_util::set_bit(&mut null_bits, 0); - bit_util::set_bit(&mut null_bits, 3); - bit_util::set_bit(&mut null_bits, 4); - bit_util::set_bit(&mut null_bits, 6); - bit_util::set_bit(&mut null_bits, 8); + { + let mut null_bit_slice = BufferBitSliceMut::new(&mut null_bits); + null_bit_slice.set_bit(0, true); + null_bit_slice.set_bit(3, true); + null_bit_slice.set_bit(4, true); + null_bit_slice.set_bit(6, true); + null_bit_slice.set_bit(8, true); + } // Construct a list array from the above two let list_data_type = @@ -647,8 +654,9 @@ mod tests { assert_eq!(1, sliced_array.offset()); assert_eq!(3, sliced_array.null_count()); + let null_bit_slice = BufferBitSliceMut::new(&mut null_bits); for i in 0..sliced_array.len() { - if bit_util::get_bit(&null_bits, sliced_array.offset() + i) { + if null_bit_slice.get_bit(sliced_array.offset() + i) { assert!(sliced_array.is_valid(i)); } else { assert!(sliced_array.is_null(i)); @@ -678,13 +686,16 @@ mod tests { )) .build(); - // Set null buts for the nested array: + // Set null bits for the nested array: // [[0, 1], null, null, [6, 7], [8, 9]] // 01011001 00000001 let mut null_bits: [u8; 1] = [0; 1]; - bit_util::set_bit(&mut null_bits, 0); - bit_util::set_bit(&mut null_bits, 3); - bit_util::set_bit(&mut null_bits, 4); + { + let mut null_bit_slice = BufferBitSliceMut::new(&mut null_bits); + null_bit_slice.set_bit(0, true); + null_bit_slice.set_bit(3, true); + null_bit_slice.set_bit(4, true); + } // Construct a fixed size list array from the above two let list_data_type = DataType::FixedSizeList( @@ -711,8 +722,9 @@ mod tests { assert_eq!(1, sliced_array.offset()); assert_eq!(2, sliced_array.null_count()); + let null_bit_slice = BufferBitSlice::new(&null_bits); for i in 0..sliced_array.len() { - if bit_util::get_bit(&null_bits, sliced_array.offset() + i) { + if null_bit_slice.get_bit(sliced_array.offset() + i) { assert!(sliced_array.is_valid(i)); } else { assert!(sliced_array.is_null(i)); diff --git a/rust/arrow/src/array/array_primitive.rs b/rust/arrow/src/array/array_primitive.rs index d1014dbb940..34fbfc856af 100644 --- a/rust/arrow/src/array/array_primitive.rs +++ b/rust/arrow/src/array/array_primitive.rs @@ -30,7 +30,7 @@ use super::raw_pointer::RawPtrBox; use super::*; use crate::buffer::{Buffer, MutableBuffer}; use crate::memory; -use crate::util::bit_util; +use crate::util::utils; /// Number of seconds in a day const SECONDS_IN_DAY: i64 = 86_400; @@ -296,7 +296,7 @@ impl::Native let (_, data_len) = iter.size_hint(); let data_len = data_len.expect("Iterator must be sized"); // panic if no upper bound. - let num_bytes = bit_util::ceil(data_len, 8); + let num_bytes = utils::ceil(data_len, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); let mut val_buf = MutableBuffer::new( data_len * mem::size_of::<::Native>(), @@ -304,10 +304,9 @@ impl::Native let null = vec![0; mem::size_of::<::Native>()]; - let null_slice = null_buf.data_mut(); iter.enumerate().for_each(|(i, item)| { if let Some(a) = item.borrow() { - bit_util::set_bit(null_slice, i); + null_buf.set_bit(i); val_buf.extend_from_slice(a.to_byte_slice()); } else { val_buf.extend_from_slice(&null); @@ -401,10 +400,9 @@ impl PrimitiveArray { { let null = vec![0; mem::size_of::()]; - let null_slice = null_buf.data_mut(); for (i, v) in data.iter().enumerate() { if let Some(n) = v { - bit_util::set_bit(null_slice, i); + null_buf.set_bit(i); val_buf.extend_from_slice(&n.to_byte_slice()); } else { val_buf.extend_from_slice(&null); @@ -427,10 +425,9 @@ impl From> for BooleanArray { fn from(data: Vec) -> Self { let mut mut_buf = MutableBuffer::new_null(data.len()); { - let mut_slice = mut_buf.data_mut(); for (i, b) in data.iter().enumerate() { if *b { - bit_util::set_bit(mut_slice, i); + mut_buf.set_bit(i); } } } @@ -445,19 +442,16 @@ impl From> for BooleanArray { impl From>> for BooleanArray { fn from(data: Vec>) -> Self { let data_len = data.len(); - let num_byte = bit_util::ceil(data_len, 8); + let num_byte = utils::ceil(data_len, 8); let mut null_buf = MutableBuffer::new_null(data.len()); let mut val_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false); { - let null_slice = null_buf.data_mut(); - let val_slice = val_buf.data_mut(); - for (i, v) in data.iter().enumerate() { if let Some(b) = v { - bit_util::set_bit(null_slice, i); + null_buf.set_bit(i); if *b { - bit_util::set_bit(val_slice, i); + val_buf.set_bit(i); } } } diff --git a/rust/arrow/src/array/array_string.rs b/rust/arrow/src/array/array_string.rs index 5f871b8f595..67799f420a4 100644 --- a/rust/arrow/src/array/array_string.rs +++ b/rust/arrow/src/array/array_string.rs @@ -25,7 +25,7 @@ use super::{ Array, ArrayData, ArrayDataRef, GenericListArray, GenericStringIter, LargeListArray, ListArray, OffsetSizeTrait, }; -use crate::util::bit_util; + use crate::{buffer::Buffer, datatypes::ToByteSlice}; use crate::{buffer::MutableBuffer, datatypes::DataType}; @@ -168,8 +168,7 @@ where if let Some(s) = s { let s = s.as_ref(); // set null bit - let null_slice = null_buf.data_mut(); - bit_util::set_bit(null_slice, i); + null_buf.set_bit(i); length_so_far = length_so_far + OffsetSize::from_usize(s.len()).unwrap(); offsets.push(length_so_far); diff --git a/rust/arrow/src/array/array_struct.rs b/rust/arrow/src/array/array_struct.rs index e5cb9a2bc0a..284b1ac35ed 100644 --- a/rust/arrow/src/array/array_struct.rs +++ b/rust/arrow/src/array/array_struct.rs @@ -141,7 +141,7 @@ impl TryFrom> for StructArray { child_datum_len, ) } else { - child_null_buffer.bit_slice(child_datum_offset, child_datum_len) + child_null_buffer.bit_view(child_datum_offset, child_datum_len) }); } else if null.is_some() { // when one of the fields has no nulls, them there is no null in the array @@ -154,7 +154,7 @@ impl TryFrom> for StructArray { .len(len) .child_data(child_data); if let Some(null_buffer) = null { - let null_count = len - null_buffer.count_set_bits(); + let null_count = len - null_buffer.count_ones(); builder = builder.null_count(null_count).null_bit_buffer(null_buffer); } diff --git a/rust/arrow/src/array/array_union.rs b/rust/arrow/src/array/array_union.rs index a26404ff912..926e9aeb91b 100644 --- a/rust/arrow/src/array/array_union.rs +++ b/rust/arrow/src/array/array_union.rs @@ -144,7 +144,7 @@ impl UnionArray { bitmap: Option, ) -> Result { let bitmap_data = bitmap.map(|b| { - let null_count = type_ids.len() - b.count_set_bits(); + let null_count = type_ids.len() - b.count_ones(); (b, null_count) }); @@ -233,7 +233,7 @@ impl UnionArray { // In format v4 unions had their own validity bitmap and offsets are compressed by omitting null values // Starting with v5 unions don't have a validity bitmap and it's possible to directly index into the offsets buffer let valid_slots = match self.data.null_buffer() { - Some(b) => b.count_set_bits_offset(0, index), + Some(b) => b.bit_slice().slicing(0, index).count_ones(), None => index, }; self.data().buffers()[1].data()[valid_slots * size_of::()] as i32 diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index eb582c3e107..6018d4881e3 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -31,7 +31,8 @@ use crate::array::*; use crate::buffer::{Buffer, MutableBuffer}; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use crate::util::bit_util; +use crate::util::bit_ops::BufferBitSliceMut; +use crate::util::utils; /// Converts a `MutableBuffer` to a `BufferBuilder`. /// @@ -255,8 +256,8 @@ impl BufferBuilderTrait for BufferBuilder { #[inline] fn new(capacity: usize) -> Self { let buffer = if T::DATA_TYPE == DataType::Boolean { - let byte_capacity = bit_util::ceil(capacity, 8); - let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); + let byte_capacity = utils::ceil(capacity, 8); + let actual_capacity = utils::round_upto_multiple_of_64(byte_capacity); let mut buffer = MutableBuffer::new(actual_capacity); buffer.set_null_bits(0, actual_capacity); buffer @@ -287,7 +288,7 @@ impl BufferBuilderTrait for BufferBuilder { #[inline] fn advance(&mut self, i: usize) -> Result<()> { let new_buffer_len = if T::DATA_TYPE == DataType::Boolean { - bit_util::ceil(self.len + i, 8) + utils::ceil(self.len + i, 8) } else { (self.len + i) * mem::size_of::() }; @@ -301,7 +302,7 @@ impl BufferBuilderTrait for BufferBuilder { let new_capacity = self.len + n; if T::DATA_TYPE == DataType::Boolean { if new_capacity > self.capacity() { - let new_byte_capacity = bit_util::ceil(new_capacity, 8); + let new_byte_capacity = utils::ceil(new_capacity, 8); let existing_capacity = self.buffer.capacity(); let new_capacity = self.buffer.reserve(new_byte_capacity); self.buffer @@ -318,9 +319,7 @@ impl BufferBuilderTrait for BufferBuilder { self.reserve(1); if T::DATA_TYPE == DataType::Boolean { if v != T::default_value() { - unsafe { - bit_util::set_bit_raw(self.buffer.raw_data_mut(), self.len); - } + self.buffer.set_bit(self.len); } self.len += 1; } else { @@ -340,7 +339,9 @@ impl BufferBuilderTrait for BufferBuilder { self.buffer.capacity(), ) }; - (self.len..self.len + n).for_each(|i| bit_util::set_bit(data, i)) + (self.len..self.len + n).for_each(|i| { + BufferBitSliceMut::new(data).set_bit(i, true); + }); } self.len += n; } else { @@ -362,9 +363,7 @@ impl BufferBuilderTrait for BufferBuilder { // For performance the `len` of the buffer is not // updated on each append but is updated in the // `freeze` method instead. - unsafe { - bit_util::set_bit_raw(self.buffer.raw_data_mut(), self.len); - } + self.buffer.set_bit(self.len); } self.len += 1; } @@ -379,7 +378,7 @@ impl BufferBuilderTrait for BufferBuilder { fn finish(&mut self) -> Buffer { if T::DATA_TYPE == DataType::Boolean { // `append` does not update the buffer's `len` so do it before `freeze` is called. - let new_buffer_len = bit_util::ceil(self.len, 8); + let new_buffer_len = utils::ceil(self.len, 8); debug_assert!(new_buffer_len >= self.buffer.len()); let mut buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0)); self.len = 0; @@ -603,7 +602,7 @@ impl PrimitiveBuilder { pub fn finish(&mut self) -> PrimitiveArray { let len = self.len(); let null_bit_buffer = self.bitmap_builder.finish(); - let null_count = len - null_bit_buffer.count_set_bits(); + let null_count = len - null_bit_buffer.count_ones(); let mut builder = ArrayData::builder(T::DATA_TYPE) .len(len) .add_buffer(self.values_builder.finish()); @@ -620,7 +619,7 @@ impl PrimitiveBuilder { pub fn finish_dict(&mut self, values: ArrayRef) -> DictionaryArray { let len = self.len(); let null_bit_buffer = self.bitmap_builder.finish(); - let null_count = len - null_bit_buffer.count_set_bits(); + let null_count = len - null_bit_buffer.count_ones(); let data_type = DataType::Dictionary( Box::new(T::DATA_TYPE), Box::new(values.data_type().clone()), @@ -832,7 +831,7 @@ where let offset_buffer = self.offsets_builder.finish(); let null_bit_buffer = self.bitmap_builder.finish(); - let nulls = null_bit_buffer.count_set_bits(); + let nulls = null_bit_buffer.count_ones(); self.offsets_builder.append(0).unwrap(); let data = ArrayData::builder(DataType::List(Box::new(Field::new( "item", @@ -1044,7 +1043,7 @@ where let offset_buffer = self.offsets_builder.finish(); let null_bit_buffer = self.bitmap_builder.finish(); - let nulls = null_bit_buffer.count_set_bits(); + let nulls = null_bit_buffer.count_ones(); self.offsets_builder.append(0).unwrap(); let data = ArrayData::builder(DataType::LargeList(Box::new(Field::new( "item", @@ -1235,7 +1234,7 @@ where } let null_bit_buffer = self.bitmap_builder.finish(); - let nulls = null_bit_buffer.count_set_bits(); + let nulls = null_bit_buffer.count_ones(); let data = ArrayData::builder(DataType::FixedSizeList( Box::new(Field::new("item", values_data.data_type().clone(), true)), self.list_len, @@ -2135,7 +2134,7 @@ impl StructBuilder { } let null_bit_buffer = self.bitmap_builder.finish(); - let null_count = self.len - null_bit_buffer.count_set_bits(); + let null_count = self.len - null_bit_buffer.count_ones(); let mut builder = ArrayData::builder(DataType::Struct(self.fields.clone())) .len(self.len) .child_data(child_data); diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index 5049ef1dd88..a6d6e54cb75 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -30,8 +30,8 @@ use super::equal::equal; #[inline] fn count_nulls(null_bit_buffer: Option<&Buffer>, offset: usize, len: usize) -> usize { if let Some(ref buf) = null_bit_buffer { - len.checked_sub(buf.count_set_bits_offset(offset, len)) - .unwrap() + let ones = buf.bit_slice().slicing(offset, len).count_ones(); + len.checked_sub(ones).unwrap() } else { 0 } @@ -341,7 +341,7 @@ mod tests { use crate::buffer::Buffer; use crate::datatypes::ToByteSlice; - use crate::util::bit_util; + use crate::util::bit_ops::BufferBitSliceMut; #[test] fn test_new() { @@ -387,9 +387,10 @@ mod tests { #[test] fn test_null_count() { let mut bit_v: [u8; 2] = [0; 2]; - bit_util::set_bit(&mut bit_v, 0); - bit_util::set_bit(&mut bit_v, 3); - bit_util::set_bit(&mut bit_v, 10); + let mut bit_v_slice = BufferBitSliceMut::new(&mut bit_v); + bit_v_slice.set_bit(0, true); + bit_v_slice.set_bit(3, true); + bit_v_slice.set_bit(10, true); let arr_data = ArrayData::builder(DataType::Int32) .len(16) .null_bit_buffer(Buffer::from(bit_v)) @@ -398,9 +399,10 @@ mod tests { // Test with offset let mut bit_v: [u8; 2] = [0; 2]; - bit_util::set_bit(&mut bit_v, 0); - bit_util::set_bit(&mut bit_v, 3); - bit_util::set_bit(&mut bit_v, 10); + let mut bit_v_slice = BufferBitSliceMut::new(&mut bit_v); + bit_v_slice.set_bit(0, true); + bit_v_slice.set_bit(3, true); + bit_v_slice.set_bit(10, true); let arr_data = ArrayData::builder(DataType::Int32) .len(12) .offset(2) @@ -412,9 +414,10 @@ mod tests { #[test] fn test_null_buffer_ref() { let mut bit_v: [u8; 2] = [0; 2]; - bit_util::set_bit(&mut bit_v, 0); - bit_util::set_bit(&mut bit_v, 3); - bit_util::set_bit(&mut bit_v, 10); + let mut bit_v_slice = BufferBitSliceMut::new(&mut bit_v); + bit_v_slice.set_bit(0, true); + bit_v_slice.set_bit(3, true); + bit_v_slice.set_bit(10, true); let arr_data = ArrayData::builder(DataType::Int32) .len(16) .null_bit_buffer(Buffer::from(bit_v)) @@ -426,9 +429,10 @@ mod tests { #[test] fn test_slice() { let mut bit_v: [u8; 2] = [0; 2]; - bit_util::set_bit(&mut bit_v, 0); - bit_util::set_bit(&mut bit_v, 3); - bit_util::set_bit(&mut bit_v, 10); + let mut bit_v_slice = BufferBitSliceMut::new(&mut bit_v); + bit_v_slice.set_bit(0, true); + bit_v_slice.set_bit(3, true); + bit_v_slice.set_bit(10, true); let data = ArrayData::builder(DataType::Int32) .len(16) .null_bit_buffer(Buffer::from(bit_v)) diff --git a/rust/arrow/src/array/equal/utils.rs b/rust/arrow/src/array/equal/utils.rs index f9e8860a5bb..36cdb44e5fd 100644 --- a/rust/arrow/src/array/equal/utils.rs +++ b/rust/arrow/src/array/equal/utils.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::{array::ArrayData, util::bit_util}; +use crate::array::ArrayData; +use crate::util::bit_ops::BufferBitSlice; // whether bits along the positions are equal // `lhs_start`, `rhs_start` and `len` are _measured in bits_. @@ -27,10 +28,11 @@ pub(super) fn equal_bits( rhs_start: usize, len: usize, ) -> bool { - (0..len).all(|i| { - bit_util::get_bit(lhs_values, lhs_start + i) - == bit_util::get_bit(rhs_values, rhs_start + i) - }) + let (lhs, rhs) = ( + BufferBitSlice::new(lhs_values), + BufferBitSlice::new(rhs_values), + ); + lhs.slicing(lhs_start, len) == rhs.slicing(rhs_start, len) } #[inline] diff --git a/rust/arrow/src/bitmap.rs b/rust/arrow/src/bitmap.rs index 7df609fb88b..1bf7ce14361 100644 --- a/rust/arrow/src/bitmap.rs +++ b/rust/arrow/src/bitmap.rs @@ -20,7 +20,7 @@ use crate::buffer::Buffer; use crate::error::Result; -use crate::util::bit_util; + use std::mem; use std::ops::{BitAnd, BitOr}; @@ -44,6 +44,7 @@ impl Bitmap { } } + #[inline] pub fn len(&self) -> usize { self.bits.len() } @@ -52,9 +53,9 @@ impl Bitmap { self.bits.is_empty() } + #[inline] pub fn is_set(&self, i: usize) -> bool { - assert!(i < (self.bits.len() << 3)); - unsafe { bit_util::get_bit_raw(self.bits.raw_data(), i) } + self.bits.get_bit(i) } pub fn buffer_ref(&self) -> &Buffer { diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index ece811b742f..e6afb331b33 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -33,10 +33,12 @@ use std::sync::Arc; use crate::datatypes::ArrowNativeType; use crate::error::{ArrowError, Result}; use crate::memory; -use crate::util::bit_chunk_iterator::BitChunks; -use crate::util::bit_util; -use crate::util::bit_util::ceil; -#[cfg(any(feature = "simd", feature = "avx512"))] +use crate::util::bit_ops::*; +use crate::util::utils; +use crate::util::utils::ceil; +use bitvec::field::BitField; +use rayon::prelude::*; +#[cfg(any(feature = "avx512"))] use std::borrow::BorrowMut; /// Buffer is a contiguous memory region of fixed size and is aligned at a 64-byte @@ -258,39 +260,52 @@ impl Buffer { /// Returns a slice of this buffer starting at a certain bit offset. /// If the offset is byte-aligned the returned buffer is a shallow clone, /// otherwise a new buffer is allocated and filled with a copy of the bits in the range. - pub fn bit_slice(&self, offset: usize, len: usize) -> Self { - if offset % 8 == 0 && len % 8 == 0 { - return self.slice(offset / 8); + #[inline] + pub fn bit_view(&self, offset_in_bits: usize, len_in_bits: usize) -> Self { + if offset_in_bits % 8 == 0 && len_in_bits % 8 == 0 { + self.slice(offset_in_bits / 8) + } else { + self.bit_slice() + .slicing(offset_in_bits, len_in_bits) + .as_buffer() } + } - bitwise_unary_op_helper(&self, offset, len, |a| a) + /// Gives bit slice of the underlying buffer + /// This method can be used to get bit views for bit operations on the immutable view over the buffer. + #[inline] + pub fn bit_slice(&self) -> BufferBitSlice { + BufferBitSlice::new(self.data()) } - /// Returns a `BitChunks` instance which can be used to iterate over this buffers bits - /// in larger chunks and starting at arbitrary bit offsets. - /// Note that both `offset` and `length` are measured in bits. - pub fn bit_chunks(&self, offset: usize, len: usize) -> BitChunks { - BitChunks::new(&self, offset, len) + /// Count one bits in this buffer + #[inline] + pub fn count_ones(&self) -> usize { + self.bit_slice().count_ones() } - /// Returns the number of 1-bits in this buffer. - pub fn count_set_bits(&self) -> usize { - let len_in_bits = self.len() * 8; - // self.offset is already taken into consideration by the bit_chunks implementation - self.count_set_bits_offset(0, len_in_bits) + /// Count zero bits in this buffer + #[inline] + pub fn count_zeros(&self) -> usize { + self.bit_slice().count_zeros() } - /// Returns the number of 1-bits in this buffer, starting from `offset` with `length` bits - /// inspected. Note that both `offset` and `length` are measured in bits. - pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize { - let chunks = self.bit_chunks(offset, len); - let mut count = chunks.iter().map(|c| c.count_ones() as usize).sum(); - count += chunks.remainder_bits().count_ones() as usize; + /// + /// Get bit value at the given index in this bit view + #[inline] + pub fn get_bit(&self, index: usize) -> bool { + self.bit_slice().get_bit(index) + } - count + /// + /// Get bits in this view as vector of booleans + #[inline] + pub fn typed_bits(&mut self) -> Vec { + self.bit_slice().typed_bits() } /// Returns an empty buffer. + #[inline] pub fn empty() -> Self { unsafe { Self::from_raw_parts(BUFFER_INIT.as_ptr() as _, 0, 0) } } @@ -303,7 +318,7 @@ impl> From for Buffer { // allocate aligned memory buffer let slice = p.as_ref(); let len = slice.len() * mem::size_of::(); - let capacity = bit_util::round_upto_multiple_of_64(len); + let capacity = utils::round_upto_multiple_of_64(len); let buffer = memory::allocate_aligned(capacity); unsafe { memory::memcpy(buffer, slice.as_ptr(), len); @@ -328,31 +343,32 @@ fn bitwise_bin_op_simd_helper( scalar_op: F_SCALAR, ) -> Buffer where - F_SIMD: Fn(u8x64, u8x64) -> u8x64, - F_SCALAR: Fn(u8, u8) -> u8, + F_SIMD: Fn(u8x64, u8x64) -> u8x64 + Send + Sync, + F_SCALAR: Fn(u8, u8) -> u8 + Send + Sync, { let mut result = MutableBuffer::new(len).with_bitset(len, false); let lanes = u8x64::lanes(); - let mut left_chunks = left.data()[left_offset..].chunks_exact(lanes); - let mut right_chunks = right.data()[right_offset..].chunks_exact(lanes); - let mut result_chunks = result.data_mut().chunks_exact_mut(lanes); + let left_chunks = left.data()[left_offset..].par_chunks_exact(lanes); + let right_chunks = right.data()[right_offset..].par_chunks_exact(lanes); + let mut result_chunks = result.data_mut().par_chunks_exact_mut(lanes); + + let res_remainder = result_chunks.take_remainder(); + let left_remainder = left_chunks.remainder(); + let right_remainder = right_chunks.remainder(); result_chunks - .borrow_mut() - .zip(left_chunks.borrow_mut().zip(right_chunks.borrow_mut())) + .zip(left_chunks.zip(right_chunks)) .for_each(|(res, (left, right))| { - unsafe { bit_util::bitwise_bin_op_simd(&left, &right, res, &simd_op) }; + unsafe { utils::bitwise_bin_op_simd(&left, &right, res, &simd_op) }; }); - result_chunks - .into_remainder() - .iter_mut() + res_remainder + .into_par_iter() .zip( - left_chunks - .remainder() - .iter() - .zip(right_chunks.remainder().iter()), + left_remainder + .into_par_iter() + .zip(right_remainder.into_par_iter()), ) .for_each(|(res, (left, right))| { *res = scalar_op(*left, *right); @@ -375,28 +391,27 @@ fn bitwise_unary_op_simd_helper( scalar_op: F_SCALAR, ) -> Buffer where - F_SIMD: Fn(u8x64) -> u8x64, - F_SCALAR: Fn(u8) -> u8, + F_SIMD: Fn(u8x64) -> u8x64 + Send + Sync, + F_SCALAR: Fn(u8) -> u8 + Send + Sync, { let mut result = MutableBuffer::new(len).with_bitset(len, false); let lanes = u8x64::lanes(); - let mut left_chunks = left.data()[left_offset..].chunks_exact(lanes); - let mut result_chunks = result.data_mut().chunks_exact_mut(lanes); + let left_chunks = left.data()[left_offset..].par_chunks_exact(lanes); + let mut result_chunks = result.data_mut().par_chunks_exact_mut(lanes); - result_chunks - .borrow_mut() - .zip(left_chunks.borrow_mut()) - .for_each(|(res, left)| unsafe { - let data_simd = u8x64::from_slice_unaligned_unchecked(left); - let simd_result = simd_op(data_simd); - simd_result.write_to_slice_unaligned_unchecked(res); - }); + let res_remainder = result_chunks.take_remainder(); + let left_remainder = left_chunks.remainder(); - result_chunks - .into_remainder() - .iter_mut() - .zip(left_chunks.remainder().iter()) + left_chunks + .map(|e| unsafe { u8x64::from_slice_unaligned_unchecked(e) }) + .map(simd_op) + .zip(result_chunks) + .for_each(|(e, res)| unsafe { e.write_to_slice_unaligned_unchecked(res) }); + + res_remainder + .into_par_iter() + .zip(left_remainder.into_par_iter()) .for_each(|(res, left)| { *res = scalar_op(*left); }); @@ -415,26 +430,30 @@ fn bitwise_bin_op_helper( op: F, ) -> Buffer where - F: Fn(u64, u64) -> u64, + F: Fn(u64, u64) -> u64 + Send + Sync, { // reserve capacity and set length so we can get a typed view of u64 chunks let mut result = MutableBuffer::new(ceil(len_in_bits, 8)).with_bitset(len_in_bits / 64 * 8, false); - let left_chunks = left.bit_chunks(left_offset_in_bits, len_in_bits); - let right_chunks = right.bit_chunks(right_offset_in_bits, len_in_bits); - let result_chunks = result.typed_data_mut::().iter_mut(); + let left_slice = left.bit_slice().slicing(left_offset_in_bits, len_in_bits); + let left_chunks = left_slice.par_chunks::(); + + let right_slice = right.bit_slice().slicing(right_offset_in_bits, len_in_bits); + let right_chunks = right_slice.par_chunks::(); + + let remainder_bytes = ceil(left_chunks.remainder_bit_len(), 8); + let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits()); + let rem = &rem.to_ne_bytes()[0..remainder_bytes]; + + let result_chunks = result.typed_data_mut::().par_iter_mut(); result_chunks - .zip(left_chunks.iter().zip(right_chunks.iter())) + .zip(left_chunks.zip(right_chunks)) .for_each(|(res, (left, right))| { - *res = op(left, right); + *res = op((*left).load::(), (*right).load::()); }); - let remainder_bytes = ceil(left_chunks.remainder_len(), 8); - let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits()); - // we are counting its starting from the least significant bit, to to_le_bytes should be correct - let rem = &rem.to_le_bytes()[0..remainder_bytes]; result.extend_from_slice(rem); result.freeze() @@ -449,25 +468,25 @@ fn bitwise_unary_op_helper( op: F, ) -> Buffer where - F: Fn(u64) -> u64, + F: Fn(u64) -> u64 + Send + Sync, { // reserve capacity and set length so we can get a typed view of u64 chunks let mut result = MutableBuffer::new(ceil(len_in_bits, 8)).with_bitset(len_in_bits / 64 * 8, false); - let left_chunks = left.bit_chunks(offset_in_bits, len_in_bits); - let result_chunks = result.typed_data_mut::().iter_mut(); - - result_chunks - .zip(left_chunks.iter()) - .for_each(|(res, left)| { - *res = op(left); - }); + let left_slice = left.bit_slice().slicing(offset_in_bits, len_in_bits); + let left_chunks = left_slice.par_chunks::(); - let remainder_bytes = ceil(left_chunks.remainder_len(), 8); + let remainder_bytes = ceil(left_chunks.remainder_bit_len(), 8); let rem = op(left_chunks.remainder_bits()); - // we are counting its starting from the least significant bit, to to_le_bytes should be correct - let rem = &rem.to_le_bytes()[0..remainder_bytes]; + let rem = &rem.to_ne_bytes()[0..remainder_bytes]; + + let result_chunks = result.typed_data_mut::().par_iter_mut(); + + result_chunks.zip(left_chunks).for_each(|(res, left)| { + *res = op((*left).load::()); + }); + result.extend_from_slice(rem); result.freeze() @@ -714,7 +733,7 @@ pub struct MutableBuffer { impl MutableBuffer { /// Allocate a new mutable buffer with initial capacity to be `capacity`. pub fn new(capacity: usize) -> Self { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); + let new_capacity = utils::round_upto_multiple_of_64(capacity); let ptr = memory::allocate_aligned(new_capacity); Self { data: ptr, @@ -725,7 +744,7 @@ impl MutableBuffer { /// creates a new [MutableBuffer] where every bit is initialized to `0` pub fn new_null(len: usize) -> Self { - let num_bytes = bit_util::ceil(len, 8); + let num_bytes = utils::ceil(len, 8); MutableBuffer::new(num_bytes).with_bitset(num_bytes, false) } @@ -735,9 +754,9 @@ impl MutableBuffer { /// This is useful when one wants to clear (or set) the bits and then manipulate /// the buffer directly (e.g., modifying the buffer by holding a mutable reference /// from `data_mut()`). - pub fn with_bitset(mut self, end: usize, val: bool) -> Self { + pub fn with_bitset(mut self, end: usize, initial_value: bool) -> Self { assert!(end <= self.capacity); - let v = if val { 255 } else { 0 }; + let v = if initial_value { 0xFF_u8 } else { 0x00_u8 }; unsafe { std::ptr::write_bytes(self.data, v, end); self.len = end; @@ -763,7 +782,7 @@ impl MutableBuffer { /// Returns the new capacity for this buffer. pub fn reserve(&mut self, capacity: usize) -> usize { if capacity > self.capacity { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); + let new_capacity = utils::round_upto_multiple_of_64(capacity); let new_capacity = cmp::max(new_capacity, self.capacity * 2); let new_data = unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; @@ -784,7 +803,7 @@ impl MutableBuffer { if new_len > self.len { self.reserve(new_len); } else { - let new_capacity = bit_util::round_upto_multiple_of_64(new_len); + let new_capacity = utils::round_upto_multiple_of_64(new_len); if new_capacity < self.capacity { let new_data = unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; @@ -838,6 +857,8 @@ impl MutableBuffer { /// Returns a raw pointer for this buffer. /// + /// # Safety + /// /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. #[inline] @@ -845,6 +866,13 @@ impl MutableBuffer { self.data } + /// Returns a mutable raw pointer for this buffer. + /// + /// # Safety + /// + /// Note that this should be used cautiously, and the returned pointer should not be + /// stored anywhere, to avoid dangling pointers. Moreover, arguments and return type using this + /// method should be either immutable, or mutable. #[inline] pub fn raw_data_mut(&mut self) -> *mut u8 { self.data @@ -888,6 +916,51 @@ impl MutableBuffer { } self.len += bytes.len(); } + + /// Gives immutable bit slice of the underlying buffer + /// This method can be used to get bit views for bit operations on the immutable view over the buffer. + #[inline] + pub fn bit_slice(&self) -> BufferBitSlice { + BufferBitSlice::new(self.data()) + } + + /// Gives mutable bit slice of the underlying buffer + /// This method can be used to get bit views for bit operations on the mutable view over the buffer. + #[inline] + pub fn bit_slice_mut(&mut self) -> BufferBitSliceMut { + BufferBitSliceMut::new(self.data_mut()) + } + + /// Count one bits in this buffer + #[inline] + pub fn count_ones(&self) -> usize { + self.bit_slice().count_ones() + } + + /// Count zero bits in this buffer + #[inline] + pub fn count_zeros(&self) -> usize { + self.bit_slice().count_zeros() + } + + /// Unsets bit at the given position in the underlying mutable buffer + #[inline] + pub fn set_bit(&mut self, i: usize) { + self.bit_slice_mut().set_bit(i, true) + } + + /// Unsets bit at the given position in the underlying mutable buffer + #[inline] + pub fn unset_bit(&mut self, i: usize) { + self.bit_slice_mut().set_bit(i, false) + } + + /// + /// Get bits in this view as vector of booleans + #[inline] + pub fn typed_bits(&mut self) -> Vec { + self.bit_slice().typed_bits() + } } impl Drop for MutableBuffer { @@ -1016,11 +1089,11 @@ mod tests { fn test_with_bitset() { let mut_buf = MutableBuffer::new(64).with_bitset(64, false); let buf = mut_buf.freeze(); - assert_eq!(0, buf.count_set_bits()); + assert_eq!(0, buf.count_ones()); let mut_buf = MutableBuffer::new(64).with_bitset(64, true); let buf = mut_buf.freeze(); - assert_eq!(512, buf.count_set_bits()); + assert_eq!(512, buf.count_ones()); } #[test] @@ -1028,12 +1101,12 @@ mod tests { let mut mut_buf = MutableBuffer::new(64).with_bitset(64, true); mut_buf.set_null_bits(0, 64); let buf = mut_buf.freeze(); - assert_eq!(0, buf.count_set_bits()); + assert_eq!(0, buf.count_ones()); let mut mut_buf = MutableBuffer::new(64).with_bitset(64, true); mut_buf.set_null_bits(32, 32); let buf = mut_buf.freeze(); - assert_eq!(256, buf.count_set_bits()); + assert_eq!(256, buf.count_ones()); } #[test] @@ -1205,11 +1278,11 @@ mod tests { #[test] fn test_count_bits() { - assert_eq!(0, Buffer::from(&[0b00000000]).count_set_bits()); - assert_eq!(8, Buffer::from(&[0b11111111]).count_set_bits()); - assert_eq!(3, Buffer::from(&[0b00001101]).count_set_bits()); - assert_eq!(6, Buffer::from(&[0b01001001, 0b01010010]).count_set_bits()); - assert_eq!(16, Buffer::from(&[0b11111111, 0b11111111]).count_set_bits()); + assert_eq!(0, Buffer::from(&[0b00000000]).count_ones()); + assert_eq!(8, Buffer::from(&[0b11111111]).count_ones()); + assert_eq!(3, Buffer::from(&[0b00001101]).count_ones()); + assert_eq!(6, Buffer::from(&[0b01001001, 0b01010010]).count_ones()); + assert_eq!(16, Buffer::from(&[0b11111111, 0b11111111]).count_ones()); } #[test] @@ -1218,73 +1291,133 @@ mod tests { 0, Buffer::from(&[0b11111111, 0b00000000]) .slice(1) - .count_set_bits() + .count_ones() ); assert_eq!( 8, Buffer::from(&[0b11111111, 0b11111111]) .slice(1) - .count_set_bits() + .count_ones() ); assert_eq!( 3, Buffer::from(&[0b11111111, 0b11111111, 0b00001101]) .slice(2) - .count_set_bits() + .count_ones() ); assert_eq!( 6, Buffer::from(&[0b11111111, 0b01001001, 0b01010010]) .slice(1) - .count_set_bits() + .count_ones() ); assert_eq!( 16, Buffer::from(&[0b11111111, 0b11111111, 0b11111111, 0b11111111]) .slice(2) - .count_set_bits() + .count_ones() ); } #[test] fn test_count_bits_offset_slice() { - assert_eq!(8, Buffer::from(&[0b11111111]).count_set_bits_offset(0, 8)); - assert_eq!(3, Buffer::from(&[0b11111111]).count_set_bits_offset(0, 3)); - assert_eq!(5, Buffer::from(&[0b11111111]).count_set_bits_offset(3, 5)); - assert_eq!(1, Buffer::from(&[0b11111111]).count_set_bits_offset(3, 1)); - assert_eq!(0, Buffer::from(&[0b11111111]).count_set_bits_offset(8, 0)); - assert_eq!(2, Buffer::from(&[0b01010101]).count_set_bits_offset(0, 3)); + assert_eq!( + 8, + Buffer::from(&[0b11111111]) + .bit_slice() + .slicing(0, 8) + .count_ones() + ); + assert_eq!( + 3, + Buffer::from(&[0b11111111]) + .bit_slice() + .slicing(0, 3) + .count_ones() + ); + assert_eq!( + 5, + Buffer::from(&[0b11111111]) + .bit_slice() + .slicing(3, 5) + .count_ones() + ); + assert_eq!( + 1, + Buffer::from(&[0b11111111]) + .bit_slice() + .slicing(3, 1) + .count_ones() + ); + assert_eq!( + 0, + Buffer::from(&[0b11111111]) + .bit_slice() + .slicing(8, 0) + .count_ones() + ); + assert_eq!( + 2, + Buffer::from(&[0b01010101]) + .bit_slice() + .slicing(0, 3) + .count_ones() + ); assert_eq!( 16, - Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(0, 16) + Buffer::from(&[0b11111111, 0b11111111]) + .bit_slice() + .slicing(0, 16) + .count_ones() ); assert_eq!( 10, - Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(0, 10) + Buffer::from(&[0b11111111, 0b11111111]) + .bit_slice() + .slicing(0, 10) + .count_ones() ); assert_eq!( 10, - Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(3, 10) + Buffer::from(&[0b11111111, 0b11111111]) + .bit_slice() + .slicing(3, 10) + .count_ones() ); assert_eq!( 8, - Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(8, 8) + Buffer::from(&[0b11111111, 0b11111111]) + .bit_slice() + .slicing(8, 8) + .count_ones() ); assert_eq!( 5, - Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(11, 5) + Buffer::from(&[0b11111111, 0b11111111]) + .bit_slice() + .slicing(11, 5) + .count_ones() ); assert_eq!( 0, - Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(16, 0) + Buffer::from(&[0b11111111, 0b11111111]) + .bit_slice() + .slicing(16, 0) + .count_ones() ); assert_eq!( 2, - Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 5) + Buffer::from(&[0b01101101, 0b10101010]) + .bit_slice() + .slicing(7, 5) + .count_ones() ); assert_eq!( 4, - Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 9) + Buffer::from(&[0b01101101, 0b10101010]) + .bit_slice() + .slicing(7, 9) + .count_ones() ); } } diff --git a/rust/arrow/src/compute/kernels/aggregate.rs b/rust/arrow/src/compute/kernels/aggregate.rs index 52aa893dca7..61e18076cef 100644 --- a/rust/arrow/src/compute/kernels/aggregate.rs +++ b/rust/arrow/src/compute/kernels/aggregate.rs @@ -21,6 +21,11 @@ use std::ops::Add; use crate::array::{Array, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait}; use crate::datatypes::ArrowNumericType; +use rayon::iter::IndexedParallelIterator; +use rayon::prelude::*; + +use bitvec::prelude::BitField; +use std::iter::Sum; /// Helper macro to perform min/max of strings fn min_max_string bool>( @@ -129,7 +134,7 @@ where pub fn sum(array: &PrimitiveArray) -> Option where T: ArrowNumericType, - T::Native: Add, + T::Native: Add + Sum, { let null_count = array.null_count(); @@ -141,32 +146,41 @@ where match array.data().null_buffer() { None => { - let sum = data.iter().fold(T::default_value(), |accumulator, value| { - accumulator + *value - }); + let total = data + .par_iter() + .map(|value| { + let mut sum = T::default_value(); + sum = sum + *value; + sum + }) + .sum(); - Some(sum) + Some(total) } Some(buffer) => { - let mut sum = T::default_value(); - let data_chunks = data.chunks_exact(64); + let data_chunks = data.par_chunks_exact(64); let remainder = data_chunks.remainder(); - let bit_chunks = buffer.bit_chunks(array.offset(), array.len()); - data_chunks - .zip(bit_chunks.iter()) - .for_each(|(chunk, mask)| { + let buffer_slice = buffer.bit_slice().slicing(array.offset(), array.len()); + let buffer_chunks = buffer_slice.par_chunks::(); + + let buffer_remainder_bits: u64 = buffer_chunks.remainder_bits(); + + let mut sum = data_chunks + .zip(buffer_chunks) + .map(|(chunk, mask)| { + let mut sum = T::default_value(); chunk.iter().enumerate().for_each(|(i, value)| { - if (mask & (1 << i)) != 0 { + if (mask.load::() & (1 << i)) != 0 { sum = sum + *value; } }); - }); - - let remainder_bits = bit_chunks.remainder_bits(); + sum + }) + .sum(); remainder.iter().enumerate().for_each(|(i, value)| { - if remainder_bits & (1 << i) != 0 { + if buffer_remainder_bits & (1 << i) != 0 { sum = sum + *value; } }); @@ -180,9 +194,12 @@ where /// /// Returns `None` if the array is empty or only contains null values. #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] -pub fn sum(array: &PrimitiveArray) -> Option +pub fn sum(array: &PrimitiveArray) -> Option where - T::Native: Add, + T: ArrowNumericType, + // T::Native: Add + Sum + Sum, + T::Native: Add + Sum, + T::Simd: Send + Sync + Sum, { let null_count = array.null_count(); @@ -192,71 +209,55 @@ where let data: &[T::Native] = array.value_slice(0, array.len()); - let mut vector_sum = T::init(T::default_value()); - let mut remainder_sum = T::default_value(); - match array.data().null_buffer() { None => { - let data_chunks = data.chunks_exact(64); + let data_chunks = data.par_chunks_exact(T::lanes()); let remainder = data_chunks.remainder(); - data_chunks.for_each(|chunk| { - chunk.chunks_exact(T::lanes()).for_each(|chunk| { - let chunk = T::load(&chunk); - vector_sum = vector_sum + chunk; - }); - }); + let mut agg_sum = + T::horizontal_sum(data_chunks.map(T::load).sum::()); remainder.iter().for_each(|value| { - remainder_sum = remainder_sum + *value; + agg_sum = agg_sum + *value; }); + + Some(agg_sum) } Some(buffer) => { // process data in chunks of 64 elements since we also get 64 bits of validity information at a time - let data_chunks = data.chunks_exact(64); + let data_chunks = data.par_chunks_exact(64); let remainder = data_chunks.remainder(); - let bit_chunks = buffer.bit_chunks(array.offset(), array.len()); + let bit_slice = buffer.bit_slice().slicing(array.offset(), array.len()); + let bit_chunks = bit_slice.par_chunks::(); let remainder_bits = bit_chunks.remainder_bits(); - data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { - // split chunks further into slices corresponding to the vector length - // the compiler is able to unroll this inner loop and remove bounds checks - // since the outer chunk size (64) is always a multiple of the number of lanes - chunk.chunks_exact(T::lanes()).for_each(|chunk| { - let zero = T::init(T::default_value()); - let vecmask = T::mask_from_u64(mask); - let chunk = T::load(&chunk); - let blended = T::mask_select(vecmask, chunk, zero); - - vector_sum = vector_sum + blended; - - mask = mask >> T::lanes(); - }); - }); + let agg_sum: T::Simd = data_chunks + .zip(bit_chunks) + .map(|(chunk, mask)| { + chunk + .par_chunks_exact(T::lanes()) + .map(|chunk| { + let zero = T::init(T::default_value()); + let vecmask = T::mask_from_u64(mask.load::()); + let chunk = T::load(&chunk); + T::mask_select(vecmask, chunk, zero) + }) + .sum::() + }) + .sum::(); + + let mut agg_sum: T::Native = T::horizontal_sum(agg_sum); remainder.iter().enumerate().for_each(|(i, value)| { if remainder_bits & (1 << i) != 0 { - remainder_sum = remainder_sum + *value; + agg_sum = agg_sum + *value; } }); + + Some(agg_sum) } } - - // calculate horizontal sum of accumulator by writing to a temporary - // this is probably faster than extracting individual lanes - // the compiler is free to optimize this to something faster - let tmp = &mut [T::default_value(); 64]; - T::write(vector_sum, &mut tmp[0..T::lanes()]); - - let mut total_sum = T::default_value(); - tmp[0..T::lanes()] - .iter() - .for_each(|lane| total_sum = total_sum + *lane); - - total_sum = total_sum + remainder_sum; - - Some(total_sum) } #[cfg(test)] diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index fe1bda5e26f..c88680e6d6a 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -31,6 +31,7 @@ use std::sync::Arc; use num::{One, Zero}; +use crate::array::*; #[cfg(feature = "simd")] use crate::bitmap::Bitmap; use crate::buffer::Buffer; @@ -42,7 +43,6 @@ use crate::compute::util::simd_load_set_invalid; use crate::datatypes; use crate::datatypes::ToByteSlice; use crate::error::{ArrowError, Result}; -use crate::{array::*, util::bit_util}; /// Helper function to perform math lambda function on values from two arrays. If either /// left or right value is null then the output value is also null, so `1 + null` is @@ -113,8 +113,8 @@ where if let Some(b) = &null_bit_buffer { // some value is null for i in 0..left.len() { - values.push(unsafe { - if bit_util::get_bit_raw(b.raw_data(), i) { + values.push({ + if b.get_bit(i) { let right_value = right.value(i); if right_value.is_zero() { return Err(ArrowError::DivideByZero); diff --git a/rust/arrow/src/compute/kernels/boolean.rs b/rust/arrow/src/compute/kernels/boolean.rs index 075ffb0d67b..ac9d24d20bb 100644 --- a/rust/arrow/src/compute/kernels/boolean.rs +++ b/rust/arrow/src/compute/kernels/boolean.rs @@ -31,7 +31,7 @@ use crate::buffer::{ use crate::compute::util::combine_option_bitmap; use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; -use crate::util::bit_util::ceil; +use crate::util::utils::ceil; /// Helper function to implement binary kernels fn binary_boolean_kernel( @@ -140,7 +140,7 @@ pub fn is_not_null(input: &ArrayRef) -> Result { .with_bitset(len_bytes, true) .freeze() } - Some(buffer) => buffer.bit_slice(input.offset(), len), + Some(buffer) => buffer.bit_view(input.offset(), len), }; let data = diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index 4268eaf568f..5e2689293b2 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -31,7 +31,7 @@ use crate::buffer::{Buffer, MutableBuffer}; use crate::compute::util::combine_option_bitmap; use crate::datatypes::{ArrowNumericType, BooleanType, DataType}; use crate::error::{ArrowError, Result}; -use crate::util::bit_util; +use crate::util::utils; /// Helper function to perform boolean lambda function on values from two arrays, this /// version does not attempt to use SIMD. @@ -626,27 +626,24 @@ where )); } - let num_bytes = bit_util::ceil(left_len, 8); + let num_bytes = utils::ceil(left_len, 8); let not_both_null_bit_buffer = match combine_option_bitmap(left.data_ref(), right.data_ref(), left_len)? { Some(buff) => buff, None => new_all_set_buffer(num_bytes), }; - let not_both_null_bitmap = not_both_null_bit_buffer.data(); - let mut bool_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let bool_slice = bool_buf.data_mut(); // if both array slots are valid, check if list contains primitive for i in 0..left_len { - if bit_util::get_bit(not_both_null_bitmap, i) { + if not_both_null_bit_buffer.get_bit(i) { let list = right.value(i); let list = list.as_any().downcast_ref::>().unwrap(); for j in 0..list.len() { if list.is_valid(j) && (left.value(i) == list.value(j)) { - bit_util::set_bit(bool_slice, i); + bool_buf.set_bit(i); continue; } } @@ -681,21 +678,19 @@ where )); } - let num_bytes = bit_util::ceil(left_len, 8); + let num_bytes = utils::ceil(left_len, 8); let not_both_null_bit_buffer = match combine_option_bitmap(left.data_ref(), right.data_ref(), left_len)? { Some(buff) => buff, None => new_all_set_buffer(num_bytes), }; - let not_both_null_bitmap = not_both_null_bit_buffer.data(); let mut bool_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let bool_slice = bool_buf.data_mut(); for i in 0..left_len { // contains(null, null) = false - if bit_util::get_bit(not_both_null_bitmap, i) { + if not_both_null_bit_buffer.get_bit(i) { let list = right.value(i); let list = list .as_any() @@ -704,7 +699,7 @@ where for j in 0..list.len() { if list.is_valid(j) && (left.value(i) == list.value(j)) { - bit_util::set_bit(bool_slice, i); + bool_buf.set_bit(i); continue; } } diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index eb8d3397cfc..623c748a322 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -21,10 +21,10 @@ use crate::array::*; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; +use crate::util::bit_ops::BufferBitSlice; use crate::{ bitmap::Bitmap, buffer::{Buffer, MutableBuffer}, - util::bit_util, }; use std::{mem, sync::Arc}; @@ -93,8 +93,9 @@ impl<'a> NullBitSetter<'a> { impl<'a> CopyNullBit for NullBitSetter<'a> { #[inline] fn copy_null_bit(&mut self, source_index: usize) { - if !bit_util::get_bit(self.source_bytes, source_index) { - bit_util::unset_bit(self.target_buffer.data_mut(), self.target_index); + let source = BufferBitSlice::new(self.source_bytes); + if !source.get_bit(source_index) { + self.target_buffer.unset_bit(self.target_index); self.null_count += 1; } self.target_index += 1; @@ -319,10 +320,10 @@ impl FilterContext { )); } let filter_mask: Vec = (0..64).map(|x| 1u64 << x).collect(); - let filter_buffer = &filter_array.data_ref().buffers()[0]; - let filtered_count = filter_buffer.count_set_bits_offset(0, filter_array.len()); - - let filter_bytes = filter_buffer.data(); + let filter_bytes = filter_array.data_ref().buffers()[0].data(); + let filtered_count = BufferBitSlice::new(filter_bytes) + .slicing(0, filter_array.len()) + .count_ones(); // transmute filter_bytes to &[u64] let mut u64_buffer = MutableBuffer::new(filter_bytes.len()); diff --git a/rust/arrow/src/compute/kernels/limit.rs b/rust/arrow/src/compute/kernels/limit.rs index 65f66bce8e5..8f191c3e933 100644 --- a/rust/arrow/src/compute/kernels/limit.rs +++ b/rust/arrow/src/compute/kernels/limit.rs @@ -37,8 +37,8 @@ mod tests { use crate::array::*; use crate::buffer::Buffer; use crate::datatypes::{DataType, Field, ToByteSlice}; - use crate::util::bit_util; + use crate::util::bit_ops::BufferBitSliceMut; use std::sync::Arc; #[test] @@ -103,11 +103,12 @@ mod tests { Buffer::from(&[0, 2, 2, 4, 4, 6, 6, 9, 9, 10].to_byte_slice()); // 01010101 00000001 let mut null_bits: [u8; 2] = [0; 2]; - bit_util::set_bit(&mut null_bits, 0); - bit_util::set_bit(&mut null_bits, 2); - bit_util::set_bit(&mut null_bits, 4); - bit_util::set_bit(&mut null_bits, 6); - bit_util::set_bit(&mut null_bits, 8); + let mut null_bit_slice = BufferBitSliceMut::new(&mut null_bits); + null_bit_slice.set_bit(0, true); + null_bit_slice.set_bit(2, true); + null_bit_slice.set_bit(4, true); + null_bit_slice.set_bit(6, true); + null_bit_slice.set_bit(8, true); // Construct a list array from the above two let list_data_type = diff --git a/rust/arrow/src/compute/kernels/take.rs b/rust/arrow/src/compute/kernels/take.rs index 0d999e34128..854897415e6 100644 --- a/rust/arrow/src/compute/kernels/take.rs +++ b/rust/arrow/src/compute/kernels/take.rs @@ -23,7 +23,7 @@ use crate::buffer::{Buffer, MutableBuffer}; use crate::compute::util::take_value_indices_from_list; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use crate::util::bit_util; +use crate::util::utils; use crate::{array::*, buffer::buffer_bin_and}; use num::{ToPrimitive, Zero}; @@ -212,11 +212,9 @@ where let array = values.as_any().downcast_ref::>().unwrap(); - let num_bytes = bit_util::ceil(data_len, 8); + let num_bytes = utils::ceil(data_len, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.data_mut(); - // This iteration is implemented with a while loop, rather than a // map()/collect(), since the while loop performs better in the benchmarks. let mut new_values: Vec = Vec::with_capacity(data_len); @@ -227,7 +225,7 @@ where })?; if array.is_null(index) { - bit_util::unset_bit(null_slice, i); + null_buf.unset_bit(i); } new_values.push(array.value(index)); @@ -265,22 +263,19 @@ where let array = values.as_any().downcast_ref::().unwrap(); - let num_byte = bit_util::ceil(data_len, 8); + let num_byte = utils::ceil(data_len, 8); let mut null_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, true); let mut val_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false); - let null_slice = null_buf.data_mut(); - let val_slice = val_buf.data_mut(); - (0..data_len).try_for_each::<_, Result<()>>(|i| { let index = ToPrimitive::to_usize(&indices.value(i)).ok_or_else(|| { ArrowError::ComputeError("Cast to usize failed".to_string()) })?; if array.is_null(index) { - bit_util::unset_bit(null_slice, i); + null_buf.unset_bit(i); } else if array.value(index) { - bit_util::set_bit(val_slice, i); + val_buf.set_bit(i); } Ok(()) @@ -320,9 +315,8 @@ where .downcast_ref::>() .unwrap(); - let num_bytes = bit_util::ceil(data_len, 8); + let num_bytes = utils::ceil(data_len, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.data_mut(); let mut offsets = Vec::with_capacity(data_len + 1); let mut values = Vec::with_capacity(data_len); @@ -341,7 +335,7 @@ where values.extend_from_slice(s.as_bytes()); } else { // set null bit - bit_util::unset_bit(null_slice, i); + null_buf.unset_bit(i); } offsets.push(length_so_far); } @@ -388,17 +382,14 @@ where let taken = take_impl::(&list.values(), &list_indices, None)?; // determine null count and null buffer, which are a function of `values` and `indices` - let mut null_count = 0; - let num_bytes = bit_util::ceil(indices.len(), 8); + let num_bytes = utils::ceil(indices.len(), 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); { - let null_slice = null_buf.data_mut(); offsets[..].windows(2).enumerate().for_each( |(i, window): (usize, &[OffsetType::Native])| { if window[0] == window[1] { // offsets are equal, slot is null - bit_util::unset_bit(null_slice, i); - null_count += 1; + null_buf.unset_bit(i); } }, ); @@ -407,7 +398,7 @@ where // create a new list with taken data and computed null information let list_data = ArrayDataBuilder::new(list.data_type().clone()) .len(indices.len()) - .null_count(null_count) + .null_count(null_buf.count_zeros()) .null_bit_buffer(null_buf.freeze()) .offset(0) .add_child_data(taken.data()) @@ -452,6 +443,7 @@ where #[cfg(test)] mod tests { use super::*; + use crate::util::bit_ops::*; fn test_take_primitive_arrays( data: Vec>, @@ -951,9 +943,10 @@ mod tests { let expected_offsets = Buffer::from(&expected_offsets.to_byte_slice()); // construct list array from the two let mut null_bits: [u8; 1] = [0; 1]; - bit_util::set_bit(&mut null_bits, 2); - bit_util::set_bit(&mut null_bits, 3); - bit_util::set_bit(&mut null_bits, 4); + let mut null_bit_view = BufferBitSliceMut::new(&mut null_bits); + null_bit_view.set_bit(2, true); + null_bit_view.set_bit(3, true); + null_bit_view.set_bit(4, true); let expected_list_data = ArrayData::builder(list_data_type) .len(5) .null_count(2) diff --git a/rust/arrow/src/compute/util.rs b/rust/arrow/src/compute/util.rs index ba7de77f6b0..44afa936a6b 100644 --- a/rust/arrow/src/compute/util.rs +++ b/rust/arrow/src/compute/util.rs @@ -45,10 +45,10 @@ pub(super) fn combine_option_bitmap( match left { None => match right { None => Ok(None), - Some(r) => Ok(Some(r.bit_slice(right_offset_in_bits, len_in_bits))), + Some(r) => Ok(Some(r.bit_view(right_offset_in_bits, len_in_bits))), }, Some(l) => match right { - None => Ok(Some(l.bit_slice(left_offset_in_bits, len_in_bits))), + None => Ok(Some(l.bit_view(left_offset_in_bits, len_in_bits))), Some(r) => Ok(Some(buffer_bin_and( &l, @@ -78,10 +78,10 @@ pub(super) fn compare_option_bitmap( match left { None => match right { None => Ok(None), - Some(r) => Ok(Some(r.bit_slice(right_offset_in_bits, len_in_bits))), + Some(r) => Ok(Some(r.bit_view(right_offset_in_bits, len_in_bits))), }, Some(l) => match right { - None => Ok(Some(l.bit_slice(left_offset_in_bits, len_in_bits))), + None => Ok(Some(l.bit_view(left_offset_in_bits, len_in_bits))), Some(r) => Ok(Some(buffer_bin_or( &l, diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 630b14dfc1e..e3e5a4dc247 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -40,7 +40,7 @@ use serde_json::{ }; use crate::error::{ArrowError, Result}; -use crate::util::bit_util; +use crate::util::bit_ops::BufferBitSlice; /// The set of datatypes that are supported by this implementation of Apache Arrow. /// @@ -386,7 +386,11 @@ impl ArrowPrimitiveType for BooleanType { /// The pointer must be part of a bit-packed boolean array, and the index must be less than the /// size of the array. unsafe fn index(raw_ptr: *const Self::Native, i: usize) -> Self::Native { - bit_util::get_bit_raw(raw_ptr as *const u8, i) + let data = std::slice::from_raw_parts( + raw_ptr as *const u8, + std::mem::size_of::(), + ); + BufferBitSlice::new(data).get_bit(i) } } @@ -558,6 +562,9 @@ where op: F, ) -> Self::Simd; + /// Horizontal sum of SIMD vector elements + fn horizontal_sum(vector: Self::Simd) -> Self::Native; + /// SIMD version of equal fn eq(left: Self::Simd, right: Self::Simd) -> Self::SimdMask; @@ -587,7 +594,7 @@ where pub trait ArrowNumericType: ArrowPrimitiveType {} macro_rules! make_numeric_type { - ($impl_ty:ty, $native_ty:ty, $simd_ty:ident, $simd_mask_ty:ident) => { + ($impl_ty:ty, $native_ty:ty, $simd_ty:ident, $simd_mask_ty:ident, $lanes:expr) => { #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] impl ArrowNumericType for $impl_ty { type Simd = $simd_ty; @@ -737,6 +744,15 @@ macro_rules! make_numeric_type { op(left, right) } + #[inline] + fn horizontal_sum(vector: Self::Simd) -> Self::Native { + let mut x = vector.extract(0) as Self::Native; + for i in 1..Self::Simd::lanes() { + x += vector.extract(i) as Self::Native; + } + x + } + #[inline] fn eq(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { left.eq(right) @@ -772,6 +788,7 @@ macro_rules! make_numeric_type { unsafe { simd_result.write_to_slice_unaligned_unchecked(slice) }; } } + #[cfg(any( not(any(target_arch = "x86", target_arch = "x86_64")), not(feature = "simd") @@ -780,33 +797,33 @@ macro_rules! make_numeric_type { }; } -make_numeric_type!(Int8Type, i8, i8x64, m8x64); -make_numeric_type!(Int16Type, i16, i16x32, m16x32); -make_numeric_type!(Int32Type, i32, i32x16, m32x16); -make_numeric_type!(Int64Type, i64, i64x8, m64x8); -make_numeric_type!(UInt8Type, u8, u8x64, m8x64); -make_numeric_type!(UInt16Type, u16, u16x32, m16x32); -make_numeric_type!(UInt32Type, u32, u32x16, m32x16); -make_numeric_type!(UInt64Type, u64, u64x8, m64x8); -make_numeric_type!(Float32Type, f32, f32x16, m32x16); -make_numeric_type!(Float64Type, f64, f64x8, m64x8); - -make_numeric_type!(TimestampSecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampMillisecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampMicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(TimestampNanosecondType, i64, i64x8, m64x8); -make_numeric_type!(Date32Type, i32, i32x16, m32x16); -make_numeric_type!(Date64Type, i64, i64x8, m64x8); -make_numeric_type!(Time32SecondType, i32, i32x16, m32x16); -make_numeric_type!(Time32MillisecondType, i32, i32x16, m32x16); -make_numeric_type!(Time64MicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(Time64NanosecondType, i64, i64x8, m64x8); -make_numeric_type!(IntervalYearMonthType, i32, i32x16, m32x16); -make_numeric_type!(IntervalDayTimeType, i64, i64x8, m64x8); -make_numeric_type!(DurationSecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationMillisecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationMicrosecondType, i64, i64x8, m64x8); -make_numeric_type!(DurationNanosecondType, i64, i64x8, m64x8); +make_numeric_type!(Int8Type, i8, i8x64, m8x64, 64); +make_numeric_type!(Int16Type, i16, i16x32, m16x32, 32); +make_numeric_type!(Int32Type, i32, i32x16, m32x16, 16); +make_numeric_type!(Int64Type, i64, i64x8, m64x8, 8); +make_numeric_type!(UInt8Type, u8, u8x64, m8x64, 64); +make_numeric_type!(UInt16Type, u16, u16x32, m16x32, 32); +make_numeric_type!(UInt32Type, u32, u32x16, m32x16, 16); +make_numeric_type!(UInt64Type, u64, u64x8, m64x8, 8); +make_numeric_type!(Float32Type, f32, f32x16, m32x16, 16); +make_numeric_type!(Float64Type, f64, f64x8, m64x8, 8); + +make_numeric_type!(TimestampSecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(TimestampMillisecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(TimestampMicrosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(TimestampNanosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(Date32Type, i32, i32x16, m32x16, 16); +make_numeric_type!(Date64Type, i64, i64x8, m64x8, 8); +make_numeric_type!(Time32SecondType, i32, i32x16, m32x16, 16); +make_numeric_type!(Time32MillisecondType, i32, i32x16, m32x16, 16); +make_numeric_type!(Time64MicrosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(Time64NanosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(IntervalYearMonthType, i32, i32x16, m32x16, 16); +make_numeric_type!(IntervalDayTimeType, i64, i64x8, m64x8, 8); +make_numeric_type!(DurationSecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(DurationMillisecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(DurationMicrosecondType, i64, i64x8, m64x8, 8); +make_numeric_type!(DurationNanosecondType, i64, i64x8, m64x8, 8); /// A subtype of primitive type that represents temporal values. pub trait ArrowTemporalType: ArrowPrimitiveType {} diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index cb861609f51..d3a17730a60 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -30,7 +30,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::ipc; use crate::record_batch::RecordBatch; -use crate::util::bit_util; +use crate::util::utils; use ipc::CONTINUATION_MARKER; @@ -533,7 +533,7 @@ fn write_array_data( let null_buffer = match array_data.null_buffer() { None => { // create a buffer and fill it with valid bits - let num_bytes = bit_util::ceil(num_rows, 8); + let num_bytes = utils::ceil(num_rows, 8); let buffer = MutableBuffer::new(num_bytes); let buffer = buffer.with_bitset(num_bytes, true); buffer.freeze() diff --git a/rust/arrow/src/util/bit_chunk_iterator.rs b/rust/arrow/src/util/bit_chunk_iterator.rs deleted file mode 100644 index 801c38a243f..00000000000 --- a/rust/arrow/src/util/bit_chunk_iterator.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -use crate::buffer::Buffer; -use crate::util::bit_util::ceil; -use std::fmt::Debug; - -#[derive(Debug)] -pub struct BitChunks<'a> { - buffer: &'a Buffer, - raw_data: *const u8, - /// offset inside a byte, guaranteed to be between 0 and 7 (inclusive) - bit_offset: usize, - /// number of complete u64 chunks - chunk_len: usize, - /// number of remaining bits, guaranteed to be between 0 and 63 (inclusive) - remainder_len: usize, -} - -impl<'a> BitChunks<'a> { - pub fn new(buffer: &'a Buffer, offset: usize, len: usize) -> Self { - assert!(ceil(offset + len, 8) <= buffer.len() * 8); - - let byte_offset = offset / 8; - let bit_offset = offset % 8; - - let raw_data = unsafe { buffer.raw_data().add(byte_offset) }; - - let chunk_bits = 8 * std::mem::size_of::(); - - let chunk_len = len / chunk_bits; - let remainder_len = len & (chunk_bits - 1); - - BitChunks::<'a> { - buffer: &buffer, - raw_data, - bit_offset, - chunk_len, - remainder_len, - } - } -} - -#[derive(Debug)] -pub struct BitChunkIterator<'a> { - buffer: &'a Buffer, - raw_data: *const u8, - bit_offset: usize, - chunk_len: usize, - index: usize, -} - -impl<'a> BitChunks<'a> { - /// Returns the number of remaining bits, guaranteed to be between 0 and 63 (inclusive) - #[inline] - pub const fn remainder_len(&self) -> usize { - self.remainder_len - } - - /// Returns the bitmask of remaining bits - #[inline] - pub fn remainder_bits(&self) -> u64 { - let bit_len = self.remainder_len; - if bit_len == 0 { - 0 - } else { - let bit_offset = self.bit_offset; - // number of bytes to read - // might be one more than sizeof(u64) if the offset is in the middle of a byte - let byte_len = ceil(bit_len + bit_offset, 8); - // pointer to remainder bytes after all complete chunks - let base = unsafe { - self.raw_data - .add(self.chunk_len * std::mem::size_of::()) - }; - - let mut bits = unsafe { std::ptr::read(base) } as u64 >> bit_offset; - for i in 1..byte_len { - let byte = unsafe { std::ptr::read(base.add(i)) }; - bits |= (byte as u64) << (i * 8 - bit_offset); - } - - bits & ((1 << bit_len) - 1) - } - } - - /// Returns an iterator over chunks of 64 bits represented as an u64 - #[inline] - pub const fn iter(&self) -> BitChunkIterator<'a> { - BitChunkIterator::<'a> { - buffer: self.buffer, - raw_data: self.raw_data, - bit_offset: self.bit_offset, - chunk_len: self.chunk_len, - index: 0, - } - } -} - -impl<'a> IntoIterator for BitChunks<'a> { - type Item = u64; - type IntoIter = BitChunkIterator<'a>; - - fn into_iter(self) -> Self::IntoIter { - self.iter() - } -} - -impl Iterator for BitChunkIterator<'_> { - type Item = u64; - - #[inline] - fn next(&mut self) -> Option { - let index = self.index; - if index >= self.chunk_len { - return None; - } - - // cast to *const u64 should be fine since we are using read_unaligned below - #[allow(clippy::cast_ptr_alignment)] - let raw_data = self.raw_data as *const u64; - - // bit-packed buffers are stored starting with the least-significant byte first - // so when reading as u64 on a big-endian machine, the bytes need to be swapped - let current = unsafe { std::ptr::read_unaligned(raw_data.add(index)).to_le() }; - - let combined = if self.bit_offset == 0 { - current - } else { - let next = - unsafe { std::ptr::read_unaligned(raw_data.add(index + 1)).to_le() }; - - current >> self.bit_offset - | (next & ((1 << self.bit_offset) - 1)) << (64 - self.bit_offset) - }; - - self.index = index + 1; - - Some(combined) - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - ( - self.chunk_len - self.index, - Some(self.chunk_len - self.index), - ) - } -} - -impl ExactSizeIterator for BitChunkIterator<'_> { - #[inline] - fn len(&self) -> usize { - self.chunk_len - self.index - } -} - -#[cfg(test)] -mod tests { - use crate::buffer::Buffer; - - #[test] - fn test_iter_aligned() { - let input: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7]; - let buffer: Buffer = Buffer::from(input); - - let bitchunks = buffer.bit_chunks(0, 64); - let result = bitchunks.into_iter().collect::>(); - - assert_eq!(vec![0x0706050403020100], result); - } - - #[test] - fn test_iter_unaligned() { - let input: &[u8] = &[ - 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, - 0b00100000, 0b01000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bitchunks = buffer.bit_chunks(4, 64); - - assert_eq!(0, bitchunks.remainder_len()); - assert_eq!(0, bitchunks.remainder_bits()); - - let result = bitchunks.into_iter().collect::>(); - - assert_eq!( - vec![0b1111010000000010000000010000000010000000010000000010000000010000], - result - ); - } - - #[test] - fn test_iter_unaligned_remainder_1_byte() { - let input: &[u8] = &[ - 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, - 0b00100000, 0b01000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bitchunks = buffer.bit_chunks(4, 66); - - assert_eq!(2, bitchunks.remainder_len()); - assert_eq!(0b00000011, bitchunks.remainder_bits()); - - let result = bitchunks.into_iter().collect::>(); - - assert_eq!( - vec![0b1111010000000010000000010000000010000000010000000010000000010000], - result - ); - } - - #[test] - fn test_iter_unaligned_remainder_bits_across_bytes() { - let input: &[u8] = &[0b00111111, 0b11111100]; - let buffer: Buffer = Buffer::from(input); - - // remainder contains bits from both bytes - // result should be the highest 2 bits from first byte followed by lowest 5 bits of second bytes - let bitchunks = buffer.bit_chunks(6, 7); - - assert_eq!(7, bitchunks.remainder_len()); - assert_eq!(0b1110000, bitchunks.remainder_bits()); - } - - #[test] - fn test_iter_unaligned_remainder_bits_large() { - let input: &[u8] = &[ - 0b11111111, 0b00000000, 0b11111111, 0b00000000, 0b11111111, 0b00000000, - 0b11111111, 0b00000000, 0b11111111, - ]; - let buffer: Buffer = Buffer::from(input); - - let bitchunks = buffer.bit_chunks(2, 63); - - assert_eq!(63, bitchunks.remainder_len()); - assert_eq!( - 0b100_0000_0011_1111_1100_0000_0011_1111_1100_0000_0011_1111_1100_0000_0011_1111, - bitchunks.remainder_bits() - ); - } -} diff --git a/rust/arrow/src/util/bit_ops.rs b/rust/arrow/src/util/bit_ops.rs new file mode 100644 index 00000000000..00cdccd7806 --- /dev/null +++ b/rust/arrow/src/util/bit_ops.rs @@ -0,0 +1,588 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::buffer::Buffer; + +use bitvec::prelude::*; +use bitvec::slice::ChunksExact; + +use rayon::iter::plumbing::*; +use rayon::prelude::*; +use std::fmt::Debug; +use std::marker::PhantomData as marker; + +/// +/// Immutable bit slice view of `Buffer` data. +/// +/// `BufferBitSlice` does not own any underlying data, but rather wraps references +/// to the underlying data in a `Buffer` and has methods for addressing and interacting with +/// individual bits +#[derive(Debug)] +pub struct BufferBitSlice<'a> { + bit_slice: &'a BitSlice, +} + +impl<'a> BufferBitSlice<'a> { + /// + /// Creates a immutable bit slice over the given data + #[inline] + pub fn new(buffer_data: &'a [u8]) -> Self { + let bit_slice = BitSlice::::from_slice(buffer_data).unwrap(); + + BufferBitSlice { + bit_slice: &bit_slice, + } + } + + /// + /// Returns immutable view with the given offset in bits and length in bits. + /// This view have zero-copy representation over the actual data. + #[inline] + pub fn slicing(&self, offset_in_bits: usize, len_in_bits: usize) -> Self { + Self { + bit_slice: &self.bit_slice[offset_in_bits..offset_in_bits + len_in_bits], + } + } + + /// + /// Returns bit chunks in given byte width. + /// This can be u64(native Arrow byte representation size) or any other unsigned primitive like: + /// u8, u16, u32, u128 and usize. + /// + /// This method is generic over the given primitives to enable user to filter out + /// any upper/lower nibble/s which is not used like: + /// + /// # Example + /// + /// ``` + /// # use arrow::buffer::Buffer; + /// let input: &[u8] = &[ + /// 0b11111111, 0b00000000, 0b11111111, 0b00000000, + /// 0b11111111, 0b00000000, 0b11111111, 0b00000000, + /// ]; + /// + /// let buffer: Buffer = Buffer::from(input); + /// let bit_slice = buffer.bit_slice(); + /// // Interpret bit slice as u8 + /// let chunks = bit_slice.chunks::(); + /// + /// // Filter out null bytes for compression + /// let bytes = chunks.into_native_iter().filter(|e| *e != 0x00_u8).collect::>(); + /// assert_eq!(bytes.len(), 4); + /// ``` + /// Native representations in Arrow follows 64-bit convention. + /// Chunks can still be reinterpreted in any primitive type lower than u64. + #[inline] + pub fn chunks(&self) -> BufferBitChunksExact + where + T: BitMemory, + { + let offset_size_in_bits = 8 * std::mem::size_of::(); + let chunks_exact = self.bit_slice.chunks_exact(offset_size_in_bits); + let remainder_bits = chunks_exact.remainder(); + let remainder: T = if remainder_bits.is_empty() { + T::default() + } else { + remainder_bits.load::() + }; + BufferBitChunksExact { + chunks_exact, + remainder, + remainder_len_in_bits: remainder_bits.len(), + } + } + + #[inline] + pub fn par_chunks(&self) -> ParallelChunksExact + where + T: BitMemory, + { + let offset_size_in_bits = 8 * std::mem::size_of::(); + let chunks_exact = self.bit_slice.chunks_exact(offset_size_in_bits); + let remainder_bits = chunks_exact.remainder(); + let remainder: T = if remainder_bits.is_empty() { + T::default() + } else { + remainder_bits.load::() + }; + ParallelChunksExact { + bit_slice: self.bit_slice, + chunk_size: offset_size_in_bits, + remainder, + remainder_len_in_bits: remainder_bits.len(), + } + } + + /// + /// Converts the bit view into a Buffer. + /// Buffer is always byte-aligned and it's pointer is aligned to size of u64. + #[inline] + pub fn as_buffer(&self) -> Buffer { + Buffer::from(self.bit_slice.as_slice()) + } + + /// + /// Count ones in the given bit view + #[inline] + pub fn count_ones(&self) -> usize { + self.bit_slice.count_ones() + } + + /// + /// Count zeros in the given bit view + #[inline] + pub fn count_zeros(&self) -> usize { + self.bit_slice.count_zeros() + } + + /// + /// Get bit value at the given index in this bit view + #[inline] + pub fn get_bit(&self, index: usize) -> bool { + *unsafe { self.bit_slice.get_unchecked(index) } + } + + /// + /// Get bits in this view as vector of booleans + #[inline] + pub fn typed_bits(&self) -> Vec { + self.bit_slice.iter().copied().collect() + } + + /// + /// Get manipulated data as byte slice + #[inline] + pub fn to_slice(&self) -> &[u8] { + self.bit_slice.as_slice() + } +} + +impl<'a> PartialEq for BufferBitSlice<'a> { + fn eq(&self, other: &Self) -> bool { + self.bit_slice == other.bit_slice + } +} + +/// +/// Conversion from mutable slice to immutable bit slice +impl<'a> From<&'a [u8]> for BufferBitSlice<'a> { + fn from(data: &'a [u8]) -> Self { + BufferBitSlice::new(data) + } +} + +/// +/// Mutable bit slice view of buffer data +/// +/// `BufferBitSliceMut` does not own any underlying data, but rather +/// has methods for addressing and interacting with individual bits. +#[derive(Debug)] +pub struct BufferBitSliceMut<'a> { + bit_slice: &'a mut BitSlice, +} + +impl<'a> BufferBitSliceMut<'a> { + /// + /// Creates a mutable bit slice over the given data + #[inline] + pub fn new(buffer_data: &'a mut [u8]) -> Self { + let bit_slice = BitSlice::::from_slice_mut(buffer_data).unwrap(); + + BufferBitSliceMut { bit_slice } + } + + /// + /// Returns mutable view with the given offset in bits and length in bits. + /// This view have zero-copy representation over the actual data. + #[inline] + pub fn slicing(&'a mut self, offset_in_bits: usize, len_in_bits: usize) -> Self { + Self { + bit_slice: &mut self.bit_slice[offset_in_bits..offset_in_bits + len_in_bits], + } + } + + /// + /// Sets all bits in this slice to the given value + #[inline] + pub fn set_bit_all(&mut self, value: bool) { + self.bit_slice.set_all(value) + } + + /// + /// Set given bit at the position to a given value + #[inline] + pub fn set_bit(&mut self, index: usize, value: bool) { + unsafe { self.bit_slice.set_unchecked(index, value) } + } + + /// + /// Converts the bit view into a Buffer. + /// Buffer is always byte-aligned and it's pointer is aligned to size of u64. + #[inline] + pub fn as_buffer(&self) -> Buffer { + Buffer::from(self.bit_slice.as_slice()) + } + + /// + /// Count ones in the given bit view + #[inline] + pub fn count_ones(&self) -> usize { + self.bit_slice.count_ones() + } + + /// + /// Count zeros in the given bit view + #[inline] + pub fn count_zeros(&self) -> usize { + self.bit_slice.count_zeros() + } + + /// + /// Get bit value at the given index in this bit view + #[inline] + pub fn get_bit(&self, index: usize) -> bool { + *unsafe { self.bit_slice.get_unchecked(index) } + } + + /// + /// Get bits in this view as vector of booleans + #[inline] + pub fn typed_bits(&self) -> Vec { + self.bit_slice.iter().copied().collect() + } + + /// + /// Get manipulated data as byte slice + #[inline] + pub fn to_slice(&self) -> &[u8] { + self.bit_slice.as_slice() + } +} + +impl<'a> PartialEq for BufferBitSliceMut<'a> { + fn eq(&self, other: &Self) -> bool { + self.bit_slice == other.bit_slice + } +} + +/// +/// Conversion from mutable slice to mutable bit slice +impl<'a> From<&'a mut [u8]> for BufferBitSliceMut<'a> { + fn from(data: &'a mut [u8]) -> Self { + BufferBitSliceMut::new(data) + } +} + +/// +/// Exact chunk view over the bit slice +/// +/// The view is represented as some number of aligned T-sized chunks, +/// followed by some number of remainder bits +#[derive(Clone, Debug)] +pub struct BufferBitChunksExact<'a, T> +where + T: BitMemory, +{ + chunks_exact: ChunksExact<'a, LocalBits, u8>, + remainder: T, + remainder_len_in_bits: usize, +} + +impl<'a, T> BufferBitChunksExact<'a, T> +where + T: BitMemory, +{ + /// + /// Returns remainder bit length from the exact chunk iterator + #[inline(always)] + pub fn remainder_bit_len(&self) -> usize { + self.remainder_len_in_bits + } + + /// + /// Returns the remainder bits interpreted as given type. + #[inline(always)] + pub fn remainder_bits(&self) -> T { + self.remainder + } + + /// + /// Returns an iterator which interprets underlying chunk's view's bits as given type. + #[inline(always)] + pub fn into_native_iter(self) -> impl Iterator + 'a + where + T: BitMemory, + { + self.chunks_exact.map(|e| e.load::()) + } + + /// + /// Returns underlying iterator as it is + #[inline(always)] + pub fn iter(&self) -> &ChunksExact<'a, LocalBits, u8> { + &self.chunks_exact + } +} + +/// +/// Implements consuming iterator for exact chunk iterator +impl<'a, T> IntoIterator for BufferBitChunksExact<'a, T> +where + T: BitMemory, +{ + type Item = &'a BitSlice; + type IntoIter = ChunksExact<'a, LocalBits, u8>; + + fn into_iter(self) -> Self::IntoIter { + self.chunks_exact + } +} + +/// Implements parallel iterator over an immutable reference to a `BufferBitChunksExact` +#[derive(Debug)] +pub struct ParallelChunksExact<'a, T> { + bit_slice: &'a BitSlice, + chunk_size: usize, + remainder: T, + remainder_len_in_bits: usize, +} + +impl<'a, T> ParallelChunksExact<'a, T> +where + T: BitMemory, +{ + /// + /// Returns remainder bit length from the exact chunk iterator + #[inline(always)] + pub fn remainder_bit_len(&self) -> usize { + self.remainder_len_in_bits + } + + /// + /// Returns the remainder bits interpreted as given type. + #[inline(always)] + pub fn remainder_bits(&self) -> T { + self.remainder + } + + /// + /// Returns an iterator which interprets underlying chunk's view's bits as given type. + #[inline(always)] + pub fn into_native_iter(self) -> impl Iterator + 'a + where + T: BitMemory, + { + self.bit_slice + .chunks_exact(self.chunk_size) + .map(|e| e.load::()) + } +} + +impl<'a, T> ParallelIterator for ParallelChunksExact<'a, T> +where + T: BitMemory, +{ + type Item = &'a BitSlice; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(self.len()) + } +} + +impl<'a, T> IndexedParallelIterator for ParallelChunksExact<'a, T> +where + T: BitMemory, +{ + fn len(&self) -> usize { + self.bit_slice.len() + } + + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + bridge(self, consumer) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + callback.callback(BufferBitSliceProducer:: { + inner: self.bit_slice, + marker, + }) + } +} + +#[derive(Debug)] +pub struct BufferBitSliceProducer<'a, T> +where + T: BitMemory, +{ + inner: &'a BitSlice, + marker: marker, +} + +impl<'a, T> Producer for BufferBitSliceProducer<'a, T> +where + T: BitMemory, +{ + type Item = &'a BitSlice; + type IntoIter = ChunksExact<'a, LocalBits, u8>; + + fn into_iter(self) -> Self::IntoIter { + let offset_size_in_bits = 8 * std::mem::size_of::(); + self.inner.chunks_exact(offset_size_in_bits) + } + + fn split_at(self, index: usize) -> (Self, Self) { + let offset_size_in_bits = 8 * std::mem::size_of::(); + let elem_index = index * offset_size_in_bits; + let (left, right) = self.inner.split_at(elem_index); + ( + BufferBitSliceProducer { + inner: left, + marker, + }, + BufferBitSliceProducer { + inner: right, + marker, + }, + ) + } +} + +#[cfg(all(test, target_endian = "little"))] +mod tests_bit_slices_little_endian { + use super::*; + use crate::datatypes::ToByteSlice; + + #[test] + fn test_bit_slice_iter_aligned() { + let input: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice(); + let result = bit_slice.chunks().into_native_iter().collect::>(); + + assert_eq!(vec![0x0706050403020100], result); + } + + #[test] + fn test_bit_slice_iter_unaligned() { + let input: &[u8] = &[ + 0b11110000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, + 0b00100000, 0b01000000, 0b11111010, + ]; + let buffer: Buffer = Buffer::from(input); + + // After the 4th bit get a bit slice of 64 bits. + let bit_slice = buffer.bit_slice().slicing(4, 64); + let chunks = bit_slice.chunks::(); + + // 64 bits perfectly fits. + assert_eq!(0, chunks.remainder_bit_len()); + assert_eq!(0, chunks.remainder_bits()); + + let result = chunks.into_native_iter().collect::>(); + + assert_eq!( + vec![0b1010_0100_0000_0010_0000_0001_0000_0000_1000_0000_0100_0000_0010_0000_0001_1111], + result + ); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_1_byte() { + let input: &[u8] = &[ + 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, + 0b00100000, 0b01000000, 0b11111111, + ]; + let buffer: Buffer = Buffer::from(input); + + // After the 4th bit get a bit slice of 66 bits. + let bit_slice = buffer.bit_slice().slicing(4, 66); + let chunks = bit_slice.chunks::(); + + // 66 bits doesn't fit into 64 bits primitive type, so there are 2 remainder bits. + assert_eq!(2, chunks.remainder_bit_len()); + assert_eq!(0b00000011, chunks.remainder_bits()); + + let result = chunks.into_native_iter().collect::>(); + + assert_eq!( + vec![0b1111_0100_0000_0010_0000_0001_0000_0000_1000_0000_0100_0000_0010_0000_0001_0000], + result + ); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_bits_across_bytes() { + let input: &[u8] = &[0b00111111, 0b11111100]; + let buffer: Buffer = Buffer::from(input); + + // remainder contains bits from both bytes + // result should be the highest 2 bits from first byte followed by lowest 5 bits of second bytes + let bit_slice = buffer.bit_slice().slicing(6, 7); + let chunks = bit_slice.chunks::(); + + assert_eq!(7, chunks.remainder_bit_len()); + assert_eq!(0b1110000, chunks.remainder_bits()); + } + + #[test] + fn test_bit_slice_iter_unaligned_remainder_bits_large() { + let input: &[u8] = &[ + 0b11111111, 0b00000000, 0b11111111, 0b00000000, 0b11111111, 0b00000000, + 0b11111111, 0b00000000, 0b11111111, + ]; + let buffer: Buffer = Buffer::from(input); + + let bit_slice = buffer.bit_slice().slicing(2, 63); + let chunks = bit_slice.chunks::(); + + assert_eq!(63, chunks.remainder_bit_len()); + assert_eq!( + 0b100_0000_0011_1111_1100_0000_0011_1111_1100_0000_0011_1111_1100_0000_0011_1111, + chunks.remainder_bits() + ); + } + + #[test] + fn test_bit_slice_iter_reinterpret() { + assert_eq!(LocalBits::default(), Lsb0::default()); + let buffer_slice = &[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice(); + // Name of the bit slice comes from byte slice, since it is still on the stack and behaves similarly to Rust's byte slice. + let buffer = Buffer::from(buffer_slice); + + // Let's get the whole buffer. + let bit_slice = buffer.bit_slice().slicing(0, buffer_slice.len() * 8); + // Let's also get a chunked bits as u8, not u64 this time... + let chunks = bit_slice.chunks::(); + + let result = chunks.into_native_iter().collect::>(); + assert_eq!(buffer_slice.to_vec(), result); + } +} diff --git a/rust/arrow/src/util/bit_util.rs b/rust/arrow/src/util/bit_util.rs deleted file mode 100644 index bf54e22394c..00000000000 --- a/rust/arrow/src/util/bit_util.rs +++ /dev/null @@ -1,328 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utils for working with bits - -#[cfg(feature = "simd")] -use packed_simd::u8x64; - -const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128]; - -/// Returns the nearest number that is `>=` than `num` and is a multiple of 64 -#[inline] -pub fn round_upto_multiple_of_64(num: usize) -> usize { - round_upto_power_of_2(num, 64) -} - -/// Returns the nearest multiple of `factor` that is `>=` than `num`. Here `factor` must -/// be a power of 2. -pub fn round_upto_power_of_2(num: usize, factor: usize) -> usize { - debug_assert!(factor > 0 && (factor & (factor - 1)) == 0); - (num + (factor - 1)) & !(factor - 1) -} - -/// Returns whether bit at position `i` in `data` is set or not -#[inline] -pub fn get_bit(data: &[u8], i: usize) -> bool { - (data[i >> 3] & BIT_MASK[i & 7]) != 0 -} - -/// Returns whether bit at position `i` in `data` is set or not. -/// -/// # Safety -/// -/// Note this doesn't do any bound checking, for performance reason. The caller is -/// responsible to guarantee that `i` is within bounds. -#[inline] -pub unsafe fn get_bit_raw(data: *const u8, i: usize) -> bool { - (*data.add(i >> 3) & BIT_MASK[i & 7]) != 0 -} - -/// Sets bit at position `i` for `data` -#[inline] -pub fn set_bit(data: &mut [u8], i: usize) { - data[i >> 3] |= BIT_MASK[i & 7]; -} - -/// Sets bit at position `i` for `data` -/// -/// # Safety -/// -/// Note this doesn't do any bound checking, for performance reason. The caller is -/// responsible to guarantee that `i` is within bounds. -#[inline] -pub unsafe fn set_bit_raw(data: *mut u8, i: usize) { - *data.add(i >> 3) |= BIT_MASK[i & 7]; -} - -/// Sets bit at position `i` for `data` to 0 -#[inline] -pub fn unset_bit(data: &mut [u8], i: usize) { - data[i >> 3] ^= BIT_MASK[i & 7]; -} - -/// Sets bit at position `i` for `data` to 0 -/// -/// # Safety -/// -/// Note this doesn't do any bound checking, for performance reason. The caller is -/// responsible to guarantee that `i` is within bounds. -#[inline] -pub unsafe fn unset_bit_raw(data: *mut u8, i: usize) { - *data.add(i >> 3) ^= BIT_MASK[i & 7]; -} - -/// Returns the ceil of `value`/`divisor` -#[inline] -pub fn ceil(value: usize, divisor: usize) -> usize { - let (quot, rem) = (value / divisor, value % divisor); - if rem > 0 && divisor > 0 { - quot + 1 - } else { - quot - } -} - -/// Performs SIMD bitwise binary operations. -/// -/// # Safety -/// -/// Note that each slice should be 64 bytes and it is the callers responsibility to ensure -/// that this is the case. If passed slices larger than 64 bytes the operation will only -/// be performed on the first 64 bytes. Slices less than 64 bytes will panic. -#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] -pub unsafe fn bitwise_bin_op_simd(left: &[u8], right: &[u8], result: &mut [u8], op: F) -where - F: Fn(u8x64, u8x64) -> u8x64, -{ - let left_simd = u8x64::from_slice_unaligned_unchecked(left); - let right_simd = u8x64::from_slice_unaligned_unchecked(right); - let simd_result = op(left_simd, right_simd); - simd_result.write_to_slice_unaligned_unchecked(result); -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - - use super::*; - use crate::util::test_util::seedable_rng; - use rand::Rng; - - #[test] - fn test_round_upto_multiple_of_64() { - assert_eq!(0, round_upto_multiple_of_64(0)); - assert_eq!(64, round_upto_multiple_of_64(1)); - assert_eq!(64, round_upto_multiple_of_64(63)); - assert_eq!(64, round_upto_multiple_of_64(64)); - assert_eq!(128, round_upto_multiple_of_64(65)); - assert_eq!(192, round_upto_multiple_of_64(129)); - } - - #[test] - fn test_get_bit() { - // 00001101 - assert_eq!(true, get_bit(&[0b00001101], 0)); - assert_eq!(false, get_bit(&[0b00001101], 1)); - assert_eq!(true, get_bit(&[0b00001101], 2)); - assert_eq!(true, get_bit(&[0b00001101], 3)); - - // 01001001 01010010 - assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 0)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 1)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 2)); - assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 3)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 4)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 5)); - assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 6)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 7)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 8)); - assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 9)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 10)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 11)); - assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 12)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 13)); - assert_eq!(true, get_bit(&[0b01001001, 0b01010010], 14)); - assert_eq!(false, get_bit(&[0b01001001, 0b01010010], 15)); - } - - #[test] - fn test_get_bit_raw() { - const NUM_BYTE: usize = 10; - let mut buf = vec![0; NUM_BYTE]; - let mut expected = vec![]; - let mut rng = seedable_rng(); - for i in 0..8 * NUM_BYTE { - let b = rng.gen_bool(0.5); - expected.push(b); - if b { - set_bit(&mut buf[..], i) - } - } - - let raw_ptr = buf.as_ptr(); - for (i, b) in expected.iter().enumerate() { - unsafe { - assert_eq!(*b, get_bit_raw(raw_ptr, i)); - } - } - } - - #[test] - fn test_set_bit() { - let mut b = [0b00000000]; - set_bit(&mut b, 0); - assert_eq!([0b00000001], b); - set_bit(&mut b, 2); - assert_eq!([0b00000101], b); - set_bit(&mut b, 5); - assert_eq!([0b00100101], b); - } - - #[test] - fn test_unset_bit() { - let mut b = [0b11111111]; - unset_bit(&mut b, 0); - assert_eq!([0b11111110], b); - unset_bit(&mut b, 2); - assert_eq!([0b11111010], b); - unset_bit(&mut b, 5); - assert_eq!([0b11011010], b); - } - - #[test] - fn test_set_bit_raw() { - const NUM_BYTE: usize = 10; - let mut buf = vec![0; NUM_BYTE]; - let mut expected = vec![]; - let mut rng = seedable_rng(); - for i in 0..8 * NUM_BYTE { - let b = rng.gen_bool(0.5); - expected.push(b); - if b { - unsafe { - set_bit_raw(buf.as_mut_ptr(), i); - } - } - } - - let raw_ptr = buf.as_ptr(); - for (i, b) in expected.iter().enumerate() { - unsafe { - assert_eq!(*b, get_bit_raw(raw_ptr, i)); - } - } - } - - #[test] - fn test_unset_bit_raw() { - const NUM_BYTE: usize = 10; - let mut buf = vec![255; NUM_BYTE]; - let mut expected = vec![]; - let mut rng = seedable_rng(); - for i in 0..8 * NUM_BYTE { - let b = rng.gen_bool(0.5); - expected.push(b); - if !b { - unsafe { - unset_bit_raw(buf.as_mut_ptr(), i); - } - } - } - - let raw_ptr = buf.as_ptr(); - for (i, b) in expected.iter().enumerate() { - unsafe { - assert_eq!(*b, get_bit_raw(raw_ptr, i)); - } - } - } - - #[test] - fn test_get_set_bit_roundtrip() { - const NUM_BYTES: usize = 10; - const NUM_SETS: usize = 10; - - let mut buffer: [u8; NUM_BYTES * 8] = [0; NUM_BYTES * 8]; - let mut v = HashSet::new(); - let mut rng = seedable_rng(); - for _ in 0..NUM_SETS { - let offset = rng.gen_range(0, 8 * NUM_BYTES); - v.insert(offset); - set_bit(&mut buffer[..], offset); - } - for i in 0..NUM_BYTES * 8 { - assert_eq!(v.contains(&i), get_bit(&buffer[..], i)); - } - } - - #[test] - #[cfg(all(any(target_arch = "x86", target_arch = "x86_64")))] - fn test_ceil() { - assert_eq!(ceil(0, 1), 0); - assert_eq!(ceil(1, 1), 1); - assert_eq!(ceil(1, 2), 1); - assert_eq!(ceil(1, 8), 1); - assert_eq!(ceil(7, 8), 1); - assert_eq!(ceil(8, 8), 1); - assert_eq!(ceil(9, 8), 2); - assert_eq!(ceil(9, 9), 1); - assert_eq!(ceil(10000000000, 10), 1000000000); - assert_eq!(ceil(10, 10000000000), 1); - assert_eq!(ceil(10000000000, 1000000000), 10); - } - - #[test] - #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "avx512"))] - fn test_bitwise_and_avx512() { - use crate::buffer::avx512_bin_and; - - let buf1 = [0b00110011u8; 64]; - let buf2 = [0b11110000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { - avx512_bin_and(&buf1, &buf2, &mut buf3); - }; - for i in buf3.iter() { - assert_eq!(&0b00110000u8, i); - } - } - - #[test] - #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] - fn test_bitwise_and_simd() { - let buf1 = [0b00110011u8; 64]; - let buf2 = [0b11110000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { bitwise_bin_op_simd(&buf1, &buf2, &mut buf3, |a, b| a & b) }; - for i in buf3.iter() { - assert_eq!(&0b00110000u8, i); - } - } - - #[test] - #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] - fn test_bitwise_or_simd() { - let buf1 = [0b00110011u8; 64]; - let buf2 = [0b11110000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { bitwise_bin_op_simd(&buf1, &buf2, &mut buf3, |a, b| a | b) }; - for i in buf3.iter() { - assert_eq!(&0b11110011u8, i); - } - } -} diff --git a/rust/arrow/src/util/mod.rs b/rust/arrow/src/util/mod.rs index 053d1329631..ca090c0a40b 100644 --- a/rust/arrow/src/util/mod.rs +++ b/rust/arrow/src/util/mod.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -pub mod bit_chunk_iterator; -pub mod bit_util; +pub mod bit_ops; pub mod buffered_iterator; pub mod display; pub mod integration_util; @@ -24,3 +23,4 @@ pub mod integration_util; pub mod pretty; pub mod string_writer; pub mod test_util; +pub mod utils; diff --git a/rust/arrow/src/util/utils.rs b/rust/arrow/src/util/utils.rs new file mode 100644 index 00000000000..84435830896 --- /dev/null +++ b/rust/arrow/src/util/utils.rs @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utils for working with bits + +#[cfg(feature = "simd")] +use packed_simd::u8x64; + +/// Returns the nearest number that is `>=` than `num` and is a multiple of 64 +#[inline] +pub fn round_upto_multiple_of_64(num: usize) -> usize { + round_upto_power_of_2(num, 64) +} + +/// Returns the nearest multiple of `factor` that is `>=` than `num`. Here `factor` must +/// be a power of 2. +pub fn round_upto_power_of_2(num: usize, factor: usize) -> usize { + debug_assert!(factor > 0 && (factor & (factor - 1)) == 0); + (num + (factor - 1)) & !(factor - 1) +} + +/// Returns the ceil of `value`/`divisor` +#[inline] +pub fn ceil(value: usize, divisor: usize) -> usize { + let (quot, rem) = (value / divisor, value % divisor); + if rem > 0 && divisor > 0 { + quot + 1 + } else { + quot + } +} + +/// Performs SIMD bitwise binary operations. +/// +/// # Safety +/// +/// Note that each slice should be 64 bytes and it is the callers responsibility to ensure +/// that this is the case. If passed slices larger than 64 bytes the operation will only +/// be performed on the first 64 bytes. Slices less than 64 bytes will panic. +#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] +pub unsafe fn bitwise_bin_op_simd(left: &[u8], right: &[u8], result: &mut [u8], op: F) +where + F: Fn(u8x64, u8x64) -> u8x64, +{ + let left_simd = u8x64::from_slice_unaligned_unchecked(left); + let right_simd = u8x64::from_slice_unaligned_unchecked(right); + let simd_result = op(left_simd, right_simd); + simd_result.write_to_slice_unaligned_unchecked(result); +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_round_upto_multiple_of_64() { + assert_eq!(0, round_upto_multiple_of_64(0)); + assert_eq!(64, round_upto_multiple_of_64(1)); + assert_eq!(64, round_upto_multiple_of_64(63)); + assert_eq!(64, round_upto_multiple_of_64(64)); + assert_eq!(128, round_upto_multiple_of_64(65)); + assert_eq!(192, round_upto_multiple_of_64(129)); + } + + #[test] + #[cfg(all(any(target_arch = "x86", target_arch = "x86_64")))] + fn test_ceil() { + assert_eq!(ceil(0, 1), 0); + assert_eq!(ceil(1, 1), 1); + assert_eq!(ceil(1, 2), 1); + assert_eq!(ceil(1, 8), 1); + assert_eq!(ceil(7, 8), 1); + assert_eq!(ceil(8, 8), 1); + assert_eq!(ceil(9, 8), 2); + assert_eq!(ceil(9, 9), 1); + assert_eq!(ceil(10000000000, 10), 1000000000); + assert_eq!(ceil(10, 10000000000), 1); + assert_eq!(ceil(10000000000, 1000000000), 10); + } + + #[test] + #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] + fn test_bitwise_and_simd() { + let buf1 = [0b00110011u8; 64]; + let buf2 = [0b11110000u8; 64]; + let mut buf3 = [0b00000000; 64]; + unsafe { bitwise_bin_op_simd(&buf1, &buf2, &mut buf3, |a, b| a & b) }; + for i in buf3.iter() { + assert_eq!(&0b00110000u8, i); + } + } + + #[test] + #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] + fn test_bitwise_or_simd() { + let buf1 = [0b00110011u8; 64]; + let buf2 = [0b11110000u8; 64]; + let mut buf3 = [0b00000000; 64]; + unsafe { bitwise_bin_op_simd(&buf1, &buf2, &mut buf3, |a, b| a | b) }; + for i in buf3.iter() { + assert_eq!(&0b11110011u8, i); + } + } +} diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index fc35f4fd975..16daba0495a 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -349,7 +349,7 @@ async fn csv_query_avg_sqrt() -> Result<()> { let mut actual = execute(&mut ctx, sql).await; actual.sort(); let expected = vec![vec!["0.6706002946036462"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); Ok(()) } @@ -363,7 +363,7 @@ async fn csv_query_custom_udf_with_cast() -> Result<()> { let sql = "SELECT avg(custom_sqrt(c11)) FROM aggregate_test_100"; let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["0.6584408483418833"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); Ok(()) } @@ -377,11 +377,11 @@ async fn sqrt_f32_vs_f64() -> Result<()> { let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["0.6584408485889435"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); let sql = "SELECT avg(sqrt(CAST(c11 AS double))) FROM aggregate_test_100"; let actual = execute(&mut ctx, sql).await; let expected = vec![vec!["0.6584408483418833"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); Ok(()) } @@ -405,7 +405,7 @@ async fn csv_query_sqrt_sqrt() -> Result<()> { let actual = execute(&mut ctx, sql).await; // sqrt(sqrt(c12=0.9294097332465232)) = 0.9818650561397431 let expected = vec![vec!["0.9818650561397431"]]; - assert_eq!(actual, expected); + assert_float_eq(&expected, &actual); Ok(()) } @@ -448,7 +448,7 @@ async fn csv_query_avg() -> Result<()> { let mut actual = execute(&mut ctx, sql).await; actual.sort(); let expected = vec![vec!["0.5089725099127211"]]; - assert_eq!(expected, actual); + assert_float_eq(&expected, &actual); Ok(()) } @@ -1400,6 +1400,28 @@ async fn query_on_string_dictionary() -> Result<()> { Ok(()) } +fn assert_float_eq(expected: &[Vec], received: &[Vec]) +where + T: AsRef, +{ + expected + .iter() + .flatten() + .zip(received.iter().flatten()) + .for_each(|(l, r)| { + let (l, r) = ( + l.as_ref().parse::().unwrap(), + r.as_str().parse::().unwrap(), + ); + // Too small floats are hard to approximate, give them some more error range. + if l < 1.0 && r < 1.0 { + assert!((l - r).abs() <= 2.0_f64 * f64::EPSILON); + } else { + assert!((l - r).abs() <= f64::EPSILON); + } + }); +} + #[tokio::test] async fn query_without_from() -> Result<()> { // Test for SELECT without FROM. diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index d4afd13528d..d0fc14794d2 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -35,7 +35,7 @@ use arrow::{ buffer::Buffer, buffer::MutableBuffer, datatypes::ToByteSlice, - util::{bit_util, integration_util::*}, + util::{integration_util::*, utils}, }; fn main() -> Result<()> { @@ -588,7 +588,7 @@ fn dictionary_array_from_json( /// A helper to create a null buffer from a Vec fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer { - let num_bytes = bit_util::ceil(json_col.count, 8); + let num_bytes = utils::ceil(json_col.count, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); json_col .validity @@ -597,9 +597,8 @@ fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer { .iter() .enumerate() .for_each(|(i, v)| { - let null_slice = null_buf.data_mut(); if *v != 0 { - bit_util::set_bit(null_slice, i); + null_buf.set_bit(i); } }); null_buf.freeze() diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 00e7c74147f..7a9780595ae 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -53,7 +53,7 @@ use arrow::datatypes::{ UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type, }; -use arrow::util::bit_util; +use arrow::util::utils; use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, Converter, FixedLenBinaryConverter, @@ -904,13 +904,12 @@ impl ArrayReader for ListArrayReader { } offsets.push(cur_offset); - let num_bytes = bit_util::ceil(offsets.len(), 8); + let num_bytes = utils::ceil(offsets.len(), 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let null_slice = null_buf.data_mut(); let mut list_index = 0; for i in 0..rep_levels.len() { if rep_levels[i] == 0 && def_levels[i] != 0 { - bit_util::set_bit(null_slice, list_index); + null_buf.set_bit(list_index); } if rep_levels[i] == 0 { list_index += 1;