Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC StringViewArray in CoalesceBatchesStream #11587

Merged
merged 10 commits into from
Jul 25, 2024
171 changes: 170 additions & 1 deletion datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};

use arrow::array::{AsArray, StringViewBuilder};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef};
use datafusion_common::Result;
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -216,6 +218,8 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
let batch = gc_string_view_batch(&batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here inside gc string buffer will be copied once, (below) in concat_batches() string buffer will be copied again, it seems possible to copy only once by changing the internal implementation of concat_batches()
But I think we can do it later when there is a good benchmark to assess the impact, at least excessive copy in coalesce_batches() does not have a huge performance impact on TPCH now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point

I think given how concat is implemented for StringView it will only copy the fixed parts (not the actual string data)

Perhaps what we could do is implement a wrapper around arrow::concat_batches that has the datafusion specific GC trigger for sparse arrays, and falls back to concat for other types: https://docs.rs/arrow-select/52.1.0/src/arrow_select/concat.rs.html#150

/// wrapper around [`arrow::compute::concat`] that 
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
 // loop over columns here and handle StringView specially, 
 // or fallback to concat
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #11628 to track this idea


if batch.num_rows() >= self.target_batch_size
&& self.buffer.is_empty()
{
Expand Down Expand Up @@ -290,13 +294,83 @@ pub fn concat_batches(
arrow::compute::concat_batches(schema, batches)
}

/// Heuristically compact [`StringViewArray`]s to reduce memory usage, if needed
///
/// This function decides when to consolidate the StringView into a new buffer
/// to reduce memory usage and improve string locality for better performance.
///
/// This differs from [`StringViewArray::gc`] because:
/// 1. It may not compact the array depending on a heuristic.
/// 2. It uses a larger default block size (2MB) to reduce the number of buffers to track.
///
/// # Heuristic
///
/// If the average size of each view is larger than 32 bytes, we compact the array.
///
/// `StringViewArray` include pointers to buffer that hold the underlying data.
/// One of the great benefits of `StringViewArray` is that many operations
/// (e.g., `filter`) can be done without copying the underlying data.
///
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
/// `StringViewArray` may only refer to a small portion of the buffer,
/// significantly increasing memory usage.
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
let new_columns: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|c| {
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
let Some(s) = c.as_string_view_opt() else {
return Arc::clone(c);
};
let ideal_buffer_size: usize = s
.views()
.iter()
.map(|v| {
let len = (*v as u32) as usize;
if len > 12 {
len
} else {
0
}
})
.sum();
let actual_buffer_size = s.get_buffer_memory_size();

// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse
if actual_buffer_size > (ideal_buffer_size * 2) {
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut builder = StringViewBuilder::with_capacity(s.len())
.with_block_size(ideal_buffer_size as u32);

for v in s.iter() {
builder.append_option(v);
}

let gc_string = builder.finish();

debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing for the win!


Arc::new(gc_string)
} else {
Arc::clone(c)
}
})
.collect();
RecordBatch::try_new(batch.schema(), new_columns)
.expect("Failed to re-create the gc'ed record batch")
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{memory::MemoryExec, repartition::RepartitionExec, Partitioning};

use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::UInt32Array;
use arrow_array::builder::ArrayBuilder;
use arrow_array::{StringViewArray, UInt32Array};

#[tokio::test(flavor = "multi_thread")]
async fn test_concat_batches() -> Result<()> {
Expand Down Expand Up @@ -369,4 +443,99 @@ mod tests {
)
.unwrap()
}

#[test]
fn test_gc_string_view_batch_small_no_compact() {
// view with only short strings (no buffers) --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("a"), Some("b"), Some("c")],
}
.build();

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 0);
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
}

#[test]
fn test_gc_string_view_batch_large_no_compact() {
// view with large strings (has buffers) but full --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("This string is longer than 12 bytes")],
}
.build();

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 5);
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
}

#[test]
fn test_gc_string_view_batch_large_slice_compact() {
// view with large strings (has buffers) and only partially used --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("this string is longer than 12 bytes")],
}
.build();

// slice only 11 rows, so most of the buffer is not used
let array = array.slice(11, 22);

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 5);
assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer
}

/// Compares the values of two string view arrays
fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) {
assert_eq!(arr1.len(), arr2.len());
for (s1, s2) in arr1.iter().zip(arr2.iter()) {
assert_eq!(s1, s2);
}
}

/// runs garbage collection on string view array
/// and ensures the number of rows are the same
fn do_gc(array: StringViewArray) -> StringViewArray {
let batch =
RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
let gc_batch = gc_string_view_batch(&batch);
assert_eq!(batch.num_rows(), gc_batch.num_rows());
assert_eq!(batch.schema(), gc_batch.schema());
gc_batch
.column(0)
.as_any()
.downcast_ref::<StringViewArray>()
.unwrap()
.clone()
}

/// Describes parameters for creating a `StringViewArray`
struct StringViewTest {
/// The number of rows in the array
rows: usize,
/// The strings to use in the array (repeated over and over
strings: Vec<Option<&'static str>>,
}

impl StringViewTest {
/// Create a `StringViewArray` with the parameters specified in this struct
fn build(self) -> StringViewArray {
let mut builder = StringViewBuilder::with_capacity(100);
loop {
for &v in self.strings.iter() {
builder.append_option(v);
if builder.len() >= self.rows {
return builder.finish();
}
}
}
}
}
}
Loading