-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Handle skewness in Remote Exchange during writes #16802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@sopel39 Please take a look at some early benchmarks. I'm looking into how we can scale up faster. |
core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DriverContext.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DriverContext.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionHandler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionHandler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
sopel39
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it might be easier to just have abstract bucketIds and then map bucket to task when taskId is really needed
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
sopel39
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: that is one, but also if node count is 11 and bucket count is 8, then it will effectively mean that each node will write every bucket (so it's effectively round robin from the start).
|
awesome job! |
This commit introduces a SkewedPartitionRebalancer which helps in distributing big or skewed partitions across available tasks to improve the performance of partitioned writes. This rebalancer initialize a bunch of buckets for each task based on a given taskBucketCount and then tries to uniformly distribute partitions across those buckets. This helps to mitigate two problems: 1. Mitigate skewness across tasks. 2. Scale few big partitions across tasks even if there's no skewness among them. This will essentially speed the local scaling without impacting much overall resource utilization. Example: Before: 3 tasks, 3 buckets per task, and 2 skewed partitions Task1 Task2 Task3 Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 Bucket2 Bucket2 Bucket2 Bucket3 Bucket3 Bucket3 After rebalancing: Task1 Task2 Task3 Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 (Part 1) Bucket2 (Part 2) Bucket2 (Part 1) Bucket2 (Part 2) Bucket3 Bucket3 Bucket3
|
There is relevant failure: |
Fixed! |
Description
Benchmarks (6 worker nodes)
1.) Single partition (1.2B rows)
1:41 mins (cpu: 1.97h)7:54 mins (cpu: 1.97h)1:45 mins (cpu: 2.05h)2.) 3 partitions (514M rows)
55.78 mins (cpu: 50.29m, peak mem: 28.0GB)1:59 mins (cpu: 52.43m, peak mem: 16.7GB)51.08 secs (cpu: 52.45m, peak mem: 23.4GB)3.) 2000+ partitions with almost no skewness (600M rows)
2:34 mins (cpu: 52.12m, peak mem: 100GB)3:05 mins (cpu: 55.32m, peak mem: 63.5GB)2:55 mins (cpu: 55.20m, peak mem: 70.7GB)Additional context and related issues
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: