Implement runtime adaptive partitioning for FTE#18349
Implement runtime adaptive partitioning for FTE#18349losipiuk merged 5 commits intotrinodb:masterfrom
Conversation
01856c3 to
942e9cc
Compare
There was a problem hiding this comment.
Not reading deep it looks like the repartioning logic should be extracted to separate method updateStagesPartitioning? and then called either from optimizePlan (preferably) or from schedule (if for some reason it needs to be there)
There was a problem hiding this comment.
Hmm that means I need to call isReadyForExecution twice. Not sure what is more important --- code organization or efficiency.
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
it is not obvious to me why partitioned sources is always emtpy - can you add explanatory comment in the code?
There was a problem hiding this comment.
partitioned sources will always be empty --- but remote partitioned sources are not. The input to the runtimeAdaptiveRemoteSourceNode will only be from the 'runtimeAdaptiveRemoteSourceNode'
There was a problem hiding this comment.
@sopel39 can you verify if we can always just insert extra stage which does the additional partitioning? Are we sure that partitiningScheme taken from sourceFragment will be able to operate on the output of the sourceFragment?
There was a problem hiding this comment.
@losipiuk I'm not sure what this code is about. Extra stage will always hurt performance. It is possible to maybe improve existing operators in source stage?
There was a problem hiding this comment.
@sopel39 : updated commit message. Basically we find performance hurts when number of partitions is large, and we see that 200 partitions will be enough for most of the cases for FTE. So small queries will just use 200 partitions, for huge queries if we see the size of intermediate stage is large, then we change the partition count (and therefore need to insert extra shuffle stages)
There was a problem hiding this comment.
I would suggest having separate test which enforces that. Could be whole AbstractTestAggregations.java and maybe more but with different engine configuration.
Make sure to add assertions which verify that runtime repartitining actually happens so we know that test actually provides coverage.
There was a problem hiding this comment.
I would suggest having separate test which enforces that
Yes I can do that. Probably for both AbstractTestAggregations.java and AbstractTestJoins.java.
Make sure to add assertions which verify that runtime repartitining actually happens so we know that test actually provides coverage.
Hmm I don't see an easy way to do this
core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
@losipiuk I'm not sure what this code is about. Extra stage will always hurt performance. It is possible to maybe improve existing operators in source stage?
e7a8198 to
27dcf7f
Compare
95902ab to
08d0879
Compare
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
fa224fd to
a98cb7f
Compare
core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Let's change signature of private SubPlan optimizePlan(SubPlan plan) to
Optional<SubPlan> optimizePlan()
We now already exploit the fact that current presorted plan is cached on the planInTopologicalOrder. With current code structure it is not obvious that plan passed as arguement and plan shared in state match each-other.
There was a problem hiding this comment.
Hmm I don't quite understand. When will we return Optional.empty()?
There was a problem hiding this comment.
I was thinking returning Optional.empty() if the plan should not change. It requires some call-flow refactoring. Can be a followup.
There was a problem hiding this comment.
isReadyForExecutionCache.computeIfAbsent(...) ?
There was a problem hiding this comment.
We probably can put the logic of how the cache is used into isReadyForExecution itself and just use the method. And clear the cache on each scheduler call.
There was a problem hiding this comment.
isReadyForExecutionCache.computeIfAbsent(...) ?
Cache is fresh at the beginning.
There was a problem hiding this comment.
We probably can put the logic of how the cache is used into isReadyForExecution itself and just use the method.
I took a look and find that it only complicates the logic of isReadyForExecution. The current version is more concise.
There was a problem hiding this comment.
Is the assumption that stage which requires repartitining will never be started speculatively?
There was a problem hiding this comment.
OK so this review comment is not consistent with the code:) If a stage has already started, then we will not be able to insert repartition stages before it.
There was a problem hiding this comment.
Yeah - I know - that is why I ask if that is not a problem. That is can we end up not being able to fix the query (bump partition count) because some stage already started executing speculatively?
There was a problem hiding this comment.
That's not possible since we always check for input data size of a stage before it's started, and this applies to speculatively started stages as well.
You are probably asking stages whose parents were speculatively started. In that case we use the extrapolated estimated output data size to determine whether we want to change partition count at runtime.
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Hmm.
Can it be that we have sth like
root
|
X (output partitioning: FIXED_BROADCAST_DISTRIBUTION)
|
...
|
Y
|
Z (output partitioning: HASH)
if Z output is large now then processing of Y may fail.
If such tree shape is not possible can we validate that it actually does not happen with an assertion?
There was a problem hiding this comment.
If the case you mentioned above happens, how can planner determine that the final output can be broadcasted? My guess is it's determine on input data sizes and filter ratio, which make your suspection an impossible scenario.
Of course, we can always be conservative here and apply runtime adaptive partitioning recursively.
There was a problem hiding this comment.
Yeah - it feels not very probable - but I also not see a big gain of special casing this. If you want to keep the code please explain in code comment the rationale and why we believe it is ok to do so.
core/trino-main/src/main/java/io/trino/sql/planner/RuntimeAdaptivePartitioningRewriter.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/RuntimeAdaptivePartitioningRewriter.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: is this test specifically about testing of skipping broadcast subtrees, or it generally tests rewriter mechanism. Looks like the latter. Change the name?
There was a problem hiding this comment.
It's actually both. For testCreateTableAs it's the same, it also generally tests the rewriter mechanism.
...lerant-tests/src/test/java/io/trino/faulttolerant/TestOverridePartitionCountRecursively.java
Outdated
Show resolved
Hide resolved
...-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
.../trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java
Outdated
Show resolved
Hide resolved
.../trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java
Outdated
Show resolved
Hide resolved
Having high number of partitions hurts the performance of FTE quite seriously, which presents a tradeoff between scalability and performance. This PR tries address this problem by calculating the total input size of each stage at runtime, and change the number of partitions at runtime by inserting extra shuffles to upstream stages. With this change, users should be able to make the best of both worlds --- majority of the queries can finish with a small partition count efficiently, and those huge queries will still be able to finish with a larger partition count.
`getStageInfo` is called asynchronously, so it's possible that the plan we get is a stale one, yet stages have been updated. To fix the race, we just switch the order, such that plan will not be staler than stageInfos.
a98cb7f to
cb7936a
Compare
|
LGTM. Some questions:
|
|
This looks like it needs a release note. Can you propose one? @linzebing cc @losipiuk |
|
@colebow : discussing offline. |
Having high number of partitions hurts the performance of FTE quite seriously, which presents a tradeoff between scalability and performance. This PR tries address this problem by calculating the total input size of each stage at runtime, and change the number of partitions at runtime by inserting extra shuffles to upstream stages.
With this change, users should be able to make the best of both worlds --- majority of the queries can finish with a small partition count efficiently, and those huge queries will still be able to finish with a larger partition count.
TODO:
EventDrivenFaultTolerantQuerySchedulerto separate classes/filesoverridePartitionCountRecursivelyisReadyForExecutioncalls in a scheduling loop (probably won't do in this PR. Tradeoff between code organization and efficiency. Can't see an elegant way to avoid this)