-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Implement distributed processing for iceberg $files system table #25677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
SystemTable supports ALL_NODES distribution. Could you please explain why we need to add a new table function? I'm not a fan of this approach. |
If I recall correctly, I tried to implement this as an It also seemed like the paradigm for |
b397410 to
d17720e
Compare
raunaqmorarka
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there should be two separate ways of querying $files table. We should improve the existing implementation.
I think you should be able to implement this by overiding io.trino.spi.connector.SystemTable#splitSource for io.trino.plugin.iceberg.FilesTable.
Yeah, I had originally tried to do that but the SPI expects a If folks are ok with changing above, I can throw something together for review. |
Its okay to change the SPI if we need to, we should do whatever changes are neeeded to make this work in the system tables framework |
d17720e to
5689635
Compare
|
I apply last commit 5689635 And the problem, which I encountered is in On coordinator everything is working. I dived a little bit how it's done on coordinator and I see that to fetch this table we use
But as far as I understand on worker Maybe you have some ideas how we can overcome this issue. |
|
Thanks for validating this @vlad-lyutenko! I came to roughly the same conclusion as well. I also realized the current commit doesn't work in I have a few thoughts around possible solutions but probably want to iterate on them a bit/chat over slack with folks if possible. |
|
So, I had a two general approaches that I think could work, but unsure if any of them is the "right" thing to do. I wanted to run it by the community before running with either as I'm sure I'm missing something.
|
Maybe lets start with first one, just as draft, play with it, understand some limitiations, and then we could decide do we want dive deeper (like proper testing e.t.c) or maybe it's even not possible and we should go with option 2. |
5689635 to
d69b5b8
Compare
|
I pushed some thing that at least passes one of the |
Big thx! I will try ti play/experiment with this. |
|
@tbaeg big thx, I forgot that I can not force push on your fork. @ebyhr @raunaqmorarka please take a look on this PR - #25861 Should we move in this direction? |
d69b5b8 to
ca9864e
Compare
|
@vlad-lyutenko I pushed up an update that actually tries not to change too much of the SPI and instead delegate through the existing I need clean up the rest of the actual implementation of the |
ca9864e to
eb612dc
Compare
eb612dc to
0ca172d
Compare
529d39b to
c714772
Compare
core/trino-main/src/main/java/io/trino/connector/system/SystemTablesProvider.java
Outdated
Show resolved
Hide resolved
|
|
||
| // This means there is no known static table, but that doesn't mean a dynamic table must exist. | ||
| // This node could have a different config that causes that table to not exist. | ||
| if (!isCoordinatorTransaction(session)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if user enable schedule on coordinator - node-scheduler.include-coordinator, will it be broken?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, there shouldn't be an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make sure we have a test which runs a query on $files in both modes. My guess is the existing tests already use the default node-scheduler.include-coordinator of true
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/ManifestFileBean.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/PartitionFieldSummaryBean.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTable.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTable.java
Show resolved
Hide resolved
10ce16a to
c9c1654
Compare
raunaqmorarka
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please keep your commits squashed
we can use github to compare the pushed changes as long as any rebases to master are not mixed with pushes for addressing review comments
...n/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java
Outdated
Show resolved
Hide resolved
|
|
||
| // This means there is no known static table, but that doesn't mean a dynamic table must exist. | ||
| // This node could have a different config that causes that table to not exist. | ||
| if (!isCoordinatorTransaction(session)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make sure we have a test which runs a query on $files in both modes. My guess is the existing tests already use the default node-scheduler.include-coordinator of true
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTable.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/PartitionFieldSummaryBean.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/ManifestFileBean.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplit.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplit.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTable.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplit.java
Outdated
Show resolved
Hide resolved
7d2b6ae to
f0e2613
Compare
raunaqmorarka
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/TrinoManifestFile.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/TrinoPartitionFieldSummary.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/TrinoManifestFile.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/TrinoManifestFile.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/bean/TrinoManifestFile.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTable.java
Outdated
Show resolved
Hide resolved
.../trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplitSource.java
Show resolved
Hide resolved
.../trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplitSource.java
Outdated
Show resolved
Hide resolved
.../trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplitSource.java
Outdated
Show resolved
Hide resolved
...n/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java
Outdated
Show resolved
Hide resolved
956244c to
471a2cb
Compare
...n/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java
Outdated
Show resolved
Hide resolved
471a2cb to
f29b93e
Compare
|
Looks like the |
4cff52a to
1c1ed90
Compare
- Add support for SystemTable's to delegate split processing to connectors - Add support for distributed processing for $files
1c1ed90 to
e647356
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enables distributed execution of system tables implementing the SystemTable interface, with a focus on improving performance and resource utilization for the $files metadata table. The implementation introduces a new split-based approach for distributing work across the cluster.
- Updates
SystemTabledistribution mode forALL_NODESby delegating processing toConnectorPageSourceProvider - Refactors the
FilesTableimplementation to use distributed splits instead of single-coordinator execution - Introduces new split and page source classes for distributed
$filestable processing
Reviewed Changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTable.java |
New distributed implementation of FilesTable using split-based architecture |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplit.java |
New split class for distributing manifest processing across nodes |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java |
New page source for processing individual manifest files |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java |
Updated to handle FilesTableSplit instances |
core/trino-spi/src/main/java/io/trino/spi/connector/SystemTable.java |
Added splitSource() method for distributed system table support |
core/trino-main/src/main/java/io/trino/connector/system/SystemTablesProvider.java |
Refactored from interface to concrete class |
Description
The update will allows for connectors implementing a
SystemTableto distribute and better parallelize work across the cluster. There should be a noticeable speed improvement, as well as a substantial reduction in coordinator resource utilization.In the original table function implementation, I saw x60 speed up on
$filesquery (4.5 minutes to 4.5 seconds) as well as a massive reduction in CPU/memory utilization of 80 - 95%. I can't deploy this specific iteration of this, but expect similar results.ALL_NODESdistribution mode forSystemTables by delegating processing toConnectorPageSourceProvider.FilesTableintrino-icebergwas updated to illustrate the usage.Additional context and related issues
https://trinodb.slack.com/archives/CJ6UC075E/p1745437019159079
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: