Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comments and tests for gc_string_view_batch #1

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 118 additions & 4 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,26 @@ pub fn concat_batches(
arrow::compute::concat_batches(schema, batches)
}

/// `StringViewArray` reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released.
/// When `StringViewArray`'s cardinality significantly drops (e.g., after `FilterExec` or `HashJoinExec` or many others),
/// we should consider consolidating it so that we can release the buffer to reduce memory usage and improve string locality for better performance.
/// Heuristically compact [`StringViewArray`]s to reduce memory usage, if needed
///
/// This function decides when to consolidate the StringView into a new buffer
/// to reduce memory usage and improve string locality for better performance.
///
/// This differs from [`StringViewArray::gc`] because:
/// 1. It may not compact the array depending on a heuristic.
/// 2. It uses a larger default block size (2MB) to reduce the number of buffers to track.
///
/// # Heuristic
///
/// If the average size of each view is larger than 32 bytes, we compact the array.
///
/// `StringViewArray` include pointers to buffer that hold the underlying data.
/// One of the great benefits of `StringViewArray` is that many operations
/// (e.g., `filter`) can be done without copying the underlying data.
///
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
/// `StringViewArray` may only refer to a small portion of the buffer,
/// significantly increasing memory usage.
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
let new_columns: Vec<ArrayRef> = batch
.columns()
Expand Down Expand Up @@ -339,7 +356,8 @@ mod tests {
use crate::{memory::MemoryExec, repartition::RepartitionExec, Partitioning};

use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::UInt32Array;
use arrow_array::builder::ArrayBuilder;
use arrow_array::{StringViewArray, UInt32Array};

#[tokio::test(flavor = "multi_thread")]
async fn test_concat_batches() -> Result<()> {
Expand Down Expand Up @@ -412,4 +430,100 @@ mod tests {
)
.unwrap()
}

#[test]
fn test_gc_string_view_batch_small_no_compact() {
// view with only short strings (no buffers) --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("a"), Some("b"), Some("c")],
}
.build();

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 0);
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
}

#[test]
fn test_gc_string_view_batch_large_no_compact() {
// view with large strings (has buffers) but full --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("This string is longer than 12 bytes")],
}
.build();

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 5);
// TODO this is failing now (it always compacts)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is failing now because the StringView is compacted even though all data in the buffers is pointed to

What I think this is showing is that string views with many long strings but no actual garbage are always compacted, which doesn't seem like a good heuristic

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the heuristic is a bit arbitrary and not really backed by real experiments... I'll think a bit more about this and back to it later today

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I am going to prototype what the combined filter/coalsece thing we were talking about on apache#11628 might look like

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prototype is here: apache#11647 (not yet ready for review)

assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
}

#[test]
fn test_gc_string_view_batch_large_slice_compact() {
// view with large strings (has buffers) and only partially used --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("this string is longer than 12 bytes")],
}
.build();

// slice only 11 rows, so most of the buffer is not used
let array = array.slice(11, 22);

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 5);
assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer
}

/// Compares the values of two string view arrays
fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) {
assert_eq!(arr1.len(), arr2.len());
for (s1, s2) in arr1.iter().zip(arr2.iter()) {
assert_eq!(s1, s2);
}
}

/// runs garbage collection on string view array
/// and ensures the number of rows are the same
fn do_gc(array: StringViewArray) -> StringViewArray {
let batch =
RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
let gc_batch = gc_string_view_batch(&batch);
assert_eq!(batch.num_rows(), gc_batch.num_rows());
assert_eq!(batch.schema(), gc_batch.schema());
gc_batch
.column(0)
.as_any()
.downcast_ref::<StringViewArray>()
.unwrap()
.clone()
}

/// Describes parameters for creating a `StringViewArray`
struct StringViewTest {
/// The number of rows in the array
rows: usize,
/// The strings to use in the array (repeated over and over
strings: Vec<Option<&'static str>>,
}

impl StringViewTest {
/// Create a `StringViewArray` with the parameters specified in this struct
fn build(self) -> StringViewArray {
let mut builder = StringViewBuilder::with_capacity(100);
loop {
for &v in self.strings.iter() {
builder.append_option(v);
if builder.len() >= self.rows {
return builder.finish();
}
}
}
}
}
}