-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Validate conflicting delete files in RowDelta and OverwriteFiles #3069
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -94,7 +94,7 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> { | |
| RowDelta validateDeletedFiles(); | ||
|
|
||
| /** | ||
| * Enables validation that files added concurrently do not conflict with this commit's operation. | ||
| * Enables validation that data files added concurrently do not conflict with this commit's operation. | ||
| * <p> | ||
| * This method should be called when the table is queried to determine which files to delete/append. | ||
| * If a concurrent operation commits a new file after the data was read and that file might | ||
|
|
@@ -111,4 +111,19 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> { | |
| * @return this for method chaining | ||
| */ | ||
| RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter); | ||
|
|
||
| /** | ||
| * Enables validation that delete files added concurrently do not conflict with this commit's operation. | ||
| * <p> | ||
| * This method must be called when the table is queried to produce a row delta for UPDATE and | ||
| * MERGE operations independently of the isolation level. Calling this method isn't required | ||
| * for DELETE operations as it is OK when a particular record we are trying to delete | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: use of "we" in javadoc is unnecessary. It is simpler to say "it is OK to delete a record that is also deleted concurrently". |
||
| * was deleted concurrently. | ||
| * <p> | ||
| * Validation applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. | ||
| * | ||
| * @param conflictDetectionFilter an expression on rows in the table | ||
| * @return this for method chaining | ||
| */ | ||
| RowDelta validateNoConflictingDeleteFiles(Expression conflictDetectionFilter); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: should we make it a bit consistent with above? (ie, omit 'files' from the name)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did that in the first place but then I started to worry it may be confusing. For example, we refer here to concurrently added delete files vs concurrently happened delete operations that removed data files. I do prefer consistency too but I am not sure whether it is confusing. What do you think, @szehon-ho?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes fine with me then, thanks for clarifying |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,17 +19,22 @@ | |
|
|
||
| package org.apache.iceberg; | ||
|
|
||
| import java.util.Set; | ||
| import org.apache.iceberg.exceptions.ValidationException; | ||
| import org.apache.iceberg.expressions.Evaluator; | ||
| import org.apache.iceberg.expressions.Expression; | ||
| import org.apache.iceberg.expressions.Expressions; | ||
| import org.apache.iceberg.expressions.Projections; | ||
| import org.apache.iceberg.expressions.StrictMetricsEvaluator; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
|
|
||
| public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles> implements OverwriteFiles { | ||
| private final Set<DataFile> deletedDataFiles = Sets.newHashSet(); | ||
| private boolean validateAddedFilesMatchOverwriteFilter = false; | ||
| private Long startingSnapshotId = null; | ||
| private Expression conflictDetectionFilter = null; | ||
| private Expression appendConflictDetectionFilter = null; | ||
| private Expression deleteConflictDetectionFilter = null; | ||
| private boolean caseSensitive = true; | ||
|
|
||
| protected BaseOverwriteFiles(String tableName, TableOperations ops) { | ||
|
|
@@ -60,6 +65,7 @@ public OverwriteFiles addFile(DataFile file) { | |
|
|
||
| @Override | ||
| public OverwriteFiles deleteFile(DataFile file) { | ||
| deletedDataFiles.add(file); | ||
| delete(file); | ||
| return this; | ||
| } | ||
|
|
@@ -95,11 +101,18 @@ public OverwriteFiles caseSensitive(boolean isCaseSensitive) { | |
| @Override | ||
| public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetectionFilter) { | ||
| Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Should this be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll update if it fits on the same line. |
||
| this.conflictDetectionFilter = newConflictDetectionFilter; | ||
| this.appendConflictDetectionFilter = newConflictDetectionFilter; | ||
| failMissingDeletePaths(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: Does this call to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll double check. |
||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public OverwriteFiles validateNoConflictingDeleteFiles(Expression newConflictDetectionFilter) { | ||
| Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Same comment about saying "Delete conflict detection filter cannot be null` instead of leaving it unqualified and ambiguous. |
||
| this.deleteConflictDetectionFilter = newConflictDetectionFilter; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| protected void validate(TableMetadata base) { | ||
| if (validateAddedFilesMatchOverwriteFilter) { | ||
|
|
@@ -127,8 +140,20 @@ protected void validate(TableMetadata base) { | |
| } | ||
| } | ||
|
|
||
| if (conflictDetectionFilter != null && base.currentSnapshot() != null) { | ||
| validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); | ||
| if (appendConflictDetectionFilter != null && base.currentSnapshot() != null) { | ||
| validateAddedDataFiles(base, startingSnapshotId, appendConflictDetectionFilter, caseSensitive); | ||
| } | ||
|
|
||
| boolean validateNewDeletes = deleteConflictDetectionFilter != null && base.currentSnapshot() != null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that the behavior here should be slightly different. There are two concerns: 1) whether to check delete files for snapshot isolation and 2) what conflict detection filter to use. Basing I don't think there is a case where we don't want to validate delete files if we have called Then, if we are validating delete files, we should have two separate checks. First, if there are any files in I think that makes the API more understandable and consistent.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here's what I changed this to locally while thinking through it: // validateDeletes is set to true in validateFromSnapshot. Maybe we should default it if that method isn't called?
if (validateDeletes) {
if (deletedDataFiles.size() > 0) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, deleteConflictDetectionFilter,
deletedDataFiles, caseSensitive);
}
if (rowFilter() != Expressions.alwaysFalse()) {
if (deleteConflictDetectionFilter != null) {
validateNoNewDeletes(base, startingSnapshotId, deleteConflictDetectionFilter, caseSensitive);
} else {
validateNoNewDeletes(base, startingSnapshotId, rowFilter(), caseSensitive);
}
}
}
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If I got you correctly, you are proposing that Also, in your snippet, why call |
||
| boolean overwriteByFilter = rowFilter() != Expressions.alwaysFalse(); | ||
|
|
||
| if (validateNewDeletes && overwriteByFilter) { | ||
| validateNoNewDeletes(base, startingSnapshotId, deleteConflictDetectionFilter, caseSensitive); | ||
| } else if (validateNewDeletes && deletedDataFiles.size() > 0) { | ||
| // it is sufficient to ensure we don't have new delete files only for overwritten data files | ||
| validateNoNewDeletesForDataFiles( | ||
| base, startingSnapshotId, deleteConflictDetectionFilter, | ||
| deletedDataFiles, caseSensitive); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,8 @@ class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta | |
| private Long startingSnapshotId = null; // check all versions by default | ||
| private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); | ||
| private boolean validateDeletes = false; | ||
| private Expression conflictDetectionFilter = null; | ||
| private Expression appendConflictDetectionFilter = null; | ||
| private Expression deleteConflictDetectionFilter = null; | ||
| private boolean caseSensitive = true; | ||
|
|
||
| BaseRowDelta(String tableName, TableOperations ops) { | ||
|
|
@@ -83,7 +84,14 @@ public RowDelta validateDataFilesExist(Iterable<? extends CharSequence> referenc | |
| @Override | ||
| public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilter) { | ||
| Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); | ||
| this.conflictDetectionFilter = newConflictDetectionFilter; | ||
| this.appendConflictDetectionFilter = newConflictDetectionFilter; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public RowDelta validateNoConflictingDeleteFiles(Expression newConflictDetectionFilter) { | ||
| Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); | ||
| this.deleteConflictDetectionFilter = newConflictDetectionFilter; | ||
| return this; | ||
| } | ||
|
|
||
|
|
@@ -92,12 +100,15 @@ protected void validate(TableMetadata base) { | |
| if (base.currentSnapshot() != null) { | ||
| if (!referencedDataFiles.isEmpty()) { | ||
| validateDataFilesExist( | ||
| base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter); | ||
| base, startingSnapshotId, referencedDataFiles, !validateDeletes, appendConflictDetectionFilter); | ||
| } | ||
|
|
||
| if (appendConflictDetectionFilter != null) { | ||
| validateAddedDataFiles(base, startingSnapshotId, appendConflictDetectionFilter, caseSensitive); | ||
| } | ||
|
|
||
| // TODO: does this need to check new delete files? | ||
| if (conflictDetectionFilter != null) { | ||
| validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); | ||
| if (deleteConflictDetectionFilter != null) { | ||
| validateNoNewDeletes(base, startingSnapshotId, deleteConflictDetectionFilter, caseSensitive); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check is quite trivial. For example, we won't be able to resolve conflicts within the same partition. I have outlined a way to optimize it here.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean it cannot resolve within same data file (I thought we are passing data filter)? Or within the same partition? And also for my learning, you mean it will be over-aggressive and report false negatives even if rows do not actually conflict, until we make the optimization.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, it may report false positives. The data filter is helpful but I think it won't help much within the same partition. Position deletes are scoped to a partition so the data filter should help us when there is a concurrent delete in another partition. Within the partition, though, most of position deletes will match that row filter as we don't persist the deleted row (by default).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit late to the whole discussion. Regarding the check, I read the outlined way to optimize it, just want to share some thoughts based on what I am doing for position deletes of my internal distribution as of today. In my system, each position delete file written contains exactly 1 When I started to compact position delete files to contain multiple |
||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,20 @@ public boolean isEmpty() { | |
| return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty(); | ||
| } | ||
|
|
||
| public List<DeleteFile> referencedDeleteFiles() { | ||
| List<DeleteFile> deleteFiles = Lists.newArrayList(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optional comment: small optimization can be done by knowing the initial length, and checking isEmpty
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was about to implement this but then I realized |
||
|
|
||
| if (globalDeletes != null) { | ||
| deleteFiles.addAll(Arrays.asList(globalDeletes)); | ||
| } | ||
|
|
||
| sortedDeletesByPartition.forEach((key, partitionDeletes) -> { | ||
| deleteFiles.addAll(Arrays.asList(partitionDeletes.second())); | ||
| }); | ||
|
|
||
| return deleteFiles; | ||
| } | ||
|
|
||
| private StructLikeWrapper newWrapper(int specId) { | ||
| return StructLikeWrapper.forType(partitionTypeById.get(specId)); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,7 +64,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> { | |
| private static final Set<String> VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS = | ||
| ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE); | ||
| // delete files can be added in "overwrite" or "delete" operations | ||
| private static final Set<String> VALIDATE_REPLACED_DATA_FILES_OPERATIONS = | ||
| private static final Set<String> VALIDATE_ADDED_DELETE_FILES_OPERATIONS = | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I renamed it to match |
||
| ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE); | ||
|
|
||
| private final String tableName; | ||
|
|
@@ -293,20 +293,33 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI | |
| */ | ||
| protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, | ||
| Iterable<DataFile> dataFiles) { | ||
| validateNoNewDeletesForDataFiles(base, startingSnapshotId, null, dataFiles, true); | ||
| } | ||
|
|
||
| /** | ||
| * Validates that no new delete files that must be applied to the given data files have been added to the table since | ||
| * a starting snapshot. | ||
| * | ||
| * @param base table metadata to validate | ||
| * @param startingSnapshotId id of the snapshot current at the start of the operation | ||
| * @param dataFilter a data filter | ||
| * @param dataFiles data files to validate have no new row deletes | ||
| * @param caseSensitive whether expression binding should be case-sensitive | ||
| */ | ||
| protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, | ||
| Expression dataFilter, Iterable<DataFile> dataFiles, | ||
| boolean caseSensitive) { | ||
| // if there is no current table state, no files have been added | ||
| if (base.currentSnapshot() == null) { | ||
| return; | ||
| } | ||
|
|
||
| Pair<List<ManifestFile>, Set<Long>> history = | ||
| validationHistory(base, startingSnapshotId, VALIDATE_REPLACED_DATA_FILES_OPERATIONS, ManifestContent.DELETES); | ||
| validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES); | ||
| List<ManifestFile> deleteManifests = history.first(); | ||
|
|
||
| long startingSequenceNumber = startingSnapshotId == null ? 0 : base.snapshot(startingSnapshotId).sequenceNumber(); | ||
| DeleteFileIndex deletes = DeleteFileIndex.builderFor(ops.io(), deleteManifests) | ||
| .afterSequenceNumber(startingSequenceNumber) | ||
| .specsById(ops.current().specsById()) | ||
| .build(); | ||
| long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); | ||
| DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive); | ||
|
|
||
| for (DataFile dataFile : dataFiles) { | ||
| // if any delete is found that applies to files written in or before the starting snapshot, fail | ||
|
|
@@ -316,6 +329,57 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Validates that no delete files matching a filter have been added to the table since a starting snapshot. | ||
| * | ||
| * @param base table metadata to validate | ||
| * @param startingSnapshotId id of the snapshot current at the start of the operation | ||
| * @param dataFilter an expression used to find new conflicting delete files | ||
| * @param caseSensitive whether expression evaluation should be case-sensitive | ||
| */ | ||
| protected void validateNoNewDeletes(TableMetadata base, Long startingSnapshotId, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a slightly more accurate name would be |
||
| Expression dataFilter, boolean caseSensitive) { | ||
| // if there is no current table state, no files have been added | ||
| if (base.currentSnapshot() == null) { | ||
| return; | ||
| } | ||
|
|
||
| Pair<List<ManifestFile>, Set<Long>> history = | ||
| validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES); | ||
| List<ManifestFile> deleteManifests = history.first(); | ||
|
|
||
| long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); | ||
| DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive); | ||
|
|
||
| ValidationException.check(deletes.isEmpty(), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding this! |
||
| "Found new conflicting delete files that can apply to records matching %s: %s", | ||
| dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path)); | ||
| } | ||
|
|
||
| // use 0 as a starting seq number if the starting snapshot is not set or expired | ||
| private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) { | ||
| if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) != null) { | ||
| Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId); | ||
| return startingSnapshot.sequenceNumber(); | ||
| } else { | ||
| return 0; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can use |
||
| } | ||
| } | ||
|
|
||
| private DeleteFileIndex buildDeleteFileIndex(List<ManifestFile> deleteManifests, long startingSequenceNumber, | ||
| Expression dataFilter, boolean caseSensitive) { | ||
| DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(ops.io(), deleteManifests) | ||
| .afterSequenceNumber(startingSequenceNumber) | ||
| .caseSensitive(caseSensitive) | ||
| .specsById(ops.current().specsById()); | ||
|
|
||
| if (dataFilter != null) { | ||
| builder.filterData(dataFilter); | ||
| } | ||
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
| @SuppressWarnings("CollectionUndefinedEquality") | ||
| protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, | ||
| CharSequenceSet requiredDataFiles, boolean skipDeletes, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.