Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions api/src/main/java/org/apache/iceberg/OverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,28 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
OverwriteFiles caseSensitive(boolean caseSensitive);

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* This method should be called while committing non-idempotent overwrite operations.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, the overwrite operation
* will detect this during retries and fail.
* will detect this and fail.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* serializable isolation for eager update/delete operations. Otherwise, the isolation level
* serializable isolation for overwrite operations. Otherwise, the isolation level
* will be snapshot isolation.
* <p>
* Validation applies to files added to the table since the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and
* {@link #validateNoConflictingData()} instead.
*/
OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter);
@Deprecated
default OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter) {
return conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData();
}

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
Expand All @@ -145,4 +150,52 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression conflictDetectionFilter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this as well? Looks like it was supposed to be removed already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll clean up all interfaces in the api module in a separate PR.


/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);

/**
* Enables validation that data added concurrently does not conflict with this commit's operation.
* <p>
* This method should be called while committing non-idempotent overwrite operations.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, the overwrite operation
* will detect this and fail.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* isolation for non-idempotent overwrite operations.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
* If the conflict detection filter is not set, any new data added concurrently will fail this
* overwrite operation.
*
* @return this for method chaining
*/
OverwriteFiles validateNoConflictingData();

/**
* Enables validation that deletes that happened concurrently do not conflict with this commit's operation.
* <p>
* Validating concurrent deletes is required during non-idempotent overwrite operations.
* If a concurrent operation deletes data in one of the files being overwritten, the overwrite
* operation must be aborted as it may undelete rows that were removed concurrently.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* isolation for non-idempotent overwrite operations.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right to understand that we use rowFilter to do the detection, if conflictDetectionFilter is not set? Should we mention it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not set, the validation is tricky. I added a few sentences. Let me know if that makes sense, @szehon-ho.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea its a lot better.

* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
* If the conflict detection filter is not set, this operation will use the row filter provided
* in {@link #overwriteByRowFilter(Expression)} to check for new delete files and will ensure
* there are no conflicting deletes for data files removed via {@link #deleteFile(DataFile)}.
*
* @return this for method chaining
*/
OverwriteFiles validateNoConflictingDeletes();
}
50 changes: 47 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@

package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles> implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
private boolean validateNewDataFiles = false;
private boolean validateNewDeleteFiles = false;
private boolean caseSensitive = true;

protected BaseOverwriteFiles(String tableName, TableOperations ops) {
Expand Down Expand Up @@ -60,6 +66,7 @@ public OverwriteFiles addFile(DataFile file) {

@Override
public OverwriteFiles deleteFile(DataFile file) {
deletedDataFiles.add(file);
delete(file);
return this;
}
Expand Down Expand Up @@ -93,9 +100,22 @@ public OverwriteFiles caseSensitive(boolean isCaseSensitive) {
}

@Override
public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetectionFilter) {
public OverwriteFiles conflictDetectionFilter(Expression newConflictDetectionFilter) {
Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null");
this.conflictDetectionFilter = newConflictDetectionFilter;
return this;
}

@Override
public OverwriteFiles validateNoConflictingData() {
this.validateNewDataFiles = true;
failMissingDeletePaths();
return this;
}

@Override
public OverwriteFiles validateNoConflictingDeletes() {
this.validateNewDeleteFiles = true;
failMissingDeletePaths();
return this;
}
Expand Down Expand Up @@ -127,8 +147,32 @@ protected void validate(TableMetadata base) {
}
}

if (conflictDetectionFilter != null && base.currentSnapshot() != null) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), caseSensitive);
}

if (validateNewDeleteFiles) {
if (rowFilter() != Expressions.alwaysFalse()) {
Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter();
validateNoNewDeleteFiles(base, startingSnapshotId, filter, caseSensitive);
}

if (deletedDataFiles.size() > 0) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, conflictDetectionFilter,
deletedDataFiles, caseSensitive);
}
}
}

private Expression dataConflictDetectionFilter() {
if (conflictDetectionFilter != null) {
return conflictDetectionFilter;
} else if (rowFilter() != Expressions.alwaysFalse() && deletedDataFiles.isEmpty()) {
return rowFilter();
} else {
return Expressions.alwaysTrue();
}
}
}
Loading