Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce sorting handle fetchable operators, add option to repartition based on row count estimates #11875

Merged
merged 30 commits into from
Aug 10, 2024

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Aug 7, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Recently @alihandroid added limit pushdown support for physical plan in the PR. After this PR, I recognized that EnforceSorting rule has some problems in handling operators with fetch. It sometimes loses fetch number during sort pushdown (Since LimitPushdown rule is after EnforceSorting we do not hit erroneous cases currently.). Hence, I used unit tests to trigger erroneous handlings.

Are these changes tested?

Yes, unit tests are added.

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Aug 7, 2024
@@ -3019,11 +3019,11 @@ mod tests {

assert_batches_sorted_eq!(
[
"+-----+-----+----+-------+",
Copy link
Contributor Author

@mustafasrepo mustafasrepo Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result of this test changes with this PR. I have analyzed the change, previously this tes was generating the following plan:

    "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as sum]",
    "  RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "    SortExec: expr=[c1@0 ASC,c2@1 ASC,c3@2 ASC], preserve_partitioning=[false]",
    "      GlobalLimitExec: skip=0, fetch=1",
    "        CoalescePartitionsExec",
    "          CoalesceBatchesExec: target_batch_size=8192, fetch=1",
    "            FilterExec: c2@1 = 3 AND c1@0 = a",
    "              RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "                CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",

After the changes in this PR, following plan is generated

    "ProjectionExec: expr=[c1@0 as one, c2@1 as two, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as total]",
    "  RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "    SortExec: TopK(fetch=1), expr=[c3@2 ASC], preserve_partitioning=[false]",
    "      CoalescePartitionsExec",
    "        CoalesceBatchesExec: target_batch_size=8192",
    "          FilterExec: c2@1 = 3 AND c1@0 = a",
    "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "              CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",

I think the second plan generates a deterministic result. However, the query (dataframe query) is not deterministic as is.
With this observation, I have updated the place of the limit to make sure the query is deterministic after execution. With the change of the place of the limit, the following plan is generated:

    "ProjectionExec: expr=[c1@0 as one, c2@1 as two, c3@2 as c3, CAST(c2@1 AS Int8) + c3@2 as total]",
    "  GlobalLimitExec: skip=0, fetch=1",
    "    SortPreservingMergeExec: [c1@0 ASC,c2@1 ASC,c3@2 ASC], fetch=1",
    "      SortExec: TopK(fetch=1), expr=[c3@2 ASC], preserve_partitioning=[true]",
    "        CoalesceBatchesExec: target_batch_size=8192",
    "          FilterExec: c2@1 = 3 AND c1@0 = a",
    "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "              CsvExec: file_groups={1 group: [[<PATH>]]}, projection=[c1, c2, c3], has_header=true",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it also makes sense that the previous test did a sort right after a select + filter which will not produce a deterministic result. Doing the limit after the sort makes sense

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +1780 to +1790
05)--------ProjectionExec: expr=[]
06)----------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
07)------------CoalesceBatchesExec: target_batch_size=4096
08)--------------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
09)----------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
10)------------------ProjectionExec: expr=[c1@0 as c1]
11)--------------------CoalesceBatchesExec: target_batch_size=4096
12)----------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
13)------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
14)--------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better plan 🚀

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking very good!

For other reviewers: The optimizer removes the RR in some SLT tests because we estimate to have a single batch (the RR would be pointless). We are getting very smart 🚀

@alamb it would be great if you could take a look

@alamb
Copy link
Contributor

alamb commented Aug 8, 2024

@alamb it would be great if you could take a look

Will put it on my list for today

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Aug 9, 2024
@alamb alamb changed the title Enforce sorting handle fetchable operators. Enforce sorting handle fetchable operators, add option to repartition based on row count estimates Aug 9, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mustafasrepo and @ozankabak -- I went through this PR carefully and I think it looks good to me.

I had some improvement suggestions but I don't think any are ncessary prior to merge

datafusion/common/src/config.rs Outdated Show resolved Hide resolved
@@ -3019,11 +3019,11 @@ mod tests {

assert_batches_sorted_eq!(
[
"+-----+-----+----+-------+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it also makes sense that the previous test did a sort right after a select + filter which will not produce a deterministic result. Doing the limit after the sort makes sense

datafusion/physical-plan/src/limit.rs Outdated Show resolved Hide resolved
datafusion/physical-plan/src/limit.rs Outdated Show resolved Hide resolved
datafusion/physical-plan/src/sorts/sort.rs Outdated Show resolved Hide resolved
datafusion/sqllogictest/test_files/order.slt Outdated Show resolved Hide resolved
@ozankabak
Copy link
Contributor

Thanks for the review @alamb -- I will send one more commit and then this will be good to go.

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a careful study of the code, I have one issue in mind (for which I left an inline comment). We can merge the code after we make sure the plan change in question is not due to a regression.

Comment on lines -406 to +407
05)--------MemoryExec: partitions=4
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[]
06)----------MemoryExec: partitions=1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only thing I don't understand here. I studied the rule logic but it is not clear to me why we don't use source output multi-partitioning but a RR later on.

Once we are sure this is not due to some regression, we can merge this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I figured out what is going on here. With the optimizations we now do, the CREATE TABLE ... SELECT ... query doesn't create a multi-partition table (because it is not helpful). Therefore we see the RR in the downstream test. Reducing the batch size just before the test gives us the old plan. I updated the comment above accordingly.

So all is good, ready to go.

@ozankabak ozankabak merged commit 79fa6f9 into apache:main Aug 10, 2024
25 checks passed
wiedld added a commit to influxdata/arrow-datafusion that referenced this pull request Aug 15, 2024
…artition based on row count estimates (apache#11875)"

This reverts commit 79fa6f9.
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants