Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions api/src/main/java/org/apache/iceberg/OverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,28 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
OverwriteFiles caseSensitive(boolean caseSensitive);

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* This method should be called while committing non-idempotent overwrite operations.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, the overwrite operation
* will detect this during retries and fail.
* will detect this and fail.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* serializable isolation for eager update/delete operations. Otherwise, the isolation level
* serializable isolation for overwrite operations. Otherwise, the isolation level
* will be snapshot isolation.
* <p>
* Validation applies to files added to the table since the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and
* {@link #validateNoConflictingData()} instead.
*/
OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter);
@Deprecated
default OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter) {
return conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData();
}

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
Expand All @@ -145,4 +150,52 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression conflictDetectionFilter);

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);

/**
* Enables validation that data added concurrently does not conflict with this commit's operation.
* <p>
* This method should be called while committing non-idempotent overwrite operations.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, the overwrite operation
* will detect this and fail.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* isolation for non-idempotent overwrite operations.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
* If the conflict detection filter is not set, any new data added concurrently will fail this
* overwrite operation.
*
* @return this for method chaining
*/
OverwriteFiles validateNoConflictingData();

/**
* Enables validation that deletes that happened concurrently do not conflict with this commit's operation.
* <p>
* Validating concurrent deletes is required during non-idempotent overwrite operations.
* If a concurrent operation deletes data in one of the files being overwritten, the overwrite
* operation must be aborted as it may undelete rows that were removed concurrently.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* isolation for non-idempotent overwrite operations.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
* If the conflict detection filter is not set, this operation will use the row filter provided
* in {@link #overwriteByRowFilter(Expression)} to check for new delete files and will ensure
* there are no conflicting deletes for data files removed via {@link #deleteFile(DataFile)}.
*
* @return this for method chaining
*/
OverwriteFiles validateNoConflictingDeletes();
}
52 changes: 50 additions & 2 deletions api/src/main/java/org/apache/iceberg/RowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
RowDelta validateDeletedFiles();

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* If a concurrent operation commits a new file after the data was read and that file might
Expand All @@ -109,6 +109,54 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and
* {@link #validateNoConflictingDataFiles()} instead.
*/
RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter);
@Deprecated
default RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter) {
conflictDetectionFilter(conflictDetectionFilter);
return validateNoConflictingDataFiles();
}

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
* <p>
* If not called, a true literal will be used as the conflict detection filter.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
RowDelta conflictDetectionFilter(Expression conflictDetectionFilter);

/**
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, this operation
* will detect this during retries and fail.
* <p>
* Calling this method is required to maintain serializable isolation for update/delete operations.
* Otherwise, the isolation level will be snapshot isolation.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
RowDelta validateNoConflictingDataFiles();

/**
* Enables validation that delete files added concurrently do not conflict with this commit's operation.
* <p>
* This method must be called when the table is queried to produce a row delta for UPDATE and
* MERGE operations independently of the isolation level. Calling this method isn't required
* for DELETE operations as it is OK to delete a record that is also deleted concurrently.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
RowDelta validateNoConflictingDeleteFiles();
}
50 changes: 47 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@

package org.apache.iceberg;

import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles> implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
private boolean validateNewDataFiles = false;
private boolean validateNewDeleteFiles = false;
private boolean caseSensitive = true;

protected BaseOverwriteFiles(String tableName, TableOperations ops) {
Expand Down Expand Up @@ -60,6 +66,7 @@ public OverwriteFiles addFile(DataFile file) {

@Override
public OverwriteFiles deleteFile(DataFile file) {
deletedDataFiles.add(file);
delete(file);
return this;
}
Expand Down Expand Up @@ -93,9 +100,22 @@ public OverwriteFiles caseSensitive(boolean isCaseSensitive) {
}

@Override
public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetectionFilter) {
public OverwriteFiles conflictDetectionFilter(Expression newConflictDetectionFilter) {
Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null");
this.conflictDetectionFilter = newConflictDetectionFilter;
return this;
}

@Override
public OverwriteFiles validateNoConflictingData() {
this.validateNewDataFiles = true;
failMissingDeletePaths();
return this;
}

@Override
public OverwriteFiles validateNoConflictingDeletes() {
this.validateNewDeleteFiles = true;
failMissingDeletePaths();
Comment on lines +111 to 119
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that both of these have failMissingDeletePaths makes me think something happened with the cherry-picks. Investigating further.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is the way both of these functions look in master - Is this intentional?:

public OverwriteFiles validateNoConflictingData() {
this.validateNewDataFiles = true;
failMissingDeletePaths();
return this;
}
@Override
public OverwriteFiles validateNoConflictingDeletes() {
this.validateNewDeleteFiles = true;
failMissingDeletePaths();
return this;
}

Wondering if this is correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind. This looks correct. Here's the actual function called:

protected void failMissingDeletePaths() {
filterManager.failMissingDeletePaths();
deleteFilterManager.failMissingDeletePaths();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intentional. This was to preserve the old behavior.

return this;
}
Expand Down Expand Up @@ -127,8 +147,32 @@ protected void validate(TableMetadata base) {
}
}

if (conflictDetectionFilter != null && base.currentSnapshot() != null) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), caseSensitive);
}

if (validateNewDeleteFiles) {
if (rowFilter() != Expressions.alwaysFalse()) {
Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter();
validateNoNewDeleteFiles(base, startingSnapshotId, filter, caseSensitive);
}

if (deletedDataFiles.size() > 0) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, conflictDetectionFilter,
deletedDataFiles, caseSensitive);
}
}
}

private Expression dataConflictDetectionFilter() {
if (conflictDetectionFilter != null) {
return conflictDetectionFilter;
} else if (rowFilter() != Expressions.alwaysFalse() && deletedDataFiles.isEmpty()) {
return rowFilter();
} else {
return Expressions.alwaysTrue();
}
}
}
29 changes: 24 additions & 5 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
package org.apache.iceberg;

import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.CharSequenceSet;

class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta {
private Long startingSnapshotId = null; // check all versions by default
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
private boolean validateDeletes = false;
private Expression conflictDetectionFilter = null;
private Expression conflictDetectionFilter = Expressions.alwaysTrue();
private boolean validateNewDataFiles = false;
private boolean validateNewDeleteFiles = false;
private boolean caseSensitive = true;

BaseRowDelta(String tableName, TableOperations ops) {
Expand Down Expand Up @@ -81,23 +84,39 @@ public RowDelta validateDataFilesExist(Iterable<? extends CharSequence> referenc
}

@Override
public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilter) {
public RowDelta conflictDetectionFilter(Expression newConflictDetectionFilter) {
Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null");
this.conflictDetectionFilter = newConflictDetectionFilter;
return this;
}

@Override
public RowDelta validateNoConflictingDataFiles() {
this.validateNewDataFiles = true;
return this;
}

@Override
public RowDelta validateNoConflictingDeleteFiles() {
this.validateNewDeleteFiles = true;
return this;
}

@Override
protected void validate(TableMetadata base) {
if (base.currentSnapshot() != null) {
if (!referencedDataFiles.isEmpty()) {
validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes);
validateDataFilesExist(
base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter);
}

// TODO: does this need to check new delete files?
if (conflictDetectionFilter != null) {
if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
}

if (validateNewDeleteFiles) {
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive);
}
}
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,6 +87,20 @@ public boolean isEmpty() {
return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty();
}

public Iterable<DeleteFile> referencedDeleteFiles() {
Iterable<DeleteFile> deleteFiles = Collections.emptyList();

if (globalDeletes != null) {
deleteFiles = Iterables.concat(deleteFiles, Arrays.asList(globalDeletes));
}

for (Pair<long[], DeleteFile[]> partitionDeletes : sortedDeletesByPartition.values()) {
deleteFiles = Iterables.concat(deleteFiles, Arrays.asList(partitionDeletes.second()));
}

return deleteFiles;
}

private StructLikeWrapper newWrapper(int specId) {
return StructLikeWrapper.forType(partitionTypeById.get(specId));
}
Expand Down
Loading