Expire Snapshot and Remove Orphan files for Iceberg together with coordinator-only execute#10810
Conversation
8ff40d4 to
82c940f
Compare
There was a problem hiding this comment.
The "else" is redundant since the if block returns
There was a problem hiding this comment.
Format intentionally empty methods like
public void finish() {}
core/trino-main/src/main/java/io/trino/operator/TableCoordinatorOnlyExecuteOperator.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
table.expireSnapshots()
.expireOlderThan(expireTimestamp)
.cleanExpiredFiles(true)
.commit();There was a problem hiding this comment.
We normally do throws Exception for test methods that throw any checked exceptions
There was a problem hiding this comment.
This is flaky, as it relies on the INSERT, SELECTs and EXECUTE all running in under 1 second. Or if we don't care, then there is no need to sleep or have a threshold greater than zero.
There was a problem hiding this comment.
That is not correct.
The sleep is there to make sure at least 1 second passed after we inserted first row.
There was a problem hiding this comment.
Can you just drop the sleep and make retention 0?
82c940f to
d51f5a8
Compare
There was a problem hiding this comment.
It feels to me output schema of all ALTER TABLE ... EXECUTE statements should be the same. So I would make it single BIGINT column and return no rows.
There was a problem hiding this comment.
I do not think we need status. Either operation completes correctly. Or we get an error, which is passed via exception.
There was a problem hiding this comment.
no need for separate class - you can inline fields imo
There was a problem hiding this comment.
Name is cumbersome. Maybe NonReadingTableExecuteNode?
There was a problem hiding this comment.
node.getTarget does not have toStriong - but I suggest to drop the class altogether
There was a problem hiding this comment.
You have analysis.getTableExecuteHandle().orElseThrow() assigned to variable one line above
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
"Remove only files older than threshold"
There was a problem hiding this comment.
can you add a check at the file level to see whether the vacuum operation was effective?
You could use $files metadata table (while the snapshot to be expired is still active) to gather the file(s) relevant for the snapshot.
You could use $snapshots / $history metadata tables to actually check that the snapshot was expired.
Subsequently you could check at the file level that the files you previously recorded are also removed.
There was a problem hiding this comment.
I added counting physical files as it is more reliable
|
either here or in a follow-up PR it would be beneficial to have documentation for this newly introduced operation. |
d51f5a8 to
f2d9283
Compare
f2d9283 to
c991b8e
Compare
There was a problem hiding this comment.
I am having a hard time to understand what readsData is about and why the method io.trino.spi.connector.TableProcedureExecutionMode#coordinatorOnly uses readsData set to false
There was a problem hiding this comment.
it is removed already from here
There was a problem hiding this comment.
Let's make the constructor private and only have static factories. The boolean arguments are confusing, especially because certain combinations are not valid.
There was a problem hiding this comment.
You can add Javadoc to the coordinatorOnly() factory:
Table procedure that does not read any table data and only executes on the coordinator.
Such procedures are useful for custom DDL-type operations.
There was a problem hiding this comment.
I find the name distributedWithFilteringAndRepartitioning() confusing. Would supportsFiltering=false also do repartitioning? More generally, how does filtering relate to repartitioning?
There was a problem hiding this comment.
is the field name still consistent with the type?
There was a problem hiding this comment.
is the javadoc change intentional?
4e7ab2c to
9488255
Compare
|
@losipiuk I addressed all the comments, improved existing test and added a new one |
There was a problem hiding this comment.
URI.getPath is bad conversion (lossy), is it file:// URI?
Path.of(tableDataDir) should suffice
There was a problem hiding this comment.
This is for local file system. Path is from java.nio. It doesn't work with protocol I've just checked
112da80 to
b94fca0
Compare
|
I've added 2 tests: testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp and testCleaningUpIcebergTableFailsForTableV2. |
|
But does it make sense to let the coordinator (which should coordinate Trino tasks) execute actual data or metadata mutations? wouldn't it be more in-line with Trino's design to just create a task and assign it to a single worker node? |
@synhershko Take a look at this comment #10810 (comment) |
d8e4ebe to
38a19e6
Compare
|
@homar please rebase, there are conflicts. |
38a19e6 to
4ac2ec9
Compare
|
@findepi done |
There was a problem hiding this comment.
Add a comment why we decided to use URI.getPath only here. Perhaps a reference to iceberg issue.
...roduct-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
Outdated
Show resolved
Hide resolved
...roduct-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
Outdated
Show resolved
Hide resolved
...roduct-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
Outdated
Show resolved
Hide resolved
4ac2ec9 to
2c3c3d7
Compare
This commit also adds com.starburstdata.presto.plugin.deltalake.procedure.Procedures#checkProcedureArgument that makes it easier to validate input parameters and throw trino exception
2c3c3d7 to
69e0b1e
Compare
|
Does |
Currently it is not parallel. It may change in the future |
Two new procedures are added to Iceberg connector:
Trino lacked infrastructure to run a specific task on a single node tough the necessity for such a behaviour was predicted when table execute was introduced.
This PR adds necessary changes to run tasks on coordinator only.