Convert the retrieval of the active data files to streaming#20054
Convert the retrieval of the active data files to streaming#20054ebyhr merged 5 commits intotrinodb:masterfrom
Conversation
alexjo2144
left a comment
There was a problem hiding this comment.
We may want to consider standardizing all the code here on Iterators rather than Streams. Going back and forth has some overhead and generally I think we prefer iterators for performance sensitive things.
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
|
Do we have a potential memory leak with this change? The pageSource is opened, but if the entries are not consumed to the very last one, the pageSource is not being closed. Do we maybe need a concept similar to https://github.com/apache/iceberg/blob/263b530502e5597b19b6b5e282917af8eede7600/api/src/main/java/org/apache/iceberg/io/CloseableIterator.java#L29 ? |
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
...-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java
Outdated
Show resolved
Hide resolved
We should. One possible alternative could be to use that streams are |
fa485f7 to
417003e
Compare
417003e to
1e33c29
Compare
1e33c29 to
055c7bd
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
055c7bd to
704400b
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
0f02eb6 to
7b29295
Compare
findepi
left a comment
There was a problem hiding this comment.
"Rewrite data file filtering logic for OPTIMIZE statements"
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
71b50e4 to
8a2edbf
Compare
|
Rebased on |
8a2edbf to
4f8c6ef
Compare
4f8c6ef to
20fe80a
Compare
|
#20121 merged, please rebase. |
63bfb21 to
624ee09
Compare
findepi
left a comment
There was a problem hiding this comment.
"Add close logic for the Delta Lake splits"
-- add some color to the commit description
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java
Outdated
Show resolved
Hide resolved
findepi
left a comment
There was a problem hiding this comment.
"Process lazily multi-part checkpoint files"
...n/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java
Outdated
Show resolved
Hide resolved
...n/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java
Outdated
Show resolved
Hide resolved
...n/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java
Outdated
Show resolved
Hide resolved
624ee09 to
5ff2514
Compare
There was a problem hiding this comment.
Could there be a potential resource leak here (which might not be introduced as part of this PR)? We call TrinoInputFile.newInput() within ParquetPageSourceFactory.createPageSource as part of the constructor of CheckpointEntryIterator. If some later part of the constructor fails, we likely won't get to close the file.
There was a problem hiding this comment.
We call TrinoInputFile.newInput() ...
I'm assuming you are refferring to
If some later part of the constructor fails, we likely won't get to close the file.
There is actually exception handling in place for this specific scenario. See
I'm adding though exception handling in the CheckpointEntryIterator constructor to make sure that the page source gets closed in case that there are eventual exceptions happening after initializing the pageSource field.
Instead of waiting to get in bulk the collection of all the active data files of a table relevant to a query, provide a stream of active add entries so that the split manager can start providing to the engine splits right away. The active add entries stream is closeable in order to ensure that the underlying checkpoint parquet page source gets closed even when not all the entries from the checkpoint are consumed.
5ff2514 to
ce75821
Compare
|
Looks good to me. How about using |
I tried adding |
|
can |
Description
Instead of waiting to get in bulk the collection of all
the active data files of a table relevant to a query,
provide a stream of active add entries so that the
split manager can start providing to the engine splits right away.
The active add entries stream is closeable in order to ensure that
the underlying checkpoint parquet page source gets closed
even when not all the entries from the checkpoint are consumed.
These changes should provide visible improvements in terms of reduced memory requirements when dealing with Delta Lake tables having many active
addfiles, because as of now these files will not be held anymore in memory withinDeltaLakeSplitManagerdue to the fact that theaddfiles are processed as a consequence of the current changes now one by one.Potential downside The checkpoint entry iterator will keep open the parquet page source corresponding to the checkpoint (multipart) file until all the
addentries are processed by theDeltaLakeSplitSource.Additional context and related issues
The access to the active add stream needs to be done within a try-with-resources statement to make sure that the checkpoint iterator does indeed release the opened resources.
The key output of these changes is that in the
DeltaLakeSplitManager&DeltaLakeSplitSourcedon't block anymore until all theaddentries relevant to the query are collected, but instead the splits are provided gradually (in parallel with the query execution).This change should particularly speed up the planning of the queries involving Delta Lake tables where the table statistics are not required because the split generation can start right away.
Fixes #20088
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: