PartitionedOutputOperator RLE blocks support#11289
Conversation
skrzypo987
left a comment
There was a problem hiding this comment.
Skimmed, we'll talk offline
core/trino-spi/src/main/java/io/trino/spi/block/AbstractArrayBlock.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/AdaptiveFSMPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderData.java
Outdated
Show resolved
Hide resolved
|
I re-run the tpcds and tpch benchmarks and the 2nd run shows no regressions nor improvements. This is what I expected initially and means the first run was flake/benchmark variability. |
|
please rebase |
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java
Outdated
Show resolved
Hide resolved
6df3c97 to
e66a1b4
Compare
408e205 to
d2d92b5
Compare
|
There are still unit tests coming for some of the changes but i think it's ready for the first round of the review. |
d2d92b5 to
51563f0
Compare
|
by row processing bug fixed |
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/AdaptivePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/AdaptivePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/AdaptivePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/AdaptivePositionsAppender.java
Outdated
Show resolved
Hide resolved
51563f0 to
fc399fa
Compare
lukasz-stec
left a comment
There was a problem hiding this comment.
Comments addressed.
JMH results are still pending. I ran them on the older hardware and got strange results. Need to re-run it.
core/trino-main/src/main/java/io/trino/operator/output/AdaptivePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
...trino-main/src/main/java/io/trino/operator/output/BlockTypeDispatchingPositionsAppender.java
Outdated
Show resolved
Hide resolved
...trino-main/src/main/java/io/trino/operator/output/BlockTypeDispatchingPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/AdaptivePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/AdaptivePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/AdaptivePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/PageTestUtils.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/PageTestUtils.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
fc399fa to
206658a
Compare
lukasz-stec
left a comment
There was a problem hiding this comment.
some quick comments
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/BlockBuilderPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java
Outdated
Show resolved
Hide resolved
lukasz-stec
left a comment
There was a problem hiding this comment.
another set of comments addressed
core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java
Outdated
Show resolved
Hide resolved
3e357d1 to
db5538b
Compare
core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
What is the purpose of this test? Why all rows landed in partition number 0?
There was a problem hiding this comment.
this tests that the output of the PagePartitioner contains exactly the input when all partitions are concatenated (so regardless of the partitioning).
Not all rows go to partition 0. See comment below.
There was a problem hiding this comment.
this tests that the output of the PagePartitioner contains exactly the input when all partitions are concatenated (so regardless of the partitioning).
testProducesSameNumberOfRows?
There was a problem hiding this comment.
@sopel39 this tests the actual values in the output page matches input page/block so maybe testOutputEqualsInput?
core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java
Outdated
Show resolved
Hide resolved
95d0148 to
5989ef6
Compare
There was a problem hiding this comment.
this tests that the output of the PagePartitioner contains exactly the input when all partitions are concatenated (so regardless of the partitioning).
testProducesSameNumberOfRows?
To avoid block type pollution, use manual dispatch over Block class instead of class isolation. Before invoking dedicated PositionsAppender the input Block is flattened so the dedicated appender either gets flat Block or RLE Block with flat value. Each dedicated, type-specific, PositionsAppender is implemented without using BlockBuilder to increase performance and to avoid jit inlining issues. Microbenchmark results: Benchmark channelCount enableCompression nullRate partitionCount positionCount type baseline no isolation rle no isolation rle % BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 2 8192 BIGINT 1301.217 485.526 -62.68677707 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT 1324.132 541.814 -59.08157193 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_SKEWED 1338.207 548.653 -59.00088701 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 DICTIONARY_BIGINT 1552.893 649.265 -58.18997188 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_20_PERCENT 845.798 326.112 -61.44327605 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_20_PERCENT 656.293 437.755 -33.2988467 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_50_PERCENT 759.82 457.409 -39.80034745 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_80_PERCENT 850.399 472.21 -44.47194787 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT 991.474 507.408 -48.82286374 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT_MINUS_1 919.57 480.823 -47.71219157 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_RLE 825.266 359.845 -56.39648307 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BIGINT_PARTITION_CHANNEL_RLE_NULL 3.438 1.603 -53.37405468 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 LONG_DECIMAL 1560.603 633.44 -59.41056117 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 DICTIONARY_LONG_DECIMAL 1983.322 757.596 -61.80166408 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 INTEGER 1362.036 508.811 -62.64335157 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 DICTIONARY_INTEGER 1548.895 585.053 -62.22771718 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 SMALLINT 1290.688 482.476 -62.61869639 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 DICTIONARY_SMALLINT 1531.761 564.357 -63.15632791 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 BOOLEAN 1371.602 493.602 -64.01273839 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 DICTIONARY_BOOLEAN 1451.336 595.603 -58.96174284 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 VARCHAR 4027.003 3181.661 -20.99183934 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 DICTIONARY_VARCHAR 4226.032 3324.484 -21.33320335 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 ARRAY_BIGINT 738.844 505.346 -31.60315303 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 ARRAY_VARCHAR 2482.31 2247.198 -9.471500336 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 ARRAY_ARRAY_BIGINT 3804.695 3487.887 -8.326764695 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 MAP_BIGINT_BIGINT 2389.073 2509.251 5.030319291 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 MAP_BIGINT_MAP_BIGINT_BIGINT 9909.356 9710.362 -2.008142608 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 ROW_BIGINT_BIGINT 993.453 646.287 -34.94538745 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 ROW_ARRAY_BIGINT_ARRAY_BIGINT 2202.165 2205.541 0.1533036807 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 16 8192 ROW_RLE_BIGINT_BIGINT 878.079 788.256 -10.2294896 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0 256 8192 BIGINT 1670.45 943.49 -43.5188123 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 2 8192 BIGINT 1602.083 855.094 -46.62611113 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT 1450.967 695.945 -52.03578028 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_SKEWED 1749.628 992.395 -43.27965716 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 DICTIONARY_BIGINT 1768.296 794.583 -55.06504567 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_20_PERCENT 1079.607 595.375 -44.85261767 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_20_PERCENT 909.682 708.798 -22.08288171 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_50_PERCENT 1007.795 715.258 -29.02743117 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_80_PERCENT 1108.153 742.894 -32.96106224 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT 1235.82 776.522 -37.16544481 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT_MINUS_1 1163.168 748.659 -35.63621076 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_RLE 1251.835 778.291 -37.82798851 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BIGINT_PARTITION_CHANNEL_RLE_NULL 5.362 3.367 -37.20626632 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 LONG_DECIMAL 1702.922 772.704 -54.62481546 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 DICTIONARY_LONG_DECIMAL 2086.265 889.509 -57.36356599 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 INTEGER 1462.114 636.766 -56.44894995 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 DICTIONARY_INTEGER 1765.352 745.925 -57.74638712 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 SMALLINT 1507.112 623.829 -58.60765491 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 DICTIONARY_SMALLINT 1718.672 723.018 -57.93158904 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 BOOLEAN 1437.562 563.497 -60.80189933 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 DICTIONARY_BOOLEAN 1581.486 654.235 -58.63162873 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 VARCHAR 3723.315 2907.568 -21.90915891 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 DICTIONARY_VARCHAR 3869.583 3013.539 -22.12238373 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 ARRAY_BIGINT 905.913 692.206 -23.59023438 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 ARRAY_VARCHAR 2218.627 1844.282 -16.8728227 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 ARRAY_ARRAY_BIGINT 3499.471 3289.49 -6.000364055 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 MAP_BIGINT_BIGINT 2331.617 2309.136 -0.9641806523 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 MAP_BIGINT_MAP_BIGINT_BIGINT 7705.042 7388.883 -4.103274194 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 ROW_BIGINT_BIGINT 942.5 908.927 -3.562122016 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 ROW_ARRAY_BIGINT_ARRAY_BIGINT 2341.541 2318.315 -0.9919108826 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 16 8192 ROW_RLE_BIGINT_BIGINT 827.48 813.803 -1.652849616 BenchmarkPartitionedOutputOperator.addPage 1 FALSE 0.2 256 8192 BIGINT 1861.543 1042.849 -43.97932253 BenchmarkPartitionedOutputOperator.addPage 2 FALSE 0 2 8192 BIGINT 1545.104 604.727 -60.8617284
Split one complicated loop inside partitionNotNullPositions into two simple ones. before Benchmark (channelCount) (enableCompression) (nullRate) (partitionCount) (positionCount) (type) Mode Cnt Score Error Units BenchmarkPartitionedOutputOperator.addPage 1 false 0 16 8192 BIGINT avgt 10 642.430 ± 24.573 ms/op after Benchmark (channelCount) (enableCompression) (nullRate) (partitionCount) (positionCount) (type) Mode Cnt Score Error Units BenchmarkPartitionedOutputOperator.addPage 1 false 0 16 8192 BIGINT avgt 10 484.876 ± 4.875 ms/op
5989ef6 to
018bc00
Compare
| /** | ||
| * Copy {@code length} bytes from {@code block}, at position {@code position} to {@code count} consecutive positions in the {@link #bytes} array. | ||
| */ | ||
| private void duplicateBytes(Block block, int position, int count, int startOffset) |
There was a problem hiding this comment.
Do you need to pass startOffset since you also use currentOffset below?
There was a problem hiding this comment.
Yeah, i noticed that too today. Its not needed (but it s equal to currentOffset )
There was a problem hiding this comment.
You could remove it altogether (I mean currentOffset). No need to track length twice (in offsets array and currentOffset)
|
|
||
| /** | ||
| * Copy {@code length} bytes from {@code block}, at position {@code position} to {@code count} consecutive positions in the {@link #bytes} array. | ||
| */ |
There was a problem hiding this comment.
I would rename it to copyBytes too
| private static final int INSTANCE_SIZE = ClassLayout.parseClass(SlicePositionsAppender.class).instanceSize(); | ||
| private static final Block NULL_VALUE_BLOCK = new VariableWidthBlock(1, EMPTY_SLICE, new int[] {0, 0}, Optional.of(new boolean[] {true})); | ||
|
|
||
| private boolean initialized; |
There was a problem hiding this comment.
Is this field needed in appenders since ensureBytesCapacity works just fine without it?
There was a problem hiding this comment.
ensurePositionCapacity uses it
There was a problem hiding this comment.
It doesn't have to. It can have same approach as ensureBytesCapacity, right?
| if (rlePositionCount == 0) { | ||
| return; | ||
| } | ||
| int sourcePosition = 0; |
Description
RLE support in the PartitionedOutputOperator.
The core of this c
orc-sf1K-part-rle-run3.pdf
hange is to refactor stateless and class isolated
PositionsAppenderto a stateful class that can buildBlock's. Possibly the name should change to something like 'BatchBlockBuilder'.The reason the new implementation does not need isolation is twofold.
io.trino.spi.type.Types are handled by dedicatedPossitionAppendersDictionaryBlockandRunLengthEncodedBlockare handled by manual dispatch inBlockTypeDispatchingPositionsAppenderOn top of this change,
AdaptivePositionsAppenderis added to support processing RLE blocks efficiently.Latest benchmarks below
The JMH results seem to confirm it working as expected for RLE + improvements with using primitive arrays directly instead of
BlockBuilders.The tpch/.tpcds shows ~2% improvement, which is expected given partitioned exchange is on average ~4% of query cost.
orc-sf1K-part-rle-run3.pdf
Related issues, pull requests, and links
Documentation
( x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
(x ) No release notes entries required.
( ) Release notes entries required with the following suggested text: