Fix partitioned output page flushing#19762
Conversation
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
Outdated
Show resolved
Hide resolved
aa02616 to
6952664
Compare
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java
Outdated
Show resolved
Hide resolved
6952664 to
7eb074b
Compare
7eb074b to
13a64f8
Compare
sopel39
left a comment
There was a problem hiding this comment.
lgtm % comments % benchmarks (will have results soon)
There was a problem hiding this comment.
The tests do assert on this value, since the test logic requires knowing what the value is in order to trigger the flushing based on position count without reaching the size limit.
core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
could we test with both RLE and dirct?
There was a problem hiding this comment.
To be honest, I'm not sure what we would really be asserting there. maxDirectSizeInBytes must be >= maxSizeInBytes so, we would always be considered full based on maxSizeInBytes and wouldn't be able to tell the difference except when appending in RLE mode.
8d691e5 to
d75e68b
Compare
|
benchmarks look good |
core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
this should most likely return OptionalLong and non-empty only for DIRECT, RLE
There was a problem hiding this comment.
I think we still want to count the current size in bytes for direct appenders towards the total direct size, but with a special case for RLE where we want to report the size "as if it were flattened". Dictionaries of course are still unavoidably under-reported here.
core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
Outdated
Show resolved
Hide resolved
d75e68b to
ee7ce59
Compare
Flushes PositionsAppenderPageBuilder after reaching 4x the PageProcessor.MAX_BATCH_SIZE positions. The previous limit of Integer.MAX_VALUE positions could easily overflow since append operations can insert more than a single row and isFull() is only checked after inserting an entire page into the builder. Additionally, buffering too many RLE rows in the builder can prevent it from reaching the size limit and starve downstream tasks from receiving any input at all.
Although the total exact size in bytes of UnnestingPositionAppender dictionaries is expensive to track, we should at least account for the dictionary ids size. Otherwise, repeated dictionary insertions don't increase the reported size at all.
ee7ce59 to
00f4269
Compare
Flushes PositionsAppenderPageBuilder entries if the cost of converting all RLE channels into direct channels would exceed the maximum page size by a factor of 8x. Previously, page builders could buffer a very large number of RLE positions without being considered full and then suddenly expand to huge sizes when forced to transition to a direct representation as a result of the RLE input value changing across one or more columns. In particular, this is can easily happen when pages are produced from CROSS JOIN UNNEST operations.
00f4269 to
a3a6a25
Compare
| (rleValue != null ? rleValue.getSizeInBytes() : 0); | ||
| } | ||
|
|
||
| void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator) |
There was a problem hiding this comment.
I think this could just return a record (and possibly OptionalLong for directSizeInBytes). IMO it would be cleaner and easier to understand than accumulator.
There was a problem hiding this comment.
I don’t think the extra allocation per appender per check is worthwhile for the small bit of extra clarity is worthwhile here. At that point, you can’t box the direct size into an optional because RLE’s have both a “size” and a separate “direct size” value that need to be summed independently from each other.
| void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator) | ||
| { | ||
| long sizeInBytes = getSizeInBytes(); | ||
| // dictionary size is not included due to the expense of the calculation, so this will under-report for dictionaries |
There was a problem hiding this comment.
dictionary size is not included due to the expense of the calculation
nit: actually there is io.trino.operator.output.UnnestingPositionsAppender#dictionary so it's easy to account for it.
There was a problem hiding this comment.
It’s easy to count for the retained size, but not for which positions are actually in the output and the size of those positions without expensive book keeping.
Description
Fixes a number of issues with the flushing behavior of
PositionsAppenderPageBuilderand related classes. The fixes are:PositionsAppenderPageBuilderto be full after at least 4xPageProcessor.MAX_BATCH_SIZE(4 * 8192 = 32,768) positions have been inserted, regardless of the current size in bytes. The previous behavior of only considering the builder full whendeclaredPositions == Integer.MAX_VALUEwas unsafe sincePositionsAppenderPageBuilder#isFull()is only checked after inserting an entire page not after each row, so integer overflows could occur.UnnestingPositionsAppenderwhen in dictionary mode, even though the size of the actually referenced entries is still not tracked. Previously, repeated insertions that used the same underlying dictionary would report no increase in the size of the builder and never flush. If at some later point a page were inserted with a different dictionary, the builder's transition to direct mode could then create very large memory allocations due to how many positions were buffered.UnnestingPositionsAppenderin RLE mode if it were to be transitioned to direct mode. Before this change, RLE mode appenders could accumulate an unbounded number of positions and report no size changes so long as the same value kept being inserted. At the point at which a different value was inserted, the transition to direct mode could cause huge spikes in memory usage and cause query failures and/or worker node heap OOMs.In particular, issues 2 and 3 are exceedingly common when performing
CROSS JOIN UNNESTqueries since theUnnestOperatorwill emit RLE and dictionary blocks that repeat the same value or use the same dictionary for extended periods before suddenly transitioning to new values or dictionaries.Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: