Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC StringViewArray in CoalesceBatchesStream #11587

Merged
merged 10 commits into from
Jul 25, 2024

Conversation

XiangpengHao
Copy link
Contributor

@XiangpengHao XiangpengHao commented Jul 21, 2024

(targets string-view2 branch)

Which issue does this PR close?

Part of #10918

Rationale for this change

StringViewArray reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released.
When StringViewArray's cardinality significantly drops (e.g., after FilterExec or HashJoinExec or many others), we should consider consolidating it so that we can release the buffer to reduce memory usage and improve string locality for better performance.

The current heuristic is to GC StringViewArray in CoalesceBatchesStream and use larger block size to reduce buffer count (see apache/arrow-rs#6094)

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@@ -216,6 +218,41 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
let new_columns: Vec<Arc<dyn Array>> = batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: How about using ArrayRef instread of Arc<dyn Array> eases code readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thank you @dharanad !

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, can't wait to see the performance numbers.

I commented on something that is not obvious to me. Maybe we can add more comments to explain it.


// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse, below is a heuristic to determine if the array is sparse.
if buffer_size > (view_cnt * 32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are lots of long strings in the buffer, will it be triggered every time even if nothing has been filtered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a rough heuristic to test whether the buffer is dense.
In fact, there's no way for us to know if anything has been filtered in CoalesceBatchesExec -- we only know this in downstream operators like FilterExec. The fact that we have a CoalesceBatchesExec here means that it is likely that the batch has been sparse and thus requires gc.

if buffer_size > (view_cnt * 32) {
// We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut builder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here builder didn't use the deduplication mechanism, is that the case we can assume that StringViewArray is already deduplicated before coalescing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deduplication hashes the string values, which has quite high overhead. Here we are processing small batches (default size 8192) and then concatenating them to a larger batch. Deduplicating on small batches gives us small benefits.

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic (batch in, batch out) should be a separate helper function (maybe living somewhere else, as it could be useful in other contexts too). This way, the main logic of the CoalesceBatchesStream would only change a single line and would not get cluttered.

@XiangpengHao
Copy link
Contributor Author

This logic (batch in, batch out) should be a separate helper function (maybe living somewhere else, as it could be useful in other contexts too).

Agree! I moved the logic to a separate function. I tentatively put it right after the CoalesceBatchesStream implementation, please let me know if there's a better way to organize it!

@@ -216,6 +218,8 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
let batch = gc_string_view_batch(&batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here inside gc string buffer will be copied once, (below) in concat_batches() string buffer will be copied again, it seems possible to copy only once by changing the internal implementation of concat_batches()
But I think we can do it later when there is a good benchmark to assess the impact, at least excessive copy in coalesce_batches() does not have a huge performance impact on TPCH now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point

I think given how concat is implemented for StringView it will only copy the fixed parts (not the actual string data)

Perhaps what we could do is implement a wrapper around arrow::concat_batches that has the datafusion specific GC trigger for sparse arrays, and falls back to concat for other types: https://docs.rs/arrow-select/52.1.0/src/arrow_select/concat.rs.html#150

/// wrapper around [`arrow::compute::concat`] that 
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
 // loop over columns here and handle StringView specially, 
 // or fallback to concat
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #11628 to track this idea

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @XiangpengHao and @2010YOUY01

I think @2010YOUY01 's comments are very good and we should address them, but also that this PR is a step forward and thus it would be ok to merge it in and make the additional improvement suggestioned by @2010YOUY01 as a follow on

I also have some ideas of how to test this code more easily. I will work on a PR

@@ -216,6 +218,8 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
let batch = gc_string_view_batch(&batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point

I think given how concat is implemented for StringView it will only copy the fixed parts (not the actual string data)

Perhaps what we could do is implement a wrapper around arrow::concat_batches that has the datafusion specific GC trigger for sparse arrays, and falls back to concat for other types: https://docs.rs/arrow-select/52.1.0/src/arrow_select/concat.rs.html#150

/// wrapper around [`arrow::compute::concat`] that 
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
 // loop over columns here and handle StringView specially, 
 // or fallback to concat
 }

Comment on lines 305 to 306
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
if let Some(s) = c.as_string_view_opt() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor comment here is that I think you can reduce the level of nesting by using syntax like

let Some(s) = c.as_string_view_opt() else {
  return Arc::clone(c)
}

@alamb
Copy link
Contributor

alamb commented Jul 23, 2024

This logic (batch in, batch out) should be a separate helper function (maybe living somewhere else, as it could be useful in other contexts too). This way, the main logic of the CoalesceBatchesStream would only change a single line and would not get cluttered.

I agree with this sentiment. I am working on this approach in #11610. I hope to have it ready for review shortly

@alamb
Copy link
Contributor

alamb commented Jul 23, 2024

I think what we should do here is:

  1. File a follow on ticket based on @2010YOUY01 's comment GC StringViewArray in CoalesceBatchesStream #11587 (comment)
  2. Merge this PR (it targets the string-view2 branch, not main)

Per @ozankabak 's comment #11610 (comment) there appears to be another PR synnada-ai#27 that will affect CoelseceBatchesExec and thus we may have a merge conflict to resolve when bringing string-view2 back into main

However I think that is manageable and we should be ok

@alamb
Copy link
Contributor

alamb commented Jul 24, 2024

I have filed #11628 to track further performance improvements

Another thing I think would be good is to write some tests for this function (especially as we know it is about to undergo some non trivial refactoring)

@alamb
Copy link
Contributor

alamb commented Jul 24, 2024

I am hoping to write those tests shortly

@alamb
Copy link
Contributor

alamb commented Jul 24, 2024

I am hoping to write those tests shortly

@XiangpengHao -- I wrote tests in XiangpengHao#1 -- something doesn't seem quite right with the heuristic.

@alamb
Copy link
Contributor

alamb commented Jul 25, 2024

Update here is that testing revealed some cases where we could improve -- see discussion on XiangpengHao#1

Given this PR is targeting a branch and not main, I am planning to keep it open a bit longer while we try and make this better

@alamb
Copy link
Contributor

alamb commented Jul 25, 2024

Update: I think if we have benchmarks showing this PR improves performance we should merge it in and we can improve the performance as a follow on PR (tracked in #11628).

I am especially excited that we could remove the second copy for all types, not just string view

@XiangpengHao
Copy link
Contributor Author

XiangpengHao commented Jul 25, 2024

While working on the blog post, I came up with a better herustic that further improves performance (additional 5%).

The idea is to calcualte an ideal_buffer_size, and if the actual buffer size is twice as larger, then we do gc.
We also use the ideal_buffer_size to set optimal block_size value, so that we never waste a single byte.

Calculating the ideal_buffer_size needs to traverse the views, it is actually cheap as the batches are pretty small for low cardinality filters, which is most cases. The worst case is that we need to check 8192 views, which is also not too bad.

cc @alamb @2010YOUY01

@XiangpengHao XiangpengHao requested a review from alamb July 25, 2024 15:35
@alamb
Copy link
Contributor

alamb commented Jul 25, 2024

The idea is to calcualte an ideal_buffer_size, and if the actual buffer size is twice as larger, then we do gc.
We also use the ideal_buffer_size to set optimal block_size value, so that we never waste a single byte.

I think this heuristic sounds good

Calculating the ideal_buffer_size needs to traverse the views, it is actually cheap as the batches are pretty small for low cardinality filters, which is most cases. The worst case is that we need to check 8192 views, which is also not too bad.

I agree

Any chance you can add tests for this code showing how the heuristics work (perhaps either based on XiangpengHao#1 or directly merging it in)?

XiangpengHao and others added 2 commits July 25, 2024 11:59
Add comments and tests for gc_string_view_batch
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks -- LGTM

datafusion/physical-plan/src/coalesce_batches.rs Outdated Show resolved Hide resolved

let gc_string = builder.finish();

debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing for the win!

@XiangpengHao
Copy link
Contributor Author

It seems the clippy and doc error are due to new rust release

@alamb
Copy link
Contributor

alamb commented Jul 25, 2024

It seems the clippy and doc error are due to new rust release

Indeed -- #11651

@alamb alamb merged commit 2b58fd5 into apache:string-view2 Jul 25, 2024
21 of 23 checks passed
alamb added a commit that referenced this pull request Jul 29, 2024
… some ClickBench queries (not on by default) (#11667)

* Pin to pre-release version of arrow 52.2.0

* Update for deprecated method

* Add a config to force using string view in benchmark (#11514)

* add a knob to force string view in benchmark

* fix sql logic test

* update doc

* fix ci

* fix ci only test

* Update benchmarks/src/util/options.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/common/src/config.rs

Co-authored-by: Andrew Lamb <[email protected]>

* update tests

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Add String view helper functions (#11517)

* add functions

* add tests for hash util

* Add ArrowBytesViewMap and ArrowBytesViewSet (#11515)

* Update `string-view` branch to arrow-rs main (#10966)

* Pin to arrow main

* Fix clippy with latest arrow

* Uncomment test that needs new arrow-rs to work

* Update datafusion-cli Cargo.lock

* Update Cargo.lock

* tapelo

* merge

* update cast

* consistent dep

* fix ci

* add more tests

* make doc happy

* update new implementation

* fix bug

* avoid unused dep

* update dep

* update

* fix cargo check

* update doc

* pick up the comments change again

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Enable `GroupValueBytesView` for aggregation with StringView types (#11519)

* add functions

* Update `string-view` branch to arrow-rs main (#10966)

* Pin to arrow main

* Fix clippy with latest arrow

* Uncomment test that needs new arrow-rs to work

* Update datafusion-cli Cargo.lock

* Update Cargo.lock

* tapelo

* merge

* update cast

* consistent dep

* fix ci

* avoid unused dep

* update dep

* update

* fix cargo check

* better group value view aggregation

* update

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Initial support for regex_replace on `StringViewArray` (#11556)

* initial support for string view regex

* update tests

* Add support for Utf8View for date/temporal codepaths (#11518)

* Add StringView support for date_part and make_date funcs

* run cargo update in datafusion-cli

* cargo fmt

---------

Co-authored-by: Andrew Lamb <[email protected]>

* GC `StringViewArray` in `CoalesceBatchesStream` (#11587)

* gc string view when appropriate

* make clippy happy

* address comments

* make doc happy

* update style

* Add comments and tests for gc_string_view_batch

* better herustic

* update test

* Update datafusion/physical-plan/src/coalesce_batches.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>

* [Bug] fix bug in return type inference of `utf8_to_int_type` (#11662)

* fix bug in return type inference

* update doc

* add tests

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Fix clippy

* Increase ByteViewMap block size to 2MB (#11674)

* better default block size

* fix related test

* Change `--string-view` to only apply to parquet formats (#11663)

* use inferenced schema, don't load schema again

* move config to parquet-only

* update

* update

* better format

* format

* update

* Implement native support StringView for character length (#11676)

* native support for character length

* Update datafusion/functions/src/unicode/character_length.rs

---------

Co-authored-by: Andrew Lamb <[email protected]>

* Remove uneeded patches

* cargo fmt

---------

Co-authored-by: Xiangpeng Hao <[email protected]>
Co-authored-by: Xiangpeng Hao <[email protected]>
Co-authored-by: Andrew Duffy <[email protected]>
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants