diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index 55bbad7750bf..b829be7ccf19 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -111,7 +111,7 @@ public BaseOverwriteFiles toBranch(String branch) { } @Override - protected void validate(TableMetadata base, Snapshot snapshot) { + protected void validate(TableMetadata base, Snapshot parent) { if (validateAddedFilesMatchOverwriteFilter) { PartitionSpec spec = dataSpec(); Expression rowFilter = rowFilter(); @@ -139,19 +139,19 @@ protected void validate(TableMetadata base, Snapshot snapshot) { } if (validateNewDataFiles) { - validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), snapshot); + validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), parent); } if (validateNewDeletes) { if (rowFilter() != Expressions.alwaysFalse()) { Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter(); - validateNoNewDeleteFiles(base, startingSnapshotId, filter, snapshot); - validateDeletedDataFiles(base, startingSnapshotId, filter, snapshot); + validateNoNewDeleteFiles(base, startingSnapshotId, filter, parent); + validateDeletedDataFiles(base, startingSnapshotId, filter, parent); } if (deletedDataFiles.size() > 0) { validateNoNewDeletesForDataFiles( - base, startingSnapshotId, conflictDetectionFilter, deletedDataFiles, snapshot); + base, startingSnapshotId, conflictDetectionFilter, deletedDataFiles, parent); } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 327a276a87cf..d3a8edbc7cdd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -86,25 +86,25 @@ public BaseReplacePartitions toBranch(String branch) { } @Override - public void validate(TableMetadata currentMetadata, Snapshot snapshot) { + public void validate(TableMetadata currentMetadata, Snapshot parent) { if (validateConflictingData) { if (dataSpec().isUnpartitioned()) { validateAddedDataFiles( - currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot); + currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); } else { - validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot); + validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent); } } if (validateConflictingDeletes) { if (dataSpec().isUnpartitioned()) { validateDeletedDataFiles( - currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot); + currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); validateNoNewDeleteFiles( - currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), snapshot); + currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); } else { - validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot); - validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, snapshot); + validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent); + validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent); } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index 05a446a14cb5..54996e831117 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -116,11 +116,11 @@ public BaseRewriteFiles toBranch(String branch) { } @Override - protected void validate(TableMetadata base, Snapshot snapshot) { + protected void validate(TableMetadata base, Snapshot parent) { if (replacedDataFiles.size() > 0) { // if there are replaced data files, there cannot be any new row-level deletes for those data // files - validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, snapshot); + validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, parent); } } } diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index c7d36bdc0078..679a66c587fa 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -27,7 +27,9 @@ import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -394,4 +396,42 @@ public static Schema schemaFor(Table table, Long snapshotId, Long timestampMilli return table.schema(); } + + /** + * Fetch the snapshot at the head of the given branch in the given table. + * + *
This method calls {@link Table#currentSnapshot()} instead of using branch API {@link + * Table#snapshot(String)} for the main branch so that existing code still goes through the old + * code path to ensure backwards compatibility. + * + * @param table a {@link Table} + * @param branch branch name of the table + * @return the latest snapshot for the given branch + */ + public static Snapshot latestSnapshot(Table table, String branch) { + if (branch.equals(SnapshotRef.MAIN_BRANCH)) { + return table.currentSnapshot(); + } + + return table.snapshot(branch); + } + + /** + * Fetch the snapshot at the head of the given branch in the given table. + * + *
This method calls {@link TableMetadata#currentSnapshot()} instead of using branch API {@link + * TableMetadata#ref(String)}} for the main branch so that existing code still goes through the + * old code path to ensure backwards compatibility. + * + * @param metadata a {@link TableMetadata} + * @param branch branch name of the table metadata + * @return the latest snapshot for the given branch + */ + public static Snapshot latestSnapshot(TableMetadata metadata, String branch) { + if (branch.equals(SnapshotRef.MAIN_BRANCH)) { + return metadata.currentSnapshot(); + } + + return metadata.snapshot(metadata.ref(branch).snapshotId()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 0914f1d77370..c03fb8cbd777 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -384,23 +384,6 @@ Snapshot apply(SnapshotUpdate snapshotUpdate, String branch) { } } - @SuppressWarnings("checkstyle:HiddenField") - Snapshot latestSnapshot(Table table, String branch) { - if (branch.equals(SnapshotRef.MAIN_BRANCH)) { - return table.currentSnapshot(); - } - - return table.snapshot(branch); - } - - Snapshot latestSnapshot(TableMetadata metadata, String branch) { - if (branch.equals(SnapshotRef.MAIN_BRANCH)) { - return metadata.currentSnapshot(); - } - - return metadata.snapshot(metadata.ref(branch).snapshotId()); - } - void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile... newFiles) { Assert.assertEquals( "Should not change delete manifests", diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index 1ee3b663bcb8..ab10c5b8c516 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; + import java.nio.ByteBuffer; import java.nio.ByteOrder; import org.apache.iceberg.ManifestEntry.Status; diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index 212bab3c136b..46dd9ecbc59f 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import static org.apache.iceberg.relocated.com.google.common.collect.Iterators.concat; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; import java.io.File; import java.io.IOException; diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java b/core/src/test/java/org/apache/iceberg/TestOverwrite.java index 7df76961f3c3..4f6a184e9ec1 100644 --- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java +++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java @@ -23,6 +23,7 @@ import static org.apache.iceberg.expressions.Expressions.lessThan; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; import java.io.File; import java.io.IOException; @@ -182,16 +183,16 @@ public void testOverwriteFailsDelete() { @Test public void testOverwriteWithAppendOutsideOfDelete() { TableMetadata base = TestTables.readMetadata(TABLE_NAME); - long baseId = - latestSnapshot(base, branch) == null ? -1 : latestSnapshot(base, branch).snapshotId(); + Snapshot latestSnapshot = latestSnapshot(base, branch); + long baseId = latestSnapshot == null ? -1 : latestSnapshot.snapshotId(); commit( table, table .newOverwrite() .overwriteByRowFilter(equal("date", "2018-06-08")) - .addFile(FILE_10_TO_14), - branch); // in 2018-06-09, NOT in 2018-06-08 + .addFile(FILE_10_TO_14), // in 2018-06-09, NOT in 2018-06-08 + branch); long overwriteId = latestSnapshot(table, branch).snapshotId(); diff --git a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java index 0b7ed030252a..b7d194377ef6 100644 --- a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java +++ b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java @@ -25,6 +25,7 @@ import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; import java.io.File; import java.io.IOException; diff --git a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java index dc74099028cd..0fe760d96e16 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java +++ b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; + import java.io.File; import java.io.IOException; import org.apache.iceberg.ManifestEntry.Status; @@ -89,12 +91,12 @@ public TestReplacePartitions(int formatVersion, String branch) { @Test public void testReplaceOnePartition() { - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); - table.newReplacePartitions().addFile(FILE_E).toBranch(branch).commit(); + commit(table, table.newReplacePartitions().addFile(FILE_E), branch); long replaceId = latestSnapshot(readMetadata(), branch).snapshotId(); Assert.assertNotEquals("Should create a new snapshot", baseId, replaceId); @@ -122,12 +124,12 @@ public void testReplaceAndMergeOnePartition() { // ensure the overwrite results in a merge table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1").commit(); - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); - table.newReplacePartitions().addFile(FILE_E).toBranch(branch).commit(); + commit(table, table.newReplacePartitions().addFile(FILE_E), branch); long replaceId = latestSnapshot(table, branch).snapshotId(); Assert.assertNotEquals("Should create a new snapshot", baseId, replaceId); @@ -155,7 +157,7 @@ public void testReplaceWithUnpartitionedTable() throws IOException { Assert.assertEquals( "Table version should be 0", 0, (long) TestTables.metadataVersion("unpartitioned")); - unpartitioned.newAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, unpartitioned.newAppend().appendFile(FILE_A), branch); // make sure the data was successfully added Assert.assertEquals( "Table version should be 1", 1, (long) TestTables.metadataVersion("unpartitioned")); @@ -234,24 +236,19 @@ public void testReplaceAndMergeWithUnpartitionedTable() throws IOException { @Test public void testValidationFailure() { - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); ReplacePartitions replace = - table - .newReplacePartitions() - .addFile(FILE_F) - .addFile(FILE_G) - .toBranch(branch) - .validateAppendOnly(); + table.newReplacePartitions().addFile(FILE_F).addFile(FILE_G).validateAppendOnly(); AssertHelpers.assertThrows( "Should reject commit with file not matching delete expression", ValidationException.class, "Cannot commit file that conflicts with existing partition", - replace::commit); + () -> commit(table, replace, branch)); Assert.assertEquals( "Should not create a new snapshot", @@ -261,12 +258,12 @@ public void testValidationFailure() { @Test public void testValidationSuccess() { - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); - table.newReplacePartitions().addFile(FILE_G).validateAppendOnly().toBranch(branch).commit(); + commit(table, table.newReplacePartitions().addFile(FILE_G).validateAppendOnly(), branch); long replaceId = latestSnapshot(readMetadata(), branch).snapshotId(); Assert.assertNotEquals("Should create a new snapshot", baseId, replaceId); @@ -291,24 +288,26 @@ public void testValidationSuccess() { @Test public void testValidationNotInvoked() { - table.newFastAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); // Two concurrent ReplacePartitions with No Validation Enabled - table - .newReplacePartitions() - .addFile(FILE_E) - .validateFromSnapshot(latestSnapshot(base, branch).snapshotId()) - .toBranch(branch) - .commit(); - table - .newReplacePartitions() - .addFile(FILE_A) // Replaces FILE_E which becomes Deleted - .addFile(FILE_B) - .validateFromSnapshot(latestSnapshot(base, branch).snapshotId()) - .toBranch(branch) - .commit(); + commit( + table, + table + .newReplacePartitions() + .addFile(FILE_E) + .validateFromSnapshot(latestSnapshot(base, branch).snapshotId()), + branch); + commit( + table, + table + .newReplacePartitions() + .addFile(FILE_A) // Replaces FILE_E which becomes Deleted + .addFile(FILE_B) + .validateFromSnapshot(latestSnapshot(base, branch).snapshotId()), + branch); long replaceId = latestSnapshot(readMetadata(), branch).snapshotId(); Assert.assertEquals( @@ -329,7 +328,7 @@ public void testValidationNotInvoked() { @Test public void testValidateWithDefaultSnapshotId() { - table.newReplacePartitions().addFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newReplacePartitions().addFile(FILE_A), branch); // Concurrent Replace Partitions should fail with ValidationException ReplacePartitions replace = table.newReplacePartitions(); @@ -339,24 +338,25 @@ public void testValidateWithDefaultSnapshotId() { "Found conflicting files that can contain records matching partitions " + "[data_bucket=0, data_bucket=1]: [/path/to/data-a.parquet]", () -> - replace - .addFile(FILE_A) - .addFile(FILE_B) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .toBranch(branch) - .commit()); + commit( + table, + replace + .addFile(FILE_A) + .addFile(FILE_B) + .validateNoConflictingData() + .validateNoConflictingDeletes(), + branch)); } @Test public void testConcurrentReplaceConflict() { - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); // Concurrent Replace Partitions should fail with ValidationException - table.newReplacePartitions().addFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newReplacePartitions().addFile(FILE_A), branch); AssertHelpers.assertThrows( "Should reject commit with file matching partitions replaced", @@ -364,36 +364,38 @@ public void testConcurrentReplaceConflict() { "Found conflicting files that can contain records matching partitions " + "[data_bucket=0, data_bucket=1]: [/path/to/data-a.parquet]", () -> - table - .newReplacePartitions() - .validateFromSnapshot(baseId) - .addFile(FILE_A) - .addFile(FILE_B) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .toBranch(branch) - .commit()); + commit( + table, + table + .newReplacePartitions() + .validateFromSnapshot(baseId) + .addFile(FILE_A) + .addFile(FILE_B) + .validateNoConflictingData() + .validateNoConflictingDeletes(), + branch)); } @Test public void testConcurrentReplaceNoConflict() { - table.newFastAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); long id1 = latestSnapshot(base, branch).snapshotId(); // Concurrent Replace Partitions should not fail if concerning different partitions - table.newReplacePartitions().addFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newReplacePartitions().addFile(FILE_A), branch); long id2 = latestSnapshot(readMetadata(), branch).snapshotId(); - table - .newReplacePartitions() - .validateFromSnapshot(id1) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .toBranch(branch) - .addFile(FILE_B) - .commit(); + commit( + table, + table + .newReplacePartitions() + .validateFromSnapshot(id1) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_B), + branch); long id3 = latestSnapshot(readMetadata(), branch).snapshotId(); Assert.assertEquals( @@ -417,13 +419,13 @@ public void testConcurrentReplaceConflictNonPartitioned() { Table unpartitioned = TestTables.create( tableDir, "unpartitioned", SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - unpartitioned.newAppend().appendFile(FILE_UNPARTITIONED_A).toBranch(branch).commit(); + commit(table, unpartitioned.newAppend().appendFile(FILE_UNPARTITIONED_A), branch); TableMetadata replaceMetadata = TestTables.readMetadata("unpartitioned"); long replaceBaseId = latestSnapshot(replaceMetadata, branch).snapshotId(); // Concurrent ReplacePartitions should fail with ValidationException - unpartitioned.newReplacePartitions().addFile(FILE_UNPARTITIONED_A).toBranch(branch).commit(); + commit(table, unpartitioned.newReplacePartitions().addFile(FILE_UNPARTITIONED_A), branch); AssertHelpers.assertThrows( "Should reject commit with file matching partitions replaced", @@ -431,25 +433,26 @@ public void testConcurrentReplaceConflictNonPartitioned() { "Found conflicting files that can contain records matching true: " + "[/path/to/data-unpartitioned-a.parquet]", () -> - unpartitioned - .newReplacePartitions() - .validateFromSnapshot(replaceBaseId) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .addFile(FILE_UNPARTITIONED_A) - .toBranch(branch) - .commit()); + commit( + table, + unpartitioned + .newReplacePartitions() + .validateFromSnapshot(replaceBaseId) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_UNPARTITIONED_A), + branch)); } @Test public void testAppendReplaceConflict() { - table.newFastAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); // Concurrent Append and ReplacePartition should fail with ValidationException - table.newFastAppend().appendFile(FILE_B).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_B), branch); AssertHelpers.assertThrows( "Should reject commit with file matching partitions replaced", @@ -457,37 +460,39 @@ public void testAppendReplaceConflict() { "Found conflicting files that can contain records matching partitions " + "[data_bucket=0, data_bucket=1]: [/path/to/data-b.parquet]", () -> - table - .newReplacePartitions() - .validateFromSnapshot(baseId) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .addFile(FILE_A) - .addFile(FILE_B) - .toBranch(branch) - .commit()); + commit( + table, + table + .newReplacePartitions() + .validateFromSnapshot(baseId) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_A) + .addFile(FILE_B), + branch)); } @Test public void testAppendReplaceNoConflict() { - table.newFastAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); long id1 = latestSnapshot(base, branch).snapshotId(); // Concurrent Append and ReplacePartition should not conflict if concerning different partitions - table.newFastAppend().appendFile(FILE_B).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_B), branch); long id2 = latestSnapshot(readMetadata(), branch).snapshotId(); - table - .newReplacePartitions() - .validateFromSnapshot(id1) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .toBranch(branch) - .addFile(FILE_A) - .commit(); + commit( + table, + table + .newReplacePartitions() + .validateFromSnapshot(id1) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_A), + branch); long id3 = latestSnapshot(readMetadata(), branch).snapshotId(); Assert.assertEquals( @@ -516,13 +521,13 @@ public void testAppendReplaceConflictNonPartitioned() { Table unpartitioned = TestTables.create( tableDir, "unpartitioned", SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - unpartitioned.newAppend().appendFile(FILE_UNPARTITIONED_A).toBranch(branch).commit(); + commit(table, unpartitioned.newAppend().appendFile(FILE_UNPARTITIONED_A), branch); TableMetadata replaceMetadata = TestTables.readMetadata("unpartitioned"); long replaceBaseId = latestSnapshot(replaceMetadata, branch).snapshotId(); // Concurrent Append and ReplacePartitions should fail with ValidationException - unpartitioned.newAppend().appendFile(FILE_UNPARTITIONED_A).toBranch(branch).commit(); + commit(table, unpartitioned.newAppend().appendFile(FILE_UNPARTITIONED_A), branch); AssertHelpers.assertThrows( "Should reject commit with file matching partitions replaced", @@ -530,31 +535,28 @@ public void testAppendReplaceConflictNonPartitioned() { "Found conflicting files that can contain records matching true: " + "[/path/to/data-unpartitioned-a.parquet]", () -> - unpartitioned - .newReplacePartitions() - .validateFromSnapshot(replaceBaseId) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .addFile(FILE_UNPARTITIONED_A) - .toBranch(branch) - .commit()); + commit( + table, + unpartitioned + .newReplacePartitions() + .validateFromSnapshot(replaceBaseId) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_UNPARTITIONED_A), + branch)); } @Test public void testDeleteReplaceConflict() { Assume.assumeTrue(formatVersion == 2); - table.newFastAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); // Concurrent Delete and ReplacePartition should fail with ValidationException - table - .newRowDelta() - .addDeletes(FILE_A_DELETES) - .validateFromSnapshot(baseId) - .toBranch(branch) - .commit(); + commit( + table, table.newRowDelta().addDeletes(FILE_A_DELETES).validateFromSnapshot(baseId), branch); AssertHelpers.assertThrows( "Should reject commit with file matching partitions replaced", @@ -562,14 +564,15 @@ public void testDeleteReplaceConflict() { "Found new conflicting delete files that can apply to records matching " + "[data_bucket=0]: [/path/to/data-a-deletes.parquet]", () -> - table - .newReplacePartitions() - .validateFromSnapshot(baseId) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .toBranch(branch) - .addFile(FILE_A) - .commit()); + commit( + table, + table + .newReplacePartitions() + .validateFromSnapshot(baseId) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_A), + branch)); } @Test @@ -579,13 +582,13 @@ public void testDeleteReplaceConflictNonPartitioned() { Table unpartitioned = TestTables.create( tableDir, "unpartitioned", SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - unpartitioned.newAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, unpartitioned.newAppend().appendFile(FILE_A), branch); TableMetadata replaceMetadata = TestTables.readMetadata("unpartitioned"); long replaceBaseId = latestSnapshot(replaceMetadata, branch).snapshotId(); // Concurrent Delete and ReplacePartitions should fail with ValidationException - unpartitioned.newRowDelta().addDeletes(FILE_UNPARTITIONED_A_DELETES).toBranch(branch).commit(); + commit(table, unpartitioned.newRowDelta().addDeletes(FILE_UNPARTITIONED_A_DELETES), branch); AssertHelpers.assertThrows( "Should reject commit with file matching partitions replaced", @@ -593,42 +596,47 @@ public void testDeleteReplaceConflictNonPartitioned() { "Found new conflicting delete files that can apply to records matching true: " + "[/path/to/data-unpartitioned-a-deletes.parquet]", () -> - unpartitioned - .newReplacePartitions() - .validateFromSnapshot(replaceBaseId) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .addFile(FILE_UNPARTITIONED_A) - .toBranch(branch) - .commit()); + commit( + table, + unpartitioned + .newReplacePartitions() + .validateFromSnapshot(replaceBaseId) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_UNPARTITIONED_A), + branch)); } @Test public void testDeleteReplaceNoConflict() { Assume.assumeTrue(formatVersion == 2); - table.newFastAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A), branch); long id1 = latestSnapshot(readMetadata(), branch).snapshotId(); // Concurrent Delta and ReplacePartition should not conflict if concerning different partitions - table - .newRowDelta() - .addDeletes(FILE_A_DELETES) - .validateFromSnapshot(id1) - .validateNoConflictingDataFiles() - .validateNoConflictingDeleteFiles() - .validateFromSnapshot(id1) - .toBranch(branch) - .commit(); + commit( + table, + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(id1) + .validateNoConflictingDataFiles() + .validateNoConflictingDeleteFiles() + .validateFromSnapshot(id1), + branch); + long id2 = latestSnapshot(readMetadata(), branch).snapshotId(); - table - .newReplacePartitions() - .validateNoConflictingData() - .validateNoConflictingDeletes() - .validateFromSnapshot(id1) - .addFile(FILE_B) - .toBranch(branch) - .commit(); + commit( + table, + table + .newReplacePartitions() + .validateNoConflictingData() + .validateNoConflictingDeletes() + .validateFromSnapshot(id1) + .addFile(FILE_B), + branch); + long id3 = latestSnapshot(readMetadata(), branch).snapshotId(); Assert.assertEquals( @@ -657,13 +665,13 @@ public void testDeleteReplaceNoConflict() { @Test public void testOverwriteReplaceConflict() { Assume.assumeTrue(formatVersion == 2); - table.newFastAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); // Concurrent Overwrite and ReplacePartition should fail with ValidationException - table.newOverwrite().deleteFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newOverwrite().deleteFile(FILE_A), branch); AssertHelpers.assertThrows( "Should reject commit with file matching partitions replaced", @@ -671,36 +679,38 @@ public void testOverwriteReplaceConflict() { "Found conflicting deleted files that can apply to records matching " + "[data_bucket=0]: [/path/to/data-a.parquet]", () -> - table - .newReplacePartitions() - .validateFromSnapshot(baseId) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .addFile(FILE_A) - .toBranch(branch) - .commit()); + commit( + table, + table + .newReplacePartitions() + .validateFromSnapshot(baseId) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_A), + branch)); } @Test public void testOverwriteReplaceNoConflict() { Assume.assumeTrue(formatVersion == 2); - table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).toBranch(branch).commit(); + commit(table, table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B), branch); TableMetadata base = readMetadata(); long baseId = latestSnapshot(base, branch).snapshotId(); // Concurrent Overwrite and ReplacePartition should not fail with if concerning different // partitions - table.newOverwrite().deleteFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newOverwrite().deleteFile(FILE_A), branch); - table - .newReplacePartitions() - .validateNoConflictingData() - .validateNoConflictingDeletes() - .validateFromSnapshot(baseId) - .addFile(FILE_B) - .toBranch(branch) - .commit(); + commit( + table, + table + .newReplacePartitions() + .validateNoConflictingData() + .validateNoConflictingDeletes() + .validateFromSnapshot(baseId) + .addFile(FILE_B), + branch); long finalId = latestSnapshot(readMetadata(), branch).snapshotId(); @@ -728,13 +738,13 @@ public void testOverwriteReplaceConflictNonPartitioned() { TestTables.create( tableDir, "unpartitioned", SCHEMA, PartitionSpec.unpartitioned(), formatVersion); - unpartitioned.newAppend().appendFile(FILE_UNPARTITIONED_A).toBranch(branch).commit(); + commit(table, unpartitioned.newAppend().appendFile(FILE_UNPARTITIONED_A), branch); TableMetadata replaceMetadata = TestTables.readMetadata("unpartitioned"); long replaceBaseId = latestSnapshot(replaceMetadata, branch).snapshotId(); // Concurrent Overwrite and ReplacePartitions should fail with ValidationException - unpartitioned.newOverwrite().deleteFile(FILE_UNPARTITIONED_A).toBranch(branch).commit(); + commit(table, unpartitioned.newOverwrite().deleteFile(FILE_UNPARTITIONED_A), branch); AssertHelpers.assertThrows( "Should reject commit with file matching partitions replaced", @@ -742,31 +752,33 @@ public void testOverwriteReplaceConflictNonPartitioned() { "Found conflicting deleted files that can contain records matching true: " + "[/path/to/data-unpartitioned-a.parquet]", () -> - unpartitioned - .newReplacePartitions() - .validateFromSnapshot(replaceBaseId) - .validateNoConflictingData() - .validateNoConflictingDeletes() - .addFile(FILE_UNPARTITIONED_A) - .toBranch(branch) - .commit()); + commit( + table, + unpartitioned + .newReplacePartitions() + .validateFromSnapshot(replaceBaseId) + .validateNoConflictingData() + .validateNoConflictingDeletes() + .addFile(FILE_UNPARTITIONED_A), + branch)); } @Test public void testValidateOnlyDeletes() { - table.newAppend().appendFile(FILE_A).toBranch(branch).commit(); + commit(table, table.newAppend().appendFile(FILE_A), branch); long baseId = latestSnapshot(readMetadata(), branch).snapshotId(); // Snapshot Isolation mode: appends do not conflict with replace - table.newAppend().appendFile(FILE_B).toBranch(branch).commit(); - - table - .newReplacePartitions() - .validateFromSnapshot(baseId) - .validateNoConflictingDeletes() - .addFile(FILE_B) - .toBranch(branch) - .commit(); + commit(table, table.newAppend().appendFile(FILE_B), branch); + + commit( + table, + table + .newReplacePartitions() + .validateFromSnapshot(baseId) + .validateNoConflictingDeletes() + .addFile(FILE_B), + branch); long finalId = latestSnapshot(readMetadata(), branch).snapshotId(); Assert.assertEquals( diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index 5f126678fe3a..d1259eda2f05 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.ManifestEntry.Status.ADDED; import static org.apache.iceberg.ManifestEntry.Status.DELETED; import static org.apache.iceberg.ManifestEntry.Status.EXISTING; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; import java.io.File; import java.util.Collections; diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index e0db24df228e..ab2ec5f71363 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -26,6 +26,7 @@ import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP; +import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot; import java.util.List; import java.util.Map;