Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/MergeAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public MergeAppend appendFile(DataFile file) {
return this;
}

@Override
public MergeAppend toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
public AppendFiles appendManifest(ManifestFile manifest) {
Preconditions.checkArgument(
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/StreamingDelete.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ public StreamingDelete deleteFromRowFilter(Expression expr) {
deleteByRowFilter(expr);
return this;
}

@Override
public StreamingDelete toBranch(String branch) {
targetBranch(branch);
return this;
}
}
39 changes: 39 additions & 0 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,45 @@ void validateSnapshot(Snapshot old, Snapshot snap, long sequenceNumber, DataFile
validateSnapshot(old, snap, (Long) sequenceNumber, newFiles);
}

@SuppressWarnings("checkstyle:HiddenField")
Snapshot commit(Table table, SnapshotUpdate snapshotUpdate, String branch) {
Snapshot snapshot;
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
snapshotUpdate.commit();
snapshot = table.currentSnapshot();
} else {
((SnapshotProducer) snapshotUpdate.toBranch(branch)).commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just pass a SnapshotProducer to this method? That would eliminate casting right?

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AppendFiles doesn't extend SnapshotProducer, so the casting would just have to shift to the caller.

snapshot = table.snapshot(branch);
}

return snapshot;
}

Snapshot apply(SnapshotUpdate snapshotUpdate, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return ((SnapshotProducer) snapshotUpdate).apply();
} else {
return ((SnapshotProducer) snapshotUpdate.toBranch(branch)).apply();
}
}

@SuppressWarnings("checkstyle:HiddenField")
Snapshot latestSnapshot(Table table, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.currentSnapshot();
}

return table.snapshot(branch);
}

Snapshot latestSnapshot(TableMetadata metadata, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return metadata.currentSnapshot();
}

return metadata.snapshot(metadata.ref(branch).snapshotId());
}

void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile... newFiles) {
Assert.assertEquals(
"Should not change delete manifests",
Expand Down
139 changes: 94 additions & 45 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
Expand Down Expand Up @@ -68,38 +69,44 @@ public class TestDeleteFiles extends TableTestBase {
))
.build();

@Parameterized.Parameters(name = "formatVersion = {0}")
private final String branch;

@Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
public static Object[] parameters() {
return new Object[] {1, 2};
return new Object[][] {
new Object[] {1, "main"},
Copy link
Contributor

@namrathamyske namrathamyske Aug 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am little confused regarding parameters. Does each {1, "testBranch"} translate to {formatVersion, branchname}?.

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Aug 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, we test for every combination of format version + branch. I'll need to update the "Parameters" annotation so that we can get nicer test names. The branch parameter gets injected as an argument in the test class constructor.

new Object[] {1, "testBranch"},
new Object[] {2, "main"},
new Object[] {2, "testBranch"}
};
}

public TestDeleteFiles(int formatVersion) {
public TestDeleteFiles(int formatVersion, String branch) {
super(formatVersion);
this.branch = branch;
}

@Test
public void testMultipleDeletes() {
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();

commit(
table, table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C), branch);
Snapshot append = latestSnapshot(readMetadata(), branch);
Assert.assertEquals("Metadata should be at version 1", 1L, (long) version());
Snapshot append = readMetadata().currentSnapshot();
validateSnapshot(null, append, FILE_A, FILE_B, FILE_C);

table.newDelete().deleteFile(FILE_A).commit();
commit(table, table.newDelete().deleteFile(FILE_A), branch);
Snapshot delete1 = latestSnapshot(readMetadata(), branch);

Assert.assertEquals("Metadata should be at version 2", 2L, (long) version());
Snapshot delete = readMetadata().currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1, delete.allManifests(FILE_IO).size());
Assert.assertEquals("Should have 1 manifest", 1, delete1.allManifests(FILE_IO).size());
validateManifestEntries(
delete.allManifests(table.io()).get(0),
ids(delete.snapshotId(), append.snapshotId(), append.snapshotId()),
delete1.allManifests(table.io()).get(0),
ids(delete1.snapshotId(), append.snapshotId(), append.snapshotId()),
files(FILE_A, FILE_B, FILE_C),
statuses(Status.DELETED, Status.EXISTING, Status.EXISTING));

table.newDelete().deleteFile(FILE_B).commit();

Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B), branch);
Assert.assertEquals("Metadata should be at version 3", 3L, (long) version());
Snapshot delete2 = readMetadata().currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1, delete2.allManifests(FILE_IO).size());
validateManifestEntries(
delete2.allManifests(FILE_IO).get(0),
Expand Down Expand Up @@ -147,9 +154,12 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() {
.build();

// add both data files
table.newFastAppend().appendFile(firstDataFile).appendFile(secondDataFile).commit();
Snapshot initialSnapshot =
commit(
table,
table.newFastAppend().appendFile(firstDataFile).appendFile(secondDataFile),
branch);

Snapshot initialSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1, initialSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
initialSnapshot.allManifests(FILE_IO).get(0),
Expand All @@ -158,9 +168,7 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() {
statuses(Status.ADDED, Status.ADDED));

// delete the first data file
table.newDelete().deleteFile(firstDataFile).commit();

Snapshot deleteSnapshot = table.currentSnapshot();
Snapshot deleteSnapshot = commit(table, table.newDelete().deleteFile(firstDataFile), branch);
Assert.assertEquals("Should have 1 manifest", 1, deleteSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
deleteSnapshot.allManifests(FILE_IO).get(0),
Expand All @@ -170,9 +178,9 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() {

// delete the second data file using a row filter
// the commit should succeed as there is only one live data file
table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 7)).commit();
Snapshot finalSnapshot =
commit(table, table.newDelete().deleteFromRowFilter(Expressions.lessThan("id", 7)), branch);

Snapshot finalSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1, finalSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
finalSnapshot.allManifests(FILE_IO).get(0),
Expand All @@ -184,13 +192,15 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() {
@Test
public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() {
// add both data files
table
.newFastAppend()
.appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
.appendFile(DATA_FILE_BUCKET_0_IDS_8_10)
.commit();
Snapshot initialSnapshot =
commit(
table,
table
.newFastAppend()
.appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
.appendFile(DATA_FILE_BUCKET_0_IDS_8_10),
branch);

Snapshot initialSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1, initialSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
initialSnapshot.allManifests(FILE_IO).get(0),
Expand All @@ -199,9 +209,10 @@ public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() {
statuses(Status.ADDED, Status.ADDED));

// delete the second one using a metrics filter (no partition filter)
table.newDelete().deleteFromRowFilter(Expressions.greaterThan("id", 5)).commit();
Snapshot deleteSnapshot =
commit(
table, table.newDelete().deleteFromRowFilter(Expressions.greaterThan("id", 5)), branch);

Snapshot deleteSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1, deleteSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
deleteSnapshot.allManifests(FILE_IO).get(0),
Expand All @@ -213,13 +224,15 @@ public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() {
@Test
public void testDeleteSomeFilesByRowFilterWithCombinedPredicates() {
// add both data files
table
.newFastAppend()
.appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
.appendFile(DATA_FILE_BUCKET_0_IDS_8_10)
.commit();
Snapshot initialSnapshot =
commit(
table,
table
.newFastAppend()
.appendFile(DATA_FILE_BUCKET_0_IDS_0_2)
.appendFile(DATA_FILE_BUCKET_0_IDS_8_10),
branch);

Snapshot initialSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1, initialSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
initialSnapshot.allManifests(FILE_IO).get(0),
Expand All @@ -231,9 +244,8 @@ public void testDeleteSomeFilesByRowFilterWithCombinedPredicates() {
Expression partPredicate = Expressions.equal(Expressions.bucket("data", 16), 0);
Expression rowPredicate = Expressions.greaterThan("id", 5);
Expression predicate = Expressions.and(partPredicate, rowPredicate);
table.newDelete().deleteFromRowFilter(predicate).commit();

Snapshot deleteSnapshot = table.currentSnapshot();
Snapshot deleteSnapshot =
commit(table, table.newDelete().deleteFromRowFilter(predicate), branch);
Assert.assertEquals("Should have 1 manifest", 1, deleteSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
deleteSnapshot.allManifests(FILE_IO).get(0),
Expand Down Expand Up @@ -262,36 +274,45 @@ public void testCannotDeleteFileWhereNotAllRowsMatchPartitionFilter() {
.withPartitionPath("data_trunc_2=aa")
.build();

table.newFastAppend().appendFile(dataFile).commit();
commit(table, table.newFastAppend().appendFile(dataFile), branch);

AssertHelpers.assertThrows(
"Should reject as not all rows match filter",
ValidationException.class,
"Cannot delete file where some, but not all, rows match filter",
() -> table.newDelete().deleteFromRowFilter(Expressions.equal("data", "aa")).commit());
() ->
commit(
table,
table.newDelete().deleteFromRowFilter(Expressions.equal("data", "aa")),
branch));
}

@Test
public void testDeleteCaseSensitivity() {
table.newFastAppend().appendFile(DATA_FILE_BUCKET_0_IDS_0_2).commit();
commit(table, table.newFastAppend().appendFile(DATA_FILE_BUCKET_0_IDS_0_2), branch);

Expression rowFilter = Expressions.lessThan("iD", 5);

AssertHelpers.assertThrows(
"Should use case sensitive binding by default",
ValidationException.class,
"Cannot find field 'iD'",
() -> table.newDelete().deleteFromRowFilter(rowFilter).commit());
() -> commit(table, table.newDelete().deleteFromRowFilter(rowFilter), branch));

AssertHelpers.assertThrows(
"Should fail with case sensitive binding",
ValidationException.class,
"Cannot find field 'iD'",
() -> table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(true).commit());
() ->
commit(
table,
table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(true),
branch));

table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(false).commit();
Snapshot deleteSnapshot =
commit(
table, table.newDelete().deleteFromRowFilter(rowFilter).caseSensitive(false), branch);

Snapshot deleteSnapshot = table.currentSnapshot();
Assert.assertEquals("Should have 1 manifest", 1, deleteSnapshot.allManifests(FILE_IO).size());
validateManifestEntries(
deleteSnapshot.allManifests(FILE_IO).get(0),
Expand All @@ -300,6 +321,34 @@ public void testDeleteCaseSensitivity() {
statuses(Status.DELETED));
}

@Test
public void testDeleteFilesOnIndependentBranches() {
String testBranch = "testBranch";
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
Snapshot initialSnapshot = table.currentSnapshot();
// Delete A on test branch
table.newDelete().deleteFile(FILE_A).toBranch(testBranch).commit();
Snapshot testBranchTip = table.snapshot(testBranch);

// Delete B and C on main
table.newDelete().deleteFile(FILE_B).deleteFile(FILE_C).commit();
Snapshot delete2 = table.currentSnapshot();

// Verify B and C on testBranch
validateManifestEntries(
Iterables.getOnlyElement(testBranchTip.allManifests(FILE_IO)),
ids(testBranchTip.snapshotId(), initialSnapshot.snapshotId(), initialSnapshot.snapshotId()),
files(FILE_A, FILE_B, FILE_C),
statuses(Status.DELETED, Status.EXISTING, Status.EXISTING));

// Verify A on main
validateManifestEntries(
Iterables.getOnlyElement(delete2.allManifests(FILE_IO)),
ids(initialSnapshot.snapshotId(), delete2.snapshotId(), delete2.snapshotId()),
files(FILE_A, FILE_B, FILE_C),
statuses(Status.EXISTING, Status.DELETED, Status.DELETED));
}

private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
Expand Down
Loading