Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
private boolean validateNewDataFiles = false;
private boolean validateNewDeleteFiles = false;
private boolean validateNewDeletes = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the rename here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cause we not only validate added delete files but also deleted data files. We have two types of deletes to cover.


protected BaseOverwriteFiles(String tableName, TableOperations ops) {
super(tableName, ops);
Expand Down Expand Up @@ -98,7 +98,7 @@ public OverwriteFiles validateNoConflictingData() {

@Override
public OverwriteFiles validateNoConflictingDeletes() {
this.validateNewDeleteFiles = true;
this.validateNewDeletes = true;
failMissingDeletePaths();
return this;
}
Expand Down Expand Up @@ -134,10 +134,11 @@ protected void validate(TableMetadata base) {
validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter());
}

if (validateNewDeleteFiles) {
if (validateNewDeletes) {
if (rowFilter() != Expressions.alwaysFalse()) {
Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter();
validateNoNewDeleteFiles(base, startingSnapshotId, filter);
validateDeletedDataFiles(base, startingSnapshotId, filter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

if (deletedDataFiles.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,60 @@ public void testConcurrentConflictingPositionDeletesOverwriteByFilter() {
overwrite::commit);
}

@Test
public void testConcurrentConflictingDataFileDeleteOverwriteByFilter() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test would previously fail as the commit would succeed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error here is because file 2 is both modified and deleted correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we concurrently removed data from a partition what we are trying to overwrite.

Assert.assertNull("Should be empty table", table.currentSnapshot());

table.newAppend()
.appendFile(FILE_DAY_1)
.appendFile(FILE_DAY_2)
.commit();

Snapshot firstSnapshot = table.currentSnapshot();

OverwriteFiles overwrite = table.newOverwrite()
.overwriteByRowFilter(EXPRESSION_DAY_2)
.addFile(FILE_DAY_2_MODIFIED)
.validateFromSnapshot(firstSnapshot.snapshotId())
.validateNoConflictingData()
.validateNoConflictingDeletes();

table.newOverwrite()
.deleteFile(FILE_DAY_2)
.commit();

AssertHelpers.assertThrows("Should reject commit",
ValidationException.class, "Found conflicting deleted files",
overwrite::commit);
}

@Test
public void testConcurrentNonConflictingDataFileDeleteOverwriteByFilter() {
Assert.assertNull("Should be empty table", table.currentSnapshot());

table.newAppend()
.appendFile(FILE_DAY_1)
.appendFile(FILE_DAY_2)
.commit();

Snapshot firstSnapshot = table.currentSnapshot();

OverwriteFiles overwrite = table.newOverwrite()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this one is alright because file 2 is modified, but file 1 is deleted.

.overwriteByRowFilter(EXPRESSION_DAY_2)
.addFile(FILE_DAY_2_MODIFIED)
.validateFromSnapshot(firstSnapshot.snapshotId())
.validateNoConflictingData()
.validateNoConflictingDeletes();

table.newOverwrite()
.deleteFile(FILE_DAY_1)
.commit();

overwrite.commit();

validateTableFiles(table, FILE_DAY_2_MODIFIED);
}

@Test
public void testConcurrentNonConflictingPositionDeletes() {
Assume.assumeTrue(formatVersion == 2);
Expand Down