Row wise group by on fixed width types#10706
Row wise group by on fixed width types#10706lukasz-stec wants to merge 15 commits intotrinodb:masterfrom
Conversation
There was a problem hiding this comment.
You can extract the first commit to a separate PR and merge it.
There was a problem hiding this comment.
ok, I'm gonna prepare the PR
There was a problem hiding this comment.
Get rid of the static import for create
There was a problem hiding this comment.
I'd rather throw exception here
There was a problem hiding this comment.
this is used to check if we can use the new GroupByHash implementation, so control flow in a normal case, i think Optional is better for this than exception.
There was a problem hiding this comment.
Just use long[] and the construct a block directly
There was a problem hiding this comment.
I wanted to be consistent with MultiChannelGroupByHash but I agree it makes sense to use array directly.
Will try to do it.
There was a problem hiding this comment.
It's harder to change if it's hardcoded. E.g. if the generated classes are not compiling for some reason (like API change) it's easy to delete and regenerate them if they are not referenced directly.
It's also easy to make, hard to spot, mistake in hard-coded code here.
There was a problem hiding this comment.
I don't think 20 is needed. I've never seen a group by using more than a few columns.
There was a problem hiding this comment.
Unless we want to generate it with bytecode generator.
There was a problem hiding this comment.
tpcds has queries with group by 8 - 10 columns e.g. q24. I think 20 is a reasonable number. It covers most of the cases and it's not too big (e.g. it's not much runtime overhead).
There was a problem hiding this comment.
Have you considered aligning values to cache lines? That may (or may not) speed up things significantly
There was a problem hiding this comment.
I haven't tried it. It's a good idea to test it.
For entries smaller than cache line it would make sure we are accessing only one cache line per entry instead of possible two. We may be accessing multiple entries per lookup in case of a collision so this is not a clear win.
Obviously, this would increase memory usage, but it may be a good trade-off.
There was a problem hiding this comment.
This can be done faster but I guess it's not a hot code path
There was a problem hiding this comment.
Well, it's kind of in the hot path because it's used during rehash (i.e. when the current table is too small and we have to extend it).
How can this be sped up?
There was a problem hiding this comment.
Anywhere between Arrays.fill and System.arraycopy of already filled parts.
There was a problem hiding this comment.
But I don't think it's worth the effort
8dcda03 to
87953e0
Compare
lukasz-stec
left a comment
There was a problem hiding this comment.
I addressed some comment +
- extracted GroupByHashFactory commit
- added dict and RLE support
- added VarHandleFastByteBuffer
There was a problem hiding this comment.
Well, it's kind of in the hot path because it's used during rehash (i.e. when the current table is too small and we have to extend it).
How can this be sped up?
There was a problem hiding this comment.
I haven't tried it. It's a good idea to test it.
For entries smaller than cache line it would make sure we are accessing only one cache line per entry instead of possible two. We may be accessing multiple entries per lookup in case of a collision so this is not a clear win.
Obviously, this would increase memory usage, but it may be a good trade-off.
There was a problem hiding this comment.
tpcds has queries with group by 8 - 10 columns e.g. q24. I think 20 is a reasonable number. It covers most of the cases and it's not too big (e.g. it's not much runtime overhead).
There was a problem hiding this comment.
It's harder to change if it's hardcoded. E.g. if the generated classes are not compiling for some reason (like API change) it's easy to delete and regenerate them if they are not referenced directly.
It's also easy to make, hard to spot, mistake in hard-coded code here.
There was a problem hiding this comment.
this is used to check if we can use the new GroupByHash implementation, so control flow in a normal case, i think Optional is better for this than exception.
There was a problem hiding this comment.
ok, I'm gonna prepare the PR
There was a problem hiding this comment.
I wanted to be consistent with MultiChannelGroupByHash but I agree it makes sense to use array directly.
Will try to do it.
|
Your benchmark PDF is missing results for partitioned/unpartitioned data (for such a big change) and peak memory metrics |
partitioned is there. I will run not partitioned |
|
I updated partitioned benchmark with peek memory statistics |
|
unpartitioned benchmark results added |
There was a problem hiding this comment.
"enhanced" is not descriptive enough.
There was a problem hiding this comment.
This can be done in a separate PR
There was a problem hiding this comment.
See if you can do that using instanceof FixedWidthType and getFixedSize method
There was a problem hiding this comment.
Isn't every type so far fixed size?
There was a problem hiding this comment.
What about other operators? There is a reason for decoupling data (block) type and operators based on that data.
Should any type has a specific equals operator, this will fail.
There was a problem hiding this comment.
Do these caches actually help?
There was a problem hiding this comment.
without it, we would start with interpreted code for every aggregation, every split, even every page. this would both slow the query down and increase pressure on the jit. I did not benchmark it though
There was a problem hiding this comment.
I understand that the other FastByteBuffer will be deleted?
There was a problem hiding this comment.
yes, possibly we could have another one (off-heap) if we need to handle large arrays
There was a problem hiding this comment.
This implementation limits possible number of groups. If the aggregation is made using several long fields it can easily get to 100B / row making this array unusable above 20M groups (2^31B / 100B).
There was a problem hiding this comment.
that's a valid point.
This is not as bad because for partial aggregation we will never hit 20M because of the memory limit and the final step is partitioned so the total number of groups is ~ groups per single hash table X number of nodes x number of cores (e.g. ~3B for 6 nodes with 32 cores).
Also, 100 bytes is for 8 bigints so not the common case.
That said it needs to be handled. One way is to switch to OffHeapFastByteBuffer once we hit this limit (we would have to manage off heap then). Another is to switch to the old columnar implementation.
There was a problem hiding this comment.
Anywhere between Arrays.fill and System.arraycopy of already filled parts.
There was a problem hiding this comment.
But I don't think it's worth the effort
|
@lukasz-stec how could that peak memory didn't go up (or even dropped). Are we correctly accounting mem in this PR? |
@sopel39 I suspect that peak memory is not "caused" by the |
There was a problem hiding this comment.
that's a valid point.
This is not as bad because for partial aggregation we will never hit 20M because of the memory limit and the final step is partitioned so the total number of groups is ~ groups per single hash table X number of nodes x number of cores (e.g. ~3B for 6 nodes with 32 cores).
Also, 100 bytes is for 8 bigints so not the common case.
That said it needs to be handled. One way is to switch to OffHeapFastByteBuffer once we hit this limit (we would have to manage off heap then). Another is to switch to the old columnar implementation.
There was a problem hiding this comment.
yes, possibly we could have another one (off-heap) if we need to handle large arrays
There was a problem hiding this comment.
without it, we would start with interpreted code for every aggregation, every split, even every page. this would both slow the query down and increase pressure on the jit. I did not benchmark it though
This uses additional column to decide which set o columns final should use
Extend BenchmarkPartitionedOutputOperator.pollute with row-wise code path pollution
BIGINT.getLong is called with multiple Block types i.e. not only LongArrayBlock but also DictionaryBlock, RunLengthEncodedBlock. This sometime confuses JIT to do virtual call in the PrecomputedHashGenerator even if the PrecomputedHashGenerator is only called with LongArrayBlock and the call to PrecomputedHashGenerator is inlined.
To avoid block type pollution, use manual dispatch instead of class isolation.
87953e0 to
f185141
Compare
|
I ran a benchmark for this (tpch/tpcds orc part sf1K) on top of the latest master that includes adaptive partial aggregation. |
I think we can start with having |
|
@lukasz-stec Rather than generating source code, we can use |
There are pros and cons to both approaches. it's way easier to read, understand and profile generated source code. |
|
👋 @lukasz-stec @sopel39 .. is this still being worked on or can we close this PR? |
|
Let's close it. Project hummingbird is going in this direction anyway. |

This adds another GroupByHash implementation that works only on multiple fixed-width types and can store hash table state in a row-wise manner.
Hash table implementation
I tested two hash table implementations. One that stores almost everything in one big array (
SingleTableHashTableData) and the second one that stores only group ids in a hash table but group values in the separate array (SeparateTableHashTableData). The single table is better for CPU because it only has one random read per row during hash table population (putIfAbsent). A separate table is better for memory utilization (it does not waste memory for empty hash table slots) but does at least two random memory reads (one to get groupId from hash table and the second one to get the values for a given group to compare to current row). For this reason,SingleTableHashTableDatais used.SeparateTableHashTableDatacould be potentially used to store variable width data (VARCHARs).Memory layout per hash table entry looks like this:
Code generation
To avoid virtual method calls ( + to have more independent code), the current implementation uses one-time source code generation + multiple classes classloader isolation instead of runtime byte code generation.
This is mainly to improve readability and maintainability. It's far easier to manually improve the generated code and then potentially change the generator. Also, it's easier to understand the code and analyze its performance in the tools like a profiler.
The main issue is that class isolation is complicated now. I think it's possible to improve it.
Tpch/tpcds benchmarks
there is a slight (~2%) but stable improvement in terms of CPU (i.e. for queries that improve, the improvement is much bigger than variability)
Partitioned orc sf1000
fixed-width-varhandle-single-table_with_mem_stats.pdf
Unpartitioned orc sf1000
fixed-width-varhandle-single-table_unpart.pdf
Jmh benchmark results.
Generally, a win across the board but significant improvement is for cases with multiple columns and many groups (i.e. hash table does not fit in L3).
Cases with 1 column here are for illustration only as 1 BigInt column is handled differently anyway (BigIntGroupByHash).
Experiments
The best gains from this change are for queries that have a group by with a lot of columns and number of groups in 10s K.
It would be even better for a larger number of groups but the current partial aggregation memory limitation (16MB) makes this not as good as it could be.
Below is a simple group by 8 BIGINT columns with 25K groups. It shows ~ 30% CPU drop and ~25% wall clock duration drop.
trino:tpch_sf1000_orc_part> set session use_enhanced_group_by=false; SET SESSION trino:tpch_sf1000_orc_part> EXPLAIN ANALYZE VERBOSE -> select orderkey % 100000, orderkey % 100000 + 1,orderkey % 100000 + 2, orderkey % 100000 + 3, orderkey % 100000 + 4, orderkey % 100000 + 5, orderkey % 100000 + 6, orderkey % 100000 + 7, count(*) -> from hive.tpch_sf1000_orc.lineitem -> group by orderkey % 100000, orderkey % 100000 + 1,orderkey % 100000 + 2, orderkey % 100000 + 3, orderkey % 100000 + 4, orderkey % 100000 + 5, orderkey % 100000 + 6, orderkey % 100000 + 7; ... Query 20220201_161708_00117_du6m2, FINISHED, 7 nodes http://localhost:8081/ui/query.html?20220201_161708_00117_du6m2 Splits: 3,260 total, 3,260 done (100.00%) CPU Time: 2871.9s total, 2.09M rows/s, 1.77MB/s, 48% active Per Node: 24.4 parallelism, 51M rows/s, 43.1MB/s Parallelism: 170.8 Peak Memory: 79.2MB 16.81 [6B rows, 4.96GB] [357M rows/s, 302MB/s] trino:tpch_sf1000_orc_part> set session use_enhanced_group_by=true; SET SESSION trino:tpch_sf1000_orc_part> EXPLAIN ANALYZE VERBOSE -> select orderkey % 100000, orderkey % 100000 + 1,orderkey % 100000 + 2, orderkey % 100000 + 3, orderkey % 100000 + 4, orderkey % 100000 + 5, orderkey % 100000 + 6, orderkey % 100000 + 7, count(*) -> from hive.tpch_sf1000_orc.lineitem -> group by orderkey % 100000, orderkey % 100000 + 1,orderkey % 100000 + 2, orderkey % 100000 + 3, orderkey % 100000 + 4, orderkey % 100000 + 5, orderkey % 100000 + 6, orderkey % 100000 + 7; ... Query 20220201_161740_00119_du6m2, FINISHED, 7 nodes http://localhost:8081/ui/query.html?20220201_161740_00119_du6m2 Splits: 3,260 total, 3,260 done (100.00%) CPU Time: 2065.2s total 2.91M rows/s, 2.46MB/s, 47% active Per Node: 23.7 parallelism, 68.7M rows/s, 58.1MB/s Parallelism: 165.6 Peak Memory: 264MB 12.47 [6B rows, 4.96GB] [481M rows/s, 407MB/s]