diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java index bd4fa17d50b3..20127abd7332 100644 --- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java @@ -101,17 +101,6 @@ default OverwriteFiles deleteFiles( */ OverwriteFiles validateAddedFilesMatchOverwriteFilter(); - /** - * Set the snapshot ID used in any reads for this operation. - * - *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all - * ancestor snapshots through the table's initial snapshot are validated. - * - * @param snapshotId a snapshot ID - * @return this for method chaining - */ - OverwriteFiles validateFromSnapshot(long snapshotId); - /** * Enables or disables case sensitive expression binding for validations that accept expressions. * diff --git a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java index 7e8ab65304c5..c153e3fd54a2 100644 --- a/api/src/main/java/org/apache/iceberg/ReplacePartitions.java +++ b/api/src/main/java/org/apache/iceberg/ReplacePartitions.java @@ -55,22 +55,6 @@ public interface ReplacePartitions extends SnapshotUpdate { */ ReplacePartitions validateAppendOnly(); - /** - * Set the snapshot ID used in validations for this operation. - * - *

All validations will check changes after this snapshot ID. If this is not called, validation - * will occur from the beginning of the table's history. - * - *

This method should be called before this operation is committed. If a concurrent operation - * committed a data or delta file or removed a data file after the given snapshot ID that might - * contain rows matching a partition marked for deletion, validation will detect this and fail. - * - * @param snapshotId a snapshot ID, it should be set to when this operation started to read the - * table. - * @return this for method chaining - */ - ReplacePartitions validateFromSnapshot(long snapshotId); - /** * Enables validation that deletes that happened concurrently do not conflict with this commit's * operation. diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java b/api/src/main/java/org/apache/iceberg/RewriteFiles.java index 2ed6368905ce..f16efb7d7f5c 100644 --- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java @@ -173,15 +173,4 @@ RewriteFiles rewriteFiles( Set deleteFilesToReplace, Set dataFilesToAdd, Set deleteFilesToAdd); - - /** - * Set the snapshot ID used in any reads for this operation. - * - *

Validations will check changes after this snapshot ID. If this is not called, all ancestor - * snapshots through the table's initial snapshot are validated. - * - * @param snapshotId a snapshot ID - * @return this for method chaining - */ - RewriteFiles validateFromSnapshot(long snapshotId); } diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java index f5f10b70f9ce..6dffd59bbc46 100644 --- a/api/src/main/java/org/apache/iceberg/RowDelta.java +++ b/api/src/main/java/org/apache/iceberg/RowDelta.java @@ -68,17 +68,6 @@ default RowDelta removeDeletes(DeleteFile deletes) { getClass().getName() + " does not implement removeDeletes"); } - /** - * Set the snapshot ID used in any reads for this operation. - * - *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all - * ancestor snapshots through the table's initial snapshot are validated. - * - * @param snapshotId a snapshot ID - * @return this for method chaining - */ - RowDelta validateFromSnapshot(long snapshotId); - /** * Enables or disables case sensitive expression binding for validations that accept expressions. * diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index cc6b02dee474..739a8eb735f0 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -71,4 +71,32 @@ default ThisT toBranch(String branch) { "Cannot commit to branch %s: %s does not support branch commits", branch, this.getClass().getName())); } + + /** + * Enables snapshot validation with a user-provided function, which must throw a {@link + * org.apache.iceberg.exceptions.ValidationException} on validation failures. + * + *

Clients can use this method to validate summary and other metadata of parent snapshots. + * + * @param snapshotValidator a user function to validate parent snapshots + * @return this for method chaining + */ + default ThisT validateWith(Consumer snapshotValidator) { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement validateWith"); + } + + /** + * Set the snapshot ID used in any reads for this operation. + * + *

Validations will check changes after this snapshot ID. If the from snapshot is not set, all + * ancestor snapshots through the table's initial snapshot are validated. + * + * @param snapshotId a snapshot ID + * @return this for method chaining + */ + default ThisT validateFromSnapshot(long snapshotId) { + throw new UnsupportedOperationException( + getClass().getName() + " does not implement validateFromSnapshot"); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index fb8c309d0bce..7e4b8c8a553a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -32,7 +32,6 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { private final DataFileSet deletedDataFiles = DataFileSet.create(); private boolean validateAddedFilesMatchOverwriteFilter = false; - private Long startingSnapshotId = null; private Expression conflictDetectionFilter = null; private boolean validateNewDataFiles = false; private boolean validateNewDeletes = false; @@ -98,12 +97,6 @@ public OverwriteFiles validateAddedFilesMatchOverwriteFilter() { return this; } - @Override - public OverwriteFiles validateFromSnapshot(long snapshotId) { - this.startingSnapshotId = snapshotId; - return this; - } - @Override public OverwriteFiles conflictDetectionFilter(Expression newConflictDetectionFilter) { Preconditions.checkArgument( @@ -134,6 +127,8 @@ public BaseOverwriteFiles toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + if (validateAddedFilesMatchOverwriteFilter) { PartitionSpec spec = dataSpec(); Expression rowFilter = rowFilter(); @@ -161,19 +156,19 @@ protected void validate(TableMetadata base, Snapshot parent) { } if (validateNewDataFiles) { - validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), parent); + validateAddedDataFiles(base, startingSnapshotId(), dataConflictDetectionFilter(), parent); } if (validateNewDeletes) { if (rowFilter() != Expressions.alwaysFalse()) { Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter(); - validateNoNewDeleteFiles(base, startingSnapshotId, filter, parent); - validateDeletedDataFiles(base, startingSnapshotId, filter, parent); + validateNoNewDeleteFiles(base, startingSnapshotId(), filter, parent); + validateDeletedDataFiles(base, startingSnapshotId(), filter, parent); } if (!deletedDataFiles.isEmpty()) { validateNoNewDeletesForDataFiles( - base, startingSnapshotId, conflictDetectionFilter, deletedDataFiles, parent); + base, startingSnapshotId(), conflictDetectionFilter, deletedDataFiles, parent); } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 892257b51b0c..8c3e6bf73c1b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -27,7 +27,6 @@ public class BaseReplacePartitions extends MergingSnapshotProducer implements RewriteFiles { private final DataFileSet replacedDataFiles = DataFileSet.create(); - private Long startingSnapshotId = null; BaseRewriteFiles(String tableName, TableOperations ops) { super(tableName, ops); @@ -119,12 +118,6 @@ public RewriteFiles rewriteFiles( return this; } - @Override - public RewriteFiles validateFromSnapshot(long snapshotId) { - this.startingSnapshotId = snapshotId; - return this; - } - @Override public BaseRewriteFiles toBranch(String branch) { targetBranch(branch); @@ -133,11 +126,13 @@ public BaseRewriteFiles toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + validateReplacedAndAddedFiles(); if (!replacedDataFiles.isEmpty()) { // if there are replaced data files, there cannot be any new row-level deletes for those data // files - validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, parent); + validateNoNewDeletesForDataFiles(base, startingSnapshotId(), replacedDataFiles, parent); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index b819d03dd5f8..d5a34572be0f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -29,7 +29,6 @@ import org.apache.iceberg.util.SnapshotUtil; public class BaseRowDelta extends MergingSnapshotProducer implements RowDelta { - private Long startingSnapshotId = null; // check all versions by default private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); private final DataFileSet removedDataFiles = DataFileSet.create(); private boolean validateDeletes = false; @@ -80,12 +79,6 @@ public RowDelta removeDeletes(DeleteFile deletes) { return this; } - @Override - public RowDelta validateFromSnapshot(long snapshotId) { - this.startingSnapshotId = snapshotId; - return this; - } - @Override public RowDelta validateDeletedFiles() { this.validateDeletes = true; @@ -126,18 +119,20 @@ public RowDelta toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + if (parent != null) { - if (startingSnapshotId != null) { + if (startingSnapshotId() != null) { Preconditions.checkArgument( - SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId, base::snapshot), + SnapshotUtil.isAncestorOf(parent.snapshotId(), startingSnapshotId(), base::snapshot), "Snapshot %s is not an ancestor of %s", - startingSnapshotId, + startingSnapshotId(), parent.snapshotId()); } if (!referencedDataFiles.isEmpty()) { validateDataFilesExist( base, - startingSnapshotId, + startingSnapshotId(), referencedDataFiles, !validateDeletes, conflictDetectionFilter, @@ -149,23 +144,23 @@ protected void validate(TableMetadata base, Snapshot parent) { } if (validateNewDataFiles) { - validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + validateAddedDataFiles(base, startingSnapshotId(), conflictDetectionFilter, parent); } if (validateNewDeleteFiles) { // validate that explicitly deleted files have not had added deletes if (!removedDataFiles.isEmpty()) { validateNoNewDeletesForDataFiles( - base, startingSnapshotId, conflictDetectionFilter, removedDataFiles, parent); + base, startingSnapshotId(), conflictDetectionFilter, removedDataFiles, parent); } // validate that previous deletes do not conflict with added deletes - validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent); + validateNoNewDeleteFiles(base, startingSnapshotId(), conflictDetectionFilter, parent); } validateNoConflictingFileAndPositionDeletes(); - validateAddedDVs(base, startingSnapshotId, conflictDetectionFilter, parent); + validateAddedDVs(base, startingSnapshotId(), conflictDetectionFilter, parent); } } diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java index 439524deaf24..a5c50c113cb2 100644 --- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java +++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java @@ -156,6 +156,8 @@ public Object updateEvent() { @Override protected void validate(TableMetadata base, Snapshot snapshot) { + super.validate(base, snapshot); + // this is only called after apply() passes off to super, but check fast-forward status just in // case if (!isFastForward(base)) { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index d11f466434ec..24b06ba7ff7f 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Consumer; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.events.CreateSnapshotEvent; @@ -117,6 +118,8 @@ public void accept(String file) { private TableMetadata base; private boolean stageOnly = false; private Consumer deleteFunc = defaultDelete; + @Nullable private Long startingSnapshotId = null; // check all versions by default + @Nullable private Consumer snapshotValidator = null; private ExecutorService workerPool; private String targetBranch = SnapshotRef.MAIN_BRANCH; @@ -211,6 +214,23 @@ public ThisT deleteWith(Consumer deleteCallback) { return self(); } + @Override + public ThisT validateWith(Consumer validator) { + this.snapshotValidator = validator; + return self(); + } + + @Override + public ThisT validateFromSnapshot(long startSnapshotId) { + this.startingSnapshotId = startSnapshotId; + return self(); + } + + @Nullable + protected Long startingSnapshotId() { + return this.startingSnapshotId; + } + /** * Clean up any uncommitted manifests that were created. * @@ -238,7 +258,13 @@ public ThisT deleteWith(Consumer deleteCallback) { * @param currentMetadata current table metadata to validate * @param snapshot ending snapshot on the lineage which is being validated */ - protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {} + protected void validate(TableMetadata currentMetadata, Snapshot snapshot) { + if (snapshotValidator != null && snapshot != null) { + SnapshotUtil.ancestorsBetween( + snapshot.snapshotId(), startingSnapshotId, currentMetadata::snapshot) + .forEach(snapshotValidator); + } + } /** * Apply the update's changes to the given metadata and snapshot. Return the new manifest list. diff --git a/core/src/main/java/org/apache/iceberg/StreamingDelete.java b/core/src/main/java/org/apache/iceberg/StreamingDelete.java index 81621164e4af..4818ebce3ed7 100644 --- a/core/src/main/java/org/apache/iceberg/StreamingDelete.java +++ b/core/src/main/java/org/apache/iceberg/StreamingDelete.java @@ -70,6 +70,8 @@ public StreamingDelete toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + super.validate(base, parent); + if (validateFilesToDeleteExist) { failMissingDeletePaths(); } diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java index c3e238e3bc93..ede3e0dd9e9f 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java @@ -19,10 +19,19 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; -public class TestSnapshotProducer { +public class TestSnapshotProducer extends TestBase { @Test public void testManifestFileGroupSize() { @@ -69,6 +78,95 @@ public void testManifestFileGroupSize() { "Must limit parallelism to avoid tiny manifests"); } + @TestTemplate + void testCommitValidationFailsOnExistingWapIdInSnapshotHistory() { + String stagedWapId = "12345"; + table + .newFastAppend() + .appendFile(FILE_A) + .set(SnapshotSummary.STAGED_WAP_ID_PROP, stagedWapId) + .commit(); + + String validationExceptionMessage = + String.format("Duplicate %s: %s", SnapshotSummary.STAGED_WAP_ID_PROP, stagedWapId); + TestingSnapshotProducer producer = + new TestingSnapshotProducer(table.ops()) + .validateWith(new WapIdValidator(stagedWapId, validationExceptionMessage)); + + assertThatThrownBy(producer::commit) + .isInstanceOf(ValidationException.class) + .hasMessage(validationExceptionMessage); + } + + @TestTemplate + void testCommitValidationStartsFromConfiguredSnapshot() { + String stagedWapId = "12345"; + table + .newFastAppend() + .appendFile(TestBase.FILE_A) + .set(SnapshotSummary.STAGED_WAP_ID_PROP, stagedWapId) + .commit(); + + TestingSnapshotProducer producer = + new TestingSnapshotProducer(table.ops()) + .validateFromSnapshot(table.currentSnapshot().snapshotId()) + .validateWith(new WapIdValidator(stagedWapId, "empty")); + + assertThatNoException().isThrownBy(producer::commit); + assertThat(Iterables.size(table.snapshots())).isEqualTo(2); + } + + private static class WapIdValidator implements Consumer { + private final String stagedWapId; + private final String validationErrorMessage; + + private WapIdValidator(String stagedWapId, String validationErrorMessage) { + this.stagedWapId = stagedWapId; + this.validationErrorMessage = validationErrorMessage; + } + + @Override + public void accept(Snapshot snapshot) { + if (stagedWapId.equals(snapshot.summary().get(SnapshotSummary.STAGED_WAP_ID_PROP))) { + throw new ValidationException(validationErrorMessage); + } + } + } + + private static class TestingSnapshotProducer extends SnapshotProducer { + private TestingSnapshotProducer(TableOperations ops) { + super(ops); + } + + @Override + protected TestingSnapshotProducer self() { + return this; + } + + @Override + protected void cleanUncommitted(Set committed) {} + + @Override + protected String operation() { + return ""; + } + + @Override + protected List apply(TableMetadata metadataToUpdate, Snapshot snapshot) { + return List.of(); + } + + @Override + protected Map summary() { + return Map.of(); + } + + @Override + public TestingSnapshotProducer set(String property, String value) { + return null; + } + } + private void assertManifestWriterCount( int workerPoolSize, int fileCount, int expectedManifestWriterCount, String errMsg) { int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize, fileCount);