Batch join lookup source#13352
Conversation
|
Benchmark-joinbatching.pdf |
|
Could you fix #12618 (comment) and #12618 (comment) as a prerequisite? |
lukasz-stec
left a comment
There was a problem hiding this comment.
I took a quick look.
Generally, this looks like a step in the right direction, but I wonder if there is gonna be a follow-up to batch PageJoiner, e.g. getNextJoinPosition.
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/BigintPagesHash.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| private boolean currentRowContainsNull() | ||
| private long[] fillCache() |
There was a problem hiding this comment.
this looks like a workaround for not batching PageJoiner. Do you plan to add batching there at some point?
There was a problem hiding this comment.
It is more complicated than it looks like. There is no 1:1 relation between probe and build side and the JoinProbe is the object responsible for handling that relation. So batching on any other level can be done only in certain conditions.
I do plan adding some quasi-batching capabilities to PageJoiner but do not consider this JoinProbe batching as a workaround.
I will post a draft PR tomorrow with some changes in PageJoiner so you can take a look.
8b7b2fb to
a5e2dde
Compare
a5e2dde to
8e9bceb
Compare
|
I rebased the PR on top of #13432 |
8e9bceb to
36cd16b
Compare
cd8837d to
3314d66
Compare
radek-kondziolka
left a comment
There was a problem hiding this comment.
AN extremely good results for benchmarks.
LGTM.
core/trino-main/src/main/java/io/trino/operator/join/JoinHash.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/PartitionedLookupSource.java
Outdated
Show resolved
Hide resolved
3314d66 to
f6d4ac3
Compare
raunaqmorarka
left a comment
There was a problem hiding this comment.
I've only reviewed first 4 commits, feel free to address them in #13432
core/trino-main/src/main/java/io/trino/operator/join/BigintPagesHash.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/BigintPagesHash.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/BigintPagesHash.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/BigintPagesHash.java
Outdated
Show resolved
Hide resolved
| int blockPosition = decodePosition(address); | ||
| long value = joinChannelBlocks.get(blockIndex).getLong(blockPosition, 0); | ||
|
|
||
| int pos = getHashPosition(value, mask); |
There was a problem hiding this comment.
Would it help to store the hash position in an array here and do the lookups in a separate loop ?
There was a problem hiding this comment.
I don't want to optimize build side here, just probe side. But, indeed, this makes sense. I'll add it to my join todo list
f6d4ac3 to
c1afc4c
Compare
|
benchmark-join-batching.pdf |
c1afc4c to
ee3821a
Compare
ping |
core/trino-main/src/main/java/io/trino/operator/join/LookupSource.java
Outdated
Show resolved
Hide resolved
| @@ -32,8 +32,28 @@ public interface LookupSource | |||
|
|
|||
| long getJoinPosition(int position, Page hashChannelsPage, Page allChannelsPage, long rawHash); | |||
There was a problem hiding this comment.
we don't need this method for unspilled path.
There was a problem hiding this comment.
PartitionLookupSource uses it if the page is small.
core/trino-main/src/main/java/io/trino/operator/join/unspilled/PartitionedLookupSource.java
Show resolved
Hide resolved
| } | ||
|
|
||
| @Override | ||
| public long[] getJoinPosition(int[] positions, Page hashChannelsPage, Page allChannelsPage, long[] rawHashes) |
There was a problem hiding this comment.
I removed the 8 threshold in PartitionedLookupSource so now it always do batching, regardless of the page size.
That makes TestHashJoinOperator a pretty good unit-ish test for this method.
core/trino-main/src/main/java/io/trino/operator/join/unspilled/PartitionedLookupSource.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
27d1866 to
cb9551f
Compare
lukasz-stec
left a comment
There was a problem hiding this comment.
some comments to 'Calculate JoinProbe position in batch' commit.
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
94df5e6 to
d34e52b
Compare
core/trino-main/src/main/java/io/trino/operator/join/BigintPagesHash.java
Show resolved
Hide resolved
|
|
||
| // At this point for any reasoable load factor of a hash array (< .75), there is no more than | ||
| // 10 - 15% of positions left. We search for them in a sequential order and update the result array. | ||
| for (int i = 0; i < remainingCount; i++) { |
There was a problem hiding this comment.
do you think it's a good idea to extract this part to a separate method? it would be useful to see the impact of this part in the profiler output
There was a problem hiding this comment.
It will get inlined anyway. As of readability, I prefer those perf-optimised methods to be a bit longer but with some comments. I believe it is more readable that the standard clean code approach, given that the code is not easy to understand and never will be.
There was a problem hiding this comment.
It will get inlined anyway
Probably, but profilers can figure out the original method in most cases and attribute the time correctly e.g. in the flame graph.
There was a problem hiding this comment.
WDYM? If the method is inlined it does not exist in the JFR. Or am I missing something?
| int[] foundKeys = new int[positionCount]; | ||
|
|
||
| // Search for positions in the hash array. This is the most CPU-consuming part as | ||
| // it relies on random memory accesses |
There was a problem hiding this comment.
there is also values[foundKeys[index]] at line 228 that is random load, right?
There was a problem hiding this comment.
Correct. But it only fetches positions that has been found, which is often only a fraction.
| int[] foundKeys = new int[positionCount]; | ||
|
|
||
| // Search for positions in the hash array. This is the most CPU-consuming part as | ||
| // it relies on random memory accesses |
There was a problem hiding this comment.
positionEqualsCurrentRowIgnoreNulls can be actually way more expensive as depending on the number of channels it can have a lot more random memory accesses + it has to do the the equals logic
There was a problem hiding this comment.
Correct. Batching some methods from PagesHashStrategy might be a good follow-up
d34e52b to
3fb2393
Compare
sopel39
left a comment
There was a problem hiding this comment.
lgtm up to Implement batch API and PLS changes
core/trino-main/src/main/java/io/trino/operator/join/LookupSource.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/PartitionedLookupSource.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/PartitionedLookupSource.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/PagesHash.java
Outdated
Show resolved
Hide resolved
3fb2393 to
a3a3b96
Compare
a3a3b96 to
9487168
Compare
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
This looks suspicious, should this be Block block = nullableBlocks.get(i) ?
Would it be simpler to pass in list of indexes of nullable blocks of the probe page rather than the list of nullable blocks ?
There was a problem hiding this comment.
This looks suspicious, should this be Block block = nullableBlocks.get(i) ?
You are right. Fixed
Would it be simpler to pass in list of indexes of nullable blocks of the probe page rather than the list of nullable blocks ?
I prefer my way, because there is no two-level indexing.
There was a problem hiding this comment.
Can we add a test which would've caught the above issue ?
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
0171069 to
ad8ab1a
Compare
core/trino-main/src/main/java/io/trino/operator/join/JoinHash.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/JoinHash.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Can we add a test which would've caught the above issue ?
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
5beca33 to
2733ba3
Compare
There was a problem hiding this comment.
Is it possible to avoid duplicating this block of code in the else branch ? The only difference appears to be in the contents of positions.
There was a problem hiding this comment.
The first one has additional hashes argument
core/trino-main/src/main/java/io/trino/operator/join/unspilled/JoinProbe.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
The tests in this class initialize JoinProbe without really using it. After introducing cache to JoinProbe, the LookupSource is being used in the constructor and invokes this method.
-Replace int[] -> List<Integer> in constructor -Pass value by constructor rather than method -Add requireNonNull in constructors -Remove redundant field
Additionally add default implementations and calculate JoinProbe positions using the new methods.
2733ba3 to
cd10ada
Compare
I just tried to test it and it appears that the bug would not return bad data. The only symptom is that rows containing null values may be propagated further and more rows will be eligible for a join. But those rows will no be joined anyway, since the have null values. So this will at most result in some performance degradation which is not really testable. |
| if (!nullableBlocks.isEmpty()) { | ||
| Arrays.fill(joinPositionCache, -1); | ||
| boolean[] isNull = new boolean[positionCount]; | ||
| int nonNullCount = getIsNull(nullableBlocks, positionCount, isNull); |
There was a problem hiding this comment.
Why do we need to go though boolean[] isNull array as intermediate?
We can't we immediately compute int[] positions array (especially if there is a single nullable block)?
We can pre-allocate int[] positions to be positions = new int[positionCount].
There was a problem hiding this comment.
We had discussion along similar lines here #13352 (comment)
The code which uses the array of positions subsequently takes exactly sized array as input parameter, so an array copy is needed to satisfy current API. Maybe we can change that, but let's leave it as a follow-up.
There was a problem hiding this comment.
I'm not sure follow-up will happen :). It is be better to have interfaces in more or less stable form form get go.
Going though boolean[] isNull seems like a waste.
There was a problem hiding this comment.
Maybe boolean[] isNull make sense for multi-channel joins (rare), but for single channel joins they are just an overhead
Handle probe side of join in batches, as opposed to row-by-row. Benchmarks in the comment below
Description
improvement
core query engine
Improve performance of join operator
Related issues, pull requests, and links
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
(x) Release notes entries required with the following suggested text: