Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions api/src/main/java/org/apache/iceberg/RowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
RowDelta validateDeletedFiles();

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* If a concurrent operation commits a new file after the data was read and that file might
Expand All @@ -109,6 +109,54 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and
* {@link #validateNoConflictingDataFiles()} instead.
*/
RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter);
@Deprecated
default RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter) {
conflictDetectionFilter(conflictDetectionFilter);
return validateNoConflictingDataFiles();
}

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
* <p>
* If not called, a true literal will be used as the conflict detection filter.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
RowDelta conflictDetectionFilter(Expression conflictDetectionFilter);
Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually different from the implementation in PR #3069. I think this one is slightly better as we set the conflict detection filter only once and then enable data/delete file validation.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, we would need to reason about what happens if the filters are different, which one to use for validating referenced data files, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems good to me.


/**
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, this operation
* will detect this during retries and fail.
* <p>
* Calling this method is required to maintain serializable isolation for update/delete operations.
* Otherwise, the isolation level will be snapshot isolation.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
RowDelta validateNoConflictingDataFiles();

/**
* Enables validation that delete files added concurrently do not conflict with this commit's operation.
* <p>
* This method must be called when the table is queried to produce a row delta for UPDATE and
* MERGE operations independently of the isolation level. Calling this method isn't required
* for DELETE operations as it is OK to delete a record that is also deleted concurrently.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
RowDelta validateNoConflictingDeleteFiles();
}
26 changes: 22 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
package org.apache.iceberg;

import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.CharSequenceSet;

class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta {
private Long startingSnapshotId = null; // check all versions by default
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
private boolean validateDeletes = false;
private Expression conflictDetectionFilter = null;
private Expression conflictDetectionFilter = Expressions.alwaysTrue();
private boolean validateNewDataFiles = false;
private boolean validateNewDeleteFiles = false;
private boolean caseSensitive = true;

BaseRowDelta(String tableName, TableOperations ops) {
Expand Down Expand Up @@ -81,12 +84,24 @@ public RowDelta validateDataFilesExist(Iterable<? extends CharSequence> referenc
}

@Override
public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilter) {
public RowDelta conflictDetectionFilter(Expression newConflictDetectionFilter) {
Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null");
this.conflictDetectionFilter = newConflictDetectionFilter;
return this;
}

@Override
public RowDelta validateNoConflictingDataFiles() {
this.validateNewDataFiles = true;
return this;
}

@Override
public RowDelta validateNoConflictingDeleteFiles() {
this.validateNewDeleteFiles = true;
return this;
}

@Override
protected void validate(TableMetadata base) {
if (base.currentSnapshot() != null) {
Expand All @@ -95,10 +110,13 @@ protected void validate(TableMetadata base) {
base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter);
}

// TODO: does this need to check new delete files?
if (conflictDetectionFilter != null) {
if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
}

if (validateNewDeleteFiles) {
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
}
}
}
}
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ public boolean isEmpty() {
return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty();
}

public List<DeleteFile> referencedDeleteFiles() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would Iterable<DeleteFile> work instead? That would make this lazy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

List<DeleteFile> deleteFiles = Lists.newArrayList();

if (globalDeletes != null) {
deleteFiles.addAll(Arrays.asList(globalDeletes));
}

sortedDeletesByPartition.forEach((key, partitionDeletes) -> {
deleteFiles.addAll(Arrays.asList(partitionDeletes.second()));
});

return deleteFiles;
}

private StructLikeWrapper newWrapper(int specId) {
return StructLikeWrapper.forType(partitionTypeById.get(specId));
}
Expand Down
77 changes: 70 additions & 7 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private static final Set<String> VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE);
// delete files can be added in "overwrite" or "delete" operations
private static final Set<String> VALIDATE_REPLACED_DATA_FILES_OPERATIONS =
private static final Set<String> VALIDATE_ADDED_DELETE_FILES_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE);

private final String tableName;
Expand Down Expand Up @@ -297,20 +297,33 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI
*/
protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId,
Iterable<DataFile> dataFiles) {
validateNoNewDeletesForDataFiles(base, startingSnapshotId, null, dataFiles, true);
}

/**
* Validates that no new delete files that must be applied to the given data files have been added to the table since
* a starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter a data filter
* @param dataFiles data files to validate have no new row deletes
* @param caseSensitive whether expression binding should be case-sensitive
*/
protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId,
Expression dataFilter, Iterable<DataFile> dataFiles,
boolean caseSensitive) {
// if there is no current table state, no files have been added
if (base.currentSnapshot() == null) {
return;
}

Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId, VALIDATE_REPLACED_DATA_FILES_OPERATIONS, ManifestContent.DELETES);
validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();

long startingSequenceNumber = startingSnapshotId == null ? 0 : base.snapshot(startingSnapshotId).sequenceNumber();
DeleteFileIndex deletes = DeleteFileIndex.builderFor(ops.io(), deleteManifests)
.afterSequenceNumber(startingSequenceNumber)
.specsById(ops.current().specsById())
.build();
long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId);
DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive);

for (DataFile dataFile : dataFiles) {
// if any delete is found that applies to files written in or before the starting snapshot, fail
Expand All @@ -320,6 +333,56 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin
}
}

/**
* Validates that no delete files matching a filter have been added to the table since a starting snapshot.
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the operation
* @param dataFilter an expression used to find new conflicting delete files
* @param caseSensitive whether expression evaluation should be case-sensitive
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapshotId,
Expression dataFilter, boolean caseSensitive) {
// if there is no current table state, no files have been added
if (base.currentSnapshot() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the check for base.formatVersion() < 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to both methods. Could you check, @rdblue?

return;
}

Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();

long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId);
DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive);

ValidationException.check(deletes.isEmpty(),
"Found new conflicting delete files that can apply to records matching %s: %s",
dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path));
}

private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) {
if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) != null) {
Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId);
return startingSnapshot.sequenceNumber();
} else {
return TableMetadata.INITIAL_SEQUENCE_NUMBER;
}
}

private DeleteFileIndex buildDeleteFileIndex(List<ManifestFile> deleteManifests, long startingSequenceNumber,
Expression dataFilter, boolean caseSensitive) {
DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(ops.io(), deleteManifests)
.afterSequenceNumber(startingSequenceNumber)
.caseSensitive(caseSensitive)
.specsById(ops.current().specsById());

if (dataFilter != null) {
builder.filterData(dataFilter);
}

return builder.build();
}

@SuppressWarnings("CollectionUndefinedEquality")
protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId,
CharSequenceSet requiredDataFiles, boolean skipDeletes,
Expand Down
16 changes: 15 additions & 1 deletion core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class TableTestBase {
.build();
// Equality delete files.
static final DeleteFile FILE_A2_DELETES = FileMetadata.deleteFileBuilder(SPEC)
.ofEqualityDeletes(3)
.ofEqualityDeletes(1)
.withPath("/path/to/data-a2-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
Expand Down Expand Up @@ -366,6 +366,20 @@ void validateTableFiles(Table tbl, DataFile... expectedFiles) {
Assert.assertEquals("Files should match", expectedFilePaths, actualFilePaths);
}

void validateTableDeleteFiles(Table tbl, DeleteFile... expectedFiles) {
Set<CharSequence> expectedFilePaths = Sets.newHashSet();
for (DeleteFile file : expectedFiles) {
expectedFilePaths.add(file.path());
}
Set<CharSequence> actualFilePaths = Sets.newHashSet();
for (FileScanTask task : tbl.newScan().planFiles()) {
for (DeleteFile file : task.deletes()) {
actualFilePaths.add(file.path());
}
}
Assert.assertEquals("Delete files should match", expectedFilePaths, actualFilePaths);
}

List<String> paths(DataFile... dataFiles) {
List<String> paths = Lists.newArrayListWithExpectedSize(dataFiles.length);
for (DataFile file : dataFiles) {
Expand Down
Loading