diff --git a/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java b/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java new file mode 100644 index 000000000000..64b579a1a377 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.function.Function; +import javax.annotation.Nonnull; + +/** + * Interface to support validating snapshot ancestry during the commit process. + * + *

Validation will be called after the table metadata is refreshed to pick up any changes to the + * table state. + */ +@FunctionalInterface +public interface SnapshotAncestryValidator extends Function, Boolean> { + + SnapshotAncestryValidator NON_VALIDATING = baseSnapshots -> true; + + /** + * Validate the snapshots based on the refreshed table state. + * + * @param baseSnapshots ancestry of the base table metadata snapshots + * @return boolean for whether the update is valid + */ + @Override + Boolean apply(Iterable baseSnapshots); + + /** + * Validation message that will be included when throwing {@link + * org.apache.iceberg.exceptions.ValidationException} + * + * @return message + */ + @Nonnull + default String errorMessage() { + return "error message not provided"; + } +} diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index cc6b02dee474..73509c15384f 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -71,4 +71,9 @@ default ThisT toBranch(String branch) { "Cannot commit to branch %s: %s does not support branch commits", branch, this.getClass().getName())); } + + default ThisT validateWith(SnapshotAncestryValidator validator) { + throw new UnsupportedOperationException( + "Snapshot validation not supported by " + this.getClass().getName()); + } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index d11f466434ec..ce02637d98d3 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -56,6 +56,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.metrics.CommitMetrics; import org.apache.iceberg.metrics.CommitMetricsResult; @@ -117,6 +118,8 @@ public void accept(String file) { private TableMetadata base; private boolean stageOnly = false; private Consumer deleteFunc = defaultDelete; + private SnapshotAncestryValidator snapshotAncestryValidator = + SnapshotAncestryValidator.NON_VALIDATING; private ExecutorService workerPool; private String targetBranch = SnapshotRef.MAIN_BRANCH; @@ -159,6 +162,20 @@ public ThisT scanManifestsWith(ExecutorService executorService) { return self(); } + /** + * Set a validator to check snapshot ancestry before committing changes. + * + *

If there is no parent snapshot, an empty iterable will be supplied to the validator. + * + * @param validator a validator to check snapshot ancestry validity + * @return this for method chaining + */ + @Override + public ThisT validateWith(SnapshotAncestryValidator validator) { + this.snapshotAncestryValidator = validator; + return self(); + } + protected TableOperations ops() { return ops; } @@ -257,7 +274,8 @@ public Snapshot apply() { long sequenceNumber = base.nextSequenceNumber(); Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId(); - validate(base, parentSnapshot); + runValidations(parentSnapshot); + List manifests = apply(base, parentSnapshot); OutputFile manifestList = manifestListPath(); @@ -327,6 +345,20 @@ public Snapshot apply() { writer.toManifestListFile().encryptionKeyID()); } + private void runValidations(Snapshot parentSnapshot) { + validate(base, parentSnapshot); + + // Validate snapshot ancestry + Iterable snapshotAncestry = + parentSnapshot != null + ? SnapshotUtil.ancestorsOf(parentSnapshot.snapshotId(), base::snapshot) + : List.of(); + + boolean valid = snapshotAncestryValidator.apply(snapshotAncestry); + ValidationException.check( + valid, "Snapshot ancestry validation failed: %s", snapshotAncestryValidator.errorMessage()); + } + protected abstract Map summary(); /** Returns the snapshot summary from the implementation and updates totals. */ diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java index c3e238e3bc93..956242f66e33 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java @@ -18,11 +18,22 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; -public class TestSnapshotProducer { +@ExtendWith(ParameterizedTestExtension.class) +public class TestSnapshotProducer extends TestBase { @Test public void testManifestFileGroupSize() { @@ -74,4 +85,77 @@ private void assertManifestWriterCount( int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize, fileCount); assertThat(writerCount).as(errMsg).isEqualTo(expectedManifestWriterCount); } + + @TestTemplate + public void testCommitValidationPreventsCommit() throws IOException { + table.newAppend().commit(); + String validationMessage = "Validation force failed"; + + // Create a CommitValidator that will reject commits + SnapshotAncestryValidator validator = + new SnapshotAncestryValidator() { + @Override + public Boolean apply(Iterable baseSnapshots) { + return false; + } + + @Nonnull + @Override + public String errorMessage() { + return validationMessage; + } + }; + + // Test that the validator rejects commit + AppendFiles append1 = table.newAppend().validateWith(validator).appendFile(FILE_A); + assertThatThrownBy(append1::commit) + .isInstanceOf(ValidationException.class) + .hasMessage("Snapshot ancestry validation failed: " + validationMessage); + + // Verify the file was not committed + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(0); + } + + @TestTemplate + public void testCommitValidationWithCustomSummaryProperties() throws IOException { + String wapId = "wap-12345-staging-audit"; + + // Create a validator that checks custom summary properties + SnapshotAncestryValidator customPropertyValidator = + baseSnapshots -> { + List publishedWapIds = + Streams.stream(baseSnapshots) + .filter(snapshot -> snapshot.summary().containsKey(PUBLISHED_WAP_ID_PROP)) + .map(snapshot -> snapshot.summary().get(PUBLISHED_WAP_ID_PROP)) + .collect(Collectors.toList()); + + return !publishedWapIds.contains(wapId); + }; + + // Add a file with and set a published WAP id + table + .newFastAppend() + .validateWith(customPropertyValidator) + .appendFile(FILE_A) + .set(PUBLISHED_WAP_ID_PROP, wapId) + .commit(); + + // Verify the current state of the table + assertThat(table.currentSnapshot().summary().get(PUBLISHED_WAP_ID_PROP)).isEqualTo(wapId); + + // Attempt to add the same published WAP id + AppendFiles append2 = + table + .newFastAppend() + .validateWith(customPropertyValidator) + .appendFile(FILE_A) + .set(PUBLISHED_WAP_ID_PROP, wapId); + + assertThatThrownBy(append2::commit) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Snapshot ancestry validation failed"); + + // Verify the table wasn't updated + assertThat(table.snapshots()).hasSize(1); + } }