-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Improve HashBuilderOperator unspill parallelism #26076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @osscm |
pettyjamesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 - I'm not sure I understand the purpose of segmenting the individual files at 64MB boundaries. The code comments mention that we avoid materializing more than one segment at a time- but the code already avoided materializing more than one page from the input stream on read back at a time. Segmenting the files is adding a good bit of complexity to the implementation and it seems unnecessary on a first read through.
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpillerFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpillerFactory.java
Show resolved
Hide resolved
Currently, when join hits spilling, it's parallelism will be reduced to single IO stream (build side unspilling) and single CPU thread (reconstructing of lookup source). This PR addresses the first issue (single IO stream). It was observed, that this PR reduces IO part of join unspilling from minutes to seconds.
Segments are deterministic. When reading segments back it's possible to restore original pages order without any extra index. This allows to read back segments in parallel, which is the purpose of this PR. |
pettyjamesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still seems massively more complicated than necessary. Wouldn't it be simpler to cut files at the segment boundary and support "pipelined" parallelism instead (i.e.: allow concurrent read-back of up to n files)? It seems like that would give you more control over the read back concurrency and simplify the logic significantly.
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Outdated
Show resolved
Hide resolved
You mean create a new file for each segment? It would create a massive number of files that have to be tracked. Not ideal if you want to keep memory and resources constrained. Also, it wouldn't necessarily make it simpler as you still have to keep track of segment size, but now you also have to keep track of potentially hundreds of thousands of spill files. |
|
I think this can be simplified using the approach James suggested. For the spill, you can create a directory (in each target) for the spill and then number the files sequentially. Each file is 64MB and you form a total order. If file count becomes a problem you could scale up the file size as more files are created, but I don't expect this to be a problem on local disks. The memory requirement is just the directory name and the max sequence number. This will also make cleanup on local disks trivial as you can just delete the directories. You could also parallel delete, but I don't expect that to be a problem on local disks. I think this might also makes unspill easier to auto tune since you can have parallel loaders and simply monitor the ordered output queue of the pages. When the queue if not empty, you can reduce readers... you would also want to reduce readers if the time spent waiting for reads is high... anyway a future optimization. |
|
@dain @pettyjamesm there are multiple ways to tackle this issue as we can see. I don't see one or the other dramatically simpler or harder. I think we all understand the approach I've proposed, so clearly it's not that complex. Unless this approach is fundamentally broken, I would rather keep it. BTW: you are mostly arguing with AI generated code. None of the lines here were written by me, but the concept and review of the final code come from me. |
I don't agree that the approaches are fundamentally equivalent in terms of complexity. Tracking the current byte offset within a fixed number of open iterators and switching between them during read-back (while maintaining parallelism) is definitely harder to reason about compared to "reading fully" from N files with each input file being the unit of parallelism. As noted in our offline conversation, there are other upsides to using the file level approach like the ability to delete spill data from disk incrementally and the ability to tune the level of parallelism higher 1 thread per spill path. |
It's tracking page size really. Something that has to be done in any approach during spill. In unspill, you suggest to replace page size tracking with tracking of multiple spill files in spill directories (new concept in spill!). I would argue it's even more complex with possible side-effects from OS (what iffs?)
iterators won't go away independently of approach. In dir approach they would be nested too (per dir, per file). No complexity saved.
It's a marginal difference and not really a contract of
We would rather like to reduce parallelism sometimes, which is possible with this solution. Overall, I still don't see any fundamental flaw in this approach and refuse to refactor PRs (or redo them completely) based on taste :) |
I find the approach quite complex and I think this can and should be done in a much simpler way. For the spilling code, there are very few maintainers, and we by necessity need to keep this as simple as possible. Therefore, I do not think we should merge this code, and we should focus on the simpler design. |
I spend last few posts explaining why it's not more complex than other approaches. I think the only alternative here is not to land this or any subsequent PRs that improve spill and join performance. That is also an option. I could also add a feature toggle.
In general there are very few active OSS maintainers left. Does that mean that the project should stall? That is an option too. |
|
This pull request has gone a while without any activity. Ask for help on #core-dev on Trino slack. |
|
Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time. |
1ec5446 to
04aceb0
Compare
|
@pettyjamesm PTAL. I've simplified the approach and used RR to distribute pages across files. |
81540e0 to
df32212
Compare
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Outdated
Show resolved
Hide resolved
df32212 to
044494a
Compare
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Outdated
Show resolved
Hide resolved
HashBuilderOperator is unspilling sequentially partition by partition. This commit improves join unspilling performance by making FileSingleStreamSpiller unspill single partition in parallel. FileSingleStreamSpiller is enhanced so it can spill to/unspill from multiple files.
044494a to
806728b
Compare
pettyjamesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @sopel39
|
@pettyjamesm thanks for review |
| return create(types, spillContext, memoryContext, false); | ||
| } | ||
|
|
||
| SingleStreamSpiller create(List<Type> types, SpillContext spillContext, LocalMemoryContext memoryContext, boolean parallelSpill); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SingleStreamSpiller maintains data order of data being spilled and read back (FIFO)
is it still the case when it's created as parallelSpill?
if yes -- we should deprecate the old method and use the new one
if no - SingleStreamSpiller interface needs update
HashBuilderOperator is unspilling sequentially partition by partition. This commit improves join unspilling performance by making FileSingleStreamSpiller unspill single partition in parallel. FileSingleStreamSpiller is enhanced so it can spill to/unspill from multiple files.
This improves unspilling time for join from minutes to seconds on larger machines
Description
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:
Fixes #26007