Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 44 additions & 17 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ use super::ByteArrayType;
pub struct GenericByteViewArray<T: ByteViewType + ?Sized> {
data_type: DataType,
views: ScalarBuffer<u128>,
buffers: Vec<Buffer>,
buffers: Arc<[Buffer]>,
phantom: PhantomData<T>,
nulls: Option<NullBuffer>,
}
Expand All @@ -188,7 +188,10 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
/// # Panics
///
/// Panics if [`GenericByteViewArray::try_new`] returns an error
pub fn new(views: ScalarBuffer<u128>, buffers: Vec<Buffer>, nulls: Option<NullBuffer>) -> Self {
pub fn new<U>(views: ScalarBuffer<u128>, buffers: U, nulls: Option<NullBuffer>) -> Self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very elegant. It is nice to keep this API backwards compatible (anything that used to compile still compiles).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

where
U: Into<Arc<[Buffer]>>,
{
Self::try_new(views, buffers, nulls).unwrap()
}

Expand All @@ -198,11 +201,16 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
///
/// * `views.len() != nulls.len()`
/// * [ByteViewType::validate] fails
pub fn try_new(
pub fn try_new<U>(
views: ScalarBuffer<u128>,
buffers: Vec<Buffer>,
buffers: U,
nulls: Option<NullBuffer>,
) -> Result<Self, ArrowError> {
) -> Result<Self, ArrowError>
where
U: Into<Arc<[Buffer]>>,
{
let buffers: Arc<[Buffer]> = buffers.into();

T::validate(&views, &buffers)?;

if let Some(n) = nulls.as_ref() {
Expand Down Expand Up @@ -230,11 +238,14 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
/// # Safety
///
/// Safe if [`Self::try_new`] would not error
pub unsafe fn new_unchecked(
pub unsafe fn new_unchecked<U>(
views: ScalarBuffer<u128>,
buffers: Vec<Buffer>,
buffers: U,
nulls: Option<NullBuffer>,
) -> Self {
) -> Self
where
U: Into<Arc<[Buffer]>>,
{
if cfg!(feature = "force_validate") {
return Self::new(views, buffers, nulls);
}
Expand All @@ -243,7 +254,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
data_type: T::DATA_TYPE,
phantom: Default::default(),
views,
buffers,
buffers: buffers.into(),
nulls,
}
}
Expand All @@ -253,7 +264,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
Self {
data_type: T::DATA_TYPE,
views: vec![0; len].into(),
buffers: vec![],
buffers: vec![].into(),
nulls: Some(NullBuffer::new_null(len)),
phantom: Default::default(),
}
Expand All @@ -279,7 +290,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
}

/// Deconstruct this array into its constituent parts
pub fn into_parts(self) -> (ScalarBuffer<u128>, Vec<Buffer>, Option<NullBuffer>) {
pub fn into_parts(self) -> (ScalarBuffer<u128>, Arc<[Buffer]>, Option<NullBuffer>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this I think is a breaking API change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested it with Datafusion and it dropped right in (both the mainline version and the hacked up and patched version we're using).

But, yeah, I can't speak for other users of the package.

(self.views, self.buffers, self.nulls)
}

Expand Down Expand Up @@ -887,8 +898,21 @@ impl<T: ByteViewType + ?Sized> Array for GenericByteViewArray<T> {

fn shrink_to_fit(&mut self) {
self.views.shrink_to_fit();
self.buffers.iter_mut().for_each(|b| b.shrink_to_fit());
self.buffers.shrink_to_fit();

// The goal of `shrink_to_fit` is to minimize the space used by any of
// its allocations. The use of `Arc::get_mut` over `Arc::make_mut` is
// because if the reference count is greater than 1, `Arc::make_mut`
// will first clone its contents. So, any large allocations will first
// be cloned before being shrunk, leaving the pre-cloned allocations
// intact, before adding the extra (used) space of the new clones.
if let Some(buffers) = Arc::get_mut(&mut self.buffers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will only shrink the buffers when there are no other outstanding references to this code. I think it would be better to call Arc::make_mut here to ensure that the buffers get shrunken

Copy link
Contributor Author

@maxburke maxburke Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the issue I have with Arc::make_mut is that if I slice array I now have two references to the underlying buffers. If I call shrink_to_fit on one of them, Arc::make_mut will clone the buffers and then shrink_to_fit will be called on one of them. But because the underlying buffers end up being cloned, it doesn't make the original allocation shrink and in the end it'll end up using more memory, until the other reference is dropped.

Additionally it'll create more allocator pressure because the buffer cloning will duplicate the buffer at it's pre-shrunken size before it's shrunk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense -- let's keep it this way then. I do think it is worth a comment explaining the rationale, though, for future readers that may wonder the same thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 👍

buffers.iter_mut().for_each(|b| b.shrink_to_fit());
}

// With the assumption that this is a best-effort function, no attempt
// is made to shrink `self.buffers`, which it can't because it's type
// does not expose a `shrink_to_fit` method.

if let Some(nulls) = &mut self.nulls {
nulls.shrink_to_fit();
}
Expand Down Expand Up @@ -946,7 +970,7 @@ impl<T: ByteViewType + ?Sized> From<ArrayData> for GenericByteViewArray<T> {
fn from(value: ArrayData) -> Self {
let views = value.buffers()[0].clone();
let views = ScalarBuffer::new(views, value.offset(), value.len());
let buffers = value.buffers()[1..].to_vec();
let buffers = value.buffers()[1..].to_vec().into();
Self {
data_type: T::DATA_TYPE,
views,
Expand Down Expand Up @@ -1014,12 +1038,15 @@ where
}

impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
fn from(mut array: GenericByteViewArray<T>) -> Self {
fn from(array: GenericByteViewArray<T>) -> Self {
let len = array.len();
array.buffers.insert(0, array.views.into_inner());

let mut buffers = array.buffers.to_vec();
buffers.insert(0, array.views.into_inner());

let builder = ArrayDataBuilder::new(T::DATA_TYPE)
.len(len)
.buffers(array.buffers)
.buffers(buffers)
.nulls(array.nulls);

unsafe { builder.build_unchecked() }
Expand Down
41 changes: 23 additions & 18 deletions arrow-select/src/zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::fmt::{Debug, Formatter};
use std::hash::Hash;
use std::marker::PhantomData;
use std::ops::Not;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

/// Zip two arrays by some boolean mask.
///
Expand Down Expand Up @@ -667,12 +667,17 @@ fn maybe_prep_null_mask_filter(predicate: &BooleanArray) -> BooleanBuffer {

struct ByteViewScalarImpl<T: ByteViewType> {
truthy_view: Option<u128>,
truthy_buffers: Vec<Buffer>,
truthy_buffers: Arc<[Buffer]>,
falsy_view: Option<u128>,
falsy_buffers: Vec<Buffer>,
falsy_buffers: Arc<[Buffer]>,
phantom: PhantomData<T>,
}

static EMPTY_ARC: OnceLock<Arc<[Buffer]>> = OnceLock::new();
fn empty_arc_buffers() -> Arc<[Buffer]> {
Arc::clone(EMPTY_ARC.get_or_init(|| Arc::new([])))
}

impl<T: ByteViewType> ByteViewScalarImpl<T> {
fn new(truthy: &dyn Array, falsy: &dyn Array) -> Self {
let (truthy_view, truthy_buffers) = Self::get_value_from_scalar(truthy);
Expand All @@ -686,9 +691,9 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
}
}

fn get_value_from_scalar(scalar: &dyn Array) -> (Option<u128>, Vec<Buffer>) {
fn get_value_from_scalar(scalar: &dyn Array) -> (Option<u128>, Arc<[Buffer]>) {
if scalar.is_null(0) {
(None, vec![])
(None, empty_arc_buffers())
} else {
let (views, buffers, _) = scalar.as_byte_view::<T>().clone().into_parts();
(views.first().copied(), buffers)
Expand All @@ -698,8 +703,8 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
fn get_views_for_single_non_nullable(
predicate: BooleanBuffer,
value: u128,
buffers: Vec<Buffer>,
) -> (ScalarBuffer<u128>, Vec<Buffer>, Option<NullBuffer>) {
buffers: Arc<[Buffer]>,
) -> (ScalarBuffer<u128>, Arc<[Buffer]>, Option<NullBuffer>) {
let number_of_true = predicate.count_set_bits();
let number_of_values = predicate.len();

Expand All @@ -708,7 +713,7 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
// All values are null
return (
vec![0; number_of_values].into(),
vec![],
empty_arc_buffers(),
Some(NullBuffer::new_null(number_of_values)),
);
}
Expand All @@ -724,10 +729,10 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
predicate: BooleanBuffer,
result_len: usize,
truthy_view: u128,
truthy_buffers: Vec<Buffer>,
truthy_buffers: Arc<[Buffer]>,
falsy_view: u128,
falsy_buffers: Vec<Buffer>,
) -> (ScalarBuffer<u128>, Vec<Buffer>, Option<NullBuffer>) {
falsy_buffers: Arc<[Buffer]>,
) -> (ScalarBuffer<u128>, Arc<[Buffer]>, Option<NullBuffer>) {
let true_count = predicate.count_set_bits();
match true_count {
0 => {
Expand All @@ -751,7 +756,7 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
let byte_view_falsy = ByteView::from(falsy_view);
let new_index_falsy_buffers =
buffers.len() as u32 + byte_view_falsy.buffer_index;
buffers.extend(falsy_buffers);
buffers.extend(falsy_buffers.iter().cloned());
let byte_view_falsy =
byte_view_falsy.with_buffer_index(new_index_falsy_buffers);
byte_view_falsy.as_u128()
Expand All @@ -778,7 +783,7 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
}

let bytes = Buffer::from(mutable);
(bytes.into(), buffers, None)
(bytes.into(), buffers.into(), None)
}
}
}
Expand All @@ -804,28 +809,28 @@ impl<T: ByteViewType> ZipImpl for ByteViewScalarImpl<T> {
predicate,
result_len,
truthy,
self.truthy_buffers.clone(),
Arc::clone(&self.truthy_buffers),
falsy,
self.falsy_buffers.clone(),
Arc::clone(&self.falsy_buffers),
),
(Some(truthy), None) => Self::get_views_for_single_non_nullable(
predicate,
truthy,
self.truthy_buffers.clone(),
Arc::clone(&self.truthy_buffers),
),
(None, Some(falsy)) => {
let predicate = predicate.not();
Self::get_views_for_single_non_nullable(
predicate,
falsy,
self.falsy_buffers.clone(),
Arc::clone(&self.falsy_buffers),
)
}
(None, None) => {
// All values are null
(
vec![0; result_len].into(),
vec![],
empty_arc_buffers(),
Some(NullBuffer::new_null(result_len)),
)
}
Expand Down
Loading