Mitigate Writer skewness when writing partitioned data with preferred partitioning enabled#14718
Mitigate Writer skewness when writing partitioned data with preferred partitioning enabled#14718arhimondr merged 2 commits intotrinodb:masterfrom gaurav8297:gaurav8297/partition_scale_writer_new
Conversation
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: consider introducing builder
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchangeSourceOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
The name is bit generic. Could you also add javadoc?
There was a problem hiding this comment.
It feels like the granularity like this is a little bit of an overkill. With the task concurrency of 32 the implementation would have to re-balance 4096 partitions and keep the state for each partition in memory. Generally there's only a few partitions that are skewed. I wonder if we should start with something lower, maybe 8? (32 * 8 = 256)
There was a problem hiding this comment.
With 8 or some other low value, it could be possible that a few small partitions are scaled along with skewed ones because they belong to the same bucket. But, it might be okay since 256 is per node. WDYT? @sopel39
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/SystemPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableLayout.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableLayout.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/ScaleWriterPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/ScaleWriterPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/ScaleWriterPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/ScaleWriterPartitioningHandle.java
Outdated
Show resolved
Hide resolved
|
Up to "Introduce ScaleWriterPartitioningHandle " lgtm % comments |
I think there are two reasons:
|
sopel39
left a comment
There was a problem hiding this comment.
lgtm % Add scaleWriters flag in PartitioningHandle
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/PartitioningHandle.java
Outdated
Show resolved
Hide resolved
lgtm until Add scaleWriters flag in PartitioningHandle
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
MAX_PARTITIONS_TO_REBALANCE_PER_WRITER * 32 = 4096, which means in worst case writerPartitionIdToRowCount will be as big as page.
Consider reversing the relationship, e.g: partitionRebalancer fetching writerPartitionIdToRowCount from ScaleWriterPartitioningExchanger instead.
There was a problem hiding this comment.
I thought about this but it just makes the code more complex. I wonder if it's worth the effort.
There was a problem hiding this comment.
I reversed the relationship. PTAL
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
In trino, the CPU time is almost similar for all related queries. So to better understand, I created this doc containing CPU utilization from the AWS console. https://docs.google.com/document/d/1Bg5z-EzavtkXnBfhNLdJE3ngXRgrYrmmTVRJwiMhpTE/edit?usp=sharing |
|
@arhimondr @sopel39 PTAL again |
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
arhimondr
left a comment
There was a problem hiding this comment.
LGTM % a couple of nits
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
|
@sopel39 PTAL again |
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/PartitioningScheme.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/SystemPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: it might be clearer if you had a field like lastScaleUpPhysicalWrittenBytes and didn't set physicalWrittenBytesAtLastRebalance to 0
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestLocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestUniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestUniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestUniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestUniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestLocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
First figure out if there's a skewness across writers in a node, then find the biggest partitions in the skewed writers and scale them across writers which are on the lower end i.e. written smallest amount of data. Scaling will only happen if the skewness is above 70% and the partition to be scaled has written atleast writerMinSize since last scale up.
Description
Issue: #13379
Problem
Improve the performance of partitioned writes specifically in case writers/partitions are skewed.
Right now, prefer-partitioning only works if you have statistics and the number of partitions is greater than preferred-write-partitioning-min-number-of-partitions (default to 50). However, we know that stats are not always guaranteed to be present in which case partitioned writes will go through from an inefficient route. But with scaling, we could enable prefer-partitioning for any number of partitions thus we don't have to rely on statistics.
Benchmarks
Cluster with 6 worker nodes
1.) Single partition (260M rows)
1:31 mins18:34 mins2:55 mins2.) 8 partitions (600M rows)
2:11 mins6:23 mins1:48 mins3.) 2000+ partitions with almost no skewness (600M rows)
13:23 mins11:32 mins10:55 mins4.) 2000+ partitions with 6 skewed partitions (2.74B rows)
22:18 mins(400GB peak memory)55:26 mins(76.1GB peak memory)20:33 mins(79.5GB peak memory)In experiments 3 and 4, the finishing time is also substantial and included in the measurements (almost +9 mins)
Non-technical explanation
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: