Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LimitPushdown optimization rule and CoalesceBatchesExec fetch #27

Closed
wants to merge 51 commits into from

Conversation

alihandroid
Copy link

@alihandroid alihandroid commented Jul 10, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

Physical plans can be optimized further by pushing GlobalLimitExec and LocalLimitExec down through certain nodes, or using versions of their children nodes with fetch limits, without changing the result. This reduces unnecessary data transfer and processing for a more efficient plan execution.

CoalesceBatchesExec can also benefit from this improvement, and as such, a fetch limit functionality is implemented for it.

For example,

GlobalLimitExec: skip=0, fetch=5
  StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

can be turned into

StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=5

and

GlobalLimitExec: skip=0, fetch=5
  CoalescePartitionsExec
    FilterExec: c3@2 > 0
      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
        StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

can be turned into

GlobalLimitExec: skip=0, fetch=5
  CoalescePartitionsExec
    LocalLimitExec: fetch=5
      FilterExec: c3@2 > 0
        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
          StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

without changing the result, but using fewer resources and finishing faster

Other examples can be found in the tests provided in limit_pushdown.rs

What changes are included in this PR?

Implement LimitPushdown Rule:

  • Introduced new APIs in the ExecutionPlan trait:
    • with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>: Returns fetching version if supported, None otherwise. The default implementation returns None
    • supports_limit_pushdown(&self) -> bool: Returns true if a node supports limit pushdown. The default implemenation returns false

Add fetch support to CoalesceBatchesExec:

  • Add fetch field and with_fetch implementation
  • Add new_with_fetch constructor
  • Implement fetch limit functionality

Are these changes tested?

Unit tests are provided for LimitPushdown and the new fetching support for CoalesceBatchesExec

Are there any user-facing changes?

No. The changes only affect performance

@github-actions github-actions bot added the core label Jul 10, 2024
@alihandroid alihandroid changed the title Add LimitPushdown optimization rule Add LimitPushdown optimization rule and CoalesceBatchesExec fetch Jul 15, 2024
@alihandroid alihandroid marked this pull request as ready for review July 15, 2024 15:37
@github-actions github-actions bot added documentation Improvements or additions to documentation sqllogictest labels Jul 17, 2024
@alihandroid
Copy link
Author

Rebased the fork because of conflicts

07)------------AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0) as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted
08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
09)----------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC]
03)----LocalLimitExec: fetch=5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LocalLimitExec seems to be redundant, since it is pushed down below projection.

03)----ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks]
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC]
03)----LocalLimitExec: fetch=5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem above

Comment on lines 102 to 103
03)----LocalLimitExec: fetch=10
04)------ProjectionExec: expr=[a@0 as a2, b@1 as b]
Copy link
Collaborator

@mustafasrepo mustafasrepo Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limit pushed down below projection as fetch inside the CoalesceBatchesExec. This limit is redundant.

Copy link

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very cool 👓

I think it would also help if we could document somewhere why we need both this physical optimizer and the logical optimizer here: https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_limit.rs

I wonder perhaps if we should remove the logical pushdown entirely in favor of physical pushdown 🤔

// specific language governing permissions and limitations
// under the License.

//! This rule reduces the amount of data transferred by pushing down limits as much as possible.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! This rule reduces the amount of data transferred by pushing down limits as much as possible.
//! [`LimitPushdown`]: Pushes limits into the plan as much as much as possible.

"| | TableScan: t projection=[a, b] |",
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |",
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | LocalLimitExec: fetch=10 |",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to get this plan would be to support limit pushdown into MemoryExec (rather than applying the limit with a new LocalLimitExec)

@@ -319,6 +376,86 @@ mod tests {
Ok(())
}

#[tokio::test]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
03)----LocalLimitExec: fetch=5
04)------UnionExec
05)--------SortExec: expr=[c9@1 DESC], preserve_partitioning=[true]
Copy link

@alamb alamb Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SortExec can be limited -- which would likely be better to use too (it has a special implementation with limit)

https://docs.rs/datafusion-physical-plan/40.0.0/src/datafusion_physical_plan/sorts/sort.rs.html#683-684

I think you could also push the limit all the way down to the CSVExec in this plan too

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually would have expected the logical optimizer to have already pushed the limit into the inputs 🤔

https://github.com/apache/datafusion/blob/fc8e7b90356b94af5f591240b8165bc4c8275a51/datafusion/optimizer/src/push_down_limit.rs#L103

@berkaysynnada
Copy link
Collaborator

Congrats again, @alihandroid. We only have two remaining issues to address. You can go ahead and open this PR to the upstream repo now. While you work on the remaining tasks, we might receive additional feedback from the upstream reviewers, so there's no need to wait any longer.

@alihandroid
Copy link
Author

@berkaysynnada Still working on the coalesce batches test cases but the limit merging is done. The PR at the upstream repo is apache#11652

@berkaysynnada
Copy link
Collaborator

berkaysynnada commented Jul 25, 2024

@berkaysynnada Still working on the coalesce batches test cases but the limit merging is done. The PR at the upstream repo is apache#11652

Great job. The tests are failing because there are two concat_batches functions: one that requires the number of rows and another that does not. You should use the one from arrow::compute. I took a quick look, and it seems well done. Good work 👍🏻

@github-actions github-actions bot added documentation Improvements or additions to documentation physical-expr optimizer substrait labels Jul 26, 2024
@ozankabak
Copy link
Collaborator

Merged upstream.

@ozankabak ozankabak closed this Jul 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants