diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java index 765145d804cb..dcf250aff12f 100644 --- a/api/src/main/java/org/apache/iceberg/RowDelta.java +++ b/api/src/main/java/org/apache/iceberg/RowDelta.java @@ -94,7 +94,7 @@ public interface RowDelta extends SnapshotUpdate { RowDelta validateDeletedFiles(); /** - * Enables validation that files added concurrently do not conflict with this commit's operation. + * Enables validation that data files added concurrently do not conflict with this commit's operation. *

* This method should be called when the table is queried to determine which files to delete/append. * If a concurrent operation commits a new file after the data was read and that file might @@ -109,6 +109,54 @@ public interface RowDelta extends SnapshotUpdate { * * @param conflictDetectionFilter an expression on rows in the table * @return this for method chaining + * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and + * {@link #validateNoConflictingDataFiles()} instead. */ - RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter); + @Deprecated + default RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter) { + conflictDetectionFilter(conflictDetectionFilter); + return validateNoConflictingDataFiles(); + } + + /** + * Sets a conflict detection filter used to validate concurrently added data and delete files. + *

+ * If not called, a true literal will be used as the conflict detection filter. + * + * @param conflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + RowDelta conflictDetectionFilter(Expression conflictDetectionFilter); + + /** + * Enables validation that data files added concurrently do not conflict with this commit's operation. + *

+ * This method should be called when the table is queried to determine which files to delete/append. + * If a concurrent operation commits a new file after the data was read and that file might + * contain rows matching the specified conflict detection filter, this operation + * will detect this during retries and fail. + *

+ * Calling this method is required to maintain serializable isolation for update/delete operations. + * Otherwise, the isolation level will be snapshot isolation. + *

+ * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and + * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + RowDelta validateNoConflictingDataFiles(); + + /** + * Enables validation that delete files added concurrently do not conflict with this commit's operation. + *

+ * This method must be called when the table is queried to produce a row delta for UPDATE and + * MERGE operations independently of the isolation level. Calling this method isn't required + * for DELETE operations as it is OK to delete a record that is also deleted concurrently. + *

+ * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and + * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + RowDelta validateNoConflictingDeleteFiles(); } diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 4d80b01a6324..ab71b554c405 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.CharSequenceSet; @@ -27,7 +28,9 @@ class BaseRowDelta extends MergingSnapshotProducer implements RowDelta private Long startingSnapshotId = null; // check all versions by default private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); private boolean validateDeletes = false; - private Expression conflictDetectionFilter = null; + private Expression conflictDetectionFilter = Expressions.alwaysTrue(); + private boolean validateNewDataFiles = false; + private boolean validateNewDeleteFiles = false; private boolean caseSensitive = true; BaseRowDelta(String tableName, TableOperations ops) { @@ -81,12 +84,24 @@ public RowDelta validateDataFilesExist(Iterable referenc } @Override - public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilter) { + public RowDelta conflictDetectionFilter(Expression newConflictDetectionFilter) { Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); this.conflictDetectionFilter = newConflictDetectionFilter; return this; } + @Override + public RowDelta validateNoConflictingDataFiles() { + this.validateNewDataFiles = true; + return this; + } + + @Override + public RowDelta validateNoConflictingDeleteFiles() { + this.validateNewDeleteFiles = true; + return this; + } + @Override protected void validate(TableMetadata base) { if (base.currentSnapshot() != null) { @@ -95,10 +110,13 @@ protected void validate(TableMetadata base) { base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter); } - // TODO: does this need to check new delete files? - if (conflictDetectionFilter != null) { + if (validateNewDataFiles) { validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); } + + if (validateNewDeleteFiles) { + validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); + } } } } diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 1c3ce122bedd..79c42f7bc8ae 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -86,6 +87,20 @@ public boolean isEmpty() { return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty(); } + public Iterable referencedDeleteFiles() { + Iterable deleteFiles = Collections.emptyList(); + + if (globalDeletes != null) { + deleteFiles = Iterables.concat(deleteFiles, Arrays.asList(globalDeletes)); + } + + for (Pair partitionDeletes : sortedDeletesByPartition.values()) { + deleteFiles = Iterables.concat(deleteFiles, Arrays.asList(partitionDeletes.second())); + } + + return deleteFiles; + } + private StructLikeWrapper newWrapper(int specId) { return StructLikeWrapper.forType(partitionTypeById.get(specId)); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e36b5da3e221..f325edf98582 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -66,7 +66,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private static final Set VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE); // delete files can be added in "overwrite" or "delete" operations - private static final Set VALIDATE_REPLACED_DATA_FILES_OPERATIONS = + private static final Set VALIDATE_ADDED_DELETE_FILES_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE); private final String tableName; @@ -297,20 +297,33 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI */ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, Iterable dataFiles) { + validateNoNewDeletesForDataFiles(base, startingSnapshotId, null, dataFiles, true); + } + + /** + * Validates that no new delete files that must be applied to the given data files have been added to the table since + * a starting snapshot. + * + * @param base table metadata to validate + * @param startingSnapshotId id of the snapshot current at the start of the operation + * @param dataFilter a data filter + * @param dataFiles data files to validate have no new row deletes + * @param caseSensitive whether expression binding should be case-sensitive + */ + protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, + Expression dataFilter, Iterable dataFiles, + boolean caseSensitive) { // if there is no current table state, no files have been added - if (base.currentSnapshot() == null) { + if (base.currentSnapshot() == null || base.formatVersion() < 2) { return; } Pair, Set> history = - validationHistory(base, startingSnapshotId, VALIDATE_REPLACED_DATA_FILES_OPERATIONS, ManifestContent.DELETES); + validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES); List deleteManifests = history.first(); - long startingSequenceNumber = startingSnapshotId == null ? 0 : base.snapshot(startingSnapshotId).sequenceNumber(); - DeleteFileIndex deletes = DeleteFileIndex.builderFor(ops.io(), deleteManifests) - .afterSequenceNumber(startingSequenceNumber) - .specsById(ops.current().specsById()) - .build(); + long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); + DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive); for (DataFile dataFile : dataFiles) { // if any delete is found that applies to files written in or before the starting snapshot, fail @@ -320,6 +333,56 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin } } + /** + * Validates that no delete files matching a filter have been added to the table since a starting snapshot. + * + * @param base table metadata to validate + * @param startingSnapshotId id of the snapshot current at the start of the operation + * @param dataFilter an expression used to find new conflicting delete files + * @param caseSensitive whether expression evaluation should be case-sensitive + */ + protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapshotId, + Expression dataFilter, boolean caseSensitive) { + // if there is no current table state, no files have been added + if (base.currentSnapshot() == null || base.formatVersion() < 2) { + return; + } + + Pair, Set> history = + validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES); + List deleteManifests = history.first(); + + long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); + DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive); + + ValidationException.check(deletes.isEmpty(), + "Found new conflicting delete files that can apply to records matching %s: %s", + dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path)); + } + + private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) { + if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) != null) { + Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId); + return startingSnapshot.sequenceNumber(); + } else { + return TableMetadata.INITIAL_SEQUENCE_NUMBER; + } + } + + private DeleteFileIndex buildDeleteFileIndex(List deleteManifests, long startingSequenceNumber, + Expression dataFilter, boolean caseSensitive) { + DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(ops.io(), deleteManifests) + .afterSequenceNumber(startingSequenceNumber) + .caseSensitive(caseSensitive) + .specsById(ops.current().specsById()); + + if (dataFilter != null) { + builder.filterData(dataFilter); + } + + return builder.build(); + } + @SuppressWarnings("CollectionUndefinedEquality") protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, CharSequenceSet requiredDataFiles, boolean skipDeletes, diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 7325cfe6032e..1a742326e116 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -79,7 +79,7 @@ public class TableTestBase { .build(); // Equality delete files. static final DeleteFile FILE_A2_DELETES = FileMetadata.deleteFileBuilder(SPEC) - .ofEqualityDeletes(3) + .ofEqualityDeletes(1) .withPath("/path/to/data-a2-deletes.parquet") .withFileSizeInBytes(10) .withPartitionPath("data_bucket=0") @@ -366,6 +366,20 @@ void validateTableFiles(Table tbl, DataFile... expectedFiles) { Assert.assertEquals("Files should match", expectedFilePaths, actualFilePaths); } + void validateTableDeleteFiles(Table tbl, DeleteFile... expectedFiles) { + Set expectedFilePaths = Sets.newHashSet(); + for (DeleteFile file : expectedFiles) { + expectedFilePaths.add(file.path()); + } + Set actualFilePaths = Sets.newHashSet(); + for (FileScanTask task : tbl.newScan().planFiles()) { + for (DeleteFile file : task.deletes()) { + actualFilePaths.add(file.path()); + } + } + Assert.assertEquals("Delete files should match", expectedFilePaths, actualFilePaths); + } + List paths(DataFile... dataFiles) { List paths = Lists.newArrayListWithExpectedSize(dataFiles.length); for (DataFile file : dataFiles) { diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 0675b45dc342..11b745eef137 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -1003,4 +1003,135 @@ public void testAbortMultipleSpecs() { // we should clean up 1 manifest list and 2 delete manifests Assert.assertEquals("Should delete 3 files", 3, deletedFiles.size()); } + + @Test + public void testConcurrentConflictingRowDelta() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.alwaysTrue(); + + // mock a MERGE operation with serializable isolation + RowDelta rowDelta = table.newRowDelta() + .addRows(FILE_B) + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles() + .validateNoConflictingDeleteFiles(); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "Found new conflicting delete files", + rowDelta::commit); + } + + @Test + public void testConcurrentConflictingRowDeltaWithoutAppendValidation() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.alwaysTrue(); + + // mock a MERGE operation with snapshot isolation (i.e. no append validation) + RowDelta rowDelta = table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDeleteFiles(); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles() + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "Found new conflicting delete files", + rowDelta::commit); + } + + @Test + public void testConcurrentNonConflictingRowDelta() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // add a data file to partition B + DataFile dataFile2 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile2) + .commit(); + + Snapshot baseSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + + // add a delete file for partition A + DeleteFile deleteFile1 = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + // mock a DELETE operation with serializable isolation + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile1) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles() + .validateNoConflictingDeleteFiles(); + + // add a delete file for partition B + DeleteFile deleteFile2 = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-b-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newRowDelta() + .addDeletes(deleteFile2) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .commit(); + + rowDelta.commit(); + + validateTableDeleteFiles(table, deleteFile1, deleteFile2); + } }