Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions api/src/main/java/org/apache/iceberg/PendingUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -54,6 +55,26 @@ public interface PendingUpdate<T> {
*/
void commit();

/**
* Apply the pending changes, validate the current version of the table, and commit.
*
* <p>Changes are committed by calling the underlying table's commit method.
*
* <p>Once the commit is successful, the updated table will be refreshed.
*
* @param validations A list of {@link Validation} which will be used to test whether it is safe
* to commit the pending changes to the current version of the table at commit time.
* @throws ValidationException If the update cannot be applied to the current table metadata.
* @throws UnsupportedOperationException If any of the supplied validations attempt to modify the
* table it is given.
* @throws CommitFailedException If the update cannot be committed due to conflicts.
* @throws CommitStateUnknownException If the update success or failure is unknown, no cleanup
* should be done in this case.
*/
default void commitIf(List<Validation> validations) {
throw new UnsupportedOperationException();
}

/**
* Generates update event to notify about metadata changes
*
Expand Down
53 changes: 53 additions & 0 deletions api/src/main/java/org/apache/iceberg/Validation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import com.google.errorprone.annotations.FormatMethod;
import java.util.function.Predicate;
import org.apache.iceberg.exceptions.ValidationException;

public class Validation {
private final Predicate<Table> predicate;
private final String message;
private final Object[] args;

/**
* @param predicate The predicate the table needs to satisfy.
* @param message The message that will be included in the {@link ValidationException} that will
* be thrown by {@link Validation#validate} if the predicate is not satisfied.
* @param args The arguments referenced by the format specifiers in the message, if any.
*/
@FormatMethod
public Validation(Predicate<Table> predicate, String message, Object... args) {
this.predicate = predicate;
this.message = message;
this.args = args;
}

/**
* Ensures that the given table is valid according to the predicate.
*
* @param table The table to test.
* @throws ValidationException If the predicate is not satisfied by the given table.
*/
@SuppressWarnings("FormatStringAnnotation")
public void validate(Table table) {
ValidationException.check(predicate.test(table), message, args);
}
}
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/iceberg/BasePendingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

abstract class BasePendingUpdate<T> implements PendingUpdate<T> {
private final List<Validation> pendingValidations = Lists.newArrayList();

@Override
public void commitIf(List<Validation> validations) {
this.pendingValidations.addAll(validations);
commit();
}

protected final void validate(TableMetadata base) {
Table currentTable = new BaseTable(new StaticTableOperations(base), null);
this.pendingValidations.forEach(validation -> validation.validate(currentTable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.util.Tasks;

public class BaseReplaceSortOrder implements ReplaceSortOrder {
public class BaseReplaceSortOrder extends BasePendingUpdate<SortOrder> implements ReplaceSortOrder {
private final TableOperations ops;
private final SortOrder.Builder builder;
private TableMetadata base;
Expand Down Expand Up @@ -62,6 +62,7 @@ public void commit() {
this.base = ops.refresh();
SortOrder newOrder = apply();
TableMetadata updated = base.replaceSortOrder(newOrder);
validate(base);
taskOps.commit(base, updated);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.Pair;

class BaseUpdatePartitionSpec implements UpdatePartitionSpec {
class BaseUpdatePartitionSpec extends BasePendingUpdate<PartitionSpec>
implements UpdatePartitionSpec {
private final TableOperations ops;
private final TableMetadata base;
private final int formatVersion;
Expand Down Expand Up @@ -342,6 +343,7 @@ public void commit() {
} else {
update = base.addPartitionSpec(apply());
}
validate(base);
ops.commit(base, update);
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/PropertiesUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;

class PropertiesUpdate implements UpdateProperties {
class PropertiesUpdate extends BasePendingUpdate<Map<String, String>> implements UpdateProperties {
private final TableOperations ops;
private final Map<String, String> updates = Maps.newHashMap();
private final Set<String> removals = Sets.newHashSet();
Expand Down Expand Up @@ -110,6 +110,7 @@ public void commit() {
taskOps -> {
Map<String, String> newProperties = apply();
TableMetadata updated = base.replaceProperties(newProperties);
validate(base);
taskOps.commit(base, updated);
});
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings("UnnecessaryAnonymousClass")
class RemoveSnapshots implements ExpireSnapshots {
class RemoveSnapshots extends BasePendingUpdate<List<Snapshot>> implements ExpireSnapshots {
private static final Logger LOG = LoggerFactory.getLogger(RemoveSnapshots.class);

// Creates an executor service that runs each task in the thread that invokes execute/submit.
Expand Down Expand Up @@ -306,6 +306,7 @@ public void commit() {
.run(
item -> {
TableMetadata updated = internalApply();
validate(base);
ops.commit(base, updated);
});
LOG.info("Committed snapshot changes");
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.slf4j.LoggerFactory;

/** Schema evolution API implementation. */
class SchemaUpdate implements UpdateSchema {
class SchemaUpdate extends BasePendingUpdate<Schema> implements UpdateSchema {
private static final Logger LOG = LoggerFactory.getLogger(SchemaUpdate.class);
private static final int TABLE_ROOT_ID = -1;

Expand Down Expand Up @@ -445,6 +445,7 @@ public Schema apply() {
@Override
public void commit() {
TableMetadata update = applyChangesToMetadata(base.updateSchema(apply(), lastColumnId));
validate(base);
ops.commit(base, update);
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/org/apache/iceberg/SetLocation.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.util.Tasks;

public class SetLocation implements UpdateLocation {
public class SetLocation extends BasePendingUpdate<String> implements UpdateLocation {
private final TableOperations ops;
private String newLocation;

Expand Down Expand Up @@ -61,6 +61,10 @@ public void commit() {
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(taskOps -> taskOps.commit(base, base.updateLocation(newLocation)));
.run(
taskOps -> {
validate(base);
taskOps.commit(base, base.updateLocation(newLocation));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* <p>This update is not exposed though the Table API. Instead, it is a package-private part of the
* Transaction API intended for use in {@link ManageSnapshots}.
*/
class SetSnapshotOperation implements PendingUpdate<Snapshot> {
class SetSnapshotOperation extends BasePendingUpdate<Snapshot> implements PendingUpdate<Snapshot> {

private final TableOperations ops;
private TableMetadata base;
Expand Down Expand Up @@ -123,6 +123,8 @@ public void commit() {
.setBranchSnapshot(snapshot.snapshotId(), SnapshotRef.MAIN_BRANCH)
.build();

validate(base);

// Do commit this operation even if the metadata has not changed, as we need to
// advance the hasLastOpCommited for the transaction's commit to work properly.
// (Without any other operations in the transaction, the commitTransaction() call
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/SetStatistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class SetStatistics implements UpdateStatistics {
public class SetStatistics extends BasePendingUpdate<List<StatisticsFile>>
implements UpdateStatistics {
private final TableOperations ops;
private final Map<Long, Optional<StatisticsFile>> statisticsToSet = Maps.newHashMap();

Expand Down Expand Up @@ -54,6 +55,7 @@ public List<StatisticsFile> apply() {
public void commit() {
TableMetadata base = ops.current();
TableMetadata newMetadata = internalApply(base);
validate(base);
ops.commit(base, newMetadata);
}

Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class SnapshotManager implements ManageSnapshots {
Expand Down Expand Up @@ -181,4 +182,12 @@ public void commit() {
transaction.commitTransaction();
}
}

@Override
public void commitIf(List<Validation> validations) {
commitIfRefUpdatesExist();
// Add a no-op UpdateProperties to add given validations to transaction
transaction.updateProperties().commitIf(validations);
commit();
}
Comment on lines +186 to +192
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SnapshotManager is the only PendingUpdate implementation where I have to implement the commitIf method "by hand" i.e. I can't just extend BasePendingUpdate like all the other implementations. This is because of the way SnapshotManager is implemented in terms of Transaction which means I don't have access to any base TableMetadata to validate directly. Instead, I add a conditional, no-op UpdateProperties to the underlying transaction which then validates the current table state as part of the Transaction commit process.

}
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings("UnnecessaryAnonymousClass")
abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
abstract class SnapshotProducer<ThisT> extends BasePendingUpdate<Snapshot>
implements SnapshotUpdate<ThisT> {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotProducer.class);
static final int MIN_FILE_GROUP_SIZE = 10_000;
static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
Expand Down Expand Up @@ -405,6 +406,7 @@ public void commit() {
}

TableMetadata updated = update.build();
validate(base);
if (updated.changes().isEmpty()) {
// do not commit if the metadata has not changed. for example, this may happen
// when setting the current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public StaticTableOperations(String metadataFileLocation, FileIO io) {
this(metadataFileLocation, io, null);
}

public StaticTableOperations(TableMetadata metadata) {
this.metadataFileLocation = metadata.metadataFileLocation();
this.staticMetadata = metadata;
this.io = null;
this.locationProvider = null;
}

public StaticTableOperations(
String metadataFileLocation, FileIO io, LocationProvider locationProvider) {
this.io = io;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
* ToDo: Add SetSnapshotOperation operations such as setCurrentSnapshot, rollBackTime, rollbackTo to
* this class so that we can support those operations for refs.
*/
class UpdateSnapshotReferencesOperation implements PendingUpdate<Map<String, SnapshotRef>> {
class UpdateSnapshotReferencesOperation extends BasePendingUpdate<Map<String, SnapshotRef>>
implements PendingUpdate<Map<String, SnapshotRef>> {

private final TableOperations ops;
private final Map<String, SnapshotRef> updatedRefs;
Expand All @@ -47,6 +48,7 @@ public Map<String, SnapshotRef> apply() {
@Override
public void commit() {
TableMetadata updated = internalApply();
validate(base);
ops.commit(base, updated);
}

Expand Down
Loading