Flush PagePartitioner dictionaries before release#19806
Flush PagePartitioner dictionaries before release#19806pettyjamesm merged 1 commit intotrinodb:masterfrom
Conversation
There was a problem hiding this comment.
why only dictionaries and not RLEs too?
nit: Generally, I've been thinking that we could have something like:
boolean UnnestingPositionsAppender#offer(io.trino.spi.block.Block source)
which would return false if appender would be flattened. This way we could flush RLE or dictionaries even if full page was not collected.
There was a problem hiding this comment.
why only dictionaries and not RLEs too?
I thought about doing that but decided against it because in theory, RLE's could see the same value on the next input from another driver after reuse whereas it's not possible for the same to occur for dictionaries- and we already have the "direct size limit if flattened" logic for RLE's too which somewhat mitigates the worst case of what will happen if we don't see the same RLE value when reused.
nit: Generally, I've been thinking that we could have something like:
boolean UnnestingPositionsAppender#offer(io.trino.spi.block.Block source)which would return
falseif appender would be flattened. This way we could flush RLE or dictionaries even if full page was not collected.
I gave that some thought, but decided against it because flushing the current page instead of flattening is something that you would have to check for each column on every input and could result in constant small page flushes due to single columns. This approach is a compromise in that we're only going to force a flush at the end of a driver processing and not on each input.
87affd5 to
7a560bf
Compare
There was a problem hiding this comment.
that seems wrong. Both should already be accounted for in partitionPage. Also flushing would make after size smaller than before. partitionPage specifically accounts before flushing because flushing can happen at later time.
PTAL @lukasz-stec
There was a problem hiding this comment.
I think you're right. This is already accounted for as part of the partitioning operation, sort of. In the case of dictionaries we know the results are very incorrect because of the way that the appender size is calculated, so I guess not reporting the result of any dictionary to direct flattenings that might occur here doesn't really make the situation any worse than it already is.
There was a problem hiding this comment.
as a result of transitioning dictionaries to direct mode
I think we need to simplify this. It's really odd that shouldForceFlushBeforeRelease() has a side effect of flattening block (better name would be flushOrFlattenIfNeccecery).
But I think it would be better to call flatten here explicitly.
There was a problem hiding this comment.
Separated shouldForceFlushBeforeRelease() from flattenPendingDictionary() and called those as appropriate here.
core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java
Outdated
Show resolved
Hide resolved
f03e732 to
ffc4f6a
Compare
There was a problem hiding this comment.
Why to do it here? It can be lazily done by next operator instance. This will cause output tracking issues as flattening might increase outputSizeInBytes, which is correctly handled by io.trino.operator.output.PagePartitioner#partitionPage
There was a problem hiding this comment.
When done lazily, we are guaranteed to know that all dictionary mode appenders will be forced to flatten. It will correctly report the output size in that situation, but at the cost of losing the dictionary representation (if profitable).
Another thing to note, is that when dictionary appenders flush without flattening, the output size is incorrect both for partitionPage and this new method. PartitionedOutputOperator stats are always incorrect in the presence of dictionary mode outputs. I think I can fix this at the same time, but it will need to reorganize the output bytes reporting logic.
There was a problem hiding this comment.
When done lazily, we are guaranteed to know that all dictionary mode appenders will be forced to flatten. It will correctly report the output size in that situation, but at the cost of losing the dictionary representation (if profitable).
If the logic is extracted to separate flattenStuff method, then we can measure size "correctly" and don't loose next dictionary, e.g: with code like:
long positionsAppendersSizeBefore = getPositionsAppendersSizeInBytes();
flattenStuff()
long positionsAppendersSizeAfter = getPositionsAppendersSizeInBytes();
operatorContext.recordOutput(positionsAppendersSizeAfter - positionsAppendersSizeBefore, 0 /* no new positions */);
There was a problem hiding this comment.
That works specifically for this prepare release approach, but doesn't fix the general problem when partitionPage flushes dictionaries since those don't get flattened and never had their actual size accounted for. I think I can fix both in this PR, will publish a new revision shortly.
There was a problem hiding this comment.
positionsAppender.shouldForceFlushBeforeRelease() should be in separate loop as only some later position appender could return shouldForceFlushBeforeRelease()==true. However, in that case previous appenders would already be flattened
There was a problem hiding this comment.
Those dictionaries are going to end up flattened anyways during compaction as part of serialization, but this does relate to the correctness of the output bytes reported (technically, this is more correct but still wrong because all dictionary outputs are reported incorrectly).
There was a problem hiding this comment.
Updated the PR with the new logic. It's currently implemented with a hard assertion that we never incorrectly report the output size so that I can get validation from the CI runs, but that's probably too aggressive of a check to keep in the final version.
986b806 to
94ff442
Compare
There was a problem hiding this comment.
I must say I don't understand this logic. Could we make it more straight-forward? Why outputSizeInBytes would be smaller than outputSizeReportedBeforeRelease?
There was a problem hiding this comment.
It shouldn't but this is defensive code that ensures that we never allow the "eagerly reported" quantity to go negative.
There was a problem hiding this comment.
Why would outputSizeReportedBeforeRelease go down?
There was a problem hiding this comment.
Because outputSizeReportedBeforeRelease represents output bytes that have already been reported in advance of the page being flushed as a result of the partitioner being released by a previous operator. In this logic we're "consuming" from the eagerly reported size since a page flush has occurred.
There was a problem hiding this comment.
what would happen if this is not zeroed?
There was a problem hiding this comment.
Presumably nothing, this is just precautionary to meet expected semantics for what close() should do.
There was a problem hiding this comment.
Do we actually need outputSizeReportedBeforeRelease as output page (flattened or not) is going to be produced one way or the other eventually? Why can't we just track page.getSizeInBytes() when the page is finally enqueued?
I feel like on one hand we are accounting size from produced pages, but also try to accommodate buffered pages somehow into that
With that in mind I feel like:
before = getPositionsAppendersSizeInBytes();
doSomething()
after = getPositionsAppendersSizeInBytes()
long outputSize = after - before
was easier to grasp
There was a problem hiding this comment.
Why can't we just track page.getSizeInBytes() when the page is finally enqueued?
Because the final page flush / enqueue could occur outside of the context of an operator to report the size against (as a result of PagePartitionerPool and reuse). That was the reason why the logic was previously changed to eagerly report buffered bytes before flushing, but critically- that approach would incorrectly report output size for all dictionary encoded buffers because the size they report during getPositionsAppendersSizeInBytes() is not the actual size.
The previous logic may have seemed simpler, but was incorrect hence the need to fix it. The new logic can be summarized as:
- Report
outputSizeInBytesfrom producedPage#getSizeInBytes()when pages are flushed, just like was previously done before partitioner reuse and just likeTaskOutputOperatordoes. - When releasing a partitioner for reuse (last chance where we are guaranteed to have an
OperatorContextto report against) we reportPage#getSizeInBytes()for any flushed pages, flatten any dictionary encoded appenders that didn't flush, and then report the appender buffered size in bytes eagerly (in case we don't have another chance to report that size in the future because the partitioner is not reused). Since we no longer have any dictionary encoded appenders at this point, we know that the reported buffer size accurately predicts the size of the page that would be flushed if no additional insertions occur which was not before. - In the case where the partitioner is reused, and new positions are appended- we have already accounted for the buffered size on the last release. We need to subtract that amount before reporting any additional output bytes after reuse.
94ff442 to
8b720cd
Compare
8b720cd to
e248333
Compare
PagePartitioners should either flatten their dictionary mode appenders into direct mode, or force flush their current page to preserve the dictionary encoding before being released to the pool for reuse since it is not possible for the appender to observe the same dictionary input when used from a different driver and the dictionary appenders do not accurately report their size and therefore may have been preventing a flush from occurring up until this point. Also fixes an issue where dictionary block outputs were severely under reporting their output size in bytes.
e248333 to
17045ed
Compare
Description
PagePartitionerinstances should either flatten their dictionary-mode appenders and transition to direct mode, or force their current page to preserve the dictionary encoding and force the current page to be flushed being released to the pool for reuse since it is not possible for the appender to observe the same dictionary input when reused by a different driver.Also fixes an issue where dictionary encoded output pages significantly under-report their output size for
PartitionedOutputOperator.Additional context and related issues
Follows up from #19762 which mitigates similar issues for RLE blocks, but using a different strategy that's more appropriate for dictionary block handling.
Release notes
(x) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: