Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions api/src/main/java/org/apache/iceberg/OverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,6 @@ default OverwriteFiles deleteFiles(
*/
OverwriteFiles validateAddedFilesMatchOverwriteFilter();

/**
* Set the snapshot ID used in any reads for this operation.
*
* <p>Validations will check changes after this snapshot ID. If the from snapshot is not set, all
* ancestor snapshots through the table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
OverwriteFiles validateFromSnapshot(long snapshotId);

/**
* Enables or disables case sensitive expression binding for validations that accept expressions.
*
Expand Down
16 changes: 0 additions & 16 deletions api/src/main/java/org/apache/iceberg/ReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,6 @@ public interface ReplacePartitions extends SnapshotUpdate<ReplacePartitions> {
*/
ReplacePartitions validateAppendOnly();

/**
* Set the snapshot ID used in validations for this operation.
*
* <p>All validations will check changes after this snapshot ID. If this is not called, validation
* will occur from the beginning of the table's history.
*
* <p>This method should be called before this operation is committed. If a concurrent operation
* committed a data or delta file or removed a data file after the given snapshot ID that might
* contain rows matching a partition marked for deletion, validation will detect this and fail.
*
* @param snapshotId a snapshot ID, it should be set to when this operation started to read the
* table.
* @return this for method chaining
*/
ReplacePartitions validateFromSnapshot(long snapshotId);

/**
* Enables validation that deletes that happened concurrently do not conflict with this commit's
* operation.
Expand Down
11 changes: 0 additions & 11 deletions api/src/main/java/org/apache/iceberg/RewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,4 @@ RewriteFiles rewriteFiles(
Set<DeleteFile> deleteFilesToReplace,
Set<DataFile> dataFilesToAdd,
Set<DeleteFile> deleteFilesToAdd);

/**
* Set the snapshot ID used in any reads for this operation.
*
* <p>Validations will check changes after this snapshot ID. If this is not called, all ancestor
* snapshots through the table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
RewriteFiles validateFromSnapshot(long snapshotId);
}
11 changes: 0 additions & 11 deletions api/src/main/java/org/apache/iceberg/RowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,6 @@ default RowDelta removeDeletes(DeleteFile deletes) {
getClass().getName() + " does not implement removeDeletes");
}

/**
* Set the snapshot ID used in any reads for this operation.
*
* <p>Validations will check changes after this snapshot ID. If the from snapshot is not set, all
* ancestor snapshots through the table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
RowDelta validateFromSnapshot(long snapshotId);

/**
* Enables or disables case sensitive expression binding for validations that accept expressions.
*
Expand Down
28 changes: 28 additions & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,32 @@ default ThisT toBranch(String branch) {
"Cannot commit to branch %s: %s does not support branch commits",
branch, this.getClass().getName()));
}

/**
* Enables snapshot validation with a user-provided function, which must throw a {@link
* org.apache.iceberg.exceptions.ValidationException} on validation failures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should document how the validator is called. Is the latest snapshot first or last? I think that affects how validations are written so it seems important.

*
* <p>Clients can use this method to validate summary and other metadata of parent snapshots.
*
* @param snapshotValidator a user function to validate parent snapshots
* @return this for method chaining
*/
default ThisT validateWith(Consumer<Snapshot> snapshotValidator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use of the ancestor iterable is a more elegant solution than making a large refactor and calling the validator in a loop.

throw new UnsupportedOperationException(
getClass().getName() + " does not implement validateWith");
}

/**
* Set the snapshot ID used in any reads for this operation.
*
* <p>Validations will check changes after this snapshot ID. If the from snapshot is not set, all
* ancestor snapshots through the table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
default ThisT validateFromSnapshot(long snapshotId) {
throw new UnsupportedOperationException(
getClass().getName() + " does not implement validateFromSnapshot");
}
}
17 changes: 6 additions & 11 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final DataFileSet deletedDataFiles = DataFileSet.create();
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
private boolean validateNewDataFiles = false;
private boolean validateNewDeletes = false;
Expand Down Expand Up @@ -98,12 +97,6 @@ public OverwriteFiles validateAddedFilesMatchOverwriteFilter() {
return this;
}

@Override
public OverwriteFiles validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
return this;
}

@Override
public OverwriteFiles conflictDetectionFilter(Expression newConflictDetectionFilter) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -134,6 +127,8 @@ public BaseOverwriteFiles toBranch(String branch) {

@Override
protected void validate(TableMetadata base, Snapshot parent) {
super.validate(base, parent);

if (validateAddedFilesMatchOverwriteFilter) {
PartitionSpec spec = dataSpec();
Expression rowFilter = rowFilter();
Expand Down Expand Up @@ -161,19 +156,19 @@ protected void validate(TableMetadata base, Snapshot parent) {
}

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), parent);
validateAddedDataFiles(base, startingSnapshotId(), dataConflictDetectionFilter(), parent);
}

if (validateNewDeletes) {
if (rowFilter() != Expressions.alwaysFalse()) {
Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter();
validateNoNewDeleteFiles(base, startingSnapshotId, filter, parent);
validateDeletedDataFiles(base, startingSnapshotId, filter, parent);
validateNoNewDeleteFiles(base, startingSnapshotId(), filter, parent);
validateDeletedDataFiles(base, startingSnapshotId(), filter, parent);
}

if (!deletedDataFiles.isEmpty()) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, conflictDetectionFilter, deletedDataFiles, parent);
base, startingSnapshotId(), conflictDetectionFilter, deletedDataFiles, parent);
}
}
}
Expand Down
21 changes: 8 additions & 13 deletions core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class BaseReplacePartitions extends MergingSnapshotProducer<ReplacePartit
implements ReplacePartitions {

private final PartitionSet replacedPartitions;
private Long startingSnapshotId;
private boolean validateConflictingData = false;
private boolean validateConflictingDeletes = false;

Expand Down Expand Up @@ -61,12 +60,6 @@ public ReplacePartitions validateAppendOnly() {
return this;
}

@Override
public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) {
this.startingSnapshotId = newStartingSnapshotId;
return this;
}

@Override
public ReplacePartitions validateNoConflictingDeletes() {
this.validateConflictingDeletes = true;
Expand All @@ -87,24 +80,26 @@ public BaseReplacePartitions toBranch(String branch) {

@Override
public void validate(TableMetadata currentMetadata, Snapshot parent) {
super.validate(currentMetadata, parent);

if (validateConflictingData) {
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
currentMetadata, startingSnapshotId(), Expressions.alwaysTrue(), parent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example of what I mean in the comment above. There are quite a few changes here (and in other classes) to refactor so that startingSnapshotId can be reused.

} else {
validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
validateAddedDataFiles(currentMetadata, startingSnapshotId(), replacedPartitions, parent);
}
}

if (validateConflictingDeletes) {
if (dataSpec().isUnpartitioned()) {
validateDeletedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
currentMetadata, startingSnapshotId(), Expressions.alwaysTrue(), parent);
validateNoNewDeleteFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
currentMetadata, startingSnapshotId(), Expressions.alwaysTrue(), parent);
} else {
validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
validateDeletedDataFiles(currentMetadata, startingSnapshotId(), replacedPartitions, parent);
validateNoNewDeleteFiles(currentMetadata, startingSnapshotId(), replacedPartitions, parent);
}
}
}
Expand Down
11 changes: 3 additions & 8 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final DataFileSet replacedDataFiles = DataFileSet.create();
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
super(tableName, ops);
Expand Down Expand Up @@ -119,12 +118,6 @@ public RewriteFiles rewriteFiles(
return this;
}

@Override
public RewriteFiles validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
return this;
}

@Override
public BaseRewriteFiles toBranch(String branch) {
targetBranch(branch);
Expand All @@ -133,11 +126,13 @@ public BaseRewriteFiles toBranch(String branch) {

@Override
protected void validate(TableMetadata base, Snapshot parent) {
super.validate(base, parent);

validateReplacedAndAddedFiles();
if (!replacedDataFiles.isEmpty()) {
// if there are replaced data files, there cannot be any new row-level deletes for those data
// files
validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, parent);
validateNoNewDeletesForDataFiles(base, startingSnapshotId(), replacedDataFiles, parent);
}
}

Expand Down
25 changes: 10 additions & 15 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iceberg.util.SnapshotUtil;

public class BaseRowDelta extends MergingSnapshotProducer<RowDelta> implements RowDelta {
private Long startingSnapshotId = null; // check all versions by default
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
private final DataFileSet removedDataFiles = DataFileSet.create();
private boolean validateDeletes = false;
Expand Down Expand Up @@ -80,12 +79,6 @@ public RowDelta removeDeletes(DeleteFile deletes) {
return this;
}

@Override
public RowDelta validateFromSnapshot(long snapshotId) {
this.startingSnapshotId = snapshotId;
return this;
}

@Override
public RowDelta validateDeletedFiles() {
this.validateDeletes = true;
Expand Down Expand Up @@ -126,18 +119,20 @@ public RowDelta toBranch(String branch) {

@Override
protected void validate(TableMetadata base, Snapshot parent) {
super.validate(base, parent);

if (parent != null) {
if (startingSnapshotId != null) {
if (startingSnapshotId() != null) {
Preconditions.checkArgument(
SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot),
SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId(), base::snapshot),
"Snapshot %s is not an ancestor of %s",
startingSnapshotId,
startingSnapshotId(),
parent.snapshotId());
}
if (!referencedDataFiles.isEmpty()) {
validateDataFilesExist(
base,
startingSnapshotId,
startingSnapshotId(),
referencedDataFiles,
!validateDeletes,
conflictDetectionFilter,
Expand All @@ -149,23 +144,23 @@ protected void validate(TableMetadata base, Snapshot parent) {
}

if (validateNewDataFiles) {
validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
validateAddedDataFiles(base, startingSnapshotId(), conflictDetectionFilter, parent);
}

if (validateNewDeleteFiles) {
// validate that explicitly deleted files have not had added deletes
if (!removedDataFiles.isEmpty()) {
validateNoNewDeletesForDataFiles(
base, startingSnapshotId, conflictDetectionFilter, removedDataFiles, parent);
base, startingSnapshotId(), conflictDetectionFilter, removedDataFiles, parent);
}

// validate that previous deletes do not conflict with added deletes
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
validateNoNewDeleteFiles(base, startingSnapshotId(), conflictDetectionFilter, parent);
}

validateNoConflictingFileAndPositionDeletes();

validateAddedDVs(base, startingSnapshotId, conflictDetectionFilter, parent);
validateAddedDVs(base, startingSnapshotId(), conflictDetectionFilter, parent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public Object updateEvent() {

@Override
protected void validate(TableMetadata base, Snapshot snapshot) {
super.validate(base, snapshot);

// this is only called after apply() passes off to super, but check fast-forward status just in
// case
if (!isFastForward(base)) {
Expand Down
28 changes: 27 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.events.CreateSnapshotEvent;
Expand Down Expand Up @@ -117,6 +118,8 @@ public void accept(String file) {
private TableMetadata base;
private boolean stageOnly = false;
private Consumer<String> deleteFunc = defaultDelete;
@Nullable private Long startingSnapshotId = null; // check all versions by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the original PR's approach to iterate backward through history rather than this approach that iterates through more by default. I also like that the other approach can reuse work by running the validation once with a loop, rather than calling the validation in a loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the original PR's approach to iterate backward through history rather than this approach that iterates through more by default.

Both the original PR and this approach iterate backward. The original PR loops through the whole history for every validation run. This PR only checks new parent Snapshots.

I also like that the other approach can reuse work by running the validation once with a loop, rather than calling the validation in a loop.

Like I said above - the other approach runs validations for all parent snapshots in a loop. This PR limits the scope of validations to new Snapshots.

@Nullable private Consumer<Snapshot> snapshotValidator = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented on the original PR, we don't want to leave the exception class to the implementation. A validation should return a boolean to indicate that something was wrong. That way, the exception class is uniform (ValidationException) and acceptance is explicit.


private ExecutorService workerPool;
private String targetBranch = SnapshotRef.MAIN_BRANCH;
Expand Down Expand Up @@ -211,6 +214,23 @@ public ThisT deleteWith(Consumer<String> deleteCallback) {
return self();
}

@Override
public ThisT validateWith(Consumer<Snapshot> validator) {
this.snapshotValidator = validator;
return self();
}

@Override
public ThisT validateFromSnapshot(long startSnapshotId) {
this.startingSnapshotId = startSnapshotId;
return self();
}

@Nullable
protected Long startingSnapshotId() {
return this.startingSnapshotId;
}

/**
* Clean up any uncommitted manifests that were created.
*
Expand Down Expand Up @@ -238,7 +258,13 @@ public ThisT deleteWith(Consumer<String> deleteCallback) {
* @param currentMetadata current table metadata to validate
* @param snapshot ending snapshot on the lineage which is being validated
*/
protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}
protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {
if (snapshotValidator != null && snapshot != null) {
SnapshotUtil.ancestorsBetween(
snapshot.snapshotId(), startingSnapshotId, currentMetadata::snapshot)
.forEach(snapshotValidator);
}
}

/**
* Apply the update's changes to the given metadata and snapshot. Return the new manifest list.
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/StreamingDelete.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public StreamingDelete toBranch(String branch) {

@Override
protected void validate(TableMetadata base, Snapshot parent) {
super.validate(base, parent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered whether it is a good idea to override validate and concluded that I wouldn't recommend this option. Part of the problem is that this needlessly touches a lot of files it doesn't need to, but it is also more brittle than adding a call because we may miss an implementation. And there are some custom operations out there that extend Iceberg classes that this would break.


if (validateFilesToDeleteExist) {
failMissingDeletePaths();
}
Expand Down
Loading