Implement flushing for partial TopNOperator#10166
Implement flushing for partial TopNOperator#10166JunhyungSong wants to merge 1 commit intotrinodb:masterfrom
Conversation
|
Any updates? |
|
@erichwang can you help here? |
|
any updates? I will fix the merge conflict after reviews. |
|
I took a quick look through this PR, but I'm confused a bit about the unfinished semantics and why we need that. Can you explain a bit more about the intended behavior? |
There was a problem hiding this comment.
Isn't "topNProcessor.isBuilderFull()" guaranteed to be false due to the checkState above?
There was a problem hiding this comment.
It can be full after "addInput".
There was a problem hiding this comment.
This will break how processors and state in general is intended to work in Trino. Once a a Processor moves to a finished state, it shouldn't ever be able to move out of it. It is expected to be a terminal state.
Can you explain a bit more about what you are trying to do with this?
There was a problem hiding this comment.
This is due to the way WorkProcessor works. TopNPages.process will be never called until TopNOperator is finishing(a.k.a. pageBuffer.finish is called). So, flushing is not possible until TopNOperator's state is changed to finishing. If the state is not changed back from finishing(pageBuffer.finished == true) after the flushing, TopNOperator cannot receive the rest input pages. So, the rest input pages will be discarded. TopNRowNumberOperator and HashAggregationOperator haven't been migrated into WorkProcessor. So, those operators' flushing implementations don't have this issue. I know this might be a bit tricky. Please let me know if you have a better idea. Feel free to ping me through Slack as well.
There was a problem hiding this comment.
The problem here is that when you call finish() on the PageBuffer, the WorkProcessor can then report finished() state to downstream workprocessors, which indicates that no more data will appear in the future. This is not correct for partials. I think you can get this to work without finishing the PageBuffer. I believe an unfinished WorkProcessor can still yield data.
There was a problem hiding this comment.
I changed the logic to utilize PageBuffer, instead.
|
Any updates? |
There was a problem hiding this comment.
The problem here is that when you call finish() on the PageBuffer, the WorkProcessor can then report finished() state to downstream workprocessors, which indicates that no more data will appear in the future. This is not correct for partials. I think you can get this to work without finishing the PageBuffer. I believe an unfinished WorkProcessor can still yield data.
There was a problem hiding this comment.
Instead of requiring the pageBuffer to be not finished, can we can use the partial and isFlushing flags here to indicate whether input is needed? That way, we may not need the unfinish call?
There was a problem hiding this comment.
No need to change this since pageBuffer.isEmpty() == false when flushing.
There was a problem hiding this comment.
@sopel39, can you take a look at the WorkProcessor and double check this logic and state machine? it's been awhile since i've looked at these parts.
core/trino-main/src/main/java/io/trino/operator/TopNProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Outdated
Show resolved
Hide resolved
0371e77 to
14d2b5c
Compare
erichwang
left a comment
There was a problem hiding this comment.
Thanks @JunhyungSong, this is much easier for me to read. A few more comments for you, but I also want @sopel39 to take a look at some comments I left for him. The usage of the WorkProcessor feels kind of weird, and he would be able to best advise on how to do this properly.
There was a problem hiding this comment.
After you build the result on line 90, it's not clear that any of the other side effects happening to topNBuilder will be reflected in the output (even though it may not occur in practice), it just looks more correct to an average programmer to clear the topNBuilder immediately after calling buildResult, so we should do that (unless it makes this code incorrect somehow).
There was a problem hiding this comment.
Even though buildResult() is called, topNBuilder is still maintained in outputIterator. So, it needs to be memory-accounted until outputIterator is nullified. This is the reason why other operators like HashAggregationOperator and TopNRankingOperator maintain their builder until flushing is completed.
core/trino-main/src/main/java/io/trino/operator/TopNProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
@sopel39 this looks weird, what is the right way to do this?
There was a problem hiding this comment.
I agree that this approach is not ideal. This is like picking the lesser evil. @sopel39 if you have a better way to accomplish this, please let me know.
There was a problem hiding this comment.
I changed the implementation to add one more method in Transformation interface. With this, operators implementing WorkProcessor can handle input buffer full situation.
There was a problem hiding this comment.
@sopel39, this looks like an awkward way to do this. Do you have any suggestions on the right structure for this pattern?
There was a problem hiding this comment.
I agree that this approach is not ideal. This is like picking the lesser evil. @sopel39 if you have a better way to accomplish this, please let me know.
There was a problem hiding this comment.
I changed the implementation to add one more method in Transformation interface. With this, operators implementing WorkProcessor can handle input buffer full situation. BTW, I found that when late materialization is on, input pages will be conveyed through TopNPages(WorkProcessor.Transformation).process function by WorkProcessorPipelineSourceOperator. Since, the approach using pageBuffer is not able to cover this scenario, this approach is the best option that I can think of.
19001bb to
13ed3f0
Compare
I changed the implementation to add one more method in Transformation interface. With this, operators implementing WorkProcessor can handle input buffer full situation. |
core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
You can manage flushing entirely within process method, e.g:
if (!isFlushing && inputPage != null) {
addPage(inputPage);
if (partial && isBuilderFull()) {
isFlushing = true;
} else {
// accumulate more data
return needsMoreData();
}
}
// flushing or finishing (inputPage == null)
Page page = null;
while (page == null && !topNProcessor.noMoreOutput()) {
page = topNProcessor.getOutput();
}
if (page != null) {
return TransformationState.ofResult(page, false)
}
// all accumulated data have been outputted (topNProcessor.noMoreOutput() == true)
// and there will be no more input data
if (inputPage == null) {
return finished();
}
// all accumulated data have been outputted, resume consuming pages
isFlushing = false;
return needsMoreData();
There was a problem hiding this comment.
The problem is process method will be never called until finish method is called in non late materialization mode. Even in late materialization mode, it will keep sending input pages even if TopNOperator is in partial flushing mode.
There was a problem hiding this comment.
The problem is process method will be never called until finish
We should just make TopNOperator a WorkProcessorOperator (as for example FilterAndProjectOperator).
Please run BenchmarkTopNOperator afterwards
There was a problem hiding this comment.
I implemented new Workprocessor.Process for TopNOperator(similar to FilterAndProjectOperator). BenchmarkTopNOperator showed almost no discrepancy.
13ed3f0 to
adc1a90
Compare
|
@pettyjamesm please review it |
There was a problem hiding this comment.
We don't need TopNTransformingProcess. Just modifying https://github.com/trinodb/trino/pull/10166/files#r791829469 as in should be sufficient
There was a problem hiding this comment.
This is the benchmark result.
AdaptiveWorkProcessorOperator
Benchmark (positionsPerPage) (topN) Mode Cnt Score Error Units
BenchmarkTopNOperator.topN 32 1 thrpt 60 16.917 ± 1.182 ops/s
BenchmarkTopNOperator.topN 32 100 thrpt 60 17.369 ± 0.517 ops/s
BenchmarkTopNOperator.topN 32 10000 thrpt 60 5.286 ± 0.058 ops/s
BenchmarkTopNOperator.topN 1024 1 thrpt 60 23.479 ± 0.387 ops/s
BenchmarkTopNOperator.topN 1024 100 thrpt 60 22.266 ± 0.267 ops/s
BenchmarkTopNOperator.topN 1024 10000 thrpt 60 7.306 ± 0.087 ops/s
WorkProcessorOperator
Benchmark (positionsPerPage) (topN) Mode Cnt Score Error Units
BenchmarkTopNOperator.topN 32 1 thrpt 60 18.258 ± 0.512 ops/s
BenchmarkTopNOperator.topN 32 100 thrpt 60 17.205 ± 0.331 ops/s
BenchmarkTopNOperator.topN 32 10000 thrpt 60 5.377 ± 0.082 ops/s
BenchmarkTopNOperator.topN 1024 1 thrpt 60 22.251 ± 1.348 ops/s
BenchmarkTopNOperator.topN 1024 100 thrpt 60 21.964 ± 0.295 ops/s
BenchmarkTopNOperator.topN 1024 10000 thrpt 60 7.349 ± 0.118 ops/s
WorkProcessorOperator showed a little bit of performance degradation when positionsPerPage is bigger. Do you think we can disregard this?
There was a problem hiding this comment.
Looking at the results, they seem like they are in the margin error. Even if you ignore the error... my eyes say this it a few % difference, so my gut says, do the simplest thing.
There was a problem hiding this comment.
I already submitted a separate PR for this. #10832 I just didn't update comments here.
|
Migrate TopNOperator to WorkProcessorOperator |
|
Please rebase on top of #10832 |
adc1a90 to
3a4a9aa
Compare
Done. |
core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
nit: you can inline addPage. It's only used once
There was a problem hiding this comment.
I would prefer to have it as a separate method.
core/trino-main/src/main/java/io/trino/operator/TopNProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/TopNProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/TestTopNOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
testing/trino-tests/src/test/java/io/trino/tests/TestPartialFlushingOrderByQueries.java
Outdated
Show resolved
Hide resolved
3a4a9aa to
be65e2d
Compare
It will prevent accumulating too many rows in partial TopNOperator. So, it mitigates memory pressures in a cluster and results in less exceeding memory limit errors.
be65e2d to
3e0526f
Compare
|
Any updates? |
|
Any updates? |
|
Can you clarify whether this PR can be merged or not? It has been so song after addressing all the comments. |
I would like to learn about real use case for this PR |
|
👋 @JunhyungSong - this PR has become inactive. If you're still interested in working on it, please let us know, and we can try to get reviewers to help with that. We're working on closing out old and inactive PRs, so if you're too busy or this has too many merge conflicts to be worth picking back up, we'll be making another pass to close it out in a few weeks. |
|
Closing this one out due to inactivity, but please reopen if you would like to pick this back up. |
It will prevent accumulating too many rows in partial TopNOperator.
So, it mitigates memory pressures in a cluster and results in less exceeding memory limit errors.