diff --git a/api/src/main/java/org/apache/iceberg/PendingUpdate.java b/api/src/main/java/org/apache/iceberg/PendingUpdate.java index f47b98238de0..6de3432f6157 100644 --- a/api/src/main/java/org/apache/iceberg/PendingUpdate.java +++ b/api/src/main/java/org/apache/iceberg/PendingUpdate.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; +import java.util.List; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; @@ -54,6 +55,26 @@ public interface PendingUpdate { */ void commit(); + /** + * Apply the pending changes, validate the current version of the table, and commit. + * + *

Changes are committed by calling the underlying table's commit method. + * + *

Once the commit is successful, the updated table will be refreshed. + * + * @param validations A list of {@link Validation} which will be used to test whether it is safe + * to commit the pending changes to the current version of the table at commit time. + * @throws ValidationException If the update cannot be applied to the current table metadata. + * @throws UnsupportedOperationException If any of the supplied validations attempt to modify the + * table it is given. + * @throws CommitFailedException If the update cannot be committed due to conflicts. + * @throws CommitStateUnknownException If the update success or failure is unknown, no cleanup + * should be done in this case. + */ + default void commitIf(List validations) { + throw new UnsupportedOperationException(); + } + /** * Generates update event to notify about metadata changes * diff --git a/api/src/main/java/org/apache/iceberg/Validation.java b/api/src/main/java/org/apache/iceberg/Validation.java new file mode 100644 index 000000000000..85c4448f15de --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/Validation.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.google.errorprone.annotations.FormatMethod; +import java.util.function.Predicate; +import org.apache.iceberg.exceptions.ValidationException; + +public class Validation { + private final Predicate predicate; + private final String message; + private final Object[] args; + + /** + * @param predicate The predicate the table needs to satisfy. + * @param message The message that will be included in the {@link ValidationException} that will + * be thrown by {@link Validation#validate} if the predicate is not satisfied. + * @param args The arguments referenced by the format specifiers in the message, if any. + */ + @FormatMethod + public Validation(Predicate
predicate, String message, Object... args) { + this.predicate = predicate; + this.message = message; + this.args = args; + } + + /** + * Ensures that the given table is valid according to the predicate. + * + * @param table The table to test. + * @throws ValidationException If the predicate is not satisfied by the given table. + */ + @SuppressWarnings("FormatStringAnnotation") + public void validate(Table table) { + ValidationException.check(predicate.test(table), message, args); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BasePendingUpdate.java b/core/src/main/java/org/apache/iceberg/BasePendingUpdate.java new file mode 100644 index 000000000000..bfab91d32fa2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BasePendingUpdate.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +abstract class BasePendingUpdate implements PendingUpdate { + private final List pendingValidations = Lists.newArrayList(); + + @Override + public void commitIf(List validations) { + this.pendingValidations.addAll(validations); + commit(); + } + + protected final void validate(TableMetadata base) { + Table currentTable = new BaseTable(new StaticTableOperations(base), null); + this.pendingValidations.forEach(validation -> validation.validate(currentTable)); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index 2311c1b017d9..0bdeedee3fe9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -31,7 +31,7 @@ import org.apache.iceberg.expressions.Term; import org.apache.iceberg.util.Tasks; -public class BaseReplaceSortOrder implements ReplaceSortOrder { +public class BaseReplaceSortOrder extends BasePendingUpdate implements ReplaceSortOrder { private final TableOperations ops; private final SortOrder.Builder builder; private TableMetadata base; @@ -62,6 +62,7 @@ public void commit() { this.base = ops.refresh(); SortOrder newOrder = apply(); TableMetadata updated = base.replaceSortOrder(newOrder); + validate(base); taskOps.commit(base, updated); }); } diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java index c69f6f3844f9..21781f86a01b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java +++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java @@ -41,7 +41,8 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.util.Pair; -class BaseUpdatePartitionSpec implements UpdatePartitionSpec { +class BaseUpdatePartitionSpec extends BasePendingUpdate + implements UpdatePartitionSpec { private final TableOperations ops; private final TableMetadata base; private final int formatVersion; @@ -342,6 +343,7 @@ public void commit() { } else { update = base.addPartitionSpec(apply()); } + validate(base); ops.commit(base, update); } diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 35338a689205..20868d290c68 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -35,7 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Tasks; -class PropertiesUpdate implements UpdateProperties { +class PropertiesUpdate extends BasePendingUpdate> implements UpdateProperties { private final TableOperations ops; private final Map updates = Maps.newHashMap(); private final Set removals = Sets.newHashSet(); @@ -110,6 +110,7 @@ public void commit() { taskOps -> { Map newProperties = apply(); TableMetadata updated = base.replaceProperties(newProperties); + validate(base); taskOps.commit(base, updated); }); } diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 7558ea7d8a3e..0cdd20c8e39a 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("UnnecessaryAnonymousClass") -class RemoveSnapshots implements ExpireSnapshots { +class RemoveSnapshots extends BasePendingUpdate> implements ExpireSnapshots { private static final Logger LOG = LoggerFactory.getLogger(RemoveSnapshots.class); // Creates an executor service that runs each task in the thread that invokes execute/submit. @@ -306,6 +306,7 @@ public void commit() { .run( item -> { TableMetadata updated = internalApply(); + validate(base); ops.commit(base, updated); }); LOG.info("Committed snapshot changes"); diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java index 069097778606..23662b281eed 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java +++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; /** Schema evolution API implementation. */ -class SchemaUpdate implements UpdateSchema { +class SchemaUpdate extends BasePendingUpdate implements UpdateSchema { private static final Logger LOG = LoggerFactory.getLogger(SchemaUpdate.class); private static final int TABLE_ROOT_ID = -1; @@ -445,6 +445,7 @@ public Schema apply() { @Override public void commit() { TableMetadata update = applyChangesToMetadata(base.updateSchema(apply(), lastColumnId)); + validate(base); ops.commit(base, update); } diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index 148e4b8bc8be..7efcf2059339 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -30,7 +30,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.util.Tasks; -public class SetLocation implements UpdateLocation { +public class SetLocation extends BasePendingUpdate implements UpdateLocation { private final TableOperations ops; private String newLocation; @@ -61,6 +61,10 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .run(taskOps -> taskOps.commit(base, base.updateLocation(newLocation))); + .run( + taskOps -> { + validate(base); + taskOps.commit(base, base.updateLocation(newLocation)); + }); } } diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index 0f80b4e1f233..367a3f498278 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -40,7 +40,7 @@ *

This update is not exposed though the Table API. Instead, it is a package-private part of the * Transaction API intended for use in {@link ManageSnapshots}. */ -class SetSnapshotOperation implements PendingUpdate { +class SetSnapshotOperation extends BasePendingUpdate implements PendingUpdate { private final TableOperations ops; private TableMetadata base; @@ -123,6 +123,8 @@ public void commit() { .setBranchSnapshot(snapshot.snapshotId(), SnapshotRef.MAIN_BRANCH) .build(); + validate(base); + // Do commit this operation even if the metadata has not changed, as we need to // advance the hasLastOpCommited for the transaction's commit to work properly. // (Without any other operations in the transaction, the commitTransaction() call diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 41c7254d6cdc..333617147a59 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -24,7 +24,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -public class SetStatistics implements UpdateStatistics { +public class SetStatistics extends BasePendingUpdate> + implements UpdateStatistics { private final TableOperations ops; private final Map> statisticsToSet = Maps.newHashMap(); @@ -54,6 +55,7 @@ public List apply() { public void commit() { TableMetadata base = ops.current(); TableMetadata newMetadata = internalApply(base); + validate(base); ops.commit(base, newMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java index bb7ca4b11c11..54dc4349dcb6 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; +import java.util.List; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class SnapshotManager implements ManageSnapshots { @@ -181,4 +182,12 @@ public void commit() { transaction.commitTransaction(); } } + + @Override + public void commitIf(List validations) { + commitIfRefUpdatesExist(); + // Add a no-op UpdateProperties to add given validations to transaction + transaction.updateProperties().commitIf(validations); + commit(); + } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index f750e88e86d9..7dbacfa352d1 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -79,7 +79,8 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("UnnecessaryAnonymousClass") -abstract class SnapshotProducer implements SnapshotUpdate { +abstract class SnapshotProducer extends BasePendingUpdate + implements SnapshotUpdate { private static final Logger LOG = LoggerFactory.getLogger(SnapshotProducer.class); static final int MIN_FILE_GROUP_SIZE = 10_000; static final Set EMPTY_SET = Sets.newHashSet(); @@ -405,6 +406,7 @@ public void commit() { } TableMetadata updated = update.build(); + validate(base); if (updated.changes().isEmpty()) { // do not commit if the metadata has not changed. for example, this may happen // when setting the current diff --git a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java index 77ee0920edc9..2595e2f92ee1 100644 --- a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java @@ -37,6 +37,13 @@ public StaticTableOperations(String metadataFileLocation, FileIO io) { this(metadataFileLocation, io, null); } + public StaticTableOperations(TableMetadata metadata) { + this.metadataFileLocation = metadata.metadataFileLocation(); + this.staticMetadata = metadata; + this.io = null; + this.locationProvider = null; + } + public StaticTableOperations( String metadataFileLocation, FileIO io, LocationProvider locationProvider) { this.io = io; diff --git a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java index f7ccea747a60..991da7cfa433 100644 --- a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java +++ b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java @@ -27,7 +27,8 @@ * ToDo: Add SetSnapshotOperation operations such as setCurrentSnapshot, rollBackTime, rollbackTo to * this class so that we can support those operations for refs. */ -class UpdateSnapshotReferencesOperation implements PendingUpdate> { +class UpdateSnapshotReferencesOperation extends BasePendingUpdate> + implements PendingUpdate> { private final TableOperations ops; private final Map updatedRefs; @@ -47,6 +48,7 @@ public Map apply() { @Override public void commit() { TableMetadata updated = internalApply(); + validate(base); ops.commit(base, updated); } diff --git a/core/src/test/java/org/apache/iceberg/TestCustomValidations.java b/core/src/test/java/org/apache/iceberg/TestCustomValidations.java new file mode 100644 index 000000000000..470a77b11584 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestCustomValidations.java @@ -0,0 +1,1425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.util.Objects; +import java.util.Set; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestCustomValidations extends V2TableTestBase { + + private final Validation alwaysPassValidation = + new Validation(currentTable -> true, "Always pass."); + + private final String alwaysFailMessage = "Always fail."; + private final Validation alwaysFailValidation = + new Validation(currentTable -> false, alwaysFailMessage); + + private final String watermarkKey = "watermark"; + + private void setWatermarkProperty(Table table, int watermarkValue) { + table.updateProperties().set(watermarkKey, Integer.toString(watermarkValue)).commit(); + } + + private final String watermarkFailMessagePattern = + "Current watermark value not equal to expected value=%s"; + + private Validation watermarkValidation(int expectedValue) { + return new Validation( + currentTable -> + Objects.equals( + currentTable.properties().get(watermarkKey), Integer.toString(expectedValue)), + watermarkFailMessagePattern, + expectedValue); + } + + private final Validation illegalValidation = + new Validation( + currentTable -> { + // illegal table modification inside validation predicate + currentTable.updateProperties().set(watermarkKey, Integer.toString(0)).commit(); + return true; + }, + "Predicate returned false."); + + @TestTemplate + public void testCherryPickPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).stageOnly().commit(); + long overwriteSnapshotId = + Streams.stream(table.snapshots()) + .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation())) + .findFirst() + .get() + .snapshotId(); + validateTableFiles(table, FILE_A); + + new CherryPickOperation(table.name(), table.operations()) + .cherrypick(overwriteSnapshotId) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(overwriteSnapshotId); + validateTableFiles(table, FILE_B); + } + + @TestTemplate + public void testCherryPickFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).stageOnly().commit(); + long overwriteSnapshotId = + Streams.stream(table.snapshots()) + .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation())) + .findFirst() + .get() + .snapshotId(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> + new CherryPickOperation(table.name(), table.operations()) + .cherrypick(overwriteSnapshotId) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(firstSnapshotId).isEqualTo(table.currentSnapshot().snapshotId()); + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testCherryPickFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).stageOnly().commit(); + long overwriteSnapshotId = + Streams.stream(table.snapshots()) + .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation())) + .findFirst() + .get() + .snapshotId(); + validateTableFiles(table, FILE_A); + + setWatermarkProperty(table, 0); + + CherryPickOperation pendingUpdate = + new CherryPickOperation(table.name(), table.operations()).cherrypick(overwriteSnapshotId); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(firstSnapshotId).isEqualTo(table.currentSnapshot().snapshotId()); + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testCherryPickFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_B).stageOnly().commit(); + long overwriteSnapshotId = + Streams.stream(table.snapshots()) + .filter(snap -> DataOperations.OVERWRITE.equals(snap.operation())) + .findFirst() + .get() + .snapshotId(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> + new CherryPickOperation(table.name(), table.operations()) + .cherrypick(overwriteSnapshotId) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(firstSnapshotId).isEqualTo(table.currentSnapshot().snapshotId()); + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testDeleteFilesPassesValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + table.newDelete().deleteFile(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); + + validateTableFiles(table); + } + + @TestTemplate + public void testDeleteFilesFailsValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> + table + .newDelete() + .deleteFile(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testDeleteFilesFailsValidationDueToConcurrentCommit() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + setWatermarkProperty(table, 0); + + PendingUpdate pendingUpdate = table.newDelete().deleteFile(FILE_A); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testDeleteFilesFailsDueToIllegalTableModificationInsideValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> + table.newDelete().deleteFile(FILE_A).commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testExpireSnapshotsPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + Set deletedFiles = Sets.newHashSet(); + + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(deletedFiles) + .as("Should remove the expired manifest list location") + .containsExactly(firstSnapshot.manifestListLocation()); + } + + @TestTemplate + public void testExpireSnapshotsFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + Set deletedFiles = Sets.newHashSet(); + + assertThatThrownBy( + () -> + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(deletedFiles).isEmpty(); + } + + @TestTemplate + public void testExpireSnapshotsFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + Set deletedFiles = Sets.newHashSet(); + + setWatermarkProperty(table, 0); + + PendingUpdate pendingUpdate = + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(deletedFiles).isEmpty(); + } + + @TestTemplate + public void testExpireSnapshotsFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = table.currentSnapshot(); + table.newAppend().appendFile(FILE_B).commit(); + Set deletedFiles = Sets.newHashSet(); + + assertThatThrownBy( + () -> + table + .expireSnapshots() + .expireSnapshotId(firstSnapshot.snapshotId()) + .deleteWith(deletedFiles::add) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(deletedFiles).isEmpty(); + } + + @TestTemplate + public void testFastAppendPassesValidation() { + validateTableFiles(table); + + table.newFastAppend().appendFile(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testFastAppendFailsValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> + table + .newFastAppend() + .appendFile(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table); + } + + @TestTemplate + public void testFastAppendFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + setWatermarkProperty(table, 0); + + PendingUpdate pendingUpdate = table.newFastAppend().appendFile(FILE_A); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table); + } + + @TestTemplate + public void testFastAppendFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> + table + .newFastAppend() + .appendFile(FILE_A) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } + + @TestTemplate + public void testManageSnapshotsPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "tag1"; + assertThat(table.refs().get(tagName)).isNull(); + + table + .manageSnapshots() + .createTag(tagName, snapshotId) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.refs().get(tagName)) + .isNotNull() + .isEqualTo(SnapshotRef.tagBuilder(snapshotId).build()); + } + + @TestTemplate + public void testManageSnapshotsFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "tag1"; + assertThat(table.refs().get(tagName)).isNull(); + + assertThatThrownBy( + () -> + table + .manageSnapshots() + .createTag(tagName, snapshotId) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.refs().get(tagName)).isNull(); + } + + @TestTemplate + public void testManageSnapshotsFailsDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "tag1"; + assertThat(table.refs().get(tagName)).isNull(); + + setWatermarkProperty(table, 0); + + ManageSnapshots pendingUpdate = table.manageSnapshots().createTag(tagName, snapshotId); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Table metadata refresh is required"); + + assertThat(table.refs().get(tagName)).isNull(); + } + + @TestTemplate + public void testManageSnapshotsFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "tag1"; + assertThat(table.refs().get(tagName)).isNull(); + + assertThatThrownBy( + () -> + table + .manageSnapshots() + .createTag(tagName, snapshotId) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.refs().get(tagName)).isNull(); + } + + @TestTemplate + public void testMergeAppendPassesValidation() { + validateTableFiles(table); + + table.newAppend().appendFile(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testMergeAppendFailsValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> + table + .newAppend() + .appendFile(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table); + } + + @TestTemplate + public void testMergeAppendFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + setWatermarkProperty(table, 0); + + AppendFiles pendingUpdate = table.newAppend().appendFile(FILE_A); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table); + } + + @TestTemplate + public void testMergeAppendFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> + table.newAppend().appendFile(FILE_A).commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } + + @TestTemplate + public void testOverwritePassesValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .addFile(FILE_B) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + validateTableFiles(table, FILE_B); + } + + @TestTemplate + public void testOverwriteFailsValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .addFile(FILE_B) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testOverwriteFailsValidationDueToConcurrentCommit() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + setWatermarkProperty(table, 0); + + OverwriteFiles pendingUpdate = + table.newOverwrite().overwriteByRowFilter(Expressions.alwaysTrue()).addFile(FILE_B); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testOverwriteFailsDueToIllegalTableModificationInsideValidation() { + table.newFastAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .addFile(FILE_B) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testReplacePartitionsPassesValidation() { + validateTableFiles(table); + + table.newReplacePartitions().addFile(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testReplacePartitionsFailsValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> + table + .newReplacePartitions() + .addFile(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table); + } + + @TestTemplate + public void testReplacePartitionsFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + setWatermarkProperty(table, 0); + + ReplacePartitions pendingUpdate = table.newReplacePartitions().addFile(FILE_A).addFile(FILE_B); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table); + } + + @TestTemplate + public void testReplacePartitionsFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> + table + .newReplacePartitions() + .addFile(FILE_A) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } + + @TestTemplate + public void testReplaceSortOrderPassesValidation() { + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + + table.replaceSortOrder().asc("data").commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.sortOrder()) + .as("Table should reflect new sort order") + .isEqualTo(SortOrder.builderFor(table.schema()).asc("data").build()); + } + + @TestTemplate + public void testReplaceSortOrderFailsValidation() { + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + + assertThatThrownBy( + () -> + table + .replaceSortOrder() + .asc("data") + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + } + + @TestTemplate + public void testReplaceSortOrderFailsValidationDueToConcurrentCommit() { + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + + setWatermarkProperty(table, 0); + + ReplaceSortOrder pendingUpdate = table.replaceSortOrder().asc("data"); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + } + + @TestTemplate + public void testReplaceSortOrderFailsDueToIllegalTableModificationInsideValidation() { + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + + assertThatThrownBy( + () -> + table.replaceSortOrder().asc("data").commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.sortOrder()).isEqualTo(SortOrder.unsorted()); + } + + @TestTemplate + public void testRewriteFilesPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + table + .newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + validateTableFiles(table, FILE_B); + } + + @TestTemplate + public void testRewriteFilesFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> + table + .newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testRewriteFilesFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + setWatermarkProperty(table, 0); + + RewriteFiles pendingUpdate = + table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testRewriteFilesFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + validateTableFiles(table, FILE_A); + + assertThatThrownBy( + () -> + table + .newRewrite() + .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_B)) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testRewriteManifestsPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + + table + .rewriteManifests() + .clusterBy(dataFile -> "") + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1); + } + + @TestTemplate + public void testRewriteManifestsFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + + assertThatThrownBy( + () -> + table + .rewriteManifests() + .clusterBy(dataFile -> "") + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + } + + @TestTemplate + public void testRewriteManifestsFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + + setWatermarkProperty(table, 0); + + RewriteManifests pendingUpdate = table.rewriteManifests().clusterBy(dataFile -> ""); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + } + + @TestTemplate + public void testRewriteManifestsFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + + assertThatThrownBy( + () -> + table + .rewriteManifests() + .clusterBy(dataFile -> "") + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); + } + + @TestTemplate + public void testRowDeltaPassesValidation() { + validateTableFiles(table); + + table.newRowDelta().addRows(FILE_A).commitIf(ImmutableList.of(alwaysPassValidation)); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testRowDeltaFailsValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> + table + .newRowDelta() + .addRows(FILE_A) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + validateTableFiles(table); + } + + @TestTemplate + public void testRowDeltaFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + setWatermarkProperty(table, 0); + + RowDelta pendingUpdate = table.newRowDelta().addRows(FILE_A); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + validateTableFiles(table); + } + + @TestTemplate + public void testRowDeltaFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> table.newRowDelta().addRows(FILE_A).commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } + + @TestTemplate + public void testSetSnapshotOperationPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + validateTableFiles(table, FILE_A, FILE_B); + + new SetSnapshotOperation(table.operations()) + .setCurrentSnapshot(firstSnapshotId) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(firstSnapshotId); + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testSetSnapshotOperationFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + long secondSnapshotId = table.currentSnapshot().snapshotId(); + validateTableFiles(table, FILE_A, FILE_B); + + assertThatThrownBy( + () -> + new SetSnapshotOperation(table.operations()) + .setCurrentSnapshot(firstSnapshotId) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(secondSnapshotId); + validateTableFiles(table, FILE_A, FILE_B); + } + + @TestTemplate + public void testSetSnapshotOperationFailsValidationDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + long secondSnapshotId = table.currentSnapshot().snapshotId(); + validateTableFiles(table, FILE_A, FILE_B); + + setWatermarkProperty(table, 0); + + SetSnapshotOperation pendingUpdate = + new SetSnapshotOperation(table.operations()).setCurrentSnapshot(firstSnapshotId); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(secondSnapshotId); + validateTableFiles(table, FILE_A, FILE_B); + } + + @TestTemplate + public void testSetSnapshotOperationFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(FILE_B).commit(); + long secondSnapshotId = table.currentSnapshot().snapshotId(); + validateTableFiles(table, FILE_A, FILE_B); + + assertThatThrownBy( + () -> + new SetSnapshotOperation(table.operations()) + .setCurrentSnapshot(firstSnapshotId) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.currentSnapshot().snapshotId()).isEqualTo(secondSnapshotId); + validateTableFiles(table, FILE_A, FILE_B); + } + + @TestTemplate + public void testUpdateSnapshotReferencesOperationPassesValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + String branchName = "feature-develop"; + assertThat(table.ops().refresh().ref(branchName)).isNull(); + + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.ops().refresh().ref(branchName)) + .isEqualTo(SnapshotRef.branchBuilder(firstSnapshotId).build()); + } + + @TestTemplate + public void testUpdateSnapshotReferencesOperationFailsValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + String branchName = "feature-develop"; + assertThat(table.ops().refresh().ref(branchName)).isNull(); + + assertThatThrownBy( + () -> + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.ops().refresh().ref(branchName)).isNull(); + } + + @TestTemplate + public void testUpdateSnapshotReferencesOperationFailsDueToConcurrentCommit() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + String branchName = "feature-develop"; + assertThat(table.ops().refresh().ref(branchName)).isNull(); + + setWatermarkProperty(table, 0); + + UpdateSnapshotReferencesOperation pendingUpdate = + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Cannot commit changes based on stale metadata"); + + assertThat(table.ops().refresh().ref(branchName)).isNull(); + } + + @TestTemplate + public void + testUpdateSnapshotReferencesOperationFailsDueToIllegalTableModificationInsideValidation() { + table.newAppend().appendFile(FILE_A).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + String branchName = "feature-develop"; + assertThat(table.ops().refresh().ref(branchName)).isNull(); + + assertThatThrownBy( + () -> + new UpdateSnapshotReferencesOperation(table.operations()) + .createBranch(branchName, firstSnapshotId) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.ops().refresh().ref(branchName)).isNull(); + } + + private static GenericStatisticsFile genericStatisticsFile(Snapshot currentSnapshot) { + return new GenericStatisticsFile( + currentSnapshot.snapshotId(), + "/some/statistics/file.puffin", + 100, + 42, + ImmutableList.of( + new GenericBlobMetadata( + "stats-type", + currentSnapshot.snapshotId(), + currentSnapshot.sequenceNumber(), + ImmutableList.of(1, 2), + ImmutableMap.of("a-property", "some-property-value")))); + } + + @TestTemplate + public void testUpdateStatisticsPassesValidation() { + table.newFastAppend().commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + assertThat(table.statisticsFiles()).isEmpty(); + + GenericStatisticsFile statisticsFile = genericStatisticsFile(currentSnapshot); + table + .updateStatistics() + .setStatistics(currentSnapshot.snapshotId(), statisticsFile) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.statisticsFiles()) + .as("Table should have statistics files") + .containsExactly(statisticsFile); + } + + @TestTemplate + public void testUpdateStatisticsFailsValidation() { + table.newFastAppend().commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + assertThat(table.statisticsFiles()).isEmpty(); + + assertThatThrownBy( + () -> + table + .updateStatistics() + .setStatistics( + currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.statisticsFiles()).isEmpty(); + } + + @TestTemplate + public void testUpdateStatisticsFailsValidationDueToConcurrentCommit() { + table.newFastAppend().commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + assertThat(table.statisticsFiles()).isEmpty(); + + setWatermarkProperty(table, 0); + + UpdateStatistics pendingUpdate = + table + .updateStatistics() + .setStatistics(currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.statisticsFiles()).isEmpty(); + } + + @TestTemplate + public void testUpdateStatisticsFailsDueToIllegalTableModificationInsideValidation() { + table.newFastAppend().commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + assertThat(table.statisticsFiles()).isEmpty(); + + assertThatThrownBy( + () -> + table + .updateStatistics() + .setStatistics( + currentSnapshot.snapshotId(), genericStatisticsFile(currentSnapshot)) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.statisticsFiles()).isEmpty(); + } + + @TestTemplate + public void testUpdateLocationPassesValidation(@TempDir File tempDir) { + String newLocation = tempDir.getAbsolutePath(); + assertThat(table.location()).isNotEqualTo(newLocation); + + table + .updateLocation() + .setLocation(newLocation) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.location()).isEqualTo(newLocation); + } + + @TestTemplate + public void testUpdateLocationFailsValidation(@TempDir File tempDir) { + String originalLocation = table.location(); + String newLocation = tempDir.getAbsolutePath(); + assertThat(originalLocation).isNotEqualTo(newLocation); + + assertThatThrownBy( + () -> + table + .updateLocation() + .setLocation(newLocation) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.location()).isEqualTo(originalLocation); + } + + @TestTemplate + public void testUpdateLocationFailsValidationDueToConcurrentCommit(@TempDir File tempDir) { + String originalLocation = table.location(); + String newLocation = tempDir.getAbsolutePath(); + assertThat(originalLocation).isNotEqualTo(newLocation); + + setWatermarkProperty(table, 0); + + UpdateLocation pendingUpdate = table.updateLocation().setLocation(newLocation); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.location()).isEqualTo(originalLocation); + } + + @TestTemplate + public void testUpdateLocationFailsDueToIllegalTableModificationInsideValidation( + @TempDir File tempDir) { + String originalLocation = table.location(); + String newLocation = tempDir.getAbsolutePath(); + assertThat(originalLocation).isNotEqualTo(newLocation); + + assertThatThrownBy( + () -> + table + .updateLocation() + .setLocation(newLocation) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.location()).isEqualTo(originalLocation); + } + + @TestTemplate + public void testUpdatePropertiesPassesValidation() { + String key = "newKey"; + String value = "newValue"; + assertThat(table.properties().get(key)).isNull(); + + table.updateProperties().set(key, value).commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.properties().get(key)).isEqualTo(value); + } + + @TestTemplate + public void testUpdatePropertiesFailsValidation() { + String key = "newKey"; + String value = "newValue"; + assertThat(table.properties().get(key)).isNull(); + + assertThatThrownBy( + () -> + table + .updateProperties() + .set(key, value) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.properties().get(key)).isNull(); + } + + @TestTemplate + public void testUpdatePropertiesFailsValidationDueToConcurrentCommit() { + String key = "newKey"; + String value = "newValue"; + assertThat(table.properties().get(key)).isNull(); + + setWatermarkProperty(table, 0); + + UpdateProperties pendingUpdate = table.updateProperties().set(key, value); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(ValidationException.class) + .hasMessage(watermarkFailMessagePattern, 0); + + assertThat(table.properties().get(key)).isNull(); + } + + @TestTemplate + public void testUpdatePropertiesFailsDueToIllegalTableModificationInsideValidation() { + String key = "newKey"; + String value = "newValue"; + assertThat(table.properties().get(key)).isNull(); + + assertThatThrownBy( + () -> + table + .updateProperties() + .set(key, value) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.properties().get(key)).isNull(); + } + + private static final Schema ORIGINAL_SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get())); + + @TestTemplate + public void testUpdateSchemaPassesValidation() { + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + + table + .updateSchema() + .addColumn("bool", Types.BooleanType.get()) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat( + table + .schema() + .sameSchema( + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + optional(3, "bool", Types.BooleanType.get())))) + .as("Should include new bucket") + .isTrue(); + } + + @TestTemplate + public void testUpdateSchemaFailsValidation() { + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + + assertThatThrownBy( + () -> + table + .updateSchema() + .addColumn("bool", Types.BooleanType.get()) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + } + + @TestTemplate + public void testUpdateSchemaFailsDueToConcurrentCommit() { + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + + setWatermarkProperty(table, 0); + + UpdateSchema pendingUpdate = table.updateSchema().addColumn("bool", Types.BooleanType.get()); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Cannot commit changes based on stale metadata"); + + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + } + + @TestTemplate + public void testUpdateSchemaFailsDueToIllegalTableModificationInsideValidation() { + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + + assertThatThrownBy( + () -> + table + .updateSchema() + .addColumn("bool", Types.BooleanType.get()) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.schema().sameSchema(ORIGINAL_SCHEMA)).isTrue(); + } + + private static final PartitionSpec ORIGINAL_SPEC = + PartitionSpec.builderFor(ORIGINAL_SCHEMA) + .bucket("data", BUCKETS_NUMBER, "data_bucket") + .withSpecId(0) + .build(); + + @TestTemplate + public void testUpdateSpecPassesValidation() { + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + + table + .updateSpec() + .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)) + .commitIf(ImmutableList.of(alwaysPassValidation)); + + assertThat(table.spec()) + .as("Should include new bucket") + .isEqualTo( + PartitionSpec.builderFor(table.schema()) + .bucket("data", BUCKETS_NUMBER, "data_bucket") + .bucket("id", BUCKETS_NUMBER, "id_bucket") + .withSpecId(1) + .build()); + } + + @TestTemplate + public void testUpdateSpecFailsValidation() { + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + + assertThatThrownBy( + () -> + table + .updateSpec() + .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)) + .commitIf(ImmutableList.of(alwaysFailValidation))) + .isInstanceOf(ValidationException.class) + .hasMessage(alwaysFailMessage); + + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + } + + @TestTemplate + public void testUpdateSpecFailsDueToConcurrentCommit() { + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + + setWatermarkProperty(table, 0); + + UpdatePartitionSpec pendingUpdate = + table.updateSpec().addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)); + + // concurrent update to the table which advances our watermark value before we're able to commit + setWatermarkProperty(table, 1); + + assertThatThrownBy(() -> pendingUpdate.commitIf(ImmutableList.of(watermarkValidation(0)))) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Cannot commit changes based on stale metadata"); + + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + } + + @TestTemplate + public void testUpdateSpecFailsDueToIllegalTableModificationInsideValidation() { + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + + assertThatThrownBy( + () -> + table + .updateSpec() + .addField("id_bucket", Expressions.bucket("id", BUCKETS_NUMBER)) + .commitIf(ImmutableList.of(illegalValidation))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + assertThat(table.spec()).isEqualTo(ORIGINAL_SPEC); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index 8fed7134fae1..427fd6816947 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -26,12 +26,15 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; @@ -714,4 +717,100 @@ public void testTransactionRecommit() { assertThat(paths).isEqualTo(expectedPaths); assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2); } + + @TestTemplate + public void testTransactionPassesValidation() { + validateTableFiles(table); + + Transaction transaction = table.newTransaction(); + transaction + .newAppend() + .appendFile(FILE_A) + .commitIf( + ImmutableList.of(new Validation(currentTable -> true, "Custom validation failed."))); + transaction.commitTransaction(); + + validateTableFiles(table, FILE_A); + } + + @TestTemplate + public void testTransactionFailsValidation() { + validateTableFiles(table); + + Transaction transaction = table.newTransaction(); + assertThatThrownBy( + () -> + transaction + .newAppend() + .appendFile(FILE_A) + .commitIf( + ImmutableList.of( + new Validation(currentTable -> false, "Custom validation failed."))), + "Transaction commit should fail") + .isInstanceOf(ValidationException.class) + .hasMessage("Custom validation failed."); + + validateTableFiles(table); + } + + @TestTemplate + public void testTransactionFailsValidationDueToConcurrentCommit() { + validateTableFiles(table); + + String watermarkKey = "custom_watermark"; + String currentWatermarkValue = "1"; + String nextWatermarkValue = "2"; + + table.updateProperties().set(watermarkKey, currentWatermarkValue).commit(); + + Transaction transaction = table.newTransaction(); + transaction.newAppend().appendFile(FILE_A).commit(); + transaction + .updateProperties() + .set(watermarkKey, nextWatermarkValue) + .commitIf( + ImmutableList.of( + new Validation( + currentTable -> + Objects.equals( + currentTable.properties().get(watermarkKey), currentWatermarkValue), + "Current watermark value not equal to expected value=%s", + currentWatermarkValue))); + + // concurrent update to the table which advances our watermark value before we're able to commit + table.updateProperties().set(watermarkKey, nextWatermarkValue).commit(); + + assertThatThrownBy(transaction::commitTransaction, "Transaction commit should fail") + .isInstanceOf(ValidationException.class) + .hasMessage("Current watermark value not equal to expected value=1"); + + validateTableFiles(table); + assertThat(table.properties().get(watermarkKey)).isEqualTo(nextWatermarkValue); + } + + @TestTemplate + public void testTransactionFailsDueToIllegalTableModificationInsideValidation() { + validateTableFiles(table); + + assertThatThrownBy( + () -> + table + .newTransaction() + .newAppend() + .appendFile(FILE_A) + .commitIf( + ImmutableList.of( + new Validation( + currentTable -> { + // illegal action + currentTable.updateProperties().set("key", "value").commit(); + return true; + }, + "Custom validation failed."))), + "Any attempts to modify a table inside a validation should throw an exception") + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot modify a static table"); + + validateTableFiles(table); + } }