diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index 7268e751b149..ae7355433f81 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -19,7 +19,7 @@ use std::any::Any; use std::marker::PhantomData; use std::sync::Arc; -use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; +use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer}; use arrow_data::ByteView; use arrow_schema::ArrowError; use hashbrown::hash_table::Entry; @@ -28,7 +28,7 @@ use hashbrown::HashTable; use crate::builder::ArrayBuilder; use crate::types::bytes::ByteArrayNativeType; use crate::types::{BinaryViewType, ByteViewType, StringViewType}; -use crate::{ArrayRef, GenericByteViewArray}; +use crate::{Array, ArrayRef, GenericByteViewArray}; const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB @@ -79,7 +79,7 @@ impl BlockSizeGrowthStrategy { /// using [`GenericByteViewBuilder::append_block`] and then views into this block appended /// using [`GenericByteViewBuilder::try_append_view`] pub struct GenericByteViewBuilder { - views_builder: BufferBuilder, + views_buffer: Vec, null_buffer_builder: NullBufferBuilder, completed: Vec, in_progress: Vec, @@ -99,7 +99,7 @@ impl GenericByteViewBuilder { /// Creates a new [`GenericByteViewBuilder`] with space for `capacity` string values. pub fn with_capacity(capacity: usize) -> Self { Self { - views_builder: BufferBuilder::new(capacity), + views_buffer: Vec::with_capacity(capacity), null_buffer_builder: NullBufferBuilder::new(capacity), completed: vec![], in_progress: vec![], @@ -148,7 +148,7 @@ impl GenericByteViewBuilder { pub fn with_deduplicate_strings(self) -> Self { Self { string_tracker: Some(( - HashTable::with_capacity(self.views_builder.capacity()), + HashTable::with_capacity(self.views_buffer.capacity()), Default::default(), )), ..self @@ -201,10 +201,42 @@ impl GenericByteViewBuilder { let b = b.get_unchecked(start..end); let view = make_view(b, block, offset); - self.views_builder.append(view); + self.views_buffer.push(view); self.null_buffer_builder.append_non_null(); } + /// Appends an array to the builder. + /// This will flush any in-progress block and append the data buffers + /// and add the (adapted) views. + pub fn append_array(&mut self, array: &GenericByteViewArray) { + self.flush_in_progress(); + // keep original views if this array is the first to be added or if there are no data buffers (all inline views) + let keep_views = self.completed.is_empty() || array.data_buffers().is_empty(); + let starting_buffer = self.completed.len() as u32; + + self.completed.extend(array.data_buffers().iter().cloned()); + + if keep_views { + self.views_buffer.extend_from_slice(array.views()); + } else { + self.views_buffer.extend(array.views().iter().map(|v| { + let mut byte_view = ByteView::from(*v); + if byte_view.length > 12 { + // Small views (<=12 bytes) are inlined, so only need to update large views + byte_view.buffer_index += starting_buffer; + }; + + byte_view.as_u128() + })); + } + + if let Some(null_buffer) = array.nulls() { + self.null_buffer_builder.append_buffer(null_buffer); + } else { + self.null_buffer_builder.append_n_non_nulls(array.len()); + } + } + /// Try to append a view of the given `block`, `offset` and `length` /// /// See [`Self::append_block`] @@ -255,7 +287,7 @@ impl GenericByteViewBuilder { /// Useful if we want to know what value has been inserted to the builder /// The index has to be smaller than `self.len()`, otherwise it will panic pub fn get_value(&self, index: usize) -> &[u8] { - let view = self.views_builder.as_slice().get(index).unwrap(); + let view = self.views_buffer.as_slice().get(index).unwrap(); let len = *view as u32; if len <= 12 { // # Safety @@ -287,7 +319,7 @@ impl GenericByteViewBuilder { let mut view_buffer = [0; 16]; view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); view_buffer[4..4 + v.len()].copy_from_slice(v); - self.views_builder.append(u128::from_le_bytes(view_buffer)); + self.views_buffer.push(u128::from_le_bytes(view_buffer)); self.null_buffer_builder.append_non_null(); return; } @@ -311,8 +343,7 @@ impl GenericByteViewBuilder { Entry::Occupied(occupied) => { // If the string already exists, we will directly use the view let idx = occupied.get(); - self.views_builder - .append(self.views_builder.as_slice()[*idx]); + self.views_buffer.push(self.views_buffer[*idx]); self.null_buffer_builder.append_non_null(); self.string_tracker = Some((ht, hasher)); return; @@ -320,7 +351,7 @@ impl GenericByteViewBuilder { Entry::Vacant(vacant) => { // o.w. we insert the (string hash -> view index) // the idx is current length of views_builder, as we are inserting a new view - vacant.insert(self.views_builder.len()); + vacant.insert(self.views_buffer.len()); } } self.string_tracker = Some((ht, hasher)); @@ -341,7 +372,7 @@ impl GenericByteViewBuilder { buffer_index: self.completed.len() as u32, offset, }; - self.views_builder.append(view.into()); + self.views_buffer.push(view.into()); self.null_buffer_builder.append_non_null(); } @@ -358,21 +389,20 @@ impl GenericByteViewBuilder { #[inline] pub fn append_null(&mut self) { self.null_buffer_builder.append_null(); - self.views_builder.append(0); + self.views_buffer.push(0); } /// Builds the [`GenericByteViewArray`] and reset this builder pub fn finish(&mut self) -> GenericByteViewArray { self.flush_in_progress(); let completed = std::mem::take(&mut self.completed); - let len = self.views_builder.len(); - let views = ScalarBuffer::new(self.views_builder.finish(), 0, len); let nulls = self.null_buffer_builder.finish(); if let Some((ref mut ht, _)) = self.string_tracker.as_mut() { ht.clear(); } + let views = std::mem::take(&mut self.views_buffer); // SAFETY: valid by construction - unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) } + unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) } } /// Builds the [`GenericByteViewArray`] without resetting the builder @@ -381,8 +411,8 @@ impl GenericByteViewBuilder { if !self.in_progress.is_empty() { completed.push(Buffer::from_slice_ref(&self.in_progress)); } - let len = self.views_builder.len(); - let views = Buffer::from_slice_ref(self.views_builder.as_slice()); + let len = self.views_buffer.len(); + let views = Buffer::from_slice_ref(self.views_buffer.as_slice()); let views = ScalarBuffer::new(views, 0, len); let nulls = self.null_buffer_builder.finish_cloned(); // SAFETY: valid by construction @@ -396,7 +426,7 @@ impl GenericByteViewBuilder { /// Return the allocated size of this builder in bytes, useful for memory accounting. pub fn allocated_size(&self) -> usize { - let views = self.views_builder.capacity() * std::mem::size_of::(); + let views = self.views_buffer.capacity() * std::mem::size_of::(); let null = self.null_buffer_builder.allocated_size(); let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::(); let in_progress = self.in_progress.capacity(); @@ -418,7 +448,7 @@ impl std::fmt::Debug for GenericByteViewBuilder { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}ViewBuilder", T::PREFIX)?; f.debug_struct("") - .field("views_builder", &self.views_builder) + .field("views_buffer", &self.views_buffer) .field("in_progress", &self.in_progress) .field("completed", &self.completed) .field("null_buffer_builder", &self.null_buffer_builder) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 5a5e7e141c84..41fd0ff16859 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -21,9 +21,9 @@ //! [`filter`]: crate::filter::filter //! [`take`]: crate::take::take use crate::concat::concat_batches; -use arrow_array::{ - builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions, -}; +use arrow_array::StringViewArray; +use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch}; +use arrow_data::ByteView; use arrow_schema::{ArrowError, SchemaRef}; use std::collections::VecDeque; use std::sync::Arc; @@ -164,7 +164,7 @@ impl BatchCoalescer { return Ok(()); } - let mut batch = gc_string_view_batch(&batch); + let mut batch = gc_string_view_batch(batch); // If pushing this batch would exceed the target batch size, // finish the current batch and start a new one @@ -242,15 +242,19 @@ impl BatchCoalescer { /// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the /// `StringViewArray` may only refer to a small portion of the buffer, /// significantly increasing memory usage. -fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { - let new_columns: Vec = batch - .columns() - .iter() +fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch { + let (schema, columns, num_rows) = batch.into_parts(); + let new_columns: Vec = columns + .into_iter() .map(|c| { // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. let Some(s) = c.as_string_view_opt() else { - return Arc::clone(c); + return c; }; + if s.data_buffers().is_empty() { + // If there are no data buffers, we can just return the array as is + return c; + } let ideal_buffer_size: usize = s .views() .iter() @@ -264,42 +268,73 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { }) .sum(); let actual_buffer_size = s.get_buffer_memory_size(); + let buffers = s.data_buffers(); // Re-creating the array copies data and can be time consuming. // We only do it if the array is sparse if actual_buffer_size > (ideal_buffer_size * 2) { + if ideal_buffer_size == 0 { + // If the ideal buffer size is 0, all views are inlined + // so just reuse the views + return Arc::new(unsafe { + StringViewArray::new_unchecked( + s.views().clone(), + vec![], + s.nulls().cloned(), + ) + }); + } // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. // See https://github.com/apache/arrow-rs/issues/6094 for more details. - let mut builder = StringViewBuilder::with_capacity(s.len()); - if ideal_buffer_size > 0 { - builder = builder.with_fixed_block_size(ideal_buffer_size as u32); - } - - for v in s.iter() { - builder.append_option(v); - } - - let gc_string = builder.finish(); - - debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 + let mut buffer: Vec = Vec::with_capacity(ideal_buffer_size); + + let views: Vec = s + .views() + .as_ref() + .iter() + .cloned() + .map(|v| { + let mut b: ByteView = ByteView::from(v); + + if b.length > 12 { + let offset = buffer.len() as u32; + buffer.extend_from_slice( + buffers[b.buffer_index as usize] + .get(b.offset as usize..b.offset as usize + b.length as usize) + .expect("Invalid buffer slice"), + ); + b.offset = offset; + b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer + } + + b.into() + }) + .collect(); + + let buffers = if buffer.is_empty() { + vec![] + } else { + vec![buffer.into()] + }; + + let gc_string = unsafe { + StringViewArray::new_unchecked(views.into(), buffers, s.nulls().cloned()) + }; Arc::new(gc_string) } else { - Arc::clone(c) + c } }) .collect(); - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(batch.num_rows())); - RecordBatch::try_new_with_options(batch.schema(), new_columns, &options) - .expect("Failed to re-create the gc'ed record batch") + unsafe { RecordBatch::new_unchecked(schema, new_columns, num_rows) } } #[cfg(test)] mod tests { use super::*; - use arrow_array::builder::ArrayBuilder; - use arrow_array::{StringViewArray, UInt32Array}; + use arrow_array::builder::{ArrayBuilder, StringViewBuilder}; + use arrow_array::{RecordBatchOptions, StringViewArray, UInt32Array}; use arrow_schema::{DataType, Field, Schema}; use std::ops::Range; @@ -518,9 +553,11 @@ mod tests { fn test_gc_string_view_test_batch_empty() { let schema = Schema::empty(); let batch = RecordBatch::new_empty(schema.into()); - let output_batch = gc_string_view_batch(&batch); - assert_eq!(batch.num_columns(), output_batch.num_columns()); - assert_eq!(batch.num_rows(), output_batch.num_rows()); + let cols = batch.num_columns(); + let num_rows = batch.num_rows(); + let output_batch = gc_string_view_batch(batch); + assert_eq!(cols, output_batch.num_columns()); + assert_eq!(num_rows, output_batch.num_rows()); } #[test] @@ -568,9 +605,11 @@ mod tests { /// and ensures the number of rows are the same fn do_gc(array: StringViewArray) -> StringViewArray { let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); - let gc_batch = gc_string_view_batch(&batch); - assert_eq!(batch.num_rows(), gc_batch.num_rows()); - assert_eq!(batch.schema(), gc_batch.schema()); + let rows = batch.num_rows(); + let schema = batch.schema(); + let gc_batch = gc_string_view_batch(batch); + assert_eq!(rows, gc_batch.num_rows()); + assert_eq!(schema, gc_batch.schema()); gc_batch .column(0) .as_any() diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 1c99caef5ad9..69451be7035d 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -31,7 +31,9 @@ //! ``` use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values}; -use arrow_array::builder::{BooleanBuilder, GenericByteBuilder, PrimitiveBuilder}; +use arrow_array::builder::{ + BooleanBuilder, GenericByteBuilder, GenericByteViewBuilder, PrimitiveBuilder, +}; use arrow_array::cast::AsArray; use arrow_array::types::*; use arrow_array::*; @@ -84,6 +86,15 @@ fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capa } } +fn concat_byte_view(arrays: &[&dyn Array]) -> Result { + let mut builder = + GenericByteViewBuilder::::with_capacity(arrays.iter().map(|a| a.len()).sum()); + for &array in arrays.iter() { + builder.append_array(array.as_byte_view()); + } + Ok(Arc::new(builder.finish())) +} + fn concat_dictionaries( arrays: &[&dyn Array], ) -> Result { @@ -425,6 +436,8 @@ pub fn concat(arrays: &[&dyn Array]) -> Result { _ => unreachable!("Unsupported run end index type: {r:?}"), } } + DataType::Utf8View => concat_byte_view::(arrays), + DataType::BinaryView => concat_byte_view::(arrays), _ => { let capacity = get_capacity(arrays, d); concat_fallback(arrays, capacity) @@ -635,6 +648,30 @@ mod tests { assert_eq!(&arr, &expected_output); } + #[test] + fn test_concat_string_view_arrays() { + let arr = concat(&[ + &StringViewArray::from(vec!["helloxxxxxxxxxxa", "world____________"]), + &StringViewArray::from(vec!["helloxxxxxxxxxxy", "3", "4"]), + &StringViewArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]), + ]) + .unwrap(); + + let expected_output = Arc::new(StringViewArray::from(vec![ + Some("helloxxxxxxxxxxa"), + Some("world____________"), + Some("helloxxxxxxxxxxy"), + Some("3"), + Some("4"), + Some("foo"), + Some("bar"), + None, + Some("baz"), + ])) as ArrayRef; + + assert_eq!(&arr, &expected_output); + } + #[test] fn test_concat_primitive_arrays() { let arr = concat(&[