Local scale writers for partitioned data#14140
Local scale writers for partitioned data#14140gaurav8297 wants to merge 6 commits intotrinodb:masterfrom gaurav8297:gaurav8297/partition_scale_writer
Conversation
core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/PartitionedTableWriterOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/PartitionedTableWriterOperator.java
Outdated
Show resolved
Hide resolved
|
Benchmarks (More than 2x improvements) session:
Before (scaling disabled): After (scaling enabled): |
Pass partition channel types directly to LocalExchange instead of all types and then filtering inside the constructor.
core/trino-main/src/main/java/io/trino/operator/PartitionFunctionFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DriverContext.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/OperatorContext.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
How this value is related to scaleWritersMaxWriterCount?
core/trino-main/src/main/java/io/trino/operator/PartitionedTableWriterOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
This is not very clear. What's the difference between artificial partitions and standard partitions?
Also, actual partitions created are (in the case of hive and iceberg) based on the values in partitioning columns. I don't think this config influences that, right?
There was a problem hiding this comment.
There is a lot of overlap in functionality and a lot of code copied from TableWriterOperator. Can this be somehow reused? This could improve the readability of this class by removing non-essential functionality.
There was a problem hiding this comment.
Consider adding WriterUtils class for shared code
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: I would drop bucketSize and use positionsList.size() directly, the name is confusing.
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: you could add a validation that there indeed aren't more operators producing stats
core/trino-main/src/main/java/io/trino/operator/DriverContext.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/OperatorContext.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/OperatorContext.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
consider putting all insert operators in separate package.
core/trino-main/src/main/java/io/trino/operator/PartitionedTableWriterOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/PartitionedTableWriterOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/PipelineContext.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: consider testing with system partitioning
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Why we start here from -1 and not from 0? then adding 1 before modulo would not be required.
There was a problem hiding this comment.
then adding 1 before modulo would not be required
WDYM?
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
How this will detect that we have heavy used partition? same with physicalWrittenBytes >= writerMinSize * maxWriterCount after some time (and size) we will just start writers if we have memory buffers filled.
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
is this related to count? it is more like getting next worker id/pointer
There was a problem hiding this comment.
Is this general rule that we are using Optional isPresent with ternary instead of we use map/orElseGet?
The only difference wrt to TableWriterOperator is that it creates a separate page sink per partition and reports the partition level physicalWrittenBytes which can be used for scaling skewed partitions at local exchange.
This new method helps the engine to identify whether writer scaling per partition is allowed.
| @@ -88,7 +88,7 @@ public LocalExchange( | |||
| int defaultConcurrency, | |||
There was a problem hiding this comment.
Refactor LocalExchange -> Pass partition channel types directly to LocalExchange
| import static java.util.Objects.requireNonNull; | ||
| import static java.util.concurrent.TimeUnit.NANOSECONDS; | ||
|
|
||
| public abstract class AbstractTableWriterOperator |
There was a problem hiding this comment.
Add commit that renames current operator first, then add a commit that makes it abstract
| updateWrittenBytes(); | ||
| } | ||
|
|
||
| protected abstract List<ListenableFuture<?>> writePage(Page page); |
There was a problem hiding this comment.
group all abstract protected methods below public methods
| bucketToPartition); | ||
| } | ||
|
|
||
| public Function<Page, Page> createPartitionPagePreparer(PartitioningHandle partitioning, List<Integer> partitionChannels) |
There was a problem hiding this comment.
public class SelectChannels
implements PartitionFunction {
public SelectChannels(PartitionFunction delegate, PartitioningHandle partitioning, List<Integer> partitionChannels) {
..
}
}
| } | ||
|
|
||
| // Specifies if writing to partition has to be performed by a single writer instance | ||
| default boolean isSingleWriterPerPartition() |
There was a problem hiding this comment.
partitioning handle is insert agnostic. After some thought, it's better to put it in ConnectorMetadata:
boolean ConnectorMetadata#isSingleWriterPerPartition(ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
| public static final String SCALE_WRITERS = "scale_writers"; | ||
| public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled"; | ||
| public static final String TASK_SCALE_WRITERS_MAX_WRITER_COUNT = "task_scale_writers_max_writer_count"; | ||
| public static final String TASK_SCALE_WRITERS_PARTITION_COUNT = "task_scale_writers_partition_count"; |
There was a problem hiding this comment.
I think it's very technical. Let's just keep it as config (maybe) or just some high enough number (like 10k).
| return physicalWrittenBytes + value; | ||
| })); | ||
| } | ||
| return result; |
There was a problem hiding this comment.
return ImmutableMap.copyOf(...)
| return physicalWrittenBytes + value; | ||
| })); | ||
| } | ||
| return result; |
There was a problem hiding this comment.
return ImmutableMap.copyOf(...)
|
Closing this in favour of #14718 |
The approach and problem are documented here: #13379
Note: Few tests are still pending which I'm working on
Description
improvement
core query engine
Increase the performance of partitioned writes with skewness.
Related issues, pull requests, and links
Documentation
( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
( ) Release notes entries required with the following suggested text: