diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 8a1371311b30..4d80b01a6324 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -91,7 +91,8 @@ public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilt protected void validate(TableMetadata base) { if (base.currentSnapshot() != null) { if (!referencedDataFiles.isEmpty()) { - validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes); + validateDataFilesExist( + base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter); } // TODO: does this need to check new delete files? diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index f26412b27bbd..f907314dad69 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -318,7 +318,8 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin @SuppressWarnings("CollectionUndefinedEquality") protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, - CharSequenceSet requiredDataFiles, boolean skipDeletes) { + CharSequenceSet requiredDataFiles, boolean skipDeletes, + Expression conflictDetectionFilter) { // if there is no current table state, no files have been removed if (base.currentSnapshot() == null) { return; @@ -339,6 +340,10 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI .specsById(base.specsById()) .ignoreExisting(); + if (conflictDetectionFilter != null) { + matchingDeletesGroup.filterData(conflictDetectionFilter); + } + try (CloseableIterator> deletes = matchingDeletesGroup.entries().iterator()) { if (deletes.hasNext()) { throw new ValidationException("Cannot commit, missing data files: %s", diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 6a5c43cd25ee..c1dae7da46ee 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -21,6 +21,7 @@ import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -629,4 +630,124 @@ public void testFastAppendDoesNotRemoveStaleDeleteFiles() { files(FILE_A_DELETES), statuses(Status.ADDED)); } + + @Test + public void testValidateDataFilesExistWithConflictDetectionFilter() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // add a data file to partition B + DataFile dataFile2 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile2) + .commit(); + + // use this snapshot as the starting snapshot in rowDelta + Snapshot baseSnapshot = table.currentSnapshot(); + + // add a delete file for partition A + DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile) + .validateDataFilesExist(ImmutableList.of(dataFile1.path())) + .validateDeletedFiles() + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter); + + // concurrently delete the file for partition B + table.newDelete() + .deleteFile(dataFile2) + .commit(); + + // commit the delta for partition A + rowDelta.commit(); + + Assert.assertEquals("Table should have one new delete manifest", + 1, table.currentSnapshot().deleteManifests().size()); + ManifestFile deletes = table.currentSnapshot().deleteManifests().get(0); + validateDeleteManifest(deletes, + seqs(4), + ids(table.currentSnapshot().snapshotId()), + files(deleteFile), + statuses(Status.ADDED)); + } + + @Test + public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // use this snapshot as the starting snapshot in rowDelta + Snapshot baseSnapshot = table.currentSnapshot(); + + // add a delete file for partition A + DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile) + .validateDataFilesExist(ImmutableList.of(dataFile1.path())) + .validateDeletedFiles() + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter); + + // concurrently delete the file for partition A + table.newDelete() + .deleteFile(dataFile1) + .commit(); + + AssertHelpers.assertThrows("Should fail to add deletes because data file is missing", + ValidationException.class, "Cannot commit, missing data files", + rowDelta::commit); + } }