-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move repartition_file_scans
out of enable_round_robin
check in EnforceDistribution
rule
#8731
Conversation
// When `repartition_file_scans` is set, attempt to increase | ||
// parallelism at the source. | ||
if repartition_file_scans { | ||
if let Some(new_child) = | ||
child.plan.repartitioned(target_partitions, config)? | ||
{ | ||
child.plan = new_child; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repartition_file_scans
doesn't look like it should be guarded by enable_round_robin
config.
// Increase parallelism by adding round-robin repartitioning | ||
// on top of the operator. Note that we only do this if the | ||
// partition count is not already equal to the desired partition | ||
// count. | ||
child = add_roundrobin_on_top(child, target_partitions)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look correct to me. The produced query plan also looks confused.
If there are already distribution requirements like single partition or hash partitioning (i.e., it will be repartitioning), why we add round-robin repartitioning below it to result in two repartitioning?
From the comment, looks like it is to increase parallelism? But the parallelism is bound to cpus, however here the partitioning number is target_partitions
. That's said it could go to round-robin into much higher partitions (9000 for example as I saw such in one existing test case). But I don't think we will have parallelism as same as target_partitions
. Not to mention that additional round-robin partitioning might have their cost.
Regarding parallelism, it sounds more correct to produce partitions at scans (i.e., repartition_file_scans
) and results in required parallelism on following operators naturally instead of inserting arbitrary round-robin repartitioning around distribution requirements.
I did benchmark for this change. Overall I don't see significant downgrade.
For example:
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ cleanup ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 3.19ms │ 3.13ms │ no change │
│ QQuery 1 │ 22.84ms │ 24.64ms │ 1.08x slower │
│ QQuery 2 │ 71.05ms │ 79.33ms │ 1.12x slower │
│ QQuery 3 │ 60.94ms │ 67.83ms │ 1.11x slower │
│ QQuery 4 │ 453.96ms │ 470.30ms │ no change │
│ QQuery 5 │ 1089.41ms │ 877.61ms │ +1.24x faster │
│ QQuery 6 │ 24.80ms │ 27.15ms │ 1.09x slower │
│ QQuery 7 │ 24.79ms │ 26.62ms │ 1.07x slower │
│ QQuery 8 │ 1001.42ms │ 1000.47ms │ no change │
│ QQuery 9 │ 1625.50ms │ 1653.77ms │ no change │
│ QQuery 10 │ 201.92ms │ 237.07ms │ 1.17x slower │
│ QQuery 11 │ 226.95ms │ 251.00ms │ 1.11x slower │
│ QQuery 12 │ 1023.53ms │ 791.79ms │ +1.29x faster │
│ QQuery 13 │ 1672.11ms │ 1435.32ms │ +1.16x faster │
│ QQuery 14 │ 1117.68ms │ 881.34ms │ +1.27x faster │
│ QQuery 15 │ 554.72ms │ 574.87ms │ no change │
│ QQuery 16 │ 1900.79ms │ 1840.62ms │ no change │
│ QQuery 17 │ 1884.34ms │ 1791.93ms │ no change │
│ QQuery 18 │ 4313.52ms │ 3465.75ms │ +1.24x faster │
│ QQuery 19 │ 43.34ms │ 53.79ms │ 1.24x slower │
│ QQuery 20 │ 1976.39ms │ 1850.32ms │ +1.07x faster │
│ QQuery 21 │ 2127.49ms │ 2010.90ms │ +1.06x faster │
│ QQuery 22 │ 4457.89ms │ 4859.12ms │ 1.09x slower │
│ QQuery 23 │ 8203.33ms │ 8276.48ms │ no change │
│ QQuery 24 │ 511.62ms │ 426.74ms │ +1.20x faster │
│ QQuery 25 │ 486.66ms │ 366.92ms │ +1.33x faster │
│ QQuery 26 │ 588.12ms │ 473.45ms │ +1.24x faster │
│ QQuery 27 │ 1692.65ms │ 1595.15ms │ +1.06x faster │
│ QQuery 28 │ 12640.52ms │ 12382.83ms │ no change │
│ QQuery 29 │ 407.09ms │ 447.76ms │ 1.10x slower │
│ QQuery 30 │ 968.69ms │ 770.84ms │ +1.26x faster │
│ QQuery 31 │ 1066.54ms │ 876.63ms │ +1.22x faster │
│ QQuery 32 │ 10732.72ms │ 8554.12ms │ +1.25x faster │
│ QQuery 33 │ 7478.91ms │ 6086.03ms │ +1.23x faster │
│ QQuery 34 │ 6516.26ms │ 6733.28ms │ no change │
│ QQuery 35 │ 1236.72ms │ 1207.24ms │ no change │
│ QQuery 36 │ 198.37ms │ 195.67ms │ no change │
│ QQuery 37 │ 99.59ms │ 99.33ms │ no change │
│ QQuery 38 │ 111.45ms │ 111.15ms │ no change │
│ QQuery 39 │ 432.10ms │ 432.58ms │ no change │
│ QQuery 40 │ 45.72ms │ 43.84ms │ no change │
│ QQuery 41 │ 41.19ms │ 41.65ms │ no change │
│ QQuery 42 │ 48.21ms │ 48.99ms │ no change │
└──────────────┴────────────┴────────────┴───────────────┘
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea as you said is to increase parallelism of the hash-repartition operator. target_partitions
in most cases / by design is currently bound to the CPU.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure the clickbench benchmark actually has different query plans? As we push down repartitioning to e.g. parquet/csv, we shouldn't introduce the round Robin repartitioning anymore in those plans, as target_partitions
requirement might already be met.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure as these benchmarks don't check query plans. I may need to manually check it. I will post back what I see later. However, my first idea might be, doesn't it mean mostly we don't need to add these round robin? I'm not sure if the addition of the round robin is driven by seeing better performance number.
Btw, I know this change might be a bit controversial before I change it. I change it because the query plan looks weird to have two-level repartitions always (round robin + hashing) and some of them looks unreasonable (round robin 9000 partitions?) so want to raise the discussion. If I cannot get consensus on the removal, I can restore it for sure and keep other change in this PR only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea as you said is to increase parallelism of the hash-repartition operator.
target_partitions
in most cases / by design is currently bound to the CPU.
Yea, however, I think in most cases hash repartitioning should already take not-single partitioned input (unless its input is intentionally partitioned into single partition). If the scans are well partitioned, later operators follow the partitioning.
Also, I don't see it can obviously affect performance so far (based on the benchmark result locally). Actually, I think mostly the addition round robin doesn't help but adds a little cost (the round robin partitioning is not zero-cost op).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, before partitions were pushed down to scans, adding round robin repartitioning after e.g. single files was better than doing it only in hash repartition. I think if you disable the file repartitioning or test with formats that don't support it you should be able to measure the effect.
The additional cost of round-robin is extremely small (invisible in profiles), especially compared to hash repartitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me restore this change first and make this PR (other parts) forward.
@@ -1198,37 +1191,25 @@ fn ensure_distribution( | |||
) | |||
.map( | |||
|(mut child, requirement, required_input_ordering, would_benefit, maintains)| { | |||
// Don't need to apply when the returned row count is not greater than 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on what value is compared with (batch_size
), I think this should be batch_size
instead of 1
.
child_context.distribution_connection = match child_context.plan.as_any() { | ||
plan_any if plan_any.is::<RepartitionExec>() => matches!( | ||
plan_any | ||
.downcast_ref::<RepartitionExec>() | ||
.unwrap() | ||
.partitioning(), | ||
Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _) | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary matching. For all supported partitioning in RepartitionExec
, distribution_connection
should be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And changed to use utility functions like is_repartition
.
------------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 | ||
--------------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
----------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], has_header=false | ||
------------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these plans in these tests are less optimal parallelism-wise as the (compute intensive) hash-repartition is now running on one partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reason to make the change: #8731 (comment)
if repartition_beneficial_stats { | ||
// Since hashing benefits from partitioning, add a round-robin repartition | ||
// before it: | ||
input = add_roundrobin_on_top(input, n_target)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks redundant as we already have it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted this first. I will clean these up in other PR.
@@ -1237,12 +1231,7 @@ fn ensure_distribution( | |||
child = add_spm_on_top(child); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Dandandan Do we need the additional round robin for single partition? This looks unnecessary as single partitioning is not costly as hash partitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me focus on repartition_file_scans
change first in this PR. I will open another PR to clean up these round robin stuffs.
@@ -147,7 +147,7 @@ WITH HEADER ROW | |||
LOCATION 'test_files/scratch/repartition_scan/csv_table/'; | |||
|
|||
query I | |||
select * from csv_table; | |||
select * from csv_table ORDER BY column1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes the test result stable, otherwise every time I rerun it locally, it gets different results...
@@ -3794,7 +3794,7 @@ select a, | |||
1 1 | |||
2 1 | |||
|
|||
# support scalar value in ORDER BY | |||
# support scalar value in ORDER BY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply ran all sqllogictests locally. Seems some space chars left there.
} else { | ||
true | ||
}; | ||
|
||
// When `repartition_file_scans` is set, attempt to increase | ||
// parallelism at the source. | ||
if repartition_file_scans { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check for repartition_beneficial_stats
maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, looks like we have filled file scan statistics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a good code change to me, but I am surprised that there are no (real) test changes needed other than parallelization_two_partitions
The changes to parallelization_two_partitions
are pretty subtle to me in that if they changed I might not catch the regression (the plan would simply look like it had an implicit rather than an explicit range)
Is there some way we can add an explain plan to prevent this code from being broken in the future?
Let me see how to add one. |
# disable round robin repartitioning | ||
statement ok | ||
set datafusion.optimizer.enable_round_robin_repartition = false; | ||
|
||
## Expect to see the scan read the file as "4" groups with even sizes (offsets) again | ||
query TT | ||
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42; | ||
---- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified that this test case fails on current main
branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me -- thank you @viirya
cc @mustafasrepo / @ozankabak as you have expertise in this area of the code I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @viirya LGTM!.
Thank you @alamb @Dandandan @mustafasrepo @ozankabak |
Which issue does this PR close?
Closes #.
Rationale for this change
This patch cleans up some code in
EnforceDistribution
rule. In particular, it also does one major changes by movingrepartition_file_scans
out of the check byenable_round_robin
.What changes are included in this PR?
Are these changes tested?
Existing tests
Are there any user-facing changes?
No