diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java b/api/src/main/java/org/apache/iceberg/RewriteFiles.java index 62ba58e828d1..5777f2e1fdb0 100644 --- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java @@ -73,4 +73,16 @@ RewriteFiles rewriteFiles(Set dataFilesToReplace, Set dele * @return this for method chaining */ RewriteFiles validateFromSnapshot(long snapshotId); + + /** + * Add data file paths that must not be rewritten by conflicting commits for this operation to succeed. + *

+ * If any path has been rewritten in a replace by a conflicting commit in the table since the snapshot passed to + * {@link #validateFromSnapshot(long)}, the operation will fail with a + * {@link org.apache.iceberg.exceptions.ValidationException}. + * + * @param referencedFiles file paths that are referenced by a position delete file + * @return this for method chaining + */ + RewriteFiles validateDataFilesNotRewritten(Iterable referencedFiles); } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index f94bcd157d0c..33f64cf36778 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -22,10 +22,13 @@ import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CharSequenceSet; class BaseRewriteFiles extends MergingSnapshotProducer implements RewriteFiles { private final Set replacedDataFiles = Sets.newHashSet(); private Long startingSnapshotId = null; + private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + private boolean validateDeletes = false; BaseRewriteFiles(String tableName, TableOperations ops) { super(tableName, ops); @@ -97,11 +100,21 @@ public RewriteFiles validateFromSnapshot(long snapshotId) { return this; } + @Override + public RewriteFiles validateDataFilesNotRewritten(Iterable referencedFiles) { + referencedFiles.forEach(referencedDataFiles::add); + return this; + } + @Override protected void validate(TableMetadata base) { - if (replacedDataFiles.size() > 0) { + if (!replacedDataFiles.isEmpty()) { // if there are replaced data files, there cannot be any new row-level deletes for those data files validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles); } + + if (!referencedDataFiles.isEmpty()) { + validateDataFilesNotRewritten(base, startingSnapshotId, referencedDataFiles); + } } } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index f26412b27bbd..4ba878ddcc74 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -66,6 +66,8 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // delete files can be added in "overwrite" or "delete" operations private static final Set VALIDATE_REPLACED_DATA_FILES_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE); + private static final Set VALIDATE_DATA_FILES_NOT_REWRITTEN_OPERATIONS = + ImmutableSet.of(DataOperations.REPLACE); private final String tableName; private final TableOperations ops; @@ -316,18 +318,26 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin } } - @SuppressWarnings("CollectionUndefinedEquality") + protected void validateDataFilesNotRewritten(TableMetadata base, Long startingSnapshotId, + CharSequenceSet requiredDataFiles) { + validateDataFilesExist(base, startingSnapshotId, requiredDataFiles, VALIDATE_DATA_FILES_NOT_REWRITTEN_OPERATIONS); + } + protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, CharSequenceSet requiredDataFiles, boolean skipDeletes) { + validateDataFilesExist(base, startingSnapshotId, requiredDataFiles, skipDeletes ? + VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS : + VALIDATE_DATA_FILES_EXIST_OPERATIONS); + } + + @SuppressWarnings("CollectionUndefinedEquality") + private void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, + CharSequenceSet requiredDataFiles, Set matchingOperations) { // if there is no current table state, no files have been removed if (base.currentSnapshot() == null) { return; } - Set matchingOperations = skipDeletes ? - VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS : - VALIDATE_DATA_FILES_EXIST_OPERATIONS; - Pair, Set> history = validationHistory(base, startingSnapshotId, matchingOperations, ManifestContent.DATA); List manifests = history.first(); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index 19b97fad91e3..9310fefb6755 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -641,4 +641,115 @@ public void testNewDeleteFile() { .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_A2)) .apply(); } + + @Test + public void testRewriteReferencedDataFile() { + Assume.assumeTrue("Delete files are only supported in v2", formatVersion > 1); + + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .commit(); + + long snapshotBeforeDeleteRewrite = table.currentSnapshot().snapshotId(); + + // simulate rewrite deletes in FILE_A_DELETES to FILE_B_DELETES + table.newRewrite() + .validateFromSnapshot(snapshotBeforeDeleteRewrite) + .validateDataFilesNotRewritten(Sets.newSet(FILE_A.path())) + .rewriteFiles(Sets.newSet(), Sets.newSet(FILE_A_DELETES), Sets.newSet(), Sets.newSet(FILE_B_DELETES)) + .commit(); + + long snapshotBeforeRewriteFileA = table.currentSnapshot().snapshotId(); + + // rewrite FILE_A as FILE_A2 + table.newRewrite() + .validateFromSnapshot(table.currentSnapshot().snapshotId()) + .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_A2)) + .commit(); + + AssertHelpers.assertThrows("Should fail because a referenced file was rewritten", + ValidationException.class, "Cannot commit, missing data files", + () -> table.newRewrite() + .validateFromSnapshot(snapshotBeforeRewriteFileA) + .validateDataFilesNotRewritten(Sets.newSet(FILE_A.path())) + .rewriteFiles(Sets.newSet(), Sets.newSet(FILE_B_DELETES), Sets.newSet(), Sets.newSet(FILE_A_DELETES)) + .apply()); + } + + @Test + public void testOverwriteReferencedDataFile() { + Assume.assumeTrue("Delete files are only supported in v2", formatVersion > 1); + + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .commit(); + + long snapshotBeforeDeleteRewrite = table.currentSnapshot().snapshotId(); + + // simulate rewrite deletes in FILE_A_DELETES to FILE_B_DELETES + table.newRewrite() + .validateFromSnapshot(snapshotBeforeDeleteRewrite) + .validateDataFilesNotRewritten(Sets.newSet(FILE_A.path())) + .rewriteFiles(Sets.newSet(), Sets.newSet(FILE_A_DELETES), Sets.newSet(), Sets.newSet(FILE_B_DELETES)) + .commit(); + + long snapshotBeforeOverwriteFileA = table.currentSnapshot().snapshotId(); + + // overwrite FILE_A with FILE_A2 + table.newOverwrite() + .deleteFile(FILE_A) + .addFile(FILE_A2) + .commit(); + + // the rewrite succeeds because the overwrite is required to read FILE_A correctly + table.newRewrite() + .validateFromSnapshot(snapshotBeforeOverwriteFileA) + .validateDataFilesNotRewritten(Sets.newSet(FILE_A.path())) + .rewriteFiles(Sets.newSet(), Sets.newSet(FILE_B_DELETES), Sets.newSet(), Sets.newSet(FILE_A_DELETES)) + .apply(); + } + + @Test + public void testDeleteReferencedDataFile() { + Assume.assumeTrue("Delete files are only supported in v2", formatVersion > 1); + + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .commit(); + + long snapshotBeforeDeleteRewrite = table.currentSnapshot().snapshotId(); + + // simulate rewrite deletes in FILE_A_DELETES to FILE_B_DELETES + table.newRewrite() + .validateFromSnapshot(snapshotBeforeDeleteRewrite) + .validateDataFilesNotRewritten(Sets.newSet(FILE_A.path())) + .rewriteFiles(Sets.newSet(), Sets.newSet(FILE_A_DELETES), Sets.newSet(), Sets.newSet(FILE_B_DELETES)) + .commit(); + + long snapshotBeforeDeleteFileA = table.currentSnapshot().snapshotId(); + + // delete FILE_A + table.newDelete() + .deleteFile(FILE_A) + .commit(); + + // rewrite deletes, but ignore that FILE_A was removed + table.newRewrite() + .validateFromSnapshot(snapshotBeforeDeleteFileA) + .validateDataFilesNotRewritten(Sets.newSet(FILE_A.path())) + .rewriteFiles(Sets.newSet(), Sets.newSet(FILE_B_DELETES), Sets.newSet(), Sets.newSet(FILE_A_DELETES)) + .apply(); + } }