diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java b/api/src/main/java/org/apache/iceberg/RewriteFiles.java index 49df7fc9ff1d..f09008a15d8f 100644 --- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; /** * API for replacing files in a table. @@ -35,11 +36,30 @@ */ public interface RewriteFiles extends SnapshotUpdate { /** - * Add a rewrite that replaces one set of files with another set that contains the same data. + * Add a rewrite that replaces one set of data files with another set that contains the same data. * * @param filesToDelete files that will be replaced (deleted), cannot be null or empty. - * @param filesToAdd files that will be added, cannot be null or empty. + * @param filesToAdd files that will be added, cannot be null or empty. * @return this for method chaining */ - RewriteFiles rewriteFiles(Set filesToDelete, Set filesToAdd); + default RewriteFiles rewriteFiles(Set filesToDelete, Set filesToAdd) { + return rewriteFiles( + filesToDelete, + ImmutableSet.of(), + filesToAdd, + ImmutableSet.of() + ); + } + + /** + * Add a rewrite that replaces one set of files with another set that contains the same data. + * + * @param dataFilesToDelete data files that will be replaced (deleted). + * @param deleteFilesToDelete delete files that will be replaced (deleted). + * @param dataFilesToAdd data files that will be added. + * @param deleteFilesToAdd delete files that will be added. + * @return this for method chaining. + */ + RewriteFiles rewriteFiles(Set dataFilesToDelete, Set deleteFilesToDelete, + Set dataFilesToAdd, Set deleteFilesToAdd); } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index d2daac5f9b39..c7732ef8a5a9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -40,19 +40,47 @@ protected String operation() { return DataOperations.REPLACE; } + private void verifyInputAndOutputFiles(Set dataFilesToDelete, Set deleteFilesToDelete, + Set dataFilesToAdd, Set deleteFilesToAdd) { + Preconditions.checkNotNull(dataFilesToDelete, "Data files to delete can not be null"); + Preconditions.checkNotNull(deleteFilesToDelete, "Delete files to delete can not be null"); + Preconditions.checkNotNull(dataFilesToAdd, "Data files to add can not be null"); + Preconditions.checkNotNull(deleteFilesToAdd, "Delete files to add can not be null"); + + int filesToDelete = 0; + filesToDelete += dataFilesToDelete.size(); + filesToDelete += deleteFilesToDelete.size(); + + Preconditions.checkArgument(filesToDelete > 0, "Files to delete cannot be null or empty"); + + if (deleteFilesToDelete.isEmpty()) { + // When there is no delete files in the rewrite action, data files to add cannot be null or empty. + Preconditions.checkArgument(dataFilesToAdd.size() > 0, + "Data files to add can not be empty because there's no delete file to be rewritten"); + Preconditions.checkArgument(deleteFilesToAdd.isEmpty(), + "Delete files to add must be empty because there's no delete file to be rewritten"); + } + } + @Override - public RewriteFiles rewriteFiles(Set filesToDelete, Set filesToAdd) { - Preconditions.checkArgument(filesToDelete != null && !filesToDelete.isEmpty(), - "Files to delete cannot be null or empty"); - Preconditions.checkArgument(filesToAdd != null && !filesToAdd.isEmpty(), - "Files to add can not be null or empty"); - - for (DataFile toDelete : filesToDelete) { - delete(toDelete); + public RewriteFiles rewriteFiles(Set dataFilesToDelete, Set deleteFilesToDelete, + Set dataFilesToAdd, Set deleteFilesToAdd) { + verifyInputAndOutputFiles(dataFilesToDelete, deleteFilesToDelete, dataFilesToAdd, deleteFilesToAdd); + + for (DataFile dataFile : dataFilesToDelete) { + delete(dataFile); + } + + for (DeleteFile deleteFile : deleteFilesToDelete) { + delete(deleteFile); + } + + for (DataFile dataFile : dataFilesToAdd) { + add(dataFile); } - for (DataFile toAdd : filesToAdd) { - add(toAdd); + for (DeleteFile deleteFile : deleteFilesToAdd) { + add(deleteFile); } return this; diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 7907ce7ea0cb..4bb6f66c6a6a 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -73,6 +73,14 @@ public class TableTestBase { .withPartitionPath("data_bucket=0") // easy way to set partition data for now .withRecordCount(1) .build(); + // Equality delete files. + static final DeleteFile FILE_A2_DELETES = FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes(3) + .withPath("/path/to/data-a2-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); static final DataFile FILE_B = DataFiles.builder(SPEC) .withPath("/path/to/data-b.parquet") .withFileSizeInBytes(10) diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index fa3240f76a5e..2bdc9f2c4940 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -21,9 +21,13 @@ import java.io.File; import java.util.Collections; +import java.util.List; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -37,7 +41,7 @@ public class TestRewriteFiles extends TableTestBase { @Parameterized.Parameters(name = "formatVersion = {0}") public static Object[] parameters() { - return new Object[] { 1, 2 }; + return new Object[] {1, 2}; } public TestRewriteFiles(int formatVersion) { @@ -57,6 +61,14 @@ public void testEmptyTable() { () -> table.newRewrite() .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_B)) .commit()); + + AssertHelpers.assertThrows("Expected an exception", + ValidationException.class, + "Missing required files to delete: /path/to/data-a-deletes.parquet", + () -> table.newRewrite() + .rewriteFiles(ImmutableSet.of(), ImmutableSet.of(FILE_A_DELETES), + ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B_DELETES)) + .commit()); } @Test @@ -65,10 +77,26 @@ public void testAddOnly() { AssertHelpers.assertThrows("Expected an exception", IllegalArgumentException.class, - "Files to add can not be null or empty", + "Data files to add can not be empty because there's no delete file to be rewritten", () -> table.newRewrite() .rewriteFiles(Sets.newSet(FILE_A), Collections.emptySet()) .apply()); + + AssertHelpers.assertThrows("Expected an exception", + IllegalArgumentException.class, + "Data files to add can not be empty because there's no delete file to be rewritten", + () -> table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(), + ImmutableSet.of(), ImmutableSet.of(FILE_A_DELETES)) + .apply()); + + AssertHelpers.assertThrows("Expected an exception", + IllegalArgumentException.class, + "Delete files to add must be empty because there's no delete file to be rewritten", + () -> table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(), + ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_B_DELETES)) + .apply()); } @Test @@ -81,6 +109,21 @@ public void testDeleteOnly() { () -> table.newRewrite() .rewriteFiles(Collections.emptySet(), Sets.newSet(FILE_A)) .apply()); + + AssertHelpers.assertThrows("Expected an exception", + IllegalArgumentException.class, + "Files to delete cannot be null or empty", + () -> table.newRewrite() + .rewriteFiles(ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(FILE_A_DELETES)) + .apply()); + + AssertHelpers.assertThrows("Expected an exception", + IllegalArgumentException.class, + "Files to delete cannot be null or empty", + () -> table.newRewrite() + .rewriteFiles(ImmutableSet.of(), ImmutableSet.of(), + ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES)) + .apply()); } @Test @@ -164,6 +207,66 @@ public void testAddAndDelete() { Assert.assertEquals("Only 3 manifests should exist", 3, listManifestFiles().size()); } + @Test + public void testRewriteDataAndDeleteFiles() { + Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1); + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + table.newRowDelta() + .addRows(FILE_A) + .addRows(FILE_B) + .addRows(FILE_C) + .addDeletes(FILE_A_DELETES) + .addDeletes(FILE_B_DELETES) + .commit(); + + TableMetadata base = readMetadata(); + Snapshot baseSnap = base.currentSnapshot(); + long baseSnapshotId = baseSnap.snapshotId(); + Assert.assertEquals("Should create 2 manifests for initial write", 2, baseSnap.allManifests().size()); + List initialManifests = baseSnap.allManifests(); + + validateManifestEntries(initialManifests.get(0), + ids(baseSnapshotId, baseSnapshotId, baseSnapshotId), + files(FILE_A, FILE_B, FILE_C), + statuses(ADDED, ADDED, ADDED)); + validateDeleteManifest(initialManifests.get(1), + seqs(1, 1), + ids(baseSnapshotId, baseSnapshotId), + files(FILE_A_DELETES, FILE_B_DELETES), + statuses(ADDED, ADDED)); + + // Rewrite the files. + Snapshot pending = table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES), + ImmutableSet.of(FILE_D), ImmutableSet.of()) + .apply(); + + Assert.assertEquals("Should contain 3 manifest", 3, pending.allManifests().size()); + Assert.assertFalse("Should not contain manifest from initial write", + pending.allManifests().stream().anyMatch(initialManifests::contains)); + + long pendingId = pending.snapshotId(); + validateManifestEntries(pending.allManifests().get(0), + ids(pendingId), + files(FILE_D), + statuses(ADDED)); + + validateManifestEntries(pending.allManifests().get(1), + ids(pendingId, baseSnapshotId, baseSnapshotId), + files(FILE_A, FILE_B, FILE_C), + statuses(DELETED, EXISTING, EXISTING)); + + validateDeleteManifest(pending.allManifests().get(2), + seqs(2, 1), + ids(pendingId, baseSnapshotId), + files(FILE_A_DELETES, FILE_B_DELETES), + statuses(DELETED, EXISTING)); + + // We should only get the 3 manifests that this test is expected to add. + Assert.assertEquals("Only 5 manifests should exist", 5, listManifestFiles().size()); + } + @Test public void testFailure() { table.newAppend() @@ -195,6 +298,58 @@ public void testFailure() { Assert.assertEquals("Only 1 manifest should exist", 1, listManifestFiles().size()); } + @Test + public void testFailureWhenRewriteBothDataAndDeleteFiles() { + Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1); + + table.newRowDelta() + .addRows(FILE_A) + .addRows(FILE_B) + .addRows(FILE_C) + .addDeletes(FILE_A_DELETES) + .addDeletes(FILE_B_DELETES) + .commit(); + + long baseSnapshotId = readMetadata().currentSnapshot().snapshotId(); + table.ops().failCommits(5); + + RewriteFiles rewrite = table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES), + ImmutableSet.of(FILE_D), ImmutableSet.of()); + Snapshot pending = rewrite.apply(); + + Assert.assertEquals("Should produce 3 manifests", 3, pending.allManifests().size()); + ManifestFile manifest1 = pending.allManifests().get(0); + ManifestFile manifest2 = pending.allManifests().get(1); + ManifestFile manifest3 = pending.allManifests().get(2); + + validateManifestEntries(pending.allManifests().get(0), + ids(pending.snapshotId()), + files(FILE_D), + statuses(ADDED)); + + validateManifestEntries(pending.allManifests().get(1), + ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId), + files(FILE_A, FILE_B, FILE_C), + statuses(DELETED, EXISTING, EXISTING)); + + validateDeleteManifest(pending.allManifests().get(2), + seqs(2, 2), + ids(pending.snapshotId(), pending.snapshotId()), + files(FILE_A_DELETES, FILE_B_DELETES), + statuses(DELETED, DELETED)); + + AssertHelpers.assertThrows("Should retry 4 times and throw last failure", + CommitFailedException.class, "Injected failure", rewrite::commit); + + Assert.assertFalse("Should clean up new manifest", new File(manifest1.path()).exists()); + Assert.assertFalse("Should clean up new manifest", new File(manifest2.path()).exists()); + Assert.assertFalse("Should clean up new manifest", new File(manifest3.path()).exists()); + + // As commit failed all the manifests added with rewrite should be cleaned up + Assert.assertEquals("Only 2 manifest should exist", 2, listManifestFiles().size()); + } + @Test public void testRecovery() { table.newAppend() @@ -228,6 +383,163 @@ public void testRecovery() { Assert.assertEquals("Only 3 manifests should exist", 3, listManifestFiles().size()); } + @Test + public void testRecoverWhenRewriteBothDataAndDeleteFiles() { + Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1); + + table.newRowDelta() + .addRows(FILE_A) + .addRows(FILE_B) + .addRows(FILE_C) + .addDeletes(FILE_A_DELETES) + .addDeletes(FILE_B_DELETES) + .commit(); + + long baseSnapshotId = readMetadata().currentSnapshot().snapshotId(); + table.ops().failCommits(3); + + RewriteFiles rewrite = table.newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES), + ImmutableSet.of(FILE_D), ImmutableSet.of()); + Snapshot pending = rewrite.apply(); + + Assert.assertEquals("Should produce 3 manifests", 3, pending.allManifests().size()); + ManifestFile manifest1 = pending.allManifests().get(0); + ManifestFile manifest2 = pending.allManifests().get(1); + ManifestFile manifest3 = pending.allManifests().get(2); + + validateManifestEntries(manifest1, + ids(pending.snapshotId()), + files(FILE_D), + statuses(ADDED)); + + validateManifestEntries(manifest2, + ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId), + files(FILE_A, FILE_B, FILE_C), + statuses(DELETED, EXISTING, EXISTING)); + + validateDeleteManifest(manifest3, + seqs(2, 2), + ids(pending.snapshotId(), pending.snapshotId()), + files(FILE_A_DELETES, FILE_B_DELETES), + statuses(DELETED, DELETED)); + + rewrite.commit(); + + Assert.assertTrue("Should reuse new manifest", new File(manifest1.path()).exists()); + Assert.assertTrue("Should reuse new manifest", new File(manifest2.path()).exists()); + Assert.assertTrue("Should reuse new manifest", new File(manifest3.path()).exists()); + + TableMetadata metadata = readMetadata(); + List committedManifests = Lists.newArrayList(manifest1, manifest2, manifest3); + Assert.assertEquals("Should committed the manifests", + metadata.currentSnapshot().allManifests(), committedManifests); + + // As commit success all the manifests added with rewrite should be available. + Assert.assertEquals("Only 5 manifest should exist", 5, listManifestFiles().size()); + } + + @Test + public void testReplaceEqualityDeletesWithPositionDeletes() { + Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1); + + table.newRowDelta() + .addRows(FILE_A2) + .addDeletes(FILE_A2_DELETES) + .commit(); + + TableMetadata metadata = readMetadata(); + long baseSnapshotId = metadata.currentSnapshot().snapshotId(); + + // Apply and commit the rewrite transaction. + RewriteFiles rewrite = table.newRewrite().rewriteFiles( + ImmutableSet.of(), ImmutableSet.of(FILE_A2_DELETES), + ImmutableSet.of(), ImmutableSet.of(FILE_B_DELETES) + ); + Snapshot pending = rewrite.apply(); + + Assert.assertEquals("Should produce 3 manifests", 3, pending.allManifests().size()); + ManifestFile manifest1 = pending.allManifests().get(0); + ManifestFile manifest2 = pending.allManifests().get(1); + ManifestFile manifest3 = pending.allManifests().get(2); + + validateManifestEntries(manifest1, + ids(baseSnapshotId), + files(FILE_A2), + statuses(ADDED)); + + validateDeleteManifest(manifest2, + seqs(2), + ids(pending.snapshotId()), + files(FILE_B_DELETES), + statuses(ADDED)); + + validateDeleteManifest(manifest3, + seqs(2), + ids(pending.snapshotId()), + files(FILE_A2_DELETES), + statuses(DELETED)); + + rewrite.commit(); + + Assert.assertTrue("Should reuse new manifest", new File(manifest1.path()).exists()); + Assert.assertTrue("Should reuse new manifest", new File(manifest2.path()).exists()); + Assert.assertTrue("Should reuse new manifest", new File(manifest3.path()).exists()); + + metadata = readMetadata(); + List committedManifests = Lists.newArrayList(manifest1, manifest2, manifest3); + Assert.assertEquals("Should committed the manifests", + metadata.currentSnapshot().allManifests(), committedManifests); + + // As commit success all the manifests added with rewrite should be available. + Assert.assertEquals("4 manifests should exist", 4, listManifestFiles().size()); + } + + @Test + public void testRemoveAllDeletes() { + Assume.assumeTrue("Rewriting delete files is only supported in iceberg format v2. ", formatVersion > 1); + + table.newRowDelta() + .addRows(FILE_A) + .addDeletes(FILE_A_DELETES) + .commit(); + + // Apply and commit the rewrite transaction. + RewriteFiles rewrite = table.newRewrite().rewriteFiles( + ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES), + ImmutableSet.of(), ImmutableSet.of() + ); + Snapshot pending = rewrite.apply(); + + Assert.assertEquals("Should produce 2 manifests", 2, pending.allManifests().size()); + ManifestFile manifest1 = pending.allManifests().get(0); + ManifestFile manifest2 = pending.allManifests().get(1); + + validateManifestEntries(manifest1, + ids(pending.snapshotId()), + files(FILE_A), + statuses(DELETED)); + + validateDeleteManifest(manifest2, + seqs(2), + ids(pending.snapshotId()), + files(FILE_A_DELETES), + statuses(DELETED)); + + rewrite.commit(); + + Assert.assertTrue("Should reuse the new manifest", new File(manifest1.path()).exists()); + Assert.assertTrue("Should reuse the new manifest", new File(manifest2.path()).exists()); + + TableMetadata metadata = readMetadata(); + List committedManifests = Lists.newArrayList(manifest1, manifest2); + Assert.assertTrue("Should committed the manifests", + metadata.currentSnapshot().allManifests().containsAll(committedManifests)); + + // As commit success all the manifests added with rewrite should be available. + Assert.assertEquals("4 manifests should exist", 4, listManifestFiles().size()); + } + @Test public void testDeleteNonExistentFile() { Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());