Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: nested loop join requires outer table to be a FusedStream #12189

Merged
merged 1 commit into from
Sep 12, 2024

Conversation

YjyJeff
Copy link
Contributor

@YjyJeff YjyJeff commented Aug 27, 2024

Which issue does this PR close?

Closes #12187

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions bot added the physical-expr Physical Expressions label Aug 27, 2024
Copy link
Contributor

@crepererum crepererum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this bug visible/reproducible somehow? If so, could you add a regression test?

@YjyJeff
Copy link
Contributor Author

YjyJeff commented Aug 28, 2024

Is this bug visible/reproducible somehow? If so, could you add a regression test?

@crepererum This bug is triggered by our customized TableScan operator that does not support FusedStream contract. In our implementation, when the stream is finished, calling the poll_next method again will cause panic.

It is simple to mimic the above situation. Firstly, modify the MemoryStream such that it does not support FusedStream contract with following code:

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        Poll::Ready(if self.index < self.data.len() {
            self.index += 1;
            let batch = &self.data[self.index - 1];

            // return just the columns requested
            let batch = match self.projection.as_ref() {
                Some(columns) => batch.project(columns)?,
                None => batch.clone(),
            };

            Some(Ok(batch))
        } else if self.index == self.data.len() {
            self.index += 1;
            None
        } else {
            panic!("Exhausted stream is polled again")
        })
    }

Secondly, run the datafusion-cli binary with following sqls:

CREATE TABLE wtf AS VALUES (4, 'arrow'), (99, 'datafusion');

select t0.column1, t1.column1, t0.column2, t1.column2 from wtf t0 left join wtf t1 on t0.column1 > t1.column1;

Then, we will hit the panic statement.

Currently, the ExecutionPlan trait only requires the execute method to return the struct that implements the Stream trait. According to the definition of the poll_next method, panic can happens when polling the exhausted stream

@crepererum
Copy link
Contributor

I think we have roughly the following paths forward here:

  • just fix this issue here (w/ or w/o regression test)
  • do NOT require FusedStream: we could invest in some tests that patch the physical plan to replace all streams with ones that panic if they are polled after the end (e.g. enforce NOT FusedStream)
  • update the entire stack to require FusedStream (requires additional flags/state in a few places, so potentially costly)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @crepererum and @YjyJeff -- I think this PR would be ok to merge.

just fix this issue here (w/ or w/o regression test)

I would prefer "fix the issue here with a regression test"

The reason I suggest a regression test is that otherwise this code may very well get lost in a future refactoring -- without regression tests for coverage we wouldn't know if this happened

Do you think you can add such a test @YjyJeff ?

@alamb
Copy link
Contributor

alamb commented Sep 10, 2024

I will wait a few more days to merge this PR to see if @YjyJeff can help add a regression test

@alamb
Copy link
Contributor

alamb commented Sep 12, 2024

Merging this to get it into 42

It would be great to get a test for this @YjyJeff otherwise it is likely a regression will creep in. Let's try and do that as a follow on PR

@alamb alamb merged commit b25aa33 into apache:main Sep 12, 2024
24 checks passed
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Left/Full join of the NestedLoopJoin requires the RHS(outer table) to be a FusedStream
3 participants