diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java b/core/src/main/java/org/apache/iceberg/MergeAppend.java index 1781e95e9db6..3ef553ba7832 100644 --- a/core/src/main/java/org/apache/iceberg/MergeAppend.java +++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java @@ -48,6 +48,12 @@ public MergeAppend appendFile(DataFile file) { return this; } + @Override + public MergeAppend toBranch(String branch) { + targetBranch(branch); + return this; + } + @Override public AppendFiles appendManifest(ManifestFile manifest) { Preconditions.checkArgument( diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java b/core/src/main/java/org/apache/iceberg/StreamingDelete.java index 493c4e44c8ee..8ff7bb831ec9 100644 --- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java +++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java @@ -59,4 +59,10 @@ public StreamingDelete deleteFromRowFilter(Expression expr) { deleteByRowFilter(expr); return this; } + + @Override + public StreamingDelete toBranch(String branch) { + targetBranch(branch); + return this; + } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index ffe909e9bd86..65461b465e9c 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -349,6 +349,45 @@ void validateSnapshot(Snapshot old, Snapshot snap, long sequenceNumber, DataFile validateSnapshot(old, snap, (Long) sequenceNumber, newFiles); } + @SuppressWarnings("checkstyle:HiddenField") + Snapshot commit(Table table, SnapshotUpdate snapshotUpdate, String branch) { + Snapshot snapshot; + if (branch.equals(SnapshotRef.MAIN_BRANCH)) { + snapshotUpdate.commit(); + snapshot = table.currentSnapshot(); + } else { + ((SnapshotProducer) snapshotUpdate.toBranch(branch)).commit(); + snapshot = table.snapshot(branch); + } + + return snapshot; + } + + Snapshot apply(SnapshotUpdate snapshotUpdate, String branch) { + if (branch.equals(SnapshotRef.MAIN_BRANCH)) { + return ((SnapshotProducer) snapshotUpdate).apply(); + } else { + return ((SnapshotProducer) snapshotUpdate.toBranch(branch)).apply(); + } + } + + @SuppressWarnings("checkstyle:HiddenField") + Snapshot latestSnapshot(Table table, String branch) { + if (branch.equals(SnapshotRef.MAIN_BRANCH)) { + return table.currentSnapshot(); + } + + return table.snapshot(branch); + } + + Snapshot latestSnapshot(TableMetadata metadata, String branch) { + if (branch.equals(SnapshotRef.MAIN_BRANCH)) { + return metadata.currentSnapshot(); + } + + return metadata.snapshot(metadata.ref(branch).snapshotId()); + } + void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile... newFiles) { Assert.assertEquals( "Should not change delete manifests", diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index 58d4352626a5..1ee3b663bcb8 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -25,6 +25,7 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -68,38 +69,44 @@ public class TestDeleteFiles extends TableTestBase { )) .build(); - @Parameterized.Parameters(name = "formatVersion = {0}") + private final String branch; + + @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}") public static Object[] parameters() { - return new Object[] {1, 2}; + return new Object[][] { + new Object[] {1, "main"}, + new Object[] {1, "testBranch"}, + new Object[] {2, "main"}, + new Object[] {2, "testBranch"} + }; } - public TestDeleteFiles(int formatVersion) { + public TestDeleteFiles(int formatVersion, String branch) { super(formatVersion); + this.branch = branch; } @Test public void testMultipleDeletes() { - table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit(); - + commit( + table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C), branch); + Snapshot append = latestSnapshot(readMetadata(), branch); Assert.assertEquals("Metadata should be at version 1", 1L, (long) version()); - Snapshot append = readMetadata().currentSnapshot(); validateSnapshot(null, append, FILE_A, FILE_B, FILE_C); - table.newDelete().deleteFile(FILE_A).commit(); + commit(table, table.newDelete().deleteFile(FILE_A), branch); + Snapshot delete1 = latestSnapshot(readMetadata(), branch); Assert.assertEquals("Metadata should be at version 2", 2L, (long) version()); - Snapshot delete = readMetadata().currentSnapshot(); - Assert.assertEquals("Should have 1 manifest", 1, delete.allManifests(FILE_IO).size()); + Assert.assertEquals("Should have 1 manifest", 1, delete1.allManifests(FILE_IO).size()); validateManifestEntries( - delete.allManifests(table.io()).get(0), - ids(delete.snapshotId(), append.snapshotId(), append.snapshotId()), + delete1.allManifests(table.io()).get(0), + ids(delete1.snapshotId(), append.snapshotId(), append.snapshotId()), files(FILE_A, FILE_B, FILE_C), statuses(Status.DELETED, Status.EXISTING, Status.EXISTING)); - table.newDelete().deleteFile(FILE_B).commit(); - + Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B), branch); Assert.assertEquals("Metadata should be at version 3", 3L, (long) version()); - Snapshot delete2 = readMetadata().currentSnapshot(); Assert.assertEquals("Should have 1 manifest", 1, delete2.allManifests(FILE_IO).size()); validateManifestEntries( delete2.allManifests(FILE_IO).get(0), @@ -147,9 +154,12 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() { .build(); // add both data files - table.newFastAppend().appendFile(firstDataFile).appendFile(secondDataFile).commit(); + Snapshot initialSnapshot = + commit( + table, + table.newFastAppend().appendFile(firstDataFile).appendFile(secondDataFile), + branch); - Snapshot initialSnapshot = table.currentSnapshot(); Assert.assertEquals("Should have 1 manifest", 1, initialSnapshot.allManifests(FILE_IO).size()); validateManifestEntries( initialSnapshot.allManifests(FILE_IO).get(0), @@ -158,9 +168,7 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() { statuses(Status.ADDED, Status.ADDED)); // delete the first data file - table.newDelete().deleteFile(firstDataFile).commit(); - - Snapshot deleteSnapshot = table.currentSnapshot(); + Snapshot deleteSnapshot = commit(table, table.newDelete().deleteFile(firstDataFile), branch); Assert.assertEquals("Should have 1 manifest", 1, deleteSnapshot.allManifests(FILE_IO).size()); validateManifestEntries( deleteSnapshot.allManifests(FILE_IO).get(0), @@ -170,9 +178,9 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() { // delete the second data file using a row filter // the commit should succeed as there is only one live data file - table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 7)).commit(); + Snapshot finalSnapshot = + commit(table, table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 7)), branch); - Snapshot finalSnapshot = table.currentSnapshot(); Assert.assertEquals("Should have 1 manifest", 1, finalSnapshot.allManifests(FILE_IO).size()); validateManifestEntries( finalSnapshot.allManifests(FILE_IO).get(0), @@ -184,13 +192,15 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() { @Test public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() { // add both data files - table - .newFastAppend() - .appendFile(DATA_FILE_BUCKET_0_IDS_0_2) - .appendFile(DATA_FILE_BUCKET_0_IDS_8_10) - .commit(); + Snapshot initialSnapshot = + commit( + table, + table + .newFastAppend() + .appendFile(DATA_FILE_BUCKET_0_IDS_0_2) + .appendFile(DATA_FILE_BUCKET_0_IDS_8_10), + branch); - Snapshot initialSnapshot = table.currentSnapshot(); Assert.assertEquals("Should have 1 manifest", 1, initialSnapshot.allManifests(FILE_IO).size()); validateManifestEntries( initialSnapshot.allManifests(FILE_IO).get(0), @@ -199,9 +209,10 @@ public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() { statuses(Status.ADDED, Status.ADDED)); // delete the second one using a metrics filter (no partition filter) - table.newDelete().deleteFromRowFilter(Expressions.greaterThan("id", 5)).commit(); + Snapshot deleteSnapshot = + commit( + table, table.newDelete().deleteFromRowFilter(Expressions.greaterThan("id", 5)), branch); - Snapshot deleteSnapshot = table.currentSnapshot(); Assert.assertEquals("Should have 1 manifest", 1, deleteSnapshot.allManifests(FILE_IO).size()); validateManifestEntries( deleteSnapshot.allManifests(FILE_IO).get(0), @@ -213,13 +224,15 @@ public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() { @Test public void testDeleteSomeFilesByRowFilterWithCombinedPredicates() { // add both data files - table - .newFastAppend() - .appendFile(DATA_FILE_BUCKET_0_IDS_0_2) - .appendFile(DATA_FILE_BUCKET_0_IDS_8_10) - .commit(); + Snapshot initialSnapshot = + commit( + table, + table + .newFastAppend() + .appendFile(DATA_FILE_BUCKET_0_IDS_0_2) + .appendFile(DATA_FILE_BUCKET_0_IDS_8_10), + branch); - Snapshot initialSnapshot = table.currentSnapshot(); Assert.assertEquals("Should have 1 manifest", 1, initialSnapshot.allManifests(FILE_IO).size()); validateManifestEntries( initialSnapshot.allManifests(FILE_IO).get(0), @@ -231,9 +244,8 @@ public void testDeleteSomeFilesByRowFilterWithCombinedPredicates() { Expression partPredicate = Expressions.equal(Expressions.bucket("data", 16), 0); Expression rowPredicate = Expressions.greaterThan("id", 5); Expression predicate = Expressions.and(partPredicate, rowPredicate); - table.newDelete().deleteFromRowFilter(predicate).commit(); - - Snapshot deleteSnapshot = table.currentSnapshot(); + Snapshot deleteSnapshot = + commit(table, table.newDelete().deleteFromRowFilter(predicate), branch); Assert.assertEquals("Should have 1 manifest", 1, deleteSnapshot.allManifests(FILE_IO).size()); validateManifestEntries( deleteSnapshot.allManifests(FILE_IO).get(0), @@ -262,18 +274,22 @@ public void testCannotDeleteFileWhereNotAllRowsMatchPartitionFilter() { .withPartitionPath("data_trunc_2=aa") .build(); - table.newFastAppend().appendFile(dataFile).commit(); + commit(table, table.newFastAppend().appendFile(dataFile), branch); AssertHelpers.assertThrows( "Should reject as not all rows match filter", ValidationException.class, "Cannot delete file where some, but not all, rows match filter", - () -> table.newDelete().deleteFromRowFilter(Expressions.equal("data", "aa")).commit()); + () -> + commit( + table, + table.newDelete().deleteFromRowFilter(Expressions.equal("data", "aa")), + branch)); } @Test public void testDeleteCaseSensitivity() { - table.newFastAppend().appendFile(DATA_FILE_BUCKET_0_IDS_0_2).commit(); + commit(table, table.newFastAppend().appendFile(DATA_FILE_BUCKET_0_IDS_0_2), branch); Expression rowFilter = Expressions.lessThan("iD", 5); @@ -281,17 +297,22 @@ public void testDeleteCaseSensitivity() { "Should use case sensitive binding by default", ValidationException.class, "Cannot find field 'iD'", - () -> table.newDelete().deleteFromRowFilter(rowFilter).commit()); + () -> commit(table, table.newDelete().deleteFromRowFilter(rowFilter), branch)); AssertHelpers.assertThrows( "Should fail with case sensitive binding", ValidationException.class, "Cannot find field 'iD'", - () -> table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(true).commit()); + () -> + commit( + table, + table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(true), + branch)); - table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(false).commit(); + Snapshot deleteSnapshot = + commit( + table, table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(false), branch); - Snapshot deleteSnapshot = table.currentSnapshot(); Assert.assertEquals("Should have 1 manifest", 1, deleteSnapshot.allManifests(FILE_IO).size()); validateManifestEntries( deleteSnapshot.allManifests(FILE_IO).get(0), @@ -300,6 +321,34 @@ public void testDeleteCaseSensitivity() { statuses(Status.DELETED)); } + @Test + public void testDeleteFilesOnIndependentBranches() { + String testBranch = "testBranch"; + table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit(); + Snapshot initialSnapshot = table.currentSnapshot(); + // Delete A on test branch + table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit(); + Snapshot testBranchTip = table.snapshot(testBranch); + + // Delete B and C on main + table.newDelete().deleteFile(FILE_B).deleteFile(FILE_C).commit(); + Snapshot delete2 = table.currentSnapshot(); + + // Verify B and C on testBranch + validateManifestEntries( + Iterables.getOnlyElement(testBranchTip.allManifests(FILE_IO)), + ids(testBranchTip.snapshotId(), initialSnapshot.snapshotId(), initialSnapshot.snapshotId()), + files(FILE_A, FILE_B, FILE_C), + statuses(Status.DELETED, Status.EXISTING, Status.EXISTING)); + + // Verify A on main + validateManifestEntries( + Iterables.getOnlyElement(delete2.allManifests(FILE_IO)), + ids(initialSnapshot.snapshotId(), delete2.snapshotId(), delete2.snapshotId()), + files(FILE_A, FILE_B, FILE_C), + statuses(Status.EXISTING, Status.DELETED, Status.DELETED)); + } + private static ByteBuffer longToBuffer(long value) { return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value); } diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index ec71c856a349..8e9d4ab13677 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -39,13 +39,21 @@ @RunWith(Parameterized.class) public class TestMergeAppend extends TableTestBase { - @Parameterized.Parameters(name = "formatVersion = {0}") + private final String branch; + + @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}") public static Object[] parameters() { - return new Object[] {1, 2}; + return new Object[][] { + new Object[] {1, "main"}, + new Object[] {1, "testBranch"}, + new Object[] {2, "main"}, + new Object[] {2, "testBranch"} + }; } - public TestMergeAppend(int formatVersion) { + public TestMergeAppend(int formatVersion, String branch) { super(formatVersion); + this.branch = branch; } @Test @@ -56,10 +64,10 @@ public void testEmptyTableAppend() { Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber()); - table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + Snapshot committedSnapshot = + commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch); - Snapshot committedSnapshot = table.currentSnapshot(); - Assert.assertNotNull("Should create a snapshot", table.currentSnapshot()); + Assert.assertNotNull("Should create a snapshot", committedSnapshot); V1Assert.assertEquals( "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); V2Assert.assertEquals( @@ -89,10 +97,9 @@ public void testEmptyTableAppendManifest() throws IOException { Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber()); ManifestFile manifest = writeManifest(FILE_A, FILE_B); - table.newAppend().appendManifest(manifest).commit(); + Snapshot committedSnapshot = commit(table, table.newAppend().appendManifest(manifest), branch); - Snapshot committedSnapshot = table.currentSnapshot(); - Assert.assertNotNull("Should create a snapshot", table.currentSnapshot()); + Assert.assertNotNull("Should create a snapshot", committedSnapshot); V1Assert.assertEquals( "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); V2Assert.assertEquals( @@ -126,10 +133,13 @@ public void testEmptyTableAppendFilesAndManifest() throws IOException { Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber()); ManifestFile manifest = writeManifest(FILE_A, FILE_B); - table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest).commit(); + Snapshot committedSnapshot = + commit( + table, + table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest), + branch); - Snapshot committedSnapshot = table.currentSnapshot(); - Assert.assertNotNull("Should create a snapshot", table.currentSnapshot()); + Assert.assertNotNull("Should create a snapshot", committedSnapshot); V1Assert.assertEquals( "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); V2Assert.assertEquals( @@ -164,23 +174,27 @@ public void testAppendWithManifestScanExecutor() { Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber()); AtomicInteger scanThreadsIndex = new AtomicInteger(0); - table - .newAppend() - .appendFile(FILE_A) - .appendFile(FILE_B) - .scanManifestsWith( - Executors.newFixedThreadPool( - 1, - runnable -> { - Thread thread = new Thread(runnable); - thread.setName("scan-" + scanThreadsIndex.getAndIncrement()); - thread.setDaemon( - true); // daemon threads will be terminated abruptly when the JVM exits - return thread; - })) - .commit(); + Snapshot snapshot = + commit( + table, + table + .newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .scanManifestsWith( + Executors.newFixedThreadPool( + 1, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("scan-" + scanThreadsIndex.getAndIncrement()); + thread.setDaemon( + true); // daemon threads will be terminated abruptly when the JVM + // exits + return thread; + })), + branch); Assert.assertTrue("Thread should be created in provided pool", scanThreadsIndex.get() > 0); - Assert.assertNotNull("Should create a snapshot", table.currentSnapshot()); + Assert.assertNotNull("Should create a snapshot", snapshot); } @Test @@ -195,10 +209,13 @@ public void testMergeWithAppendFilesAndManifest() throws IOException { Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber()); ManifestFile manifest = writeManifest(FILE_A, FILE_B); - table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest).commit(); + Snapshot committedSnapshot = + commit( + table, + table.newAppend().appendFile(FILE_C).appendFile(FILE_D).appendManifest(manifest), + branch); - Snapshot committedSnapshot = table.currentSnapshot(); - Assert.assertNotNull("Should create a snapshot", table.currentSnapshot()); + Assert.assertNotNull("Should create a snapshot", committedSnapshot); V1Assert.assertEquals( "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); V2Assert.assertEquals( @@ -224,16 +241,16 @@ public void testMergeWithExistingManifest() { Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); - table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + Snapshot commitBefore = + commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch); - Assert.assertNotNull("Should create a snapshot", table.currentSnapshot()); + Assert.assertNotNull("Should create a snapshot", commitBefore); V1Assert.assertEquals( "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber()); TableMetadata base = readMetadata(); - Snapshot commitBefore = table.currentSnapshot(); long baseId = commitBefore.snapshotId(); validateSnapshot(null, commitBefore, 1, FILE_A, FILE_B); @@ -241,7 +258,7 @@ public void testMergeWithExistingManifest() { "Should create 1 manifest for initial write", 1, commitBefore.allManifests(table.io()).size()); - ManifestFile initialManifest = base.currentSnapshot().allManifests(table.io()).get(0); + ManifestFile initialManifest = commitBefore.allManifests(table.io()).get(0); validateManifest( initialManifest, seqs(1, 1), @@ -249,14 +266,13 @@ public void testMergeWithExistingManifest() { files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED)); - table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit(); + Snapshot committedAfter = + commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D), branch); V1Assert.assertEquals( "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 2", 2, table.ops().current().lastSequenceNumber()); - Snapshot committedAfter = table.currentSnapshot(); - Assert.assertEquals( "Should contain 1 merged manifest for second write", 1, @@ -294,14 +310,16 @@ public void testManifestMergeMinCount() throws IOException { ManifestFile manifest = writeManifest(FILE_A); ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C); ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D); - table - .newAppend() - .appendManifest(manifest) - .appendManifest(manifest2) - .appendManifest(manifest3) - .commit(); + Snapshot snap1 = + commit( + table, + table + .newAppend() + .appendManifest(manifest) + .appendManifest(manifest2) + .appendManifest(manifest3), + branch); - Snapshot snap1 = table.currentSnapshot(); long commitId1 = snap1.snapshotId(); base = readMetadata(); V2Assert.assertEquals("Snapshot sequence number should be 1", 1, snap1.sequenceNumber()); @@ -312,7 +330,7 @@ public void testManifestMergeMinCount() throws IOException { Assert.assertEquals( "Should contain 2 merged manifest for first write", 2, - readMetadata().currentSnapshot().allManifests(table.io()).size()); + snap1.allManifests(table.io()).size()); validateManifest( snap1.allManifests(table.io()).get(0), seqs(1), @@ -326,13 +344,15 @@ public void testManifestMergeMinCount() throws IOException { files(FILE_C, FILE_D), statuses(Status.ADDED, Status.ADDED)); - table - .newAppend() - .appendManifest(manifest) - .appendManifest(manifest2) - .appendManifest(manifest3) - .commit(); - Snapshot snap2 = table.currentSnapshot(); + Snapshot snap2 = + commit( + table, + table + .newAppend() + .appendManifest(manifest) + .appendManifest(manifest2) + .appendManifest(manifest3), + branch); long commitId2 = snap2.snapshotId(); base = readMetadata(); V2Assert.assertEquals("Snapshot sequence number should be 2", 2, snap2.sequenceNumber()); @@ -343,7 +363,7 @@ public void testManifestMergeMinCount() throws IOException { Assert.assertEquals( "Should contain 3 merged manifest for second write", 3, - readMetadata().currentSnapshot().allManifests(table.io()).size()); + snap2.allManifests(table.io()).size()); validateManifest( snap2.allManifests(table.io()).get(0), seqs(2), @@ -367,14 +387,13 @@ public void testManifestMergeMinCount() throws IOException { Assert.assertEquals( "Summary metadata should include 3 added files", "3", - readMetadata().currentSnapshot().summary().get("added-data-files")); + snap2.summary().get("added-data-files")); } @Test public void testManifestsMergeIntoOne() throws IOException { Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); - table.newAppend().appendFile(FILE_A).commit(); - Snapshot snap1 = table.currentSnapshot(); + Snapshot snap1 = commit(table, table.newAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); V2Assert.assertEquals("Snapshot sequence number should be 1", 1, snap1.sequenceNumber()); V2Assert.assertEquals("Last sequence number should be 1", 1, base.lastSequenceNumber()); @@ -390,8 +409,7 @@ public void testManifestsMergeIntoOne() throws IOException { files(FILE_A), statuses(Status.ADDED)); - table.newAppend().appendFile(FILE_B).commit(); - Snapshot snap2 = table.currentSnapshot(); + Snapshot snap2 = commit(table, table.newAppend().appendFile(FILE_B), branch); long commitId2 = snap2.snapshotId(); base = readMetadata(); V2Assert.assertEquals("Snapshot sequence number should be 2", 2, snap2.sequenceNumber()); @@ -413,12 +431,15 @@ public void testManifestsMergeIntoOne() throws IOException { files(FILE_A), statuses(Status.ADDED)); - table - .newAppend() - .appendManifest( - writeManifest("input-m0.avro", manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))) - .commit(); - Snapshot snap3 = table.currentSnapshot(); + Snapshot snap3 = + commit( + table, + table + .newAppend() + .appendManifest( + writeManifest( + "input-m0.avro", manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))), + branch); base = readMetadata(); V2Assert.assertEquals("Snapshot sequence number should be 3", 3, snap3.sequenceNumber()); @@ -449,12 +470,15 @@ public void testManifestsMergeIntoOne() throws IOException { table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1").commit(); - table - .newAppend() - .appendManifest( - writeManifest("input-m1.avro", manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))) - .commit(); - Snapshot snap4 = table.currentSnapshot(); + Snapshot snap4 = + commit( + table, + table + .newAppend() + .appendManifest( + writeManifest( + "input-m1.avro", manifestEntry(ManifestEntry.Status.ADDED, null, FILE_D))), + branch); base = readMetadata(); V2Assert.assertEquals("Snapshot sequence number should be 4", 4, snap4.sequenceNumber()); @@ -485,27 +509,28 @@ public void testManifestDoNotMergeMinCount() throws IOException { ManifestFile manifest = writeManifest(FILE_A, FILE_B); ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C); ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D); - table - .newAppend() - .appendManifest(manifest) - .appendManifest(manifest2) - .appendManifest(manifest3) - .commit(); - - Assert.assertNotNull("Should create a snapshot", table.currentSnapshot()); + Snapshot committed = + commit( + table, + table + .newAppend() + .appendManifest(manifest) + .appendManifest(manifest2) + .appendManifest(manifest3), + branch); + + Assert.assertNotNull("Should create a snapshot", committed); V1Assert.assertEquals( "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber()); - Snapshot committed = table.currentSnapshot(); - Assert.assertEquals( "Should contain 3 merged manifest after 1st write write", 3, committed.allManifests(table.io()).size()); - long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotId = committed.snapshotId(); validateManifest( committed.allManifests(table.io()).get(0), @@ -541,18 +566,15 @@ public void testMergeWithExistingManifestAfterDelete() { Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); - table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch); - Snapshot snap = table.currentSnapshot(); validateSnapshot(null, snap, 1, FILE_A, FILE_B); TableMetadata base = readMetadata(); - long baseId = base.currentSnapshot().snapshotId(); + long baseId = snap.snapshotId(); Assert.assertEquals( - "Should create 1 manifest for initial write", - 1, - base.currentSnapshot().allManifests(table.io()).size()); - ManifestFile initialManifest = base.currentSnapshot().allManifests(table.io()).get(0); + "Should create 1 manifest for initial write", 1, snap.allManifests(table.io()).size()); + ManifestFile initialManifest = snap.allManifests(table.io()).get(0); validateManifest( initialManifest, seqs(1, 1), @@ -560,9 +582,8 @@ public void testMergeWithExistingManifestAfterDelete() { files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED)); - table.newDelete().deleteFile(FILE_A).commit(); + Snapshot deleteSnapshot = commit(table, table.newDelete().deleteFile(FILE_A), branch); - Snapshot deleteSnapshot = table.currentSnapshot(); V2Assert.assertEquals( "Snapshot sequence number should be 2", 2, deleteSnapshot.sequenceNumber()); V2Assert.assertEquals( @@ -571,12 +592,12 @@ public void testMergeWithExistingManifestAfterDelete() { "Table should end with last-sequence-number 0", 0, readMetadata().lastSequenceNumber()); TableMetadata delete = readMetadata(); - long deleteId = delete.currentSnapshot().snapshotId(); + long deleteId = latestSnapshot(table, branch).snapshotId(); Assert.assertEquals( "Should create 1 filtered manifest for delete", 1, - delete.currentSnapshot().allManifests(table.io()).size()); - ManifestFile deleteManifest = delete.currentSnapshot().allManifests(table.io()).get(0); + latestSnapshot(table, branch).allManifests(table.io()).size()); + ManifestFile deleteManifest = deleteSnapshot.allManifests(table.io()).get(0); validateManifest( deleteManifest, @@ -585,9 +606,9 @@ public void testMergeWithExistingManifestAfterDelete() { files(FILE_A, FILE_B), statuses(Status.DELETED, Status.EXISTING)); - table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit(); + Snapshot committedSnapshot = + commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D), branch); - Snapshot committedSnapshot = table.currentSnapshot(); V2Assert.assertEquals( "Snapshot sequence number should be 3", 3, committedSnapshot.sequenceNumber()); V2Assert.assertEquals( @@ -621,23 +642,18 @@ public void testMinMergeCount() { Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); - table.newFastAppend().appendFile(FILE_A).commit(); - Snapshot snap1 = table.currentSnapshot(); + Snapshot snap1 = commit(table, table.newFastAppend().appendFile(FILE_A), branch); long idFileA = snap1.snapshotId(); validateSnapshot(null, snap1, 1, FILE_A); - table.newFastAppend().appendFile(FILE_B).commit(); - Snapshot snap2 = table.currentSnapshot(); + Snapshot snap2 = commit(table, table.newFastAppend().appendFile(FILE_B), branch); long idFileB = snap2.snapshotId(); validateSnapshot(snap1, snap2, 2, FILE_B); Assert.assertEquals( - "Should have 2 manifests from setup writes", - 2, - readMetadata().currentSnapshot().allManifests(table.io()).size()); + "Should have 2 manifests from setup writes", 2, snap2.allManifests(table.io()).size()); - table.newAppend().appendFile(FILE_C).commit(); - Snapshot snap3 = table.currentSnapshot(); + Snapshot snap3 = commit(table, table.newAppend().appendFile(FILE_C), branch); long idFileC = snap3.snapshotId(); validateSnapshot(snap2, snap3, 3, FILE_C); @@ -645,11 +661,11 @@ public void testMinMergeCount() { Assert.assertEquals( "Should have 3 unmerged manifests", 3, - base.currentSnapshot().allManifests(table.io()).size()); - Set unmerged = Sets.newHashSet(base.currentSnapshot().allManifests(table.io())); + latestSnapshot(table, branch).allManifests(table.io()).size()); + Set unmerged = + Sets.newHashSet(latestSnapshot(table, branch).allManifests(table.io())); - table.newAppend().appendFile(FILE_D).commit(); - Snapshot committed = table.currentSnapshot(); + Snapshot committed = commit(table, table.newAppend().appendFile(FILE_D), branch); V2Assert.assertEquals("Snapshot sequence number should be 4", 4, committed.sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 4", 4, readMetadata().lastSequenceNumber()); @@ -681,18 +697,15 @@ public void testMergeSizeTargetWithExistingManifest() { Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); - table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch); - Snapshot snap = table.currentSnapshot(); validateSnapshot(null, snap, 1, FILE_A, FILE_B); TableMetadata base = readMetadata(); - long baseId = base.currentSnapshot().snapshotId(); + long baseId = snap.snapshotId(); Assert.assertEquals( - "Should create 1 manifest for initial write", - 1, - base.currentSnapshot().allManifests(table.io()).size()); - ManifestFile initialManifest = base.currentSnapshot().allManifests(table.io()).get(0); + "Should create 1 manifest for initial write", 1, snap.allManifests(table.io()).size()); + ManifestFile initialManifest = snap.allManifests(table.io()).get(0); validateManifest( initialManifest, seqs(1, 1), @@ -700,8 +713,8 @@ public void testMergeSizeTargetWithExistingManifest() { files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED)); - table.newAppend().appendFile(FILE_C).appendFile(FILE_D).commit(); - Snapshot committed = table.currentSnapshot(); + Snapshot committed = + commit(table, table.newAppend().appendFile(FILE_C).appendFile(FILE_D), branch); V2Assert.assertEquals("Snapshot sequence number should be 2", 2, committed.sequenceNumber()); V2Assert.assertEquals( @@ -735,18 +748,15 @@ public void testMergeSizeTargetWithExistingManifest() { @Test public void testChangedPartitionSpec() { - table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B), branch); - Snapshot snap = table.currentSnapshot(); long commitId = snap.snapshotId(); validateSnapshot(null, snap, 1, FILE_A, FILE_B); TableMetadata base = readMetadata(); Assert.assertEquals( - "Should create 1 manifest for initial write", - 1, - base.currentSnapshot().allManifests(table.io()).size()); - ManifestFile initialManifest = base.currentSnapshot().allManifests(table.io()).get(0); + "Should create 1 manifest for initial write", 1, snap.allManifests(table.io()).size()); + ManifestFile initialManifest = snap.allManifests(table.io()).get(0); validateManifest( initialManifest, seqs(1, 1), @@ -760,7 +770,7 @@ public void testChangedPartitionSpec() { // commit the new partition spec to the table manually table.ops().commit(base, base.updatePartitionSpec(newSpec)); - Snapshot snap2 = table.currentSnapshot(); + Snapshot snap2 = latestSnapshot(table, branch); V2Assert.assertEquals("Snapshot sequence number should be 1", 1, snap2.sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); @@ -775,9 +785,8 @@ public void testChangedPartitionSpec() { .withRecordCount(1) .build(); - table.newAppend().appendFile(newFileY).commit(); + Snapshot lastSnapshot = commit(table, table.newAppend().appendFile(newFileY), branch); - Snapshot lastSnapshot = table.currentSnapshot(); V2Assert.assertEquals("Snapshot sequence number should be 2", 2, lastSnapshot.sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 2", 2, readMetadata().lastSequenceNumber()); @@ -803,23 +812,20 @@ public void testChangedPartitionSpec() { @Test public void testChangedPartitionSpecMergeExisting() { - table.newAppend().appendFile(FILE_A).commit(); + Snapshot snap1 = commit(table, table.newAppend().appendFile(FILE_A), branch); - Snapshot snap1 = table.currentSnapshot(); long id1 = snap1.snapshotId(); validateSnapshot(null, snap1, 1, FILE_A); // create a second compatible manifest - table.newFastAppend().appendFile(FILE_B).commit(); + Snapshot snap2 = commit(table, table.newFastAppend().appendFile(FILE_B), branch); - Snapshot snap2 = table.currentSnapshot(); long id2 = snap2.snapshotId(); validateSnapshot(snap1, snap2, 2, FILE_B); TableMetadata base = readMetadata(); - Assert.assertEquals( - "Should contain 2 manifests", 2, base.currentSnapshot().allManifests(table.io()).size()); - ManifestFile manifest = base.currentSnapshot().allManifests(table.io()).get(0); + Assert.assertEquals("Should contain 2 manifests", 2, snap2.allManifests(table.io()).size()); + ManifestFile manifest = snap2.allManifests(table.io()).get(0); // build the new spec using the table's schema, which uses fresh IDs PartitionSpec newSpec = @@ -827,7 +833,7 @@ public void testChangedPartitionSpecMergeExisting() { // commit the new partition spec to the table manually table.ops().commit(base, base.updatePartitionSpec(newSpec)); - Snapshot snap3 = table.currentSnapshot(); + Snapshot snap3 = latestSnapshot(table, branch); V2Assert.assertEquals("Snapshot sequence number should be 2", 2, snap3.sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 2", 2, readMetadata().lastSequenceNumber()); @@ -842,8 +848,7 @@ public void testChangedPartitionSpecMergeExisting() { .withRecordCount(1) .build(); - table.newAppend().appendFile(newFileY).commit(); - Snapshot lastSnapshot = table.currentSnapshot(); + Snapshot lastSnapshot = commit(table, table.newAppend().appendFile(newFileY), branch); V2Assert.assertEquals("Snapshot sequence number should be 3", 3, lastSnapshot.sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 3", 3, readMetadata().lastSequenceNumber()); @@ -876,21 +881,21 @@ public void testFailure() { table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit(); Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); - table.newAppend().appendFile(FILE_A).commit(); + Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); - long baseId = base.currentSnapshot().snapshotId(); + long baseId = snap.snapshotId(); V2Assert.assertEquals("Last sequence number should be 1", 1, base.lastSequenceNumber()); V1Assert.assertEquals( "Table should end with last-sequence-number 0", 0, base.lastSequenceNumber()); - ManifestFile initialManifest = base.currentSnapshot().allManifests(table.io()).get(0); + ManifestFile initialManifest = snap.allManifests(table.io()).get(0); validateManifest(initialManifest, seqs(1), ids(baseId), files(FILE_A), statuses(Status.ADDED)); table.ops().failCommits(5); AppendFiles append = table.newAppend().appendFile(FILE_B); - Snapshot pending = append.apply(); + Snapshot pending = apply(append, branch); Assert.assertEquals("Should merge to 1 manifest", 1, pending.allManifests(table.io()).size()); ManifestFile newManifest = pending.allManifests(table.io()).get(0); @@ -905,7 +910,7 @@ public void testFailure() { "Should retry 4 times and throw last failure", CommitFailedException.class, "Injected failure", - append::commit); + () -> commit(table, append, branch)); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); @@ -914,10 +919,10 @@ public void testFailure() { Assert.assertEquals( "Should only contain 1 manifest file", 1, - table.currentSnapshot().allManifests(table.io()).size()); + latestSnapshot(table, branch).allManifests(table.io()).size()); validateManifest( - table.currentSnapshot().allManifests(table.io()).get(0), + latestSnapshot(table, branch).allManifests(table.io()).get(0), seqs(1), ids(baseId), files(initialManifest), @@ -934,7 +939,7 @@ public void testAppendManifestCleanup() throws IOException { ManifestFile manifest = writeManifest(FILE_A, FILE_B); AppendFiles append = table.newAppend().appendManifest(manifest); - Snapshot pending = append.apply(); + Snapshot pending = apply(append, branch); ManifestFile newManifest = pending.allManifests(table.io()).get(0); Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists()); @@ -942,7 +947,7 @@ public void testAppendManifestCleanup() throws IOException { "Should retry 4 times and throw last failure", CommitFailedException.class, "Injected failure", - append::commit); + () -> commit(table, append, branch)); V2Assert.assertEquals( "Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); V1Assert.assertEquals( @@ -958,21 +963,21 @@ public void testRecovery() { Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); - table.newAppend().appendFile(FILE_A).commit(); + Snapshot current = commit(table, table.newAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); - long baseId = base.currentSnapshot().snapshotId(); + long baseId = current.snapshotId(); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); V1Assert.assertEquals( "Table should end with last-sequence-number 0", 0, readMetadata().lastSequenceNumber()); - ManifestFile initialManifest = base.currentSnapshot().allManifests(table.io()).get(0); + ManifestFile initialManifest = current.allManifests(table.io()).get(0); validateManifest(initialManifest, seqs(1), ids(baseId), files(FILE_A), statuses(Status.ADDED)); table.ops().failCommits(3); AppendFiles append = table.newAppend().appendFile(FILE_B); - Snapshot pending = append.apply(); + Snapshot pending = apply(append, branch); Assert.assertEquals("Should merge to 1 manifest", 1, pending.allManifests(table.io()).size()); ManifestFile newManifest = pending.allManifests(table.io()).get(0); @@ -984,17 +989,15 @@ public void testRecovery() { concat(files(FILE_B), files(initialManifest))); V2Assert.assertEquals( - "Snapshot sequence number should be 1", 1, table.currentSnapshot().sequenceNumber()); + "Snapshot sequence number should be 1", 1, latestSnapshot(table, branch).sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); V1Assert.assertEquals( "Table should end with last-sequence-number 0", 0, readMetadata().lastSequenceNumber()); - append.commit(); - Snapshot snapshot = table.currentSnapshot(); + Snapshot snapshot = commit(table, append, branch); long snapshotId = snapshot.snapshotId(); - V2Assert.assertEquals( - "Snapshot sequence number should be 2", 2, table.currentSnapshot().sequenceNumber()); + V2Assert.assertEquals("Snapshot sequence number should be 2", 2, snapshot.sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 2", 2, readMetadata().lastSequenceNumber()); V1Assert.assertEquals( @@ -1005,12 +1008,10 @@ public void testRecovery() { Assert.assertEquals( "Should commit the same new manifest during retry", Lists.newArrayList(newManifest), - metadata.currentSnapshot().allManifests(table.io())); + snapshot.allManifests(table.io())); Assert.assertEquals( - "Should only contain 1 merged manifest file", - 1, - table.currentSnapshot().allManifests(table.io()).size()); + "Should only contain 1 merged manifest file", 1, snapshot.allManifests(table.io()).size()); ManifestFile manifestFile = snapshot.allManifests(table.io()).get(0); validateManifest( manifestFile, @@ -1031,13 +1032,12 @@ public void testAppendManifestWithSnapshotIdInheritance() throws IOException { Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); ManifestFile manifest = writeManifest(FILE_A, FILE_B); - table.newAppend().appendManifest(manifest).commit(); + Snapshot snapshot = commit(table, table.newAppend().appendManifest(manifest), branch); - Snapshot snapshot = table.currentSnapshot(); long snapshotId = snapshot.snapshotId(); validateSnapshot(null, snapshot, 1, FILE_A, FILE_B); - List manifests = table.currentSnapshot().allManifests(table.io()); + List manifests = snapshot.allManifests(table.io()); Assert.assertEquals("Should have 1 committed manifest", 1, manifests.size()); ManifestFile manifestFile = snapshot.allManifests(table.io()).get(0); validateManifest( @@ -1079,15 +1079,14 @@ public void testMergedAppendManifestCleanupWithSnapshotIdInheritance() throws IO table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1").commit(); ManifestFile manifest1 = writeManifestWithName("manifest-file-1.avro", FILE_A, FILE_B); - table.newAppend().appendManifest(manifest1).commit(); + Snapshot snap1 = commit(table, table.newAppend().appendManifest(manifest1), branch); - Snapshot snap1 = table.currentSnapshot(); long commitId1 = snap1.snapshotId(); validateSnapshot(null, snap1, 1, FILE_A, FILE_B); Assert.assertEquals("Should have only 1 manifest", 1, snap1.allManifests(table.io()).size()); validateManifest( - table.currentSnapshot().allManifests(table.io()).get(0), + snap1.allManifests(table.io()).get(0), seqs(1, 1), ids(commitId1, commitId1), files(FILE_A, FILE_B), @@ -1096,12 +1095,10 @@ public void testMergedAppendManifestCleanupWithSnapshotIdInheritance() throws IO "Unmerged append manifest should not be deleted", new File(manifest1.path()).exists()); ManifestFile manifest2 = writeManifestWithName("manifest-file-2.avro", FILE_C, FILE_D); - table.newAppend().appendManifest(manifest2).commit(); + Snapshot snap2 = commit(table, table.newAppend().appendManifest(manifest2), branch); - Snapshot snap2 = table.currentSnapshot(); long commitId2 = snap2.snapshotId(); - V2Assert.assertEquals( - "Snapshot sequence number should be 2", 2, table.currentSnapshot().sequenceNumber()); + V2Assert.assertEquals("Snapshot sequence number should be 2", 2, snap2.sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 2", 2, readMetadata().lastSequenceNumber()); V1Assert.assertEquals( @@ -1110,7 +1107,7 @@ public void testMergedAppendManifestCleanupWithSnapshotIdInheritance() throws IO Assert.assertEquals( "Manifests should be merged into 1", 1, snap2.allManifests(table.io()).size()); validateManifest( - table.currentSnapshot().allManifests(table.io()).get(0), + latestSnapshot(table, branch).allManifests(table.io()).get(0), seqs(2, 2, 1, 1), ids(commitId2, commitId2, commitId1, commitId1), files(FILE_C, FILE_D, FILE_A, FILE_B), @@ -1140,7 +1137,10 @@ public void testAppendManifestFailureWithSnapshotIdInheritance() throws IOExcept append.appendManifest(manifest); AssertHelpers.assertThrows( - "Should reject commit", CommitFailedException.class, "Injected failure", append::commit); + "Should reject commit", + CommitFailedException.class, + "Injected failure", + () -> commit(table, append, branch)); Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); Assert.assertTrue("Append manifest should not be deleted", new File(manifest.path()).exists()); @@ -1159,7 +1159,7 @@ public void testInvalidAppendManifest() throws IOException { "Should reject commit", IllegalArgumentException.class, "Cannot append manifest with existing files", - () -> table.newAppend().appendManifest(manifestWithExistingFiles).commit()); + () -> commit(table, table.newAppend().appendManifest(manifestWithExistingFiles), branch)); Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); ManifestFile manifestWithDeletedFiles = @@ -1168,7 +1168,7 @@ public void testInvalidAppendManifest() throws IOException { "Should reject commit", IllegalArgumentException.class, "Cannot append manifest with deleted files", - () -> table.newAppend().appendManifest(manifestWithDeletedFiles).commit()); + () -> commit(table, table.newAppend().appendManifest(manifestWithDeletedFiles), branch)); Assert.assertEquals("Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); } @@ -1217,18 +1217,15 @@ public void testUpdatePartitionSpecFieldIdsForV1Table() { @Test public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { - table.newAppend().appendFile(FILE_A).commit(); + Snapshot snap = commit(table, table.newAppend().appendFile(FILE_A), branch); - Snapshot snap = table.currentSnapshot(); long commitId = snap.snapshotId(); validateSnapshot(null, snap, 1, FILE_A); TableMetadata base = readMetadata(); Assert.assertEquals( - "Should create 1 manifest for initial write", - 1, - base.currentSnapshot().allManifests(table.io()).size()); - ManifestFile initialManifest = base.currentSnapshot().allManifests(table.io()).get(0); + "Should create 1 manifest for initial write", 1, snap.allManifests(table.io()).size()); + ManifestFile initialManifest = snap.allManifests(table.io()).get(0); validateManifest( initialManifest, seqs(1), ids(commitId), files(FILE_A), statuses(Status.ADDED)); @@ -1252,8 +1249,7 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { .withRecordCount(1) .build(); - table.newAppend().appendFile(newFile).commit(); - Snapshot committedSnapshot = table.currentSnapshot(); + Snapshot committedSnapshot = commit(table, table.newAppend().appendFile(newFile), branch); V2Assert.assertEquals( "Snapshot sequence number should be 2", 2, committedSnapshot.sequenceNumber()); diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index e8fb5e265813..53e5af520d91 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -1150,10 +1151,7 @@ public void testExpireWithDeleteFiles() { .add(firstSnapshot.manifestListLocation()) .add(secondSnapshot.manifestListLocation()) .add(thirdSnapshot.manifestListLocation()) - .addAll( - secondSnapshot.allManifests(FILE_IO).stream() - .map(ManifestFile::path) - .collect(Collectors.toList())) + .addAll(manifestPaths(secondSnapshot, table.io())) .addAll( manifestOfDeletedFiles.stream() .map(ManifestFile::path) @@ -1455,6 +1453,64 @@ public void testMaxSnapshotAgeMultipleBranches() { Assert.assertNull(table.ops().current().snapshot(initialSnapshotId)); } + @Test + public void testRetainFilesOnRetainedBranches() { + // Append a file to main and test branch + String testBranch = "test-branch"; + table.newAppend().appendFile(FILE_A).commit(); + Snapshot appendA = table.currentSnapshot(); + table.manageSnapshots().createBranch(testBranch, appendA.snapshotId()).commit(); + + // Delete A from main + table.newDelete().deleteFile(FILE_A).commit(); + Snapshot deletionA = table.currentSnapshot(); + // Add B to main + table.newAppend().appendFile(FILE_B).commit(); + long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Set deletedFiles = Sets.newHashSet(); + Set expectedDeletes = Sets.newHashSet(); + + // Only deletionA's manifest list and manifests should be removed + expectedDeletes.add(deletionA.manifestListLocation()); + expectedDeletes.addAll(manifestPaths(deletionA, table.io())); + table.expireSnapshots().expireOlderThan(tAfterCommits).deleteWith(deletedFiles::add).commit(); + + Assert.assertEquals(2, Iterables.size(table.snapshots())); + Assert.assertEquals(expectedDeletes, deletedFiles); + + // Delete A on test branch + table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit(); + Snapshot branchDelete = table.snapshot(testBranch); + + // Append C on test branch + table.newAppend().appendFile(FILE_C).toBranch(testBranch).commit(); + Snapshot testBranchHead = table.snapshot(testBranch); + + deletedFiles = Sets.newHashSet(); + expectedDeletes = Sets.newHashSet(); + + waitUntilAfter(testBranchHead.timestampMillis()); + table + .expireSnapshots() + .expireOlderThan(testBranchHead.timestampMillis()) + .deleteWith(deletedFiles::add) + .commit(); + + expectedDeletes.add(appendA.manifestListLocation()); + expectedDeletes.addAll(manifestPaths(appendA, table.io())); + expectedDeletes.add(branchDelete.manifestListLocation()); + expectedDeletes.addAll(manifestPaths(branchDelete, table.io())); + expectedDeletes.add(FILE_A.path().toString()); + + Assert.assertEquals(2, Iterables.size(table.snapshots())); + Assert.assertEquals(expectedDeletes, deletedFiles); + } + + private Set manifestPaths(Snapshot snapshot, FileIO io) { + return snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet()); + } + private RemoveSnapshots removeSnapshots(Table table) { RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots(); return (RemoveSnapshots) removeSnapshots.withIncrementalCleanup(incrementalCleanup);