diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index b31c76ab5a27..09f0f56ba3ac 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -165,7 +165,7 @@ use super::ByteArrayType; pub struct GenericByteViewArray { data_type: DataType, views: ScalarBuffer, - buffers: Vec, + buffers: Arc<[Buffer]>, phantom: PhantomData, nulls: Option, } @@ -188,7 +188,10 @@ impl GenericByteViewArray { /// # Panics /// /// Panics if [`GenericByteViewArray::try_new`] returns an error - pub fn new(views: ScalarBuffer, buffers: Vec, nulls: Option) -> Self { + pub fn new(views: ScalarBuffer, buffers: U, nulls: Option) -> Self + where + U: Into>, + { Self::try_new(views, buffers, nulls).unwrap() } @@ -198,11 +201,16 @@ impl GenericByteViewArray { /// /// * `views.len() != nulls.len()` /// * [ByteViewType::validate] fails - pub fn try_new( + pub fn try_new( views: ScalarBuffer, - buffers: Vec, + buffers: U, nulls: Option, - ) -> Result { + ) -> Result + where + U: Into>, + { + let buffers: Arc<[Buffer]> = buffers.into(); + T::validate(&views, &buffers)?; if let Some(n) = nulls.as_ref() { @@ -230,11 +238,14 @@ impl GenericByteViewArray { /// # Safety /// /// Safe if [`Self::try_new`] would not error - pub unsafe fn new_unchecked( + pub unsafe fn new_unchecked( views: ScalarBuffer, - buffers: Vec, + buffers: U, nulls: Option, - ) -> Self { + ) -> Self + where + U: Into>, + { if cfg!(feature = "force_validate") { return Self::new(views, buffers, nulls); } @@ -243,7 +254,7 @@ impl GenericByteViewArray { data_type: T::DATA_TYPE, phantom: Default::default(), views, - buffers, + buffers: buffers.into(), nulls, } } @@ -253,7 +264,7 @@ impl GenericByteViewArray { Self { data_type: T::DATA_TYPE, views: vec![0; len].into(), - buffers: vec![], + buffers: vec![].into(), nulls: Some(NullBuffer::new_null(len)), phantom: Default::default(), } @@ -279,7 +290,7 @@ impl GenericByteViewArray { } /// Deconstruct this array into its constituent parts - pub fn into_parts(self) -> (ScalarBuffer, Vec, Option) { + pub fn into_parts(self) -> (ScalarBuffer, Arc<[Buffer]>, Option) { (self.views, self.buffers, self.nulls) } @@ -887,8 +898,21 @@ impl Array for GenericByteViewArray { fn shrink_to_fit(&mut self) { self.views.shrink_to_fit(); - self.buffers.iter_mut().for_each(|b| b.shrink_to_fit()); - self.buffers.shrink_to_fit(); + + // The goal of `shrink_to_fit` is to minimize the space used by any of + // its allocations. The use of `Arc::get_mut` over `Arc::make_mut` is + // because if the reference count is greater than 1, `Arc::make_mut` + // will first clone its contents. So, any large allocations will first + // be cloned before being shrunk, leaving the pre-cloned allocations + // intact, before adding the extra (used) space of the new clones. + if let Some(buffers) = Arc::get_mut(&mut self.buffers) { + buffers.iter_mut().for_each(|b| b.shrink_to_fit()); + } + + // With the assumption that this is a best-effort function, no attempt + // is made to shrink `self.buffers`, which it can't because it's type + // does not expose a `shrink_to_fit` method. + if let Some(nulls) = &mut self.nulls { nulls.shrink_to_fit(); } @@ -946,7 +970,7 @@ impl From for GenericByteViewArray { fn from(value: ArrayData) -> Self { let views = value.buffers()[0].clone(); let views = ScalarBuffer::new(views, value.offset(), value.len()); - let buffers = value.buffers()[1..].to_vec(); + let buffers = value.buffers()[1..].to_vec().into(); Self { data_type: T::DATA_TYPE, views, @@ -1014,12 +1038,15 @@ where } impl From> for ArrayData { - fn from(mut array: GenericByteViewArray) -> Self { + fn from(array: GenericByteViewArray) -> Self { let len = array.len(); - array.buffers.insert(0, array.views.into_inner()); + + let mut buffers = array.buffers.to_vec(); + buffers.insert(0, array.views.into_inner()); + let builder = ArrayDataBuilder::new(T::DATA_TYPE) .len(len) - .buffers(array.buffers) + .buffers(buffers) .nulls(array.nulls); unsafe { builder.build_unchecked() } diff --git a/arrow-select/src/zip.rs b/arrow-select/src/zip.rs index 6be034fca23d..8702b558d01f 100644 --- a/arrow-select/src/zip.rs +++ b/arrow-select/src/zip.rs @@ -35,7 +35,7 @@ use std::fmt::{Debug, Formatter}; use std::hash::Hash; use std::marker::PhantomData; use std::ops::Not; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; /// Zip two arrays by some boolean mask. /// @@ -667,12 +667,17 @@ fn maybe_prep_null_mask_filter(predicate: &BooleanArray) -> BooleanBuffer { struct ByteViewScalarImpl { truthy_view: Option, - truthy_buffers: Vec, + truthy_buffers: Arc<[Buffer]>, falsy_view: Option, - falsy_buffers: Vec, + falsy_buffers: Arc<[Buffer]>, phantom: PhantomData, } +static EMPTY_ARC: OnceLock> = OnceLock::new(); +fn empty_arc_buffers() -> Arc<[Buffer]> { + Arc::clone(EMPTY_ARC.get_or_init(|| Arc::new([]))) +} + impl ByteViewScalarImpl { fn new(truthy: &dyn Array, falsy: &dyn Array) -> Self { let (truthy_view, truthy_buffers) = Self::get_value_from_scalar(truthy); @@ -686,9 +691,9 @@ impl ByteViewScalarImpl { } } - fn get_value_from_scalar(scalar: &dyn Array) -> (Option, Vec) { + fn get_value_from_scalar(scalar: &dyn Array) -> (Option, Arc<[Buffer]>) { if scalar.is_null(0) { - (None, vec![]) + (None, empty_arc_buffers()) } else { let (views, buffers, _) = scalar.as_byte_view::().clone().into_parts(); (views.first().copied(), buffers) @@ -698,8 +703,8 @@ impl ByteViewScalarImpl { fn get_views_for_single_non_nullable( predicate: BooleanBuffer, value: u128, - buffers: Vec, - ) -> (ScalarBuffer, Vec, Option) { + buffers: Arc<[Buffer]>, + ) -> (ScalarBuffer, Arc<[Buffer]>, Option) { let number_of_true = predicate.count_set_bits(); let number_of_values = predicate.len(); @@ -708,7 +713,7 @@ impl ByteViewScalarImpl { // All values are null return ( vec![0; number_of_values].into(), - vec![], + empty_arc_buffers(), Some(NullBuffer::new_null(number_of_values)), ); } @@ -724,10 +729,10 @@ impl ByteViewScalarImpl { predicate: BooleanBuffer, result_len: usize, truthy_view: u128, - truthy_buffers: Vec, + truthy_buffers: Arc<[Buffer]>, falsy_view: u128, - falsy_buffers: Vec, - ) -> (ScalarBuffer, Vec, Option) { + falsy_buffers: Arc<[Buffer]>, + ) -> (ScalarBuffer, Arc<[Buffer]>, Option) { let true_count = predicate.count_set_bits(); match true_count { 0 => { @@ -751,7 +756,7 @@ impl ByteViewScalarImpl { let byte_view_falsy = ByteView::from(falsy_view); let new_index_falsy_buffers = buffers.len() as u32 + byte_view_falsy.buffer_index; - buffers.extend(falsy_buffers); + buffers.extend(falsy_buffers.iter().cloned()); let byte_view_falsy = byte_view_falsy.with_buffer_index(new_index_falsy_buffers); byte_view_falsy.as_u128() @@ -778,7 +783,7 @@ impl ByteViewScalarImpl { } let bytes = Buffer::from(mutable); - (bytes.into(), buffers, None) + (bytes.into(), buffers.into(), None) } } } @@ -804,28 +809,28 @@ impl ZipImpl for ByteViewScalarImpl { predicate, result_len, truthy, - self.truthy_buffers.clone(), + Arc::clone(&self.truthy_buffers), falsy, - self.falsy_buffers.clone(), + Arc::clone(&self.falsy_buffers), ), (Some(truthy), None) => Self::get_views_for_single_non_nullable( predicate, truthy, - self.truthy_buffers.clone(), + Arc::clone(&self.truthy_buffers), ), (None, Some(falsy)) => { let predicate = predicate.not(); Self::get_views_for_single_non_nullable( predicate, falsy, - self.falsy_buffers.clone(), + Arc::clone(&self.falsy_buffers), ) } (None, None) => { // All values are null ( vec![0; result_len].into(), - vec![], + empty_arc_buffers(), Some(NullBuffer::new_null(result_len)), ) }