Add copy-on-write mode for Iceberg row-level DELETE, UPDATE, and MERGE#28958
Add copy-on-write mode for Iceberg row-level DELETE, UPDATE, and MERGE#28958kaveti wants to merge 1 commit intotrinodb:masterfrom
Conversation
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughThis pull request adds Copy-on-Write (CoW) support for row-level mutations (DELETE, UPDATE, MERGE) in the Trino Iceberg connector. It introduces a new Possibly related PRs
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.42.0)plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaComment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java (1)
200-220: Consider centralizing thisIcebergMetadatatest fixture.This positional constructor block is now duplicated across the Nessie, REST, and Glue catalog tests, including the extra executor and trailing
null. A small helper onBaseTrinoCatalogTestwould make future signature changes much less error-prone.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java` around lines 200 - 220, Extract the duplicated IcebergMetadata positional constructor block into a single helper method on BaseTrinoCatalogTest (e.g., createTestIcebergMetadata or buildIcebergMetadataFixture) and update TestTrinoNessieCatalog and the REST and Glue catalog tests to call that helper; preserve the existing arguments used (PLANNER_CONTEXT.getTypeManager(), jsonCodec(CommitTaskData.class), catalog, the UnsupportedOperationException file IO lambda, TABLE_STATISTICS_READER, new TableStatisticsWriter(new NodeVersion("test-version")), UNSUPPORTED_DELETION_VECTOR_WRITER, Optional.empty(), false, _ -> false, newDirectExecutorService(), directExecutor(), newDirectExecutorService(), newDirectExecutorService(), newDirectExecutorService(), 0, ZERO, null) so future constructor signature changes are localized to BaseTrinoCatalogTest.plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java (1)
625-642: Avoid hard-coding exact physical file layout in these CoW assertions.
doesNotContainAnyElementsOf(filesBefore),hasSize(3),hasSize(2), andcontainsAll(filesAfter)assume one stable data file per write batch and a specific row-to-file distribution. Writer parallelism/scaling changes can break those checks without any CoW regression, so these tests would be less flaky if they asserted only logical invariants and touched-file behavior.Also applies to: 665-676
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java` around lines 625 - 642, In TestIcebergCopyOnWrite, avoid asserting exact physical layout (counts or specific membership) of data files: replace checks like hasSize(...), containsAll(...), and doesNotContainAnyElementsOf(filesBefore) with logical/touch-based assertions — e.g., call getDataFilePaths(tableName) before and after a DELETE/UPDATE and assert the resulting set is non-empty and not equal to the original set, and keep assertNoDeleteFiles(tableName) to ensure no delete files were produced; apply the same change for the similar block around lines 665-676 so tests assert only that files changed (set inequality) and remain non-empty rather than relying on specific file counts or exact file identity.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugin/trino-iceberg/COPY_ON_WRITE_DESIGN.md`:
- Around line 51-56: The design doc uses two executor names which creates
ambiguity—update the finishMerge() / finishCopyOnWrite() sections that currently
reference icebergScanExecutor so they consistently reference copyOnWriteExecutor
instead; replace any mention of icebergScanExecutor with copyOnWriteExecutor
(and note the DI tag `@ForIcebergCopyOnWrite` where relevant) so the threading
model matches the wiring described elsewhere.
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.java`:
- Around line 225-235: In CopyOnWriteFileRewriter's catch(Throwable t) (the
block that calls writer.rollback()), do not convert Errors or RuntimeExceptions
(including TrinoException) into a generic RuntimeException: after attempting
rollback, if t is an IOException wrap it in UncheckedIOException as before; if t
is an Error or RuntimeException rethrow it unchanged; for any other checked
Throwable (i.e., Throwable that is an Exception but not a RuntimeException) wrap
it in a RuntimeException with the same message including originalPath.
- Around line 101-113: The rewriteFile(...) method currently uses the single
IcebergFileFormat parameter for both reading source files and writing targets
and ignores any pre-existing delete files; update the method signature to accept
a separate source file format (e.g., IcebergFileFormat sourceFileFormat) and a
collection representing pre-existing delete files or a visible-row bitmap (e.g.,
List<DeleteFile> preExistingDeleteFiles or BitSet visibleRows) in addition to
the target write IcebergFileFormat fileFormat, then: when opening/reading the
original file use sourceFileFormat (not fileFormat) and apply
preExistingDeleteFiles/visibleRows to compute visible rows so deleted rows
aren’t resurrected, while still writing the output using fileFormat; propagate
these new parameters to all other affected call sites and to the other related
method ranges noted (around the blocks at 133-140, 192-214, 240-250) so callers
supply the source format and applicable delete files from the file’s
metadata/snapshots.
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java`:
- Around line 3677-3780: The rewrite rollback tracking is incomplete because
rollbackActions are only added after future.get() succeeds in submission order
and are never executed if later commitUpdate()/commitTransaction() fails; fix by
collecting rollback actions as soon as each rewrite completes (use a
CompletionService or Future.get in completion order) and ensure all futures are
awaited (or cancelled) and their rollback actions gathered, then run/clear
rollbackActions in a single catch/finally that covers rewrite completion,
append/commit phases and commitTransaction failures; reference rewriteFutures,
rollbackActions, CopyOnWriteFileRewriter.RewriteResult, commitUpdate, and
commitTransaction to locate the code paths to change.
- Around line 3661-3667: The CoW path using RewriteFiles from
transaction.newRewrite() currently only calls validateFromSnapshot(...) and
scanManifestsWith(...); update the CoW commit path in IcebergMetadata to perform
the same row-level conflict checks used by the MOR path (see
finishWrite/RowDelta logic): apply conflictDetectionFilter(...) with the same
predicate, call validateNoConflictingDataFiles(...) when SERIALIZABLE isolation
is configured, and also call validateDeletedFiles(...) and
validateNoConflictingDeleteFiles() before committing the RewriteFiles; ensure
you reference the same transaction/newRewrite(), validateFromSnapshot(...), and
scanManifestsWith(...) sequence and add these additional validation calls to
prevent commits that would bypass row-level conflict detection.
- Around line 3649-3654: The CoW rewrite currently only passes new worker
deletion vectors (aggregatedDeletions) to CopyOnWriteFileRewriter.rewriteFile(),
which drops pre-existing delete semantics; fix by collecting existing delete
metadata from the original data file referenced by deletionTaskDetails (e.g.,
the DataFile / FileScanTask attached to the task) and include those
position-delete files and any equality-delete files when invoking
CopyOnWriteFileRewriter.rewriteFile() instead of passing List.of(); merge or
apply existing deletion vectors with DeletionVector.builder().addAll(...) so the
rewriter receives both existing and new deletions (use aggregatedDeletions,
deletionTaskDetails, DeletionVector.builder(), and
CopyOnWriteFileRewriter.rewriteFile() as anchors).
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java`:
- Line 103: The test uses assertQuery("SELECT * FROM " + tableName, "SELECT *
FROM nation WHERE ...") but the session created in createQueryRunner() is pinned
to ICEBERG_CATALOG.tpch so unqualified "nation" resolves to the Iceberg catalog
and causes TABLE NOT FOUND; update the expected query strings in the assertQuery
calls (the ones at the shown locations and the other occurrences at the lines
noted) to fully qualify the reference to the TPCH source (e.g., tpch.tiny.nation
or the exact catalog.schema.table used by the TPCH setup) so the right catalog
is queried; search for the assertQuery invocations in TestIcebergCopyOnWrite and
replace bare "nation" references accordingly.
---
Nitpick comments:
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java`:
- Around line 200-220: Extract the duplicated IcebergMetadata positional
constructor block into a single helper method on BaseTrinoCatalogTest (e.g.,
createTestIcebergMetadata or buildIcebergMetadataFixture) and update
TestTrinoNessieCatalog and the REST and Glue catalog tests to call that helper;
preserve the existing arguments used (PLANNER_CONTEXT.getTypeManager(),
jsonCodec(CommitTaskData.class), catalog, the UnsupportedOperationException file
IO lambda, TABLE_STATISTICS_READER, new TableStatisticsWriter(new
NodeVersion("test-version")), UNSUPPORTED_DELETION_VECTOR_WRITER,
Optional.empty(), false, _ -> false, newDirectExecutorService(),
directExecutor(), newDirectExecutorService(), newDirectExecutorService(),
newDirectExecutorService(), 0, ZERO, null) so future constructor signature
changes are localized to BaseTrinoCatalogTest.
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java`:
- Around line 625-642: In TestIcebergCopyOnWrite, avoid asserting exact physical
layout (counts or specific membership) of data files: replace checks like
hasSize(...), containsAll(...), and doesNotContainAnyElementsOf(filesBefore)
with logical/touch-based assertions — e.g., call getDataFilePaths(tableName)
before and after a DELETE/UPDATE and assert the resulting set is non-empty and
not equal to the original set, and keep assertNoDeleteFiles(tableName) to ensure
no delete files were produced; apply the same change for the similar block
around lines 665-676 so tests assert only that files changed (set inequality)
and remain non-empty rather than relying on specific file counts or exact file
identity.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8e0e439c-5862-439e-92dd-1d05ff2b9a0c
📒 Files selected for processing (19)
plugin/trino-iceberg/COPY_ON_WRITE_DESIGN.mdplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ForIcebergCopyOnWrite.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergExecutorModule.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeTableHandle.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.javaplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.javaplugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.javaplugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java
| public RewriteResult rewriteFile( | ||
| ConnectorSession session, | ||
| String originalPath, | ||
| DeletionVector deletionVector, | ||
| Schema schema, | ||
| PartitionSpec partitionSpec, | ||
| Optional<PartitionData> partitionData, | ||
| Optional<String> partitionDataJson, | ||
| IcebergFileFormat fileFormat, | ||
| MetricsConfig metricsConfig, | ||
| Map<String, String> tableProperties, | ||
| LocationProvider locationProvider, | ||
| Optional<NameMapping> nameMapping) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Does Apache Iceberg allow a table to contain historical data files in different formats after write.format.default changes, and are delete files tracked separately from data files in snapshot metadata?
💡 Result:
Yes, Apache Iceberg allows a table to contain historical data files in different formats even after changing write.format.default. The table property write.format.default only sets the default file format for new writes (parquet, avro, or orc), but existing historical data files written in previous formats remain valid and tracked in snapshots. Iceberg tracks files individually via manifests regardless of their format, as confirmed by the spec and configuration docs. Writers can also explicitly override the format for new files. Yes, delete files are tracked separately from data files in snapshot metadata. Manifests explicitly distinguish between data manifests (content=0) and delete manifests (content=1), ensuring delete files (position or equality deletes) are handled distinctly during scan planning and application to data files.
Citations:
- 1: https://iceberg.apache.org/docs/latest/configuration/
- 2: https://iceberg.apache.org/spec/?h=equality+delete
- 3: https://iceberg.apache.org/spec/
- 4: https://iceberg.apache.org/spec/?h=manifest+lis
- 5: https://iceberg.apache.org/javadoc/nightly/org/apache/iceberg/DeleteFiles.html
Thread the source file's actual format and pre-existing delete files separately from the target write format.
The rewriteFile() method receives a single IcebergFileFormat fileFormat parameter that is used for both reading the source file and writing the target, but does not receive the source file's actual format or any pre-existing delete files. This creates two problems:
-
After a
write.format.defaultchange, historical data files remain in their original format in the table. Using the current table format to read a source file that was written in a different format will fail or produce incorrect results. -
Apache Iceberg tracks delete files separately from data files in snapshot metadata. Pre-existing delete files applicable to the source file are not provided to this method, so rows that were already hidden by earlier delete files will be resurrected once
RewriteFilesswaps in the rewritten file.
Pass the source file's actual format (from its metadata) and applicable delete files (or an equivalent visible-row bitmap) separately from the target write format.
Also applies to: 133-140, 192-214, 240-250
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.java`
around lines 101 - 113, The rewriteFile(...) method currently uses the single
IcebergFileFormat parameter for both reading source files and writing targets
and ignores any pre-existing delete files; update the method signature to accept
a separate source file format (e.g., IcebergFileFormat sourceFileFormat) and a
collection representing pre-existing delete files or a visible-row bitmap (e.g.,
List<DeleteFile> preExistingDeleteFiles or BitSet visibleRows) in addition to
the target write IcebergFileFormat fileFormat, then: when opening/reading the
original file use sourceFileFormat (not fileFormat) and apply
preExistingDeleteFiles/visibleRows to compute visible rows so deleted rows
aren’t resurrected, while still writing the output using fileFormat; propagate
these new parameters to all other affected call sites and to the other related
method ranges noted (around the blocks at 133-140, 192-214, 240-250) so callers
supply the source format and applicable delete files from the file’s
metadata/snapshots.
| catch (Throwable t) { | ||
| try { | ||
| writer.rollback(); | ||
| } | ||
| catch (Throwable rollbackFailure) { | ||
| t.addSuppressed(rollbackFailure); | ||
| } | ||
| if (t instanceof IOException ioException) { | ||
| throw new UncheckedIOException("Failed to rewrite data file: " + originalPath, ioException); | ||
| } | ||
| throw new RuntimeException("Failed to rewrite data file: " + originalPath, t); |
There was a problem hiding this comment.
Do not downgrade TrinoException and Error into plain RuntimeException.
This turns every non-IOException failure from the page source or writer into a generic RuntimeException. That drops Trino error codes and can misclassify fatal JVM errors as generic internal failures. After writer.rollback(), rethrow unchecked failures as-is and only wrap truly checked exceptions.
Suggested fix
catch (Throwable t) {
try {
writer.rollback();
}
catch (Throwable rollbackFailure) {
t.addSuppressed(rollbackFailure);
}
if (t instanceof IOException ioException) {
throw new UncheckedIOException("Failed to rewrite data file: " + originalPath, ioException);
}
- throw new RuntimeException("Failed to rewrite data file: " + originalPath, t);
+ if (t instanceof RuntimeException runtimeException) {
+ throw runtimeException;
+ }
+ if (t instanceof Error error) {
+ throw error;
+ }
+ throw new RuntimeException("Failed to rewrite data file: " + originalPath, t);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| catch (Throwable t) { | |
| try { | |
| writer.rollback(); | |
| } | |
| catch (Throwable rollbackFailure) { | |
| t.addSuppressed(rollbackFailure); | |
| } | |
| if (t instanceof IOException ioException) { | |
| throw new UncheckedIOException("Failed to rewrite data file: " + originalPath, ioException); | |
| } | |
| throw new RuntimeException("Failed to rewrite data file: " + originalPath, t); | |
| catch (Throwable t) { | |
| try { | |
| writer.rollback(); | |
| } | |
| catch (Throwable rollbackFailure) { | |
| t.addSuppressed(rollbackFailure); | |
| } | |
| if (t instanceof IOException ioException) { | |
| throw new UncheckedIOException("Failed to rewrite data file: " + originalPath, ioException); | |
| } | |
| if (t instanceof RuntimeException runtimeException) { | |
| throw runtimeException; | |
| } | |
| if (t instanceof Error error) { | |
| throw error; | |
| } | |
| throw new RuntimeException("Failed to rewrite data file: " + originalPath, t); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.java`
around lines 225 - 235, In CopyOnWriteFileRewriter's catch(Throwable t) (the
block that calls writer.rollback()), do not convert Errors or RuntimeExceptions
(including TrinoException) into a generic RuntimeException: after attempting
rollback, if t is an IOException wrap it in UncheckedIOException as before; if t
is an Error or RuntimeException rethrow it unchanged; for any other checked
Throwable (i.e., Throwable that is an Exception but not a RuntimeException) wrap
it in a RuntimeException with the same message including originalPath.
| DeletionVector workerDeletion = DeletionVector.builder() | ||
| .deserialize(Slices.wrappedBuffer(task.serializedDeletionVector().orElseThrow())) | ||
| .build() | ||
| .orElseThrow(() -> new VerifyException("Empty deletion vector in CoW fragment")); | ||
| aggregatedDeletions.computeIfAbsent(filePath, _ -> DeletionVector.builder()).addAll(workerDeletion); | ||
| deletionTaskDetails.putIfAbsent(filePath, task); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd /repository && wc -l plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 118
🏁 Script executed:
cd /repository && sed -n '3640,3660p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 118
🏁 Script executed:
cd /repository && sed -n '3685,3705p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 118
🏁 Script executed:
git ls-files | grep -i IcebergMetadata.javaRepository: trinodb/trino
Length of output: 219
🏁 Script executed:
wc -l plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 141
🏁 Script executed:
sed -n '3640,3660p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 1234
🏁 Script executed:
sed -n '3685,3705p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 1282
🏁 Script executed:
sed -n '3665,3700p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 2278
🏁 Script executed:
sed -n '3620,3670p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 2386
🏁 Script executed:
rg -A 10 "void commitCopyOnWriteDataModification" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | head -50Repository: trinodb/trino
Length of output: 39
🏁 Script executed:
find . -name "CopyOnWriteFileRewriter.java" -type fRepository: trinodb/trino
Length of output: 146
🏁 Script executed:
rg "class CopyOnWriteFileRewriter" -A 20 | head -50Repository: trinodb/trino
Length of output: 2946
🏁 Script executed:
rg "void rewriteFile" plugin/trino-iceberg -A 15 | head -80Repository: trinodb/trino
Length of output: 39
🏁 Script executed:
rg "rewriteFile" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.java -A 20 | head -100Repository: trinodb/trino
Length of output: 906
🏁 Script executed:
wc -l plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.javaRepository: trinodb/trino
Length of output: 148
🏁 Script executed:
cat plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.java | head -100Repository: trinodb/trino
Length of output: 4076
🏁 Script executed:
sed -n '80,256p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.javaRepository: trinodb/trino
Length of output: 7668
🏁 Script executed:
rg "deletionVector" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.javaRepository: trinodb/trino
Length of output: 378
🏁 Script executed:
rg "DeletionVector" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | head -20Repository: trinodb/trino
Length of output: 1361
🏁 Script executed:
rg "createPageSource" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.java -B 5 -A 15Repository: trinodb/trino
Length of output: 1253
🏁 Script executed:
rg "rewriteFiles\|RewriteFiles" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | head -30Repository: trinodb/trino
Length of output: 39
🏁 Script executed:
sed -n '3620,3730p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 5803
🏁 Script executed:
rg "deleteFile|deletionVector" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | grep -i "existing\|original" | head -20Repository: trinodb/trino
Length of output: 150
🏁 Script executed:
rg "DataFile\|manifest" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | head -30Repository: trinodb/trino
Length of output: 39
🏁 Script executed:
sed -n '3600,3650p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 2470
🏁 Script executed:
rg "scanManifestsWith\|planFiles\|getDataFiles" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | head -20Repository: trinodb/trino
Length of output: 39
🏁 Script executed:
rg "class IcebergPageSourceProvider" -A 30 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ | head -60Repository: trinodb/trino
Length of output: 8834
🏁 Script executed:
find plugin/trino-iceberg -name "*.java" -type f -exec grep -l "createPageSource" {} \;Repository: trinodb/trino
Length of output: 877
🏁 Script executed:
rg "createPageSource" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java -A 30 | head -80Repository: trinodb/trino
Length of output: 3835
🏁 Script executed:
grep -n "Map.of()" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.javaRepository: trinodb/trino
Length of output: 239
🏁 Script executed:
rg "Map\.of\(\), 0, OptionalLong\.empty()" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ -B 5Repository: trinodb/trino
Length of output: 1037
🏁 Script executed:
rg "DeleteFile" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.javaRepository: trinodb/trino
Length of output: 39
🏁 Script executed:
sed -n '140,220p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.javaRepository: trinodb/trino
Length of output: 5426
🏁 Script executed:
sed -n '220,350p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.javaRepository: trinodb/trino
Length of output: 6844
🏁 Script executed:
rg "split\.getDeletes()" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ -B 5 -A 5Repository: trinodb/trino
Length of output: 1487
🏁 Script executed:
rg "List\.<DeleteFile\>" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.javaRepository: trinodb/trino
Length of output: 159
🏁 Script executed:
rg "getDeletes" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java -B 3 -A 3Repository: trinodb/trino
Length of output: 158
🏁 Script executed:
rg "class IcebergSplit" -A 50 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java | head -80Repository: trinodb/trino
Length of output: 2114
🏁 Script executed:
rg "List\.of\(\)" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CopyOnWriteFileRewriter.java -B 3 -A 3Repository: trinodb/trino
Length of output: 547
🏁 Script executed:
sed -n '3600,3750p' plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | grep -A 50 "finishCopyOnWrite"Repository: trinodb/trino
Length of output: 2553
🏁 Script executed:
rg "rewriteFiles\|transaction\.newRewrite" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java -B 5 -A 10Repository: trinodb/trino
Length of output: 39
🏁 Script executed:
rg "DataFile\|getFile" plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | grep -i "delete\|manifest" | head -20Repository: trinodb/trino
Length of output: 39
CoW rewrites drop pre-existing delete semantics on rewritten data files.
aggregatedDeletions only contains new worker deletion vectors from the commit tasks. When CopyOnWriteFileRewriter.rewriteFile() is called, it receives only these new deletes and passes an empty list (List.of()) for existing delete files. The rewriter filters rows using only the new deletion vector, ignoring any existing equality or position delete files attached to the original file. This allows previously deleted rows to be resurrected in the rewritten file.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java`
around lines 3649 - 3654, The CoW rewrite currently only passes new worker
deletion vectors (aggregatedDeletions) to CopyOnWriteFileRewriter.rewriteFile(),
which drops pre-existing delete semantics; fix by collecting existing delete
metadata from the original data file referenced by deletionTaskDetails (e.g.,
the DataFile / FileScanTask attached to the task) and include those
position-delete files and any equality-delete files when invoking
CopyOnWriteFileRewriter.rewriteFile() instead of passing List.of(); merge or
apply existing deletion vectors with DeletionVector.builder().addAll(...) so the
rewriter receives both existing and new deletions (use aggregatedDeletions,
deletionTaskDetails, DeletionVector.builder(), and
CopyOnWriteFileRewriter.rewriteFile() as anchors).
| RewriteFiles rewriteFiles = transaction.newRewrite(); | ||
| OptionalLong baseSnapshotId = table.getSnapshotId(); | ||
| if (baseSnapshotId.isPresent()) { | ||
| rewriteFiles.validateFromSnapshot(icebergTable.snapshot(baseSnapshotId.getAsLong()).snapshotId()); | ||
| } | ||
| rewriteFiles.scanManifestsWith(icebergScanExecutor); | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "IcebergMetadata.java" | head -5Repository: trinodb/trino
Length of output: 138
🏁 Script executed:
wc -l ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 143
🏁 Script executed:
sed -n '3650,3680p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 1800
🏁 Script executed:
sed -n '3790,3840p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 2672
🏁 Script executed:
sed -n '3600,3650p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 2470
🏁 Script executed:
sed -n '3661,3750p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 4719
🏁 Script executed:
sed -n '3800,3850p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 2990
🏁 Script executed:
sed -n '3550,3610p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 2965
🏁 Script executed:
grep -n "conflictDetectionFilter\|validateNoConflictingDeleteFiles\|validateDeletedFiles" ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | head -20Repository: trinodb/trino
Length of output: 252
🏁 Script executed:
find . -type f -name "*.java" | xargs grep -l "interface RewriteFiles\|class RewriteFiles" | grep -E "(iceberg|apache)" | head -5Repository: trinodb/trino
Length of output: 39
🏁 Script executed:
rg "import.*RewriteFiles|import.*RowDelta" ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 132
🏁 Script executed:
rg -A 20 "finishCopyOnWrite|finishWrite" ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | grep -E "^\s*(rewriteFiles|rowDelta)\.(validate|conflictDetection)" | head -20Repository: trinodb/trino
Length of output: 163
🏁 Script executed:
sed -n '3661,3800p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | grep -E "(validate|conflictDetection|Conflict)"Repository: trinodb/trino
Length of output: 167
🏁 Script executed:
sed -n '3800,3900p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | grep -E "(validate|conflictDetection|Conflict)"Repository: trinodb/trino
Length of output: 520
🏁 Script executed:
sed -n '3620,3750p' ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.javaRepository: trinodb/trino
Length of output: 6582
🏁 Script executed:
rg -B 5 -A 5 "finishCopyOnWrite" ./plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | head -40Repository: trinodb/trino
Length of output: 1149
The CoW path skips row-level conflict checks that the MOR path performs.
The CoW implementation (lines 3661-3667) only calls validateFromSnapshot() and scanManifestsWith() on RewriteFiles. In contrast, the MOR path (finishWrite) performs multiple additional conflict validations on RowDelta:
conflictDetectionFilter()with predicate-based filteringvalidateNoConflictingDataFiles()(when SERIALIZABLE isolation is configured)validateDeletedFiles()validateNoConflictingDeleteFiles()
Under concurrent INSERT/UPDATE/DELETE activity, the CoW path can commit stale rewrites where the MOR path would correctly fail with conflict errors.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java`
around lines 3661 - 3667, The CoW path using RewriteFiles from
transaction.newRewrite() currently only calls validateFromSnapshot(...) and
scanManifestsWith(...); update the CoW commit path in IcebergMetadata to perform
the same row-level conflict checks used by the MOR path (see
finishWrite/RowDelta logic): apply conflictDetectionFilter(...) with the same
predicate, call validateNoConflictingDataFiles(...) when SERIALIZABLE isolation
is configured, and also call validateDeletedFiles(...) and
validateNoConflictingDeleteFiles() before committing the RewriteFiles; ensure
you reference the same transaction/newRewrite(), validateFromSnapshot(...), and
scanManifestsWith(...) sequence and add these additional validation calls to
prevent commits that would bypass row-level conflict detection.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
| assertThat(newDataFileCount).isGreaterThanOrEqualTo(1); | ||
|
|
||
| // Verify data correctness | ||
| assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1"); |
There was a problem hiding this comment.
These expected queries resolve to the wrong catalog.
The session created in createQueryRunner() is pinned to ICEBERG_CATALOG.tpch, so bare nation here resolves to the Iceberg catalog, not tpch.tiny.nation. Unless another test creates tpch.nation, these assertions fail with TABLE NOT FOUND.
Also applies to: 173-173, 212-212, 345-345, 456-456, 521-522
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCopyOnWrite.java`
at line 103, The test uses assertQuery("SELECT * FROM " + tableName, "SELECT *
FROM nation WHERE ...") but the session created in createQueryRunner() is pinned
to ICEBERG_CATALOG.tpch so unqualified "nation" resolves to the Iceberg catalog
and causes TABLE NOT FOUND; update the expected query strings in the assertQuery
calls (the ones at the shown locations and the other occurrences at the lines
noted) to fully qualify the reference to the TPCH source (e.g., tpch.tiny.nation
or the exact catalog.schema.table used by the TPCH setup) so the right catalog
is queried; search for the assertQuery invocations in TestIcebergCopyOnWrite and
replace bare "nation" references accordingly.
- Fix CopyOnWriteFileRewriter: rethrow RuntimeException/Error unchanged in catch block - Fix IcebergMetadata: move commit calls inside try block, collect all futures' rollback actions in catch - Fix TestIcebergCopyOnWrite: replace exact file count assertions with logical invariants - Centralize IcebergMetadata test fixture in BaseTrinoCatalogTest + clean up unused imports - Update 4 catalog test subclasses to use centralized helper
Implement Copy-on-Write mode for DELETE, UPDATE, and MERGE operations in the Iceberg connector. When a table has write.delete.mode, write.update.mode, or write.merge.mode set to 'copy-on-write', the connector rewrites affected data files instead of writing position delete files. Architecture: - Workers emit serialized deletion vectors to the coordinator - Coordinator aggregates deletions across workers per data file - CopyOnWriteFileRewriter reads original files, filters deleted rows, and writes replacement files - Iceberg RewriteFiles API atomically replaces old files with new ones This coordinator-side rewrite approach correctly handles distributed execution where multiple workers may process different rows from the same data file. Add parallel CoW file rewrite, comprehensive tests, and file-path verification - Parallelize coordinator-side file rewrites using icebergScanExecutor thread pool - Add 24 comprehensive Copy-on-Write tests covering DELETE, UPDATE, MERGE, partitioned tables, sequential DML, complex expressions, subquery deletes, time travel, nullable columns, multiple data files, larger datasets, and more - Add testCowReplacesOriginalDataFiles verifying original file paths are fully replaced after CoW operations (no delete files, no stale data files) - Use ConcurrentLinkedQueue for thread-safe rollback action tracking Add metrics and logging for CoW rewrite operations - Add RewriteMetrics record to CopyOnWriteFileRewriter tracking per-file stats: original/new record counts, deleted rows, file sizes, rewrite duration - Add per-file debug logging in CopyOnWriteFileRewriter - Add aggregate info logging in finishCopyOnWrite summarizing total files rewritten/removed, rows deleted, bytes read/written, and insert count Add configurable CoW concurrency, metrics tests, and design doc updates - Add iceberg.copy-on-write-threads config property with dedicated executor - Add ForIcebergCopyOnWrite binding annotation and executor provider - Wire copyOnWriteExecutor through IcebergMetadataFactory to IcebergMetadata - Add testCowMetricsFileCountAfterMultiFileDelete and testCowMetricsRowCountConsistency - Update COPY_ON_WRITE_DESIGN.md with parallel rewrite config, metrics, and test coverage Remove unused variable to fix error-prone UnusedVariable check Add missing CopyOnWriteFileRewriter binding in LakehouseIcebergModule Address review comments on CoW PR trinodb#28958 - Fix CopyOnWriteFileRewriter: rethrow RuntimeException/Error unchanged in catch block - Fix IcebergMetadata: move commit calls inside try block, collect all futures' rollback actions in catch - Fix TestIcebergCopyOnWrite: replace exact file count assertions with logical invariants - Centralize IcebergMetadata test fixture in BaseTrinoCatalogTest + clean up unused imports - Update 4 catalog test subclasses to use centralized helper Fix high-priority CoW limitations: conflict detection, file format, pre-existing deletes - Switch from RewriteFiles to OverwriteFiles API for full conflict detection (conflictDetectionFilter, validateNoConflictingData/Deletes matching MoR path) - Read source file format from file path extension instead of table default, fixing mixed-format table rewrites (separate sourceFileFormat/writeFileFormat) - Merge pre-existing delete files into deletion vectors before CoW rewrite, preventing row resurrection when switching from MoR to CoW mode - Add mergePreExistingDeletes to DeletionVectorWriter interface + implementation in DefaultDeletionVectorWriter (reuses existing manifest scanning logic) Add comprehensive tests for CoW features: mixed format, pre-existing deletes, conflict detection Tests added (16 new, 42 total): Mixed file format (source format detection): - testCowDeleteOnMixedFormatTable: ORC→PARQUET format change, CoW delete on ORC files - testCowUpdateOnMixedFormatTable: CoW update on ORC data when table default is PARQUET - testCowMergeOnMixedFormatTable: CoW merge on ORC data when table default is PARQUET Pre-existing delete files (MoR→CoW mode switch): - testCowAfterMorDeleteV2: v2 position deletes honored after CoW switch - testCowAfterMorUpdateV2: MoR update deletes preserved through CoW update - testCowAfterMorDeleteV3: v3 deletion vectors honored after CoW switch - testCowAfterMultipleMorDeletes: accumulated MoR deletes all honored - testCowMergeAfterMorDelete: MoR delete + CoW merge on same data file - testCowOnPartitionedTableWithMorDeletesInDifferentPartitions - testCowOnPartitionedTableWithMorDeletesInSamePartition - testCowDeleteAllRowsFromFileWithMorDeletes: remaining rows after MoR - testCowV3WithDvsAndMixedFormat: combined v3 DVs + ORC→PARQUET + CoW Conflict detection (OverwriteFiles API): - testCowWithConcurrentInsert: INSERT + CoW DELETE on different partitions - testCowSnapshotIsolation: snapshot progression and time travel - testCowProducesOverwriteSnapshot: verifies overwrite/append snapshot types - testFullMorToCowMigration: end-to-end MoR→CoW migration workflow Remove design doc from tracked files Fix rollback tracking: clear rollback actions on successful commit After commitTransaction succeeds, clear the rollbackActions queue so that committed files are not accidentally cleaned up. This also prevents duplicate cleanup if the catch block re-iterates already-processed futures when a commit failure occurs after all rewrites completed successfully.
f6bef79 to
60bf7d4
Compare
|
@raunaqmorarka i have addressed all coderabitai review comments. |
Add copy-on-write mode for Iceberg row-level DELETE, UPDATE, and MERGE
Description
Implement copy-on-write (CoW) mode for row-level operations in the Iceberg connector.
When a table sets
write.delete.mode,write.update.mode, orwrite.merge.modetocopy-on-write, affected data files are rewritten without deleted rows instead ofproducing positional delete files.
Motivation
Iceberg supports two strategies for row-level modifications:
Read-heavy workloads (dashboards, BI, repeated scans) benefit significantly from CoW
because queries never need to reconcile delete files with data files at read time.
This is particularly valuable for:
OPTIMIZEis undesirable or insufficientSpark, Flink, and PyIceberg already support CoW. This brings Trino to parity.
How it works
Users opt in per-operation via Iceberg table properties (no Trino config change needed):
Or at table creation:
Execution flow:
deletion vectors (not positional delete files) back to the coordinator as commit
fragments.
this is necessary because multiple workers may process different rows from the same
data file.
CopyOnWriteFileRewriterreads each affected original file, filters out deletedpositions page-by-page, and writes surviving rows to a new data file. If all rows
are deleted, no new file is written (the file is simply dropped).
RewriteFilesAPI atomically replaces old files with new ones in a singlecommit, with snapshot validation for conflict detection.
AppendFilesoperationwithin the same transaction.
File rewrites are submitted in parallel to a dedicated
copyOnWriteExecutorthread pool,configurable via:
Additional context and related issues
Release notes
(x) Release notes are required, with the following suggested text: