diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java index e2f3d62a24e3..3f9cc7d6b11b 100644 --- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java @@ -107,7 +107,7 @@ public interface OverwriteFiles extends SnapshotUpdate { OverwriteFiles caseSensitive(boolean caseSensitive); /** - * Enables validation that files added concurrently do not conflict with this commit's operation. + * Enables validation that data files added concurrently do not conflict with this commit's operation. *

* This method should be called when the table is queried to determine which files to delete/append. * If a concurrent operation commits a new file after the data was read and that file might @@ -145,4 +145,23 @@ public interface OverwriteFiles extends SnapshotUpdate { */ @Deprecated OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression conflictDetectionFilter); + + /** + * Enables validation that delete files added concurrently do not conflict with this commit's operation. + *

+ * Validating concurrently added delete files is required during DELETE, UPDATE and MERGE operations. + * If a concurrent operation adds a new delete file that applies to one of the data files being + * overwritten, the overwrite operation must be aborted as it may undelete rows that were removed + * concurrently. + *

+ * Calling this method with a correct conflict detection filter is required to maintain + * serializable isolation for overwrite operations. Otherwise, the isolation level + * will be snapshot isolation. + *

+ * Validation applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @param conflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + OverwriteFiles validateNoConflictingDeleteFiles(Expression conflictDetectionFilter); } diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java index 765145d804cb..52bf671676f2 100644 --- a/api/src/main/java/org/apache/iceberg/RowDelta.java +++ b/api/src/main/java/org/apache/iceberg/RowDelta.java @@ -94,7 +94,7 @@ public interface RowDelta extends SnapshotUpdate { RowDelta validateDeletedFiles(); /** - * Enables validation that files added concurrently do not conflict with this commit's operation. + * Enables validation that data files added concurrently do not conflict with this commit's operation. *

* This method should be called when the table is queried to determine which files to delete/append. * If a concurrent operation commits a new file after the data was read and that file might @@ -111,4 +111,19 @@ public interface RowDelta extends SnapshotUpdate { * @return this for method chaining */ RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter); + + /** + * Enables validation that delete files added concurrently do not conflict with this commit's operation. + *

+ * This method must be called when the table is queried to produce a row delta for UPDATE and + * MERGE operations independently of the isolation level. Calling this method isn't required + * for DELETE operations as it is OK when a particular record we are trying to delete + * was deleted concurrently. + *

+ * Validation applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @param conflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + RowDelta validateNoConflictingDeleteFiles(Expression conflictDetectionFilter); } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index e1fbb0f942c5..1bb83bdbde9e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -19,17 +19,22 @@ package org.apache.iceberg; +import java.util.Set; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.expressions.StrictMetricsEvaluator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { + private final Set deletedDataFiles = Sets.newHashSet(); private boolean validateAddedFilesMatchOverwriteFilter = false; private Long startingSnapshotId = null; - private Expression conflictDetectionFilter = null; + private Expression appendConflictDetectionFilter = null; + private Expression deleteConflictDetectionFilter = null; private boolean caseSensitive = true; protected BaseOverwriteFiles(String tableName, TableOperations ops) { @@ -60,6 +65,7 @@ public OverwriteFiles addFile(DataFile file) { @Override public OverwriteFiles deleteFile(DataFile file) { + deletedDataFiles.add(file); delete(file); return this; } @@ -95,11 +101,18 @@ public OverwriteFiles caseSensitive(boolean isCaseSensitive) { @Override public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetectionFilter) { Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); - this.conflictDetectionFilter = newConflictDetectionFilter; + this.appendConflictDetectionFilter = newConflictDetectionFilter; failMissingDeletePaths(); return this; } + @Override + public OverwriteFiles validateNoConflictingDeleteFiles(Expression newConflictDetectionFilter) { + Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); + this.deleteConflictDetectionFilter = newConflictDetectionFilter; + return this; + } + @Override protected void validate(TableMetadata base) { if (validateAddedFilesMatchOverwriteFilter) { @@ -127,8 +140,20 @@ protected void validate(TableMetadata base) { } } - if (conflictDetectionFilter != null && base.currentSnapshot() != null) { - validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); + if (appendConflictDetectionFilter != null && base.currentSnapshot() != null) { + validateAddedDataFiles(base, startingSnapshotId, appendConflictDetectionFilter, caseSensitive); + } + + boolean validateNewDeletes = deleteConflictDetectionFilter != null && base.currentSnapshot() != null; + boolean overwriteByFilter = rowFilter() != Expressions.alwaysFalse(); + + if (validateNewDeletes && overwriteByFilter) { + validateNoNewDeletes(base, startingSnapshotId, deleteConflictDetectionFilter, caseSensitive); + } else if (validateNewDeletes && deletedDataFiles.size() > 0) { + // it is sufficient to ensure we don't have new delete files only for overwritten data files + validateNoNewDeletesForDataFiles( + base, startingSnapshotId, deleteConflictDetectionFilter, + deletedDataFiles, caseSensitive); } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 4d80b01a6324..3d77e0c03d6a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -27,7 +27,8 @@ class BaseRowDelta extends MergingSnapshotProducer implements RowDelta private Long startingSnapshotId = null; // check all versions by default private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); private boolean validateDeletes = false; - private Expression conflictDetectionFilter = null; + private Expression appendConflictDetectionFilter = null; + private Expression deleteConflictDetectionFilter = null; private boolean caseSensitive = true; BaseRowDelta(String tableName, TableOperations ops) { @@ -83,7 +84,14 @@ public RowDelta validateDataFilesExist(Iterable referenc @Override public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilter) { Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); - this.conflictDetectionFilter = newConflictDetectionFilter; + this.appendConflictDetectionFilter = newConflictDetectionFilter; + return this; + } + + @Override + public RowDelta validateNoConflictingDeleteFiles(Expression newConflictDetectionFilter) { + Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); + this.deleteConflictDetectionFilter = newConflictDetectionFilter; return this; } @@ -92,12 +100,15 @@ protected void validate(TableMetadata base) { if (base.currentSnapshot() != null) { if (!referencedDataFiles.isEmpty()) { validateDataFilesExist( - base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter); + base, startingSnapshotId, referencedDataFiles, !validateDeletes, appendConflictDetectionFilter); + } + + if (appendConflictDetectionFilter != null) { + validateAddedDataFiles(base, startingSnapshotId, appendConflictDetectionFilter, caseSensitive); } - // TODO: does this need to check new delete files? - if (conflictDetectionFilter != null) { - validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); + if (deleteConflictDetectionFilter != null) { + validateNoNewDeletes(base, startingSnapshotId, deleteConflictDetectionFilter, caseSensitive); } } } diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 1c3ce122bedd..bf7a864b8e72 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -86,6 +86,20 @@ public boolean isEmpty() { return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty(); } + public List referencedDeleteFiles() { + List deleteFiles = Lists.newArrayList(); + + if (globalDeletes != null) { + deleteFiles.addAll(Arrays.asList(globalDeletes)); + } + + sortedDeletesByPartition.forEach((key, partitionDeletes) -> { + deleteFiles.addAll(Arrays.asList(partitionDeletes.second())); + }); + + return deleteFiles; + } + private StructLikeWrapper newWrapper(int specId) { return StructLikeWrapper.forType(partitionTypeById.get(specId)); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index f907314dad69..ed83b8f4593b 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -64,7 +64,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private static final Set VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE); // delete files can be added in "overwrite" or "delete" operations - private static final Set VALIDATE_REPLACED_DATA_FILES_OPERATIONS = + private static final Set VALIDATE_ADDED_DELETE_FILES_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE); private final String tableName; @@ -293,20 +293,33 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI */ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, Iterable dataFiles) { + validateNoNewDeletesForDataFiles(base, startingSnapshotId, null, dataFiles, true); + } + + /** + * Validates that no new delete files that must be applied to the given data files have been added to the table since + * a starting snapshot. + * + * @param base table metadata to validate + * @param startingSnapshotId id of the snapshot current at the start of the operation + * @param dataFilter a data filter + * @param dataFiles data files to validate have no new row deletes + * @param caseSensitive whether expression binding should be case-sensitive + */ + protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, + Expression dataFilter, Iterable dataFiles, + boolean caseSensitive) { // if there is no current table state, no files have been added if (base.currentSnapshot() == null) { return; } Pair, Set> history = - validationHistory(base, startingSnapshotId, VALIDATE_REPLACED_DATA_FILES_OPERATIONS, ManifestContent.DELETES); + validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES); List deleteManifests = history.first(); - long startingSequenceNumber = startingSnapshotId == null ? 0 : base.snapshot(startingSnapshotId).sequenceNumber(); - DeleteFileIndex deletes = DeleteFileIndex.builderFor(ops.io(), deleteManifests) - .afterSequenceNumber(startingSequenceNumber) - .specsById(ops.current().specsById()) - .build(); + long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); + DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive); for (DataFile dataFile : dataFiles) { // if any delete is found that applies to files written in or before the starting snapshot, fail @@ -316,6 +329,57 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin } } + /** + * Validates that no delete files matching a filter have been added to the table since a starting snapshot. + * + * @param base table metadata to validate + * @param startingSnapshotId id of the snapshot current at the start of the operation + * @param dataFilter an expression used to find new conflicting delete files + * @param caseSensitive whether expression evaluation should be case-sensitive + */ + protected void validateNoNewDeletes(TableMetadata base, Long startingSnapshotId, + Expression dataFilter, boolean caseSensitive) { + // if there is no current table state, no files have been added + if (base.currentSnapshot() == null) { + return; + } + + Pair, Set> history = + validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES); + List deleteManifests = history.first(); + + long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); + DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive); + + ValidationException.check(deletes.isEmpty(), + "Found new conflicting delete files that can apply to records matching %s: %s", + dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path)); + } + + // use 0 as a starting seq number if the starting snapshot is not set or expired + private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) { + if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) != null) { + Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId); + return startingSnapshot.sequenceNumber(); + } else { + return 0; + } + } + + private DeleteFileIndex buildDeleteFileIndex(List deleteManifests, long startingSequenceNumber, + Expression dataFilter, boolean caseSensitive) { + DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(ops.io(), deleteManifests) + .afterSequenceNumber(startingSequenceNumber) + .caseSensitive(caseSensitive) + .specsById(ops.current().specsById()); + + if (dataFilter != null) { + builder.filterData(dataFilter); + } + + return builder.build(); + } + @SuppressWarnings("CollectionUndefinedEquality") protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, CharSequenceSet requiredDataFiles, boolean skipDeletes, diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 16a6bfd7aee5..c9a230b48de9 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -77,7 +77,7 @@ public class TableTestBase { .build(); // Equality delete files. static final DeleteFile FILE_A2_DELETES = FileMetadata.deleteFileBuilder(SPEC) - .ofEqualityDeletes(3) + .ofEqualityDeletes(1) .withPath("/path/to/data-a2-deletes.parquet") .withFileSizeInBytes(10) .withPartitionPath("data_bucket=0") @@ -364,6 +364,20 @@ void validateTableFiles(Table tbl, DataFile... expectedFiles) { Assert.assertEquals("Files should match", expectedFilePaths, actualFilePaths); } + void validateTableDeleteFiles(Table tbl, DeleteFile... expectedFiles) { + Set expectedFilePaths = Sets.newHashSet(); + for (DeleteFile file : expectedFiles) { + expectedFilePaths.add(file.path()); + } + Set actualFilePaths = Sets.newHashSet(); + for (FileScanTask task : tbl.newScan().planFiles()) { + for (DeleteFile file : task.deletes()) { + actualFilePaths.add(file.path()); + } + } + Assert.assertEquals("Delete files should match", expectedFilePaths, actualFilePaths); + } + List paths(DataFile... dataFiles) { List paths = Lists.newArrayListWithExpectedSize(dataFiles.length); for (DataFile file : dataFiles) { diff --git a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java index 26358864a76a..0def46e8298e 100644 --- a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java +++ b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java @@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.types.Types; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -71,6 +72,14 @@ public class TestOverwriteWithValidation extends TableTestBase { )) .build(); + private static final DeleteFile FILE_DAY_1_POS_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-1-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("date=2018-06-08") + .withRecordCount(1) + .build(); + private static final DataFile FILE_DAY_2 = DataFiles .builder(PARTITION_SPEC) .withPath("/path/to/data-2.parquet") @@ -85,6 +94,22 @@ public class TestOverwriteWithValidation extends TableTestBase { )) .build(); + private static final DeleteFile FILE_DAY_2_EQ_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-2-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("date=2018-06-09") + .withRecordCount(1) + .build(); + + private static final DeleteFile FILE_DAY_2_POS_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-2-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("date=2018-06-09") + .withRecordCount(1) + .build(); + private static final DataFile FILE_DAY_2_MODIFIED = DataFiles .builder(PARTITION_SPEC) .withPath("/path/to/data-3.parquet") @@ -113,6 +138,21 @@ public class TestOverwriteWithValidation extends TableTestBase { )) .build(); + private static final DeleteFile FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-3-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("date=2018-06-09") + .withRecordCount(1) + .withMetrics(new Metrics(1L, + null, // no column sizes + ImmutableMap.of(1, 1L, 2, 1L), // value count + ImmutableMap.of(1, 0L, 2, 0L), // null count + ImmutableMap.of(1, longToBuffer(10L)), // lower bounds + ImmutableMap.of(1, longToBuffer(10L)) // upper bounds + )) + .build(); + private static final Expression EXPRESSION_DAY_2 = equal("date", "2018-06-09"); private static final Expression EXPRESSION_DAY_2_ID_RANGE = and( @@ -611,4 +651,181 @@ public void testTransactionIncompatibleAdditionValidated() { Assert.assertEquals("Should not create a new snapshot", committedSnapshotId, table.currentSnapshot().snapshotId()); } + + @Test + public void testConcurrentConflictingPositionDeletes() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .deleteFile(FILE_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(EXPRESSION_DAY_2) + .validateNoConflictingDeleteFiles(EXPRESSION_DAY_2); + + table.newRowDelta() + .addDeletes(FILE_DAY_2_POS_DELETES) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "found new delete", + overwrite::commit); + } + + @Test + public void testConcurrentConflictingPositionDeletesOverwriteByFilter() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .overwriteByRowFilter(EXPRESSION_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(EXPRESSION_DAY_2) + .validateNoConflictingDeleteFiles(EXPRESSION_DAY_2); + + table.newRowDelta() + .addDeletes(FILE_DAY_2_POS_DELETES) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "Found new conflicting delete", + overwrite::commit); + } + + @Test + public void testConcurrentNonConflictingPositionDeletes() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .deleteFile(FILE_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(EXPRESSION_DAY_2) + .validateNoConflictingDeleteFiles(EXPRESSION_DAY_2); + + table.newRowDelta() + .addDeletes(FILE_DAY_1_POS_DELETES) + .commit(); + + overwrite.commit(); + + validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED); + validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES); + } + + @Test + public void testConcurrentNonConflictingPositionDeletesOverwriteByFilter() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .overwriteByRowFilter(EXPRESSION_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(EXPRESSION_DAY_2) + .validateNoConflictingDeleteFiles(EXPRESSION_DAY_2); + + table.newRowDelta() + .addDeletes(FILE_DAY_1_POS_DELETES) + .commit(); + + overwrite.commit(); + + validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED); + validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES); + } + + @Test + public void testConcurrentConflictingEqualityDeletes() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .deleteFile(FILE_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(EXPRESSION_DAY_2) + .validateNoConflictingDeleteFiles(EXPRESSION_DAY_2); + + table.newRowDelta() + .addDeletes(FILE_DAY_2_EQ_DELETES) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "found new delete", + overwrite::commit); + } + + @Test + public void testConcurrentNonConflictingEqualityDeletes() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_2) + .appendFile(FILE_DAY_2_ANOTHER_RANGE) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .deleteFile(FILE_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(EXPRESSION_DAY_2_ID_RANGE) + .validateNoConflictingDeleteFiles(EXPRESSION_DAY_2_ID_RANGE); + + table.newRowDelta() + .addDeletes(FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES) + .commit(); + + overwrite.commit(); + + validateTableFiles(table, FILE_DAY_2_ANOTHER_RANGE, FILE_DAY_2_MODIFIED); + validateTableDeleteFiles(table, FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index c1dae7da46ee..f363059d30c3 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -750,4 +750,131 @@ public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() { ValidationException.class, "Cannot commit, missing data files", rowDelta::commit); } + + @Test + public void testConcurrentConflictingRowDelta() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.alwaysTrue(); + + // mock a MERGE operation with serializable isolation + RowDelta rowDelta = table.newRowDelta() + .addRows(FILE_B) + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter) + .validateNoConflictingDeleteFiles(conflictDetectionFilter); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "Found new conflicting delete files", + rowDelta::commit); + } + + @Test + public void testConcurrentConflictingRowDeltaWithoutAppendValidation() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.alwaysTrue(); + + // mock a MERGE operation with snapshot isolation (i.e. no append validation) + RowDelta rowDelta = table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingDeleteFiles(conflictDetectionFilter); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "Found new conflicting delete files", + rowDelta::commit); + } + + @Test + public void testConcurrentNonConflictingRowDelta() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // add a data file to partition B + DataFile dataFile2 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile2) + .commit(); + + Snapshot baseSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + + // add a delete file for partition A + DeleteFile deleteFile1 = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + // mock a DELETE operation with serializable isolation + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile1) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter) + .validateNoConflictingDeleteFiles(conflictDetectionFilter); + + // add a delete file for partition B + DeleteFile deleteFile2 = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-b-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newRowDelta() + .addDeletes(deleteFile2) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .commit(); + + rowDelta.commit(); + + validateTableDeleteFiles(table, deleteFile1, deleteFile2); + } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 4cc37c8cfd7f..e5069bc4a628 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -365,6 +365,7 @@ private void commitWithSerializableIsolation(OverwriteFiles overwriteFiles, Expression conflictDetectionFilter = conflictDetectionFilter(); overwriteFiles.validateNoConflictingAppends(conflictDetectionFilter); + overwriteFiles.validateNoConflictingDeleteFiles(conflictDetectionFilter); String commitMsg = String.format( "overwrite of %d data files with %d new data files, scanSnapshotId: %d, conflictDetectionFilter: %s", @@ -375,6 +376,14 @@ private void commitWithSerializableIsolation(OverwriteFiles overwriteFiles, private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles, int numOverwrittenFiles, int numAddedFiles) { + Long scanSnapshotId = scan.snapshotId(); + if (scanSnapshotId != null) { + overwriteFiles.validateFromSnapshot(scanSnapshotId); + } + + Expression conflictDetectionFilter = conflictDetectionFilter(); + overwriteFiles.validateNoConflictingDeleteFiles(conflictDetectionFilter); + String commitMsg = String.format( "overwrite of %d data files with %d new data files", numOverwrittenFiles, numAddedFiles);