Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to FilterExec to prevent re-using input batches #12039

Closed
wants to merge 4 commits into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Aug 16, 2024

Which issue does this PR close?

N/A

Rationale for this change

DataFusion Comet is currently maintaining a fork of FilterExec with a small modificiation to change the way that filtered batches are created. We have a requirement that we do not want FilterExec to pass through input batches in the case where the predicate evaluates to true for all rows in a batch (due to some array re-use in our scan).

We would like to make the DataFusion implementation of FilterExec customizable to meet our needs.

What changes are included in this PR?

Add a new boolean parameter so that we can choose whether FilterExec is allowed to return unmodified input batches.

Are these changes tested?

I did not add tests yet. I wanted to get some feedback on approach first.

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Aug 16, 2024
@andygrove andygrove marked this pull request as ready for review August 16, 2024 22:15
@metegenez
Copy link
Contributor

If the predicate evaluation is entirely true, it typically results in an array pointer copy. However, there are instances where you might want to copy the underlying data even if the predicate is entirely true, even if it degrades the performance of the operator.

Is there a use case other than Comet itself?

if reuse_input_batches {
filter_record_batch(batch, filter_array)?
} else {
if filter_array.true_count() == batch.num_rows() {
Copy link
Contributor

@Dandandan Dandandan Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As computing true count is not free, I am wondering if we can either

  • move this to arrow filter compute kernel
  • check the returned array(s) on pointer equality, copy if equal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, I think the optimize function would be a natural place to put this: https://docs.rs/arrow-select/52.2.0/src/arrow_select/filter.rs.html#181

if reuse_input_batches {
filter_record_batch(batch, filter_array)?
} else {
if filter_array.true_count() == batch.num_rows() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, I think the optimize function would be a natural place to put this: https://docs.rs/arrow-select/52.2.0/src/arrow_select/filter.rs.html#181

filter_record_batch(batch, filter_array)?
} else {
if filter_array.true_count() == batch.num_rows() {
// special case where we just make an exact copy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am missing something -- why go through all the effort with MutableArrayData

In other words, why isn't thus simply

Ok(batch.clone())

To literally return the input batch?

I think that would be less code and faster

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the same that the filter.

The use case as far as I understand is that for comet the data needs to be a new copy, as spark will reuse the existing data/arrays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@alamb alamb Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- I think it would help if we made a function with a name that made it clear what was going on, something like

fn force_new_data_copy(...)

I also left a suggestion here apache/datafusion-comet#835 (comment)

@@ -379,7 +430,8 @@ impl Stream for FilterExecStream {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = batch_filter(&batch, &self.predicate)?;
let filtered_batch =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding a test so we don't accidentally break the feature by accident

@alamb alamb marked this pull request as draft August 20, 2024 18:59
@alamb
Copy link
Contributor

alamb commented Aug 20, 2024

Marking as draft as I think this PR is no longer waiting on feedback.

Copy link

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label Oct 20, 2024
@github-actions github-actions bot closed this Oct 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions Stale PR has not had any activity for some time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants