Skip to content

Commit

Permalink
feat: use Buffer for BitPackedArray (#862)
Browse files Browse the repository at this point in the history
Fixes #850

---------

Co-authored-by: Will Manning <[email protected]>
  • Loading branch information
a10y and lwwmanning authored Sep 18, 2024
1 parent b1914fc commit f9d5878
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 66 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions encodings/fastlanes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ rust-version = { workspace = true }
workspace = true

[dependencies]
arrow-buffer = { workspace = true }
arrayref = { workspace = true }
fastlanes = { workspace = true }
itertools = { workspace = true }
num-traits = { workspace = true }
serde = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-scalar = { workspace = true }
Expand Down
58 changes: 37 additions & 21 deletions encodings/fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::mem::size_of;

use arrow_buffer::ArrowNativeType;
use fastlanes::BitPacking;
use vortex::array::{PrimitiveArray, Sparse, SparseArray};
use vortex::stats::ArrayStatistics;
use vortex::validity::{ArrayValidity, Validity};
use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoArrayVariant};
use vortex_buffer::Buffer;
use vortex_dtype::{
match_each_integer_ptype, match_each_unsigned_integer_ptype, NativePType, PType,
};
Expand All @@ -22,40 +22,57 @@ pub fn bitpack_encode(array: PrimitiveArray, bit_width: usize) -> VortexResult<B

if bit_width >= array.ptype().bit_width() {
// Nothing we can do
vortex_bail!(
"Cannot pack -- specified bit width is greater than or equal to the type's bit width"
)
vortex_bail!("Cannot pack -- specified bit width is greater than or equal to raw bit width")
}

let packed = bitpack(&array, bit_width)?;
let patches = (num_exceptions > 0)
.then(|| bitpack_patches(&array, bit_width, num_exceptions))
.flatten();

BitPackedArray::try_new(packed, array.validity(), patches, bit_width, array.len())
BitPackedArray::try_new(
packed,
array.ptype().to_unsigned(),
array.validity(),
patches,
bit_width,
array.len(),
)
}

pub fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> VortexResult<Array> {
/// Bitpack a [PrimitiveArray] to the given width.
///
/// On success, returns a [Buffer] containing the packed data.
pub fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> VortexResult<Buffer> {
// We know the min is > 0, so it's safe to re-interpret signed integers as unsigned.
let parray = parray.reinterpret_cast(parray.ptype().to_unsigned());
let packed = match_each_unsigned_integer_ptype!(parray.ptype(), |$P| {
PrimitiveArray::from(bitpack_primitive(parray.maybe_null_slice::<$P>(), bit_width))
bitpack_primitive(parray.maybe_null_slice::<$P>(), bit_width)
});
Ok(packed.into_array())
Ok(packed)
}

pub fn bitpack_primitive<T: NativePType + BitPacking>(array: &[T], bit_width: usize) -> Vec<T> {
/// Bitpack a slice of primitives down to the given width.
///
/// See `bitpack` for more caller information.
pub fn bitpack_primitive<T: NativePType + BitPacking + ArrowNativeType>(
array: &[T],
bit_width: usize,
) -> Buffer {
if bit_width == 0 {
return Vec::new();
return Buffer::from_len_zeroed(0);
}

// How many fastlanes vectors we will process.
let num_chunks = (array.len() + 1023) / 1024;
let num_full_chunks = array.len() / 1024;
let packed_len = 128 * bit_width / size_of::<T>();
// packed_len says how many values of size T we're going to include.
// 1024 * bit_width / 8 == the number of bytes we're going to get.
// then we divide by the size of T to get the number of elements.

// Allocate a result byte array.
let mut output = Vec::with_capacity(num_chunks * packed_len);
let mut output = Vec::<T>::with_capacity(num_chunks * packed_len);

// Loop over all but the last chunk.
(0..num_full_chunks).for_each(|i| {
Expand Down Expand Up @@ -91,7 +108,7 @@ pub fn bitpack_primitive<T: NativePType + BitPacking>(array: &[T], bit_width: us
};
}

output
Buffer::from(output)
}

pub fn bitpack_patches(
Expand Down Expand Up @@ -126,12 +143,10 @@ pub fn unpack(array: BitPackedArray) -> VortexResult<PrimitiveArray> {
let bit_width = array.bit_width();
let length = array.len();
let offset = array.offset();
let packed = array.packed().into_primitive()?;
let ptype = packed.ptype();

let mut unpacked = match_each_unsigned_integer_ptype!(ptype, |$P| {
let ptype = array.ptype();
let mut unpacked = match_each_unsigned_integer_ptype!(array.ptype().to_unsigned(), |$P| {
PrimitiveArray::from_vec(
unpack_primitive::<$P>(packed.maybe_null_slice::<$P>(), bit_width, offset, length),
unpack_primitive::<$P>(array.packed_slice::<$P>(), bit_width, offset, length),
array.validity(),
)
});
Expand Down Expand Up @@ -234,10 +249,11 @@ pub fn unpack_primitive<T: NativePType + BitPacking>(

pub fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult<Scalar> {
let bit_width = array.bit_width();
let packed = array.packed().into_primitive()?;
let ptype = array.ptype();
// let packed = array.packed().into_primitive()?;
let index_in_encoded = index + array.offset();
let scalar: Scalar = match_each_unsigned_integer_ptype!(packed.ptype(), |$P| unsafe {
unpack_single_primitive::<$P>(packed.maybe_null_slice::<$P>(), bit_width, index_in_encoded).into()
let scalar: Scalar = match_each_unsigned_integer_ptype!(ptype.to_unsigned(), |$P| unsafe {
unpack_single_primitive::<$P>(array.packed_slice::<$P>(), bit_width, index_in_encoded).into()
});
// Cast to fix signedness and nullability
scalar.cast(array.dtype())
Expand Down
4 changes: 3 additions & 1 deletion encodings/fastlanes/src/bitpacking/compute/scalar_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod test {
use vortex::compute::unary::scalar_at;
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex_buffer::Buffer;
use vortex_dtype::{DType, Nullability, PType};
use vortex_scalar::Scalar;

Expand All @@ -36,7 +37,8 @@ mod test {
#[test]
fn invalid_patches() {
let packed_array = BitPackedArray::try_new(
PrimitiveArray::from(vec![0u32; 32]).into_array(),
Buffer::from(vec![0u8; 128]),
PType::U32,
Validity::AllInvalid,
Some(
SparseArray::try_new(
Expand Down
8 changes: 5 additions & 3 deletions encodings/fastlanes/src/bitpacking/compute/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ impl SliceFn for BitPackedArray {
let block_start = max(0, offset_start - offset);
let block_stop = ((offset_stop + 1023) / 1024) * 1024;

let encoded_start = (block_start / 8) * self.bit_width() / self.ptype().byte_width();
let encoded_stop = (block_stop / 8) * self.bit_width() / self.ptype().byte_width();
let encoded_start = (block_start / 8) * self.bit_width();
let encoded_stop = (block_stop / 8) * self.bit_width();
// slice the buffer using the encoded start/stop values
Self::try_new_from_offset(
slice(self.packed(), encoded_start, encoded_stop)?,
self.packed().slice(encoded_start..encoded_stop),
self.ptype(),
self.validity().slice(start, stop)?,
self.patches()
.map(|p| slice(&p, start, stop))
Expand Down
3 changes: 1 addition & 2 deletions encodings/fastlanes/src/bitpacking/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ fn take_primitive<T: NativePType + BitPacking>(

let bit_width = array.bit_width();

let packed = array.packed().into_primitive()?;
let packed = packed.maybe_null_slice::<T>();
let packed = array.packed_slice::<T>();

let patches = array.patches().map(SparseArray::try_from).transpose()?;

Expand Down
73 changes: 40 additions & 33 deletions encodings/fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use vortex::stats::{ArrayStatisticsCompute, StatsSet};
use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
use vortex::variants::{ArrayVariants, PrimitiveArrayTrait};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoCanonical};
use vortex_dtype::{NativePType, Nullability, PType};
use vortex::{
impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoCanonical, TypedArray,
};
use vortex_buffer::Buffer;
use vortex_dtype::{DType, NativePType, Nullability, PType};
use vortex_error::{
vortex_bail, vortex_err, vortex_panic, VortexError, VortexExpect as _, VortexResult,
};
Expand All @@ -29,28 +32,30 @@ pub struct BitPackedMetadata {

/// NB: All non-null values in the patches array are considered patches
impl BitPackedArray {
/// Create a new bitpacked array using a buffer of packed data.
///
/// The packed data should be interpreted as a sequence of values with size `bit_width`.
pub fn try_new(
packed: Array,
packed: Buffer,
ptype: PType,
validity: Validity,
patches: Option<Array>,
bit_width: usize,
len: usize,
) -> VortexResult<Self> {
Self::try_new_from_offset(packed, validity, patches, bit_width, len, 0)
Self::try_new_from_offset(packed, ptype, validity, patches, bit_width, len, 0)
}

pub(crate) fn try_new_from_offset(
packed: Array,
packed: Buffer,
ptype: PType,
validity: Validity,
patches: Option<Array>,
bit_width: usize,
length: usize,
offset: usize,
) -> VortexResult<Self> {
let dtype = packed.dtype().with_nullability(validity.nullability());
if !dtype.is_unsigned_int() {
vortex_bail!(MismatchedTypes: "uint", &dtype);
}
let dtype = DType::Primitive(ptype, validity.nullability());
if bit_width > u64::BITS as usize {
vortex_bail!("Unsupported bit width {}", bit_width);
}
Expand All @@ -61,9 +66,8 @@ impl BitPackedArray {
);
}

let ptype = PType::try_from(&dtype)?;
let expected_packed_size =
((length + offset + 1023) / 1024) * (128 * bit_width / ptype.byte_width());
// expected packed size is in bytes
let expected_packed_size = ((length + offset + 1023) / 1024) * (128 * bit_width);
if packed.len() != expected_packed_size {
return Err(vortex_err!(
"Expected {} packed bytes, got {}",
Expand Down Expand Up @@ -95,42 +99,45 @@ impl BitPackedArray {
has_patches: patches.is_some(),
};

let mut children = Vec::with_capacity(3);
children.push(packed);
let mut children = Vec::with_capacity(2);
if let Some(p) = patches {
children.push(p);
}
if let Some(a) = validity.into_array() {
children.push(a)
}

Self::try_from_parts(dtype, length, metadata, children.into(), StatsSet::new())
}

fn packed_len(&self) -> usize {
((self.len() + self.offset() + 1023) / 1024)
* (128 * self.bit_width() / self.ptype().byte_width())
Ok(Self {
typed: TypedArray::try_from_parts(
dtype,
length,
metadata,
Some(packed),
children.into(),
StatsSet::new(),
)?,
})
}

#[inline]
pub fn packed(&self) -> Array {
pub fn packed(&self) -> &Buffer {
self.as_ref()
.child(
0,
&self.dtype().with_nullability(Nullability::NonNullable),
self.packed_len(),
)
.vortex_expect("BitpackedArray is missing packed child bytes array")
.buffer()
.vortex_expect("BitPackedArray must contain packed buffer")
}

/// Access the slice of packed values as an array of `T`
#[inline]
pub fn packed_slice<T: NativePType + BitPacking>(&self) -> &[T] {
let packed_primitive = self.packed().as_primitive();
let maybe_null_slice = packed_primitive.maybe_null_slice::<T>();
let packed_bytes = self.packed();
let packed_ptr: *const T = packed_bytes.as_ptr().cast();
// Return number of elements of type `T` packed in the buffer
let packed_len = packed_bytes.len() / size_of::<T>();

// SAFETY: maybe_null_slice points to buffer memory that outlives the lifetime of `self`.
// Unfortunately Rust cannot understand this, so we reconstruct the slice from raw parts
// to get it to reinterpret the lifetime.
unsafe { std::slice::from_raw_parts(maybe_null_slice.as_ptr(), maybe_null_slice.len()) }
unsafe { std::slice::from_raw_parts(packed_ptr, packed_len) }
}

#[inline]
Expand All @@ -148,7 +155,7 @@ impl BitPackedArray {
.has_patches
.then(|| {
self.as_ref().child(
1,
0,
&self.dtype().with_nullability(Nullability::Nullable),
self.len(),
)
Expand All @@ -162,7 +169,7 @@ impl BitPackedArray {
}

pub fn validity(&self) -> Validity {
let validity_child_idx = if self.metadata().has_patches { 2 } else { 1 };
let validity_child_idx = if self.metadata().has_patches { 1 } else { 0 };

self.metadata().validity.to_validity(self.as_ref().child(
validity_child_idx,
Expand Down Expand Up @@ -214,7 +221,7 @@ impl ArrayValidity for BitPackedArray {

impl AcceptArrayVisitor for BitPackedArray {
fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("packed", &self.packed())?;
visitor.visit_buffer(self.packed())?;
if let Some(patches) = self.patches().as_ref() {
visitor.visit_child("patches", patches)?;
}
Expand Down
1 change: 1 addition & 0 deletions encodings/fastlanes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(incomplete_features)]
#![feature(generic_const_exprs)]
#![feature(vec_into_raw_parts)]

pub use bitpacking::*;
pub use delta::*;
Expand Down
2 changes: 1 addition & 1 deletion vortex-buffer/src/flexbuffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl flexbuffers::Buffer for Buffer {
}

fn empty() -> Self {
Self::from(vec![])
Self::from_len_zeroed(0)
}

fn buffer_str(&self) -> Result<Self::BufferString, Utf8Error> {
Expand Down
Loading

0 comments on commit f9d5878

Please sign in to comment.