Aggregation ORDER BY & DISTINCT spilling#14527
Conversation
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
highker
left a comment
There was a problem hiding this comment.
As discussed offline, let's spill RowBlocks for intermediate states.
eb2fdeb to
a69f43e
Compare
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
highker
left a comment
There was a problem hiding this comment.
The overall idea is on the right direction. Here is the high-level design suggestion to unify the solution for both ordering and distincting cases:
- Both Ordering and Distincting (Grouped)Accumulators do not have xxxIntermediateXxx() interfaces implemented. Because they do not support partial aggregation. So let's create a new class called FinalOnly(Grouped)Accumulator. This class should implement all xxxIntermediateXxx() interfaces by throwing
UnsupportedOperationException. Have both Ordering and Distincting (Grouped)Accumulators inheriting FinalOnly(Grouped)Accumulator. - Create new classes inheriting (Grouped)Accumulators called
SpillableFinalOnly(Grouped)Accumulator. The classes take FinalOnly(Grouped)Accumulators as a delegate. SpillableFinalOnly(Grouped)Accumulator is responsible for maintaining the hashtable state. UseObjectBigArrayfor the hashtable. (Check my inline comment). - SpillableFinalOnly(Grouped)Accumulator should implement xxxIntermediateXxx() interfaces. Together with the addInput interface, they should build the hashtable for RowBlock <-> Page mapping. As we have discussed offline. The hashtable is only to accumulate the original data by grouping data into different group Ids so that we can spill by groups. (We do not lose information at this step though it may use quite a lot of memory).
- SpillableFinalOnly(Grouped)Accumulator prepare/evalFinal interfaces should call addInput/prepareFinal/evalFinal interfaces for its FinalOnly(Grouped)Accumulator delegate.
- Directly create corresponding FinalOnly(Grouped)Accumulator if spilling is not enabled. Otherwise, create SpillableFinalOnly(Grouped)Accumulator wrapping FinalOnly(Grouped)Accumulator.
cc @arhimondr in case there is better design.
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
3912702 to
9909c3d
Compare
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
9909c3d to
6639b91
Compare
highker
left a comment
There was a problem hiding this comment.
Let's clean up the PR with the design proposed. That will make the review easier.
6639b91 to
2073f62
Compare
|
@highker planning on doing a full refactor with proposed design once distinct is working. I pushed my work so far, having some trouble with array_agg, will update when it works. |
2073f62 to
2e869d8
Compare
sachdevs
left a comment
There was a problem hiding this comment.
Distinct and order by spilling work now. There are one main glaring issue to iron out before we continue, we need to figure out how to reliably detect if unspill is happening. Due to the current logic, things like array_agg(DISTINCT x ORDER BY y) doesn't work as well as simple things like, "SELECT count(distinct x), y FROM t GROUP BY y" since we look at the last block of the unspilt page to see if it is an array block. See the attached comment for details.
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
2e869d8 to
ac1d598
Compare
highker
left a comment
There was a problem hiding this comment.
Did a fast skim through DistinctingGroupedAccumulator. The logic looks legit. (Didn't get into details line by line).
In terms of if (page.getBlock(page.getChannelCount() - 1) instanceof ArrayBlock) {, We might be able to add a new interface for GroupedAccumulator to hint the input is a new input or an unspilled input. But I would hold this new interface until we have the abstraction ready. Then we can better evaluate how to have the interface change.
7dc0fb3 to
ecd5f18
Compare
sachdevs
left a comment
There was a problem hiding this comment.
Summarized the main issues with the design so far.
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
ecd5f18 to
ec43556
Compare
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
3691c6e to
e7b5eb5
Compare
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...main/src/main/java/com/facebook/presto/operator/aggregation/FinalOnlyGroupedAccumulator.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
|
Will address these comments + look into page compaction for next iteration. EDIT as discussed offline, page compaction/ memory reduction for distinct has been made into its own task. |
e7b5eb5 to
14268e4
Compare
presto-main/src/main/java/com/facebook/presto/operator/aggregation/AccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Yeah since memory usage is updated in updateMemory() to be set to the size of the empty hash agg builder. This is not correct since we never spilt in the first place in startMemoryRevoke.
There was a problem hiding this comment.
If I understand correctly, the original logic can only happen if an operator that has never spilled and just started to build final results; however, a revoke request comes in. Am I right? If that is the case, shall we make a comment here?
There was a problem hiding this comment.
Yeah so essentially we are declining memory revoking in the case that the hashaggbuilder has already completed. This is only set to true when InMemoryHashAggregationBuilder.buildResult is called - NOT InMemoryHashAggregationBuilder.buildHashSortedResult. This is because after buildResult, spilling should be impossible, because it can no longer process any more input anyway.
f69f151 to
0086104
Compare
highker
left a comment
There was a problem hiding this comment.
I would be surprised if spillEnabled cannot be tunneled from LocalExecutionPlanner... Can we give it a try and see what will happen?
0086104 to
8f9f6ea
Compare
8f9f6ea to
58dddc4
Compare
|
Last update should fix any failing checks. |
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Is this right? the function is to create IntermediateAccumulator but here it delegates to GroupedAccumulator and then use createDefaultGroupedAccumulator instead of createDefaultGroupedIntermediateAccumulator?
There was a problem hiding this comment.
Yes since we do the same logic in either case since we do not care about the underlying accumulator working on intermediate values in our FinalOnlyGroupedAccumulator/SpillableFinalOnlyGroupedAccumulator delegate. If I was to separate these functions the resulting logic should be the same. It originally was separate but I noticed it could be simplified.
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
25d84bd to
6107334
Compare
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actually, is it even possible we need createSpillableGroupedIntermediateAccumulator()? Can we check if the following is good enough?
@Override
public GroupedAccumulator createGroupedIntermediateAccumulator()
{
checkState(!hasDistinct() || !hasOrderBy(), "distinct or order by cannot have partial aggregation");
try {
return groupedAccumulatorConstructor.newInstance(stateDescriptors, ImmutableList.of(), Optional.empty(), lambdaProviders);
}
catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}There was a problem hiding this comment.
We do need this since before:
-
createGroupedIntermediateAccumulatorused to be never called (in case of ORDER BY or DISTINCT) since order by and distinct did not have support for intermediate results. Hence, when spill is enabled, we usecreateGroupedIntermediateAccumulatorto recreate the intermediate version of the accumulator with the spillable wrapper when unspilling and creating intermediate accumulators. -
checkState(!hasDistinct() || !hasOrderBy())not sure what this does exactly in this context since having distinct AND orderby shouldnt be a state failure.createGroupedIntermediateAccumulatorcan be called with hasDistinct/hasOrderBy set to true. -
return groupedAccumulatorConstructor.newInstancewould be a bug since this references the underlying accumulator (NOT order by or distinct, but the accumulator inside of order by or distinct). This means that during intermediate accumulation we would get non-distinct non-ordered values.
I actually tried writing this code without this section in a previous iteration but I realized we need the spillable wrapper in the intermediate case.
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
6107334 to
b53600f
Compare
This PR implements ORDER BY and DISTINCT spilling for use in aggregation functions. Will be publishing docs on implementation details and updating this PR in the future.