Skip to content

Conversation

@zhuqi-lucas
Copy link
Contributor

Which issue does this PR close?

This PR removed the unused logic, why it's unused?

Because now we have

Rationale for this change

Because now we have unified DataSourceExec and with fetch support, so the limit pushdown can be pushed to DataSourceExec, the corner case will not happen for this logic.

For example:

query TT
explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2;
----
logical_plan
01)Limit: skip=0, fetch=2
02)--Cross Join: 
03)----SubqueryAlias: t1
04)------Limit: skip=0, fetch=2
05)--------TableScan: testsubquerylimit projection=[a, b], fetch=2
06)----Limit: skip=0, fetch=2
07)------TableScan: testsubquerylimit projection=[a, b], fetch=2
physical_plan
01)GlobalLimitExec: skip=0, fetch=2
02)--CrossJoinExec
03)----DataSourceExec: partitions=1, partition_sizes=[1], fetch=2
04)----DataSourceExec: partitions=1, partition_sizes=[1], fetch=2

What changes are included in this PR?

Are these changes tested?

Yes, the test existed in limit.

Are there any user-facing changes?

No

@github-actions github-actions bot added the optimizer Optimizer rules label Apr 16, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @zhuqi-lucas

This seems ok to me given all tests are passing, but maybe @xudong963 or @berkaysynnada could give it a look to double check as I think they know the code better than I

@berkaysynnada
Copy link
Contributor

 // If the plan's children have limit and the child's limit < parent's limit, we shouldn't change the global state to true, because the children limit will be overridden if the global state is changed.

if parent has limit, the global state would have been already satisfied before, in these lines

   // If we have a non-limit operator with fetch capability, update global
    // state as necessary:
    if pushdown_plan.fetch().is_some() {
        if global_state.fetch.is_none() {
            global_state.satisfied = true;
        }

right? So, that condition seems to guard nothing, IMO as well.

@zhuqi-lucas
Copy link
Contributor Author

 // If the plan's children have limit and the child's limit < parent's limit, we shouldn't change the global state to true, because the children limit will be overridden if the global state is changed.

if parent has limit, the global state would have been already satisfied before, in these lines

   // If we have a non-limit operator with fetch capability, update global
    // state as necessary:
    if pushdown_plan.fetch().is_some() {
        if global_state.fetch.is_none() {
            global_state.satisfied = true;
        }

right? So, that condition seems to guard nothing, IMO as well.

Yeah @berkaysynnada , previous we have some bugs due to no unified DataSourceExec and MemoryExec has no limit/fetch, now it's has been fixed, so we can remove the corner case logic.

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot see/remember the relation between DataSource fetches and these lines. These 2 conditions seem to me never happens at the same time, but I put an assert like that:

         assert!(
                !(!pushdown_plan
                    .children()
                    .iter()
                    .any(|&child| extract_limit(child).is_some())
                    && !global_state.satisfied)
            );

on top of these lines, and some tests start to fail.

So, that condition was guarding some cases indeed. Are you aware of that?

@2010YOUY01
Copy link
Contributor

I'm wondering what's the semantics of fetch in DataSourceExec now, I think it makes sense to be either

  • exact fetch: if datasource's output size is >= fetch, exact fetch number of rows will be returned
  • best-effort fetch (or fetch hint): data source is okay to return more rows than fetch

The best effort one seems a better API interface to me, because it can simplify implementation for some custom scanner: for example only early terminate at batch level instead of row level.

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Apr 18, 2025

I'm wondering what's the semantics of fetch in DataSourceExec now, I think it makes sense to be either

  • exact fetch: if datasource's output size is >= fetch, exact fetch number of rows will be returned
  • best-effort fetch (or fetch hint): data source is okay to return more rows than fetch

The best effort one seems a better API interface to me, because it can simplify implementation for some custom scanner: for example only early terminate at batch level instead of row level.

Thank you @2010YOUY01 for this idea, I was thinking we use exact fetch for now, need to check it.

This is a good point, it looks like a performance improvement for some cases.

Updated, for example memexec, it will use slice to return exact fetch lines.

fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if self.index >= self.data.len() {
            return Poll::Ready(None);
        }
        self.index += 1;
        let batch = &self.data[self.index - 1];
        // return just the columns requested
        let batch = match self.projection.as_ref() {
            Some(columns) => batch.project(columns)?,
            None => batch.clone(),
        };

        let Some(&fetch) = self.fetch.as_ref() else {
            return Poll::Ready(Some(Ok(batch)));
        };
        if fetch == 0 {
            return Poll::Ready(None);
        }

        let batch = if batch.num_rows() > fetch {
            batch.slice(0, fetch)
        } else {
            batch
        };
        self.fetch = Some(fetch - batch.num_rows());
        Poll::Ready(Some(Ok(batch)))
    }

The same for filestream, it will slice batch:

match ready!(reader.poll_next_unpin(cx)) {
                        Some(Ok(batch)) => {
                            self.file_stream_metrics.time_scanning_until_data.stop();
                            self.file_stream_metrics.time_scanning_total.stop();
                            let result = self
                                .pc_projector
                                .project(batch, partition_values)
                                .map_err(|e| ArrowError::ExternalError(e.into()))
                                .map(|batch| match &mut self.remain {
                                    Some(remain) => {
                                        if *remain > batch.num_rows() {
                                            *remain -= batch.num_rows();
                                            batch
                                        } else {
                                            let batch = batch.slice(0, *remain);
                                            self.state = FileStreamState::Limit;
                                            *remain = 0;
                                            batch
                                        }
                                    }
                                    None => batch,
                                });

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Apr 18, 2025

 // Add fetch or a `LimitExec`:
            // If the plan's children have limit and the child's limit < parent's limit, we shouldn't change the global state to true,
            // because the children limit will be overridden if the global state is changed.

   if !pushdown_plan
                .children()
                .iter()
                .any(|&child| extract_limit(child).is_some())
            {
                global_state.satisfied = true;
            }

Thank you @berkaysynnada for checking, actually, this special logic was added to fix this issue:

#14204

And after the memory exec/datasource exec supported fetch/limit, the above issue will not happen because the datasource will take over the pushed down limit.

@berkaysynnada
Copy link
Contributor

Run extended tests

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @zhuqi-lucas. I run the extended tests to not break anything. Otherwise, LGTM, and I also don't see any reason to keep these lines

@zhuqi-lucas
Copy link
Contributor Author

Thank you @berkaysynnada for review.

@berkaysynnada berkaysynnada merged commit 280997d into apache:main Apr 21, 2025
28 checks passed
nirnayroy pushed a commit to nirnayroy/datafusion that referenced this pull request May 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants